datafusion_cli/object_storage/
instrumented.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::{
19    cmp, fmt,
20    ops::AddAssign,
21    str::FromStr,
22    sync::{
23        Arc,
24        atomic::{AtomicU8, Ordering},
25    },
26    time::Duration,
27};
28
29use arrow::array::{ArrayRef, RecordBatch, StringArray};
30use arrow::util::pretty::pretty_format_batches;
31use async_trait::async_trait;
32use chrono::Utc;
33use datafusion::{
34    common::{HashMap, instant::Instant},
35    error::DataFusionError,
36    execution::object_store::{DefaultObjectStoreRegistry, ObjectStoreRegistry},
37};
38use futures::stream::{BoxStream, Stream};
39use object_store::{
40    GetOptions, GetRange, GetResult, ListResult, MultipartUpload, ObjectMeta,
41    ObjectStore, PutMultipartOptions, PutOptions, PutPayload, PutResult, Result,
42    path::Path,
43};
44use parking_lot::{Mutex, RwLock};
45use url::Url;
46
47/// A stream wrapper that measures the time until the first response(item or end of stream) is yielded
48struct TimeToFirstItemStream<S> {
49    inner: S,
50    start: Instant,
51    request_index: usize,
52    requests: Arc<Mutex<Vec<RequestDetails>>>,
53    first_item_yielded: bool,
54}
55
56impl<S> TimeToFirstItemStream<S> {
57    fn new(
58        inner: S,
59        start: Instant,
60        request_index: usize,
61        requests: Arc<Mutex<Vec<RequestDetails>>>,
62    ) -> Self {
63        Self {
64            inner,
65            start,
66            request_index,
67            requests,
68            first_item_yielded: false,
69        }
70    }
71}
72
73impl<S> Stream for TimeToFirstItemStream<S>
74where
75    S: Stream<Item = Result<ObjectMeta>> + Unpin,
76{
77    type Item = Result<ObjectMeta>;
78
79    fn poll_next(
80        mut self: std::pin::Pin<&mut Self>,
81        cx: &mut std::task::Context<'_>,
82    ) -> std::task::Poll<Option<Self::Item>> {
83        let poll_result = std::pin::Pin::new(&mut self.inner).poll_next(cx);
84
85        if !self.first_item_yielded && poll_result.is_ready() {
86            self.first_item_yielded = true;
87            let elapsed = self.start.elapsed();
88
89            let mut requests = self.requests.lock();
90            if let Some(request) = requests.get_mut(self.request_index) {
91                request.duration = Some(elapsed);
92            }
93        }
94
95        poll_result
96    }
97}
98
99/// The profiling mode to use for an [`InstrumentedObjectStore`] instance. Collecting profiling
100/// data will have a small negative impact on both CPU and memory usage. Default is `Disabled`
101#[derive(Copy, Clone, Debug, Default, PartialEq, Eq)]
102pub enum InstrumentedObjectStoreMode {
103    /// Disable collection of profiling data
104    #[default]
105    Disabled,
106    /// Enable collection of profiling data and output a summary
107    Summary,
108    /// Enable collection of profiling data and output a summary and all details
109    Trace,
110}
111
112impl fmt::Display for InstrumentedObjectStoreMode {
113    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
114        write!(f, "{self:?}")
115    }
116}
117
118impl FromStr for InstrumentedObjectStoreMode {
119    type Err = DataFusionError;
120
121    fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
122        match s.to_lowercase().as_str() {
123            "disabled" => Ok(Self::Disabled),
124            "summary" => Ok(Self::Summary),
125            "trace" => Ok(Self::Trace),
126            _ => Err(DataFusionError::Execution(format!("Unrecognized mode {s}"))),
127        }
128    }
129}
130
131impl From<u8> for InstrumentedObjectStoreMode {
132    fn from(value: u8) -> Self {
133        match value {
134            1 => InstrumentedObjectStoreMode::Summary,
135            2 => InstrumentedObjectStoreMode::Trace,
136            _ => InstrumentedObjectStoreMode::Disabled,
137        }
138    }
139}
140
141/// Wrapped [`ObjectStore`] instances that record information for reporting on the usage of the
142/// inner [`ObjectStore`]
143#[derive(Debug)]
144pub struct InstrumentedObjectStore {
145    inner: Arc<dyn ObjectStore>,
146    instrument_mode: AtomicU8,
147    requests: Arc<Mutex<Vec<RequestDetails>>>,
148}
149
150impl InstrumentedObjectStore {
151    /// Returns a new [`InstrumentedObjectStore`] that wraps the provided [`ObjectStore`]
152    fn new(object_store: Arc<dyn ObjectStore>, instrument_mode: AtomicU8) -> Self {
153        Self {
154            inner: object_store,
155            instrument_mode,
156            requests: Arc::new(Mutex::new(Vec::new())),
157        }
158    }
159
160    fn set_instrument_mode(&self, mode: InstrumentedObjectStoreMode) {
161        self.instrument_mode.store(mode as u8, Ordering::Relaxed)
162    }
163
164    /// Returns all [`RequestDetails`] accumulated in this [`InstrumentedObjectStore`] and clears
165    /// the stored requests
166    pub fn take_requests(&self) -> Vec<RequestDetails> {
167        let mut req = self.requests.lock();
168
169        req.drain(..).collect()
170    }
171
172    fn enabled(&self) -> bool {
173        self.instrument_mode.load(Ordering::Relaxed)
174            != InstrumentedObjectStoreMode::Disabled as u8
175    }
176
177    async fn instrumented_put_opts(
178        &self,
179        location: &Path,
180        payload: PutPayload,
181        opts: PutOptions,
182    ) -> Result<PutResult> {
183        let timestamp = Utc::now();
184        let start = Instant::now();
185        let size = payload.content_length();
186        let ret = self.inner.put_opts(location, payload, opts).await?;
187        let elapsed = start.elapsed();
188
189        self.requests.lock().push(RequestDetails {
190            op: Operation::Put,
191            path: location.clone(),
192            timestamp,
193            duration: Some(elapsed),
194            size: Some(size),
195            range: None,
196            extra_display: None,
197        });
198
199        Ok(ret)
200    }
201
202    async fn instrumented_put_multipart(
203        &self,
204        location: &Path,
205        opts: PutMultipartOptions,
206    ) -> Result<Box<dyn MultipartUpload>> {
207        let timestamp = Utc::now();
208        let start = Instant::now();
209        let ret = self.inner.put_multipart_opts(location, opts).await?;
210        let elapsed = start.elapsed();
211
212        self.requests.lock().push(RequestDetails {
213            op: Operation::Put,
214            path: location.clone(),
215            timestamp,
216            duration: Some(elapsed),
217            size: None,
218            range: None,
219            extra_display: None,
220        });
221
222        Ok(ret)
223    }
224
225    async fn instrumented_get_opts(
226        &self,
227        location: &Path,
228        options: GetOptions,
229    ) -> Result<GetResult> {
230        let timestamp = Utc::now();
231        let range = options.range.clone();
232
233        let start = Instant::now();
234        let ret = self.inner.get_opts(location, options).await?;
235        let elapsed = start.elapsed();
236
237        self.requests.lock().push(RequestDetails {
238            op: Operation::Get,
239            path: location.clone(),
240            timestamp,
241            duration: Some(elapsed),
242            size: Some((ret.range.end - ret.range.start) as usize),
243            range,
244            extra_display: None,
245        });
246
247        Ok(ret)
248    }
249
250    async fn instrumented_delete(&self, location: &Path) -> Result<()> {
251        let timestamp = Utc::now();
252        let start = Instant::now();
253        self.inner.delete(location).await?;
254        let elapsed = start.elapsed();
255
256        self.requests.lock().push(RequestDetails {
257            op: Operation::Delete,
258            path: location.clone(),
259            timestamp,
260            duration: Some(elapsed),
261            size: None,
262            range: None,
263            extra_display: None,
264        });
265
266        Ok(())
267    }
268
269    fn instrumented_list(
270        &self,
271        prefix: Option<&Path>,
272    ) -> BoxStream<'static, Result<ObjectMeta>> {
273        let timestamp = Utc::now();
274        let start = Instant::now();
275        let inner_stream = self.inner.list(prefix);
276
277        let request_index = {
278            let mut requests = self.requests.lock();
279            requests.push(RequestDetails {
280                op: Operation::List,
281                path: prefix.cloned().unwrap_or_else(|| Path::from("")),
282                timestamp,
283                duration: None,
284                size: None,
285                range: None,
286                extra_display: None,
287            });
288            requests.len() - 1
289        };
290
291        let wrapped_stream = TimeToFirstItemStream::new(
292            inner_stream,
293            start,
294            request_index,
295            Arc::clone(&self.requests),
296        );
297
298        Box::pin(wrapped_stream)
299    }
300
301    async fn instrumented_list_with_delimiter(
302        &self,
303        prefix: Option<&Path>,
304    ) -> Result<ListResult> {
305        let timestamp = Utc::now();
306        let start = Instant::now();
307        let ret = self.inner.list_with_delimiter(prefix).await?;
308        let elapsed = start.elapsed();
309
310        self.requests.lock().push(RequestDetails {
311            op: Operation::List,
312            path: prefix.cloned().unwrap_or_else(|| Path::from("")),
313            timestamp,
314            duration: Some(elapsed),
315            size: None,
316            range: None,
317            extra_display: None,
318        });
319
320        Ok(ret)
321    }
322
323    async fn instrumented_copy(&self, from: &Path, to: &Path) -> Result<()> {
324        let timestamp = Utc::now();
325        let start = Instant::now();
326        self.inner.copy(from, to).await?;
327        let elapsed = start.elapsed();
328
329        self.requests.lock().push(RequestDetails {
330            op: Operation::Copy,
331            path: from.clone(),
332            timestamp,
333            duration: Some(elapsed),
334            size: None,
335            range: None,
336            extra_display: Some(format!("copy_to: {to}")),
337        });
338
339        Ok(())
340    }
341
342    async fn instrumented_copy_if_not_exists(
343        &self,
344        from: &Path,
345        to: &Path,
346    ) -> Result<()> {
347        let timestamp = Utc::now();
348        let start = Instant::now();
349        self.inner.copy_if_not_exists(from, to).await?;
350        let elapsed = start.elapsed();
351
352        self.requests.lock().push(RequestDetails {
353            op: Operation::Copy,
354            path: from.clone(),
355            timestamp,
356            duration: Some(elapsed),
357            size: None,
358            range: None,
359            extra_display: Some(format!("copy_to: {to}")),
360        });
361
362        Ok(())
363    }
364
365    async fn instrumented_head(&self, location: &Path) -> Result<ObjectMeta> {
366        let timestamp = Utc::now();
367        let start = Instant::now();
368        let ret = self.inner.head(location).await?;
369        let elapsed = start.elapsed();
370
371        self.requests.lock().push(RequestDetails {
372            op: Operation::Head,
373            path: location.clone(),
374            timestamp,
375            duration: Some(elapsed),
376            size: None,
377            range: None,
378            extra_display: None,
379        });
380
381        Ok(ret)
382    }
383}
384
385impl fmt::Display for InstrumentedObjectStore {
386    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
387        let mode: InstrumentedObjectStoreMode =
388            self.instrument_mode.load(Ordering::Relaxed).into();
389        write!(
390            f,
391            "Instrumented Object Store: instrument_mode: {mode}, inner: {}",
392            self.inner
393        )
394    }
395}
396
397#[async_trait]
398impl ObjectStore for InstrumentedObjectStore {
399    async fn put_opts(
400        &self,
401        location: &Path,
402        payload: PutPayload,
403        opts: PutOptions,
404    ) -> Result<PutResult> {
405        if self.enabled() {
406            return self.instrumented_put_opts(location, payload, opts).await;
407        }
408
409        self.inner.put_opts(location, payload, opts).await
410    }
411
412    async fn put_multipart_opts(
413        &self,
414        location: &Path,
415        opts: PutMultipartOptions,
416    ) -> Result<Box<dyn MultipartUpload>> {
417        if self.enabled() {
418            return self.instrumented_put_multipart(location, opts).await;
419        }
420
421        self.inner.put_multipart_opts(location, opts).await
422    }
423
424    async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
425        if self.enabled() {
426            return self.instrumented_get_opts(location, options).await;
427        }
428
429        self.inner.get_opts(location, options).await
430    }
431
432    async fn delete(&self, location: &Path) -> Result<()> {
433        if self.enabled() {
434            return self.instrumented_delete(location).await;
435        }
436
437        self.inner.delete(location).await
438    }
439
440    fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result<ObjectMeta>> {
441        if self.enabled() {
442            return self.instrumented_list(prefix);
443        }
444
445        self.inner.list(prefix)
446    }
447
448    async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
449        if self.enabled() {
450            return self.instrumented_list_with_delimiter(prefix).await;
451        }
452
453        self.inner.list_with_delimiter(prefix).await
454    }
455
456    async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
457        if self.enabled() {
458            return self.instrumented_copy(from, to).await;
459        }
460
461        self.inner.copy(from, to).await
462    }
463
464    async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
465        if self.enabled() {
466            return self.instrumented_copy_if_not_exists(from, to).await;
467        }
468
469        self.inner.copy_if_not_exists(from, to).await
470    }
471
472    async fn head(&self, location: &Path) -> Result<ObjectMeta> {
473        if self.enabled() {
474            return self.instrumented_head(location).await;
475        }
476
477        self.inner.head(location).await
478    }
479}
480
481/// Object store operation types tracked by [`InstrumentedObjectStore`]
482#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash, PartialOrd, Ord)]
483pub enum Operation {
484    Copy,
485    Delete,
486    Get,
487    Head,
488    List,
489    Put,
490}
491
492impl fmt::Display for Operation {
493    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
494        write!(f, "{self:?}")
495    }
496}
497
498/// Holds profiling details about individual requests made through an [`InstrumentedObjectStore`]
499#[derive(Debug)]
500pub struct RequestDetails {
501    op: Operation,
502    path: Path,
503    timestamp: chrono::DateTime<Utc>,
504    duration: Option<Duration>,
505    size: Option<usize>,
506    range: Option<GetRange>,
507    extra_display: Option<String>,
508}
509
510impl fmt::Display for RequestDetails {
511    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
512        let mut output_parts = vec![format!(
513            "{} operation={:?}",
514            self.timestamp.to_rfc3339(),
515            self.op
516        )];
517
518        if let Some(d) = self.duration {
519            output_parts.push(format!("duration={:.6}s", d.as_secs_f32()));
520        }
521        if let Some(s) = self.size {
522            output_parts.push(format!("size={s}"));
523        }
524        if let Some(r) = &self.range {
525            output_parts.push(format!("range: {r}"));
526        }
527        output_parts.push(format!("path={}", self.path));
528
529        if let Some(ed) = &self.extra_display {
530            output_parts.push(ed.clone());
531        }
532
533        write!(f, "{}", output_parts.join(" "))
534    }
535}
536
537/// Summary statistics for all requests recorded in an [`InstrumentedObjectStore`]
538#[derive(Default)]
539pub struct RequestSummaries {
540    summaries: Vec<RequestSummary>,
541}
542
543/// Display the summary as a table
544impl fmt::Display for RequestSummaries {
545    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
546        // Don't expect an error, but avoid panicking if it happens
547        match pretty_format_batches(&[self.to_batch()]) {
548            Err(e) => {
549                write!(f, "Error formatting summary: {e}")
550            }
551            Ok(displayable) => {
552                write!(f, "{displayable}")
553            }
554        }
555    }
556}
557
558impl RequestSummaries {
559    /// Summarizes input [`RequestDetails`]
560    pub fn new(requests: &[RequestDetails]) -> Self {
561        let mut summaries: HashMap<Operation, RequestSummary> = HashMap::new();
562        for rd in requests {
563            match summaries.get_mut(&rd.op) {
564                Some(rs) => rs.push(rd),
565                None => {
566                    let mut rs = RequestSummary::new(rd.op);
567                    rs.push(rd);
568                    summaries.insert(rd.op, rs);
569                }
570            }
571        }
572        // Convert to a Vec with consistent ordering
573        let mut summaries: Vec<RequestSummary> = summaries.into_values().collect();
574        summaries.sort_by_key(|s| s.operation);
575        Self { summaries }
576    }
577
578    /// Convert the summaries into a `RecordBatch` for display
579    ///
580    /// Results in a table like:
581    /// ```text
582    /// +-----------+----------+-----------+-----------+-----------+-----------+-----------+
583    /// | Operation | Metric   | min       | max       | avg       | sum       | count     |
584    /// +-----------+----------+-----------+-----------+-----------+-----------+-----------+
585    /// | Get       | duration | 5.000000s | 5.000000s | 5.000000s |           | 1         |
586    /// | Get       | size     | 100 B     | 100 B     | 100 B     | 100 B     | 1         |
587    /// +-----------+----------+-----------+-----------+-----------+-----------+-----------+
588    /// ```
589    pub fn to_batch(&self) -> RecordBatch {
590        let operations: StringArray = self
591            .iter()
592            .flat_map(|s| std::iter::repeat_n(Some(s.operation.to_string()), 2))
593            .collect();
594        let metrics: StringArray = self
595            .iter()
596            .flat_map(|_s| [Some("duration"), Some("size")])
597            .collect();
598        let mins: StringArray = self
599            .stats_iter()
600            .flat_map(|(duration_stats, size_stats)| {
601                let dur_min =
602                    duration_stats.map(|d| format!("{:.6}s", d.min.as_secs_f32()));
603                let size_min = size_stats.map(|s| format!("{} B", s.min));
604                [dur_min, size_min]
605            })
606            .collect();
607        let maxs: StringArray = self
608            .stats_iter()
609            .flat_map(|(duration_stats, size_stats)| {
610                let dur_max =
611                    duration_stats.map(|d| format!("{:.6}s", d.max.as_secs_f32()));
612                let size_max = size_stats.map(|s| format!("{} B", s.max));
613                [dur_max, size_max]
614            })
615            .collect();
616        let avgs: StringArray = self
617            .iter()
618            .flat_map(|s| {
619                let count = s.count as f32;
620                let duration_stats = s.duration_stats.as_ref();
621                let size_stats = s.size_stats.as_ref();
622                let dur_avg = duration_stats.map(|d| {
623                    let avg = d.sum.as_secs_f32() / count;
624                    format!("{avg:.6}s")
625                });
626                let size_avg = size_stats.map(|s| {
627                    let avg = s.sum as f32 / count;
628                    format!("{avg} B")
629                });
630                [dur_avg, size_avg]
631            })
632            .collect();
633        let sums: StringArray = self
634            .stats_iter()
635            .flat_map(|(duration_stats, size_stats)| {
636                // Omit a sum stat for duration in the initial
637                // implementation because it can be a bit misleading (at least
638                // at first glance). For example, particularly large queries the
639                // sum of the durations was often larger than the total time of
640                // the query itself, can be confusing without additional
641                // explanation (e.g. that the sum is of individual requests,
642                // which may be concurrent).
643                let dur_sum =
644                    duration_stats.map(|d| format!("{:.6}s", d.sum.as_secs_f32()));
645                let size_sum = size_stats.map(|s| format!("{} B", s.sum));
646                [dur_sum, size_sum]
647            })
648            .collect();
649        let counts: StringArray = self
650            .iter()
651            .flat_map(|s| {
652                let count = s.count.to_string();
653                [Some(count.clone()), Some(count)]
654            })
655            .collect();
656
657        RecordBatch::try_from_iter(vec![
658            ("Operation", Arc::new(operations) as ArrayRef),
659            ("Metric", Arc::new(metrics) as ArrayRef),
660            ("min", Arc::new(mins) as ArrayRef),
661            ("max", Arc::new(maxs) as ArrayRef),
662            ("avg", Arc::new(avgs) as ArrayRef),
663            ("sum", Arc::new(sums) as ArrayRef),
664            ("count", Arc::new(counts) as ArrayRef),
665        ])
666        .expect("Created the batch correctly")
667    }
668
669    /// Return an iterator over the summaries
670    fn iter(&self) -> impl Iterator<Item = &RequestSummary> {
671        self.summaries.iter()
672    }
673
674    /// Return an iterator over (duration_stats, size_stats) tuples
675    /// for each summary
676    fn stats_iter(
677        &self,
678    ) -> impl Iterator<Item = (Option<&Stats<Duration>>, Option<&Stats<usize>>)> {
679        self.summaries
680            .iter()
681            .map(|s| (s.duration_stats.as_ref(), s.size_stats.as_ref()))
682    }
683}
684
685/// Summary statistics for a particular type of [`Operation`] (e.g. `GET` or `PUT`)
686/// in an [`InstrumentedObjectStore`]'s [`RequestDetails`]
687pub struct RequestSummary {
688    operation: Operation,
689    count: usize,
690    duration_stats: Option<Stats<Duration>>,
691    size_stats: Option<Stats<usize>>,
692}
693
694impl RequestSummary {
695    fn new(operation: Operation) -> Self {
696        Self {
697            operation,
698            count: 0,
699            duration_stats: None,
700            size_stats: None,
701        }
702    }
703    fn push(&mut self, request: &RequestDetails) {
704        self.count += 1;
705        if let Some(dur) = request.duration {
706            self.duration_stats.get_or_insert_default().push(dur)
707        }
708        if let Some(size) = request.size {
709            self.size_stats.get_or_insert_default().push(size)
710        }
711    }
712}
713
714struct Stats<T: Copy + Ord + AddAssign<T>> {
715    min: T,
716    max: T,
717    sum: T,
718}
719
720impl<T: Copy + Ord + AddAssign<T>> Stats<T> {
721    fn push(&mut self, val: T) {
722        self.min = cmp::min(val, self.min);
723        self.max = cmp::max(val, self.max);
724        self.sum += val;
725    }
726}
727
728impl Default for Stats<Duration> {
729    fn default() -> Self {
730        Self {
731            min: Duration::MAX,
732            max: Duration::ZERO,
733            sum: Duration::ZERO,
734        }
735    }
736}
737
738impl Default for Stats<usize> {
739    fn default() -> Self {
740        Self {
741            min: usize::MAX,
742            max: usize::MIN,
743            sum: 0,
744        }
745    }
746}
747
748/// Provides access to [`InstrumentedObjectStore`] instances that record requests for reporting
749#[derive(Debug)]
750pub struct InstrumentedObjectStoreRegistry {
751    inner: Arc<dyn ObjectStoreRegistry>,
752    instrument_mode: AtomicU8,
753    stores: RwLock<Vec<Arc<InstrumentedObjectStore>>>,
754}
755
756impl Default for InstrumentedObjectStoreRegistry {
757    fn default() -> Self {
758        Self::new()
759    }
760}
761
762impl InstrumentedObjectStoreRegistry {
763    /// Returns a new [`InstrumentedObjectStoreRegistry`] that wraps the provided
764    /// [`ObjectStoreRegistry`]
765    pub fn new() -> Self {
766        Self {
767            inner: Arc::new(DefaultObjectStoreRegistry::new()),
768            instrument_mode: AtomicU8::new(InstrumentedObjectStoreMode::default() as u8),
769            stores: RwLock::new(Vec::new()),
770        }
771    }
772
773    pub fn with_profile_mode(self, mode: InstrumentedObjectStoreMode) -> Self {
774        self.instrument_mode.store(mode as u8, Ordering::Relaxed);
775        self
776    }
777
778    /// Provides access to all of the [`InstrumentedObjectStore`]s managed by this
779    /// [`InstrumentedObjectStoreRegistry`]
780    pub fn stores(&self) -> Vec<Arc<InstrumentedObjectStore>> {
781        self.stores.read().clone()
782    }
783
784    /// Returns the current [`InstrumentedObjectStoreMode`] for this
785    /// [`InstrumentedObjectStoreRegistry`]
786    pub fn instrument_mode(&self) -> InstrumentedObjectStoreMode {
787        self.instrument_mode.load(Ordering::Relaxed).into()
788    }
789
790    /// Sets the [`InstrumentedObjectStoreMode`] for this [`InstrumentedObjectStoreRegistry`]
791    pub fn set_instrument_mode(&self, mode: InstrumentedObjectStoreMode) {
792        self.instrument_mode.store(mode as u8, Ordering::Relaxed);
793        for s in self.stores.read().iter() {
794            s.set_instrument_mode(mode)
795        }
796    }
797}
798
799impl ObjectStoreRegistry for InstrumentedObjectStoreRegistry {
800    fn register_store(
801        &self,
802        url: &Url,
803        store: Arc<dyn ObjectStore>,
804    ) -> Option<Arc<dyn ObjectStore>> {
805        let mode = self.instrument_mode.load(Ordering::Relaxed);
806        let instrumented =
807            Arc::new(InstrumentedObjectStore::new(store, AtomicU8::new(mode)));
808        self.stores.write().push(Arc::clone(&instrumented));
809        self.inner.register_store(url, instrumented)
810    }
811
812    fn deregister_store(
813        &self,
814        url: &Url,
815    ) -> datafusion::common::Result<Arc<dyn ObjectStore>> {
816        self.inner.deregister_store(url)
817    }
818
819    fn get_store(&self, url: &Url) -> datafusion::common::Result<Arc<dyn ObjectStore>> {
820        self.inner.get_store(url)
821    }
822}
823
824#[cfg(test)]
825mod tests {
826    use futures::StreamExt;
827    use object_store::WriteMultipart;
828
829    use super::*;
830    use insta::assert_snapshot;
831
832    #[test]
833    fn instrumented_mode() {
834        assert!(matches!(
835            InstrumentedObjectStoreMode::default(),
836            InstrumentedObjectStoreMode::Disabled
837        ));
838
839        assert!(matches!(
840            "dIsABleD".parse().unwrap(),
841            InstrumentedObjectStoreMode::Disabled
842        ));
843        assert!(matches!(
844            "SUmMaRy".parse().unwrap(),
845            InstrumentedObjectStoreMode::Summary
846        ));
847        assert!(matches!(
848            "TRaCe".parse().unwrap(),
849            InstrumentedObjectStoreMode::Trace
850        ));
851        assert!(
852            "does_not_exist"
853                .parse::<InstrumentedObjectStoreMode>()
854                .is_err()
855        );
856
857        assert!(matches!(0.into(), InstrumentedObjectStoreMode::Disabled));
858        assert!(matches!(1.into(), InstrumentedObjectStoreMode::Summary));
859        assert!(matches!(2.into(), InstrumentedObjectStoreMode::Trace));
860        assert!(matches!(3.into(), InstrumentedObjectStoreMode::Disabled));
861    }
862
863    #[test]
864    fn instrumented_registry() {
865        let mut reg = InstrumentedObjectStoreRegistry::new();
866        assert!(reg.stores().is_empty());
867        assert_eq!(
868            reg.instrument_mode(),
869            InstrumentedObjectStoreMode::default()
870        );
871
872        reg = reg.with_profile_mode(InstrumentedObjectStoreMode::Trace);
873        assert_eq!(reg.instrument_mode(), InstrumentedObjectStoreMode::Trace);
874
875        let store = object_store::memory::InMemory::new();
876        let url = "mem://test".parse().unwrap();
877        let registered = reg.register_store(&url, Arc::new(store));
878        assert!(registered.is_none());
879
880        let fetched = reg.get_store(&url);
881        assert!(fetched.is_ok());
882        assert_eq!(reg.stores().len(), 1);
883    }
884
885    // Returns an `InstrumentedObjectStore` with some data loaded for testing and the path to
886    // access the data
887    async fn setup_test_store() -> (InstrumentedObjectStore, Path) {
888        let store = Arc::new(object_store::memory::InMemory::new());
889        let mode = AtomicU8::new(InstrumentedObjectStoreMode::default() as u8);
890        let instrumented = InstrumentedObjectStore::new(store, mode);
891
892        // Load the test store with some data we can read
893        let path = Path::from("test/data");
894        let payload = PutPayload::from_static(b"test_data");
895        instrumented.put(&path, payload).await.unwrap();
896
897        (instrumented, path)
898    }
899
900    #[tokio::test]
901    async fn instrumented_store_get() {
902        let (instrumented, path) = setup_test_store().await;
903
904        // By default no requests should be instrumented/stored
905        assert!(instrumented.requests.lock().is_empty());
906        let _ = instrumented.get(&path).await.unwrap();
907        assert!(instrumented.requests.lock().is_empty());
908
909        instrumented.set_instrument_mode(InstrumentedObjectStoreMode::Trace);
910        assert!(instrumented.requests.lock().is_empty());
911        let _ = instrumented.get(&path).await.unwrap();
912        assert_eq!(instrumented.requests.lock().len(), 1);
913
914        let mut requests = instrumented.take_requests();
915        assert_eq!(requests.len(), 1);
916        assert!(instrumented.requests.lock().is_empty());
917
918        let request = requests.pop().unwrap();
919        assert_eq!(request.op, Operation::Get);
920        assert_eq!(request.path, path);
921        assert!(request.duration.is_some());
922        assert_eq!(request.size, Some(9));
923        assert_eq!(request.range, None);
924        assert!(request.extra_display.is_none());
925    }
926
927    #[tokio::test]
928    async fn instrumented_store_delete() {
929        let (instrumented, path) = setup_test_store().await;
930
931        // By default no requests should be instrumented/stored
932        assert!(instrumented.requests.lock().is_empty());
933        instrumented.delete(&path).await.unwrap();
934        assert!(instrumented.requests.lock().is_empty());
935
936        // We need a new store so we have data to delete again
937        let (instrumented, path) = setup_test_store().await;
938        instrumented.set_instrument_mode(InstrumentedObjectStoreMode::Trace);
939        assert!(instrumented.requests.lock().is_empty());
940        instrumented.delete(&path).await.unwrap();
941        assert_eq!(instrumented.requests.lock().len(), 1);
942
943        let mut requests = instrumented.take_requests();
944        assert_eq!(requests.len(), 1);
945        assert!(instrumented.requests.lock().is_empty());
946
947        let request = requests.pop().unwrap();
948        assert_eq!(request.op, Operation::Delete);
949        assert_eq!(request.path, path);
950        assert!(request.duration.is_some());
951        assert!(request.size.is_none());
952        assert!(request.range.is_none());
953        assert!(request.extra_display.is_none());
954    }
955
956    #[tokio::test]
957    async fn instrumented_store_list() {
958        let (instrumented, path) = setup_test_store().await;
959
960        // By default no requests should be instrumented/stored
961        assert!(instrumented.requests.lock().is_empty());
962        let _ = instrumented.list(Some(&path));
963        assert!(instrumented.requests.lock().is_empty());
964
965        instrumented.set_instrument_mode(InstrumentedObjectStoreMode::Trace);
966        assert!(instrumented.requests.lock().is_empty());
967        let mut stream = instrumented.list(Some(&path));
968        // Consume at least one item from the stream to trigger duration measurement
969        let _ = stream.next().await;
970        assert_eq!(instrumented.requests.lock().len(), 1);
971
972        let request = instrumented.take_requests().pop().unwrap();
973        assert_eq!(request.op, Operation::List);
974        assert_eq!(request.path, path);
975        assert!(request.duration.is_some());
976        assert!(request.size.is_none());
977        assert!(request.range.is_none());
978        assert!(request.extra_display.is_none());
979    }
980
981    #[tokio::test]
982    async fn instrumented_store_list_with_delimiter() {
983        let (instrumented, path) = setup_test_store().await;
984
985        // By default no requests should be instrumented/stored
986        assert!(instrumented.requests.lock().is_empty());
987        let _ = instrumented.list_with_delimiter(Some(&path)).await.unwrap();
988        assert!(instrumented.requests.lock().is_empty());
989
990        instrumented.set_instrument_mode(InstrumentedObjectStoreMode::Trace);
991        assert!(instrumented.requests.lock().is_empty());
992        let _ = instrumented.list_with_delimiter(Some(&path)).await.unwrap();
993        assert_eq!(instrumented.requests.lock().len(), 1);
994
995        let request = instrumented.take_requests().pop().unwrap();
996        assert_eq!(request.op, Operation::List);
997        assert_eq!(request.path, path);
998        assert!(request.duration.is_some());
999        assert!(request.size.is_none());
1000        assert!(request.range.is_none());
1001        assert!(request.extra_display.is_none());
1002    }
1003
1004    #[tokio::test]
1005    async fn instrumented_store_put_opts() {
1006        // The `setup_test_store()` method comes with data already `put` into it, so we'll setup
1007        // manually for this test
1008        let store = Arc::new(object_store::memory::InMemory::new());
1009        let mode = AtomicU8::new(InstrumentedObjectStoreMode::default() as u8);
1010        let instrumented = InstrumentedObjectStore::new(store, mode);
1011
1012        let path = Path::from("test/data");
1013        let payload = PutPayload::from_static(b"test_data");
1014        let size = payload.content_length();
1015
1016        // By default no requests should be instrumented/stored
1017        assert!(instrumented.requests.lock().is_empty());
1018        instrumented.put(&path, payload.clone()).await.unwrap();
1019        assert!(instrumented.requests.lock().is_empty());
1020
1021        instrumented.set_instrument_mode(InstrumentedObjectStoreMode::Trace);
1022        assert!(instrumented.requests.lock().is_empty());
1023        instrumented.put(&path, payload).await.unwrap();
1024        assert_eq!(instrumented.requests.lock().len(), 1);
1025
1026        let request = instrumented.take_requests().pop().unwrap();
1027        assert_eq!(request.op, Operation::Put);
1028        assert_eq!(request.path, path);
1029        assert!(request.duration.is_some());
1030        assert_eq!(request.size.unwrap(), size);
1031        assert!(request.range.is_none());
1032        assert!(request.extra_display.is_none());
1033    }
1034
1035    #[tokio::test]
1036    async fn instrumented_store_put_multipart() {
1037        // The `setup_test_store()` method comes with data already `put` into it, so we'll setup
1038        // manually for this test
1039        let store = Arc::new(object_store::memory::InMemory::new());
1040        let mode = AtomicU8::new(InstrumentedObjectStoreMode::default() as u8);
1041        let instrumented = InstrumentedObjectStore::new(store, mode);
1042
1043        let path = Path::from("test/data");
1044
1045        // By default no requests should be instrumented/stored
1046        assert!(instrumented.requests.lock().is_empty());
1047        let mp = instrumented.put_multipart(&path).await.unwrap();
1048        let mut write = WriteMultipart::new(mp);
1049        write.write(b"test_data");
1050        write.finish().await.unwrap();
1051        assert!(instrumented.requests.lock().is_empty());
1052
1053        instrumented.set_instrument_mode(InstrumentedObjectStoreMode::Trace);
1054        assert!(instrumented.requests.lock().is_empty());
1055        let mp = instrumented.put_multipart(&path).await.unwrap();
1056        let mut write = WriteMultipart::new(mp);
1057        write.write(b"test_data");
1058        write.finish().await.unwrap();
1059        assert_eq!(instrumented.requests.lock().len(), 1);
1060
1061        let request = instrumented.take_requests().pop().unwrap();
1062        assert_eq!(request.op, Operation::Put);
1063        assert_eq!(request.path, path);
1064        assert!(request.duration.is_some());
1065        assert!(request.size.is_none());
1066        assert!(request.range.is_none());
1067        assert!(request.extra_display.is_none());
1068    }
1069
1070    #[tokio::test]
1071    async fn instrumented_store_copy() {
1072        let (instrumented, path) = setup_test_store().await;
1073        let copy_to = Path::from("test/copied");
1074
1075        // By default no requests should be instrumented/stored
1076        assert!(instrumented.requests.lock().is_empty());
1077        instrumented.copy(&path, &copy_to).await.unwrap();
1078        assert!(instrumented.requests.lock().is_empty());
1079
1080        instrumented.set_instrument_mode(InstrumentedObjectStoreMode::Trace);
1081        assert!(instrumented.requests.lock().is_empty());
1082        instrumented.copy(&path, &copy_to).await.unwrap();
1083        assert_eq!(instrumented.requests.lock().len(), 1);
1084
1085        let mut requests = instrumented.take_requests();
1086        assert_eq!(requests.len(), 1);
1087        assert!(instrumented.requests.lock().is_empty());
1088
1089        let request = requests.pop().unwrap();
1090        assert_eq!(request.op, Operation::Copy);
1091        assert_eq!(request.path, path);
1092        assert!(request.duration.is_some());
1093        assert!(request.size.is_none());
1094        assert!(request.range.is_none());
1095        assert_eq!(
1096            request.extra_display.unwrap(),
1097            format!("copy_to: {copy_to}")
1098        );
1099    }
1100
1101    #[tokio::test]
1102    async fn instrumented_store_copy_if_not_exists() {
1103        let (instrumented, path) = setup_test_store().await;
1104        let mut copy_to = Path::from("test/copied");
1105
1106        // By default no requests should be instrumented/stored
1107        assert!(instrumented.requests.lock().is_empty());
1108        instrumented
1109            .copy_if_not_exists(&path, &copy_to)
1110            .await
1111            .unwrap();
1112        assert!(instrumented.requests.lock().is_empty());
1113
1114        // Use a new destination since the previous one already exists
1115        copy_to = Path::from("test/copied_again");
1116        instrumented.set_instrument_mode(InstrumentedObjectStoreMode::Trace);
1117        assert!(instrumented.requests.lock().is_empty());
1118        instrumented
1119            .copy_if_not_exists(&path, &copy_to)
1120            .await
1121            .unwrap();
1122        assert_eq!(instrumented.requests.lock().len(), 1);
1123
1124        let mut requests = instrumented.take_requests();
1125        assert_eq!(requests.len(), 1);
1126        assert!(instrumented.requests.lock().is_empty());
1127
1128        let request = requests.pop().unwrap();
1129        assert_eq!(request.op, Operation::Copy);
1130        assert_eq!(request.path, path);
1131        assert!(request.duration.is_some());
1132        assert!(request.size.is_none());
1133        assert!(request.range.is_none());
1134        assert_eq!(
1135            request.extra_display.unwrap(),
1136            format!("copy_to: {copy_to}")
1137        );
1138    }
1139
1140    #[tokio::test]
1141    async fn instrumented_store_head() {
1142        let (instrumented, path) = setup_test_store().await;
1143
1144        // By default no requests should be instrumented/stored
1145        assert!(instrumented.requests.lock().is_empty());
1146        let _ = instrumented.head(&path).await.unwrap();
1147        assert!(instrumented.requests.lock().is_empty());
1148
1149        instrumented.set_instrument_mode(InstrumentedObjectStoreMode::Trace);
1150        assert!(instrumented.requests.lock().is_empty());
1151        let _ = instrumented.head(&path).await.unwrap();
1152        assert_eq!(instrumented.requests.lock().len(), 1);
1153
1154        let mut requests = instrumented.take_requests();
1155        assert_eq!(requests.len(), 1);
1156        assert!(instrumented.requests.lock().is_empty());
1157
1158        let request = requests.pop().unwrap();
1159        assert_eq!(request.op, Operation::Head);
1160        assert_eq!(request.path, path);
1161        assert!(request.duration.is_some());
1162        assert!(request.size.is_none());
1163        assert!(request.range.is_none());
1164        assert!(request.extra_display.is_none());
1165    }
1166
1167    #[test]
1168    fn request_details() {
1169        let rd = RequestDetails {
1170            op: Operation::Get,
1171            path: Path::from("test"),
1172            timestamp: chrono::DateTime::from_timestamp(0, 0).unwrap(),
1173            duration: Some(Duration::new(5, 0)),
1174            size: Some(10),
1175            range: Some((..10).into()),
1176            extra_display: Some(String::from("extra info")),
1177        };
1178
1179        assert_eq!(
1180            format!("{rd}"),
1181            "1970-01-01T00:00:00+00:00 operation=Get duration=5.000000s size=10 range: bytes=0-9 path=test extra info"
1182        );
1183    }
1184
1185    #[test]
1186    fn request_summary() {
1187        // Test empty request list
1188        let mut requests = Vec::new();
1189        assert_snapshot!(RequestSummaries::new(&requests), @r"
1190        +-----------+--------+-----+-----+-----+-----+-------+
1191        | Operation | Metric | min | max | avg | sum | count |
1192        +-----------+--------+-----+-----+-----+-----+-------+
1193        +-----------+--------+-----+-----+-----+-----+-------+
1194        ");
1195
1196        requests.push(RequestDetails {
1197            op: Operation::Get,
1198            path: Path::from("test1"),
1199            timestamp: chrono::DateTime::from_timestamp(0, 0).unwrap(),
1200            duration: Some(Duration::from_secs(5)),
1201            size: Some(100),
1202            range: None,
1203            extra_display: None,
1204        });
1205
1206        assert_snapshot!(RequestSummaries::new(&requests), @r"
1207        +-----------+----------+-----------+-----------+-----------+-----------+-------+
1208        | Operation | Metric   | min       | max       | avg       | sum       | count |
1209        +-----------+----------+-----------+-----------+-----------+-----------+-------+
1210        | Get       | duration | 5.000000s | 5.000000s | 5.000000s | 5.000000s | 1     |
1211        | Get       | size     | 100 B     | 100 B     | 100 B     | 100 B     | 1     |
1212        +-----------+----------+-----------+-----------+-----------+-----------+-------+
1213        ");
1214
1215        // Add more Get requests to test aggregation
1216        requests.push(RequestDetails {
1217            op: Operation::Get,
1218            path: Path::from("test2"),
1219            timestamp: chrono::DateTime::from_timestamp(1, 0).unwrap(),
1220            duration: Some(Duration::from_secs(8)),
1221            size: Some(150),
1222            range: None,
1223            extra_display: None,
1224        });
1225        requests.push(RequestDetails {
1226            op: Operation::Get,
1227            path: Path::from("test3"),
1228            timestamp: chrono::DateTime::from_timestamp(2, 0).unwrap(),
1229            duration: Some(Duration::from_secs(2)),
1230            size: Some(50),
1231            range: None,
1232            extra_display: None,
1233        });
1234        assert_snapshot!(RequestSummaries::new(&requests), @r"
1235        +-----------+----------+-----------+-----------+-----------+------------+-------+
1236        | Operation | Metric   | min       | max       | avg       | sum        | count |
1237        +-----------+----------+-----------+-----------+-----------+------------+-------+
1238        | Get       | duration | 2.000000s | 8.000000s | 5.000000s | 15.000000s | 3     |
1239        | Get       | size     | 50 B      | 150 B     | 100 B     | 300 B      | 3     |
1240        +-----------+----------+-----------+-----------+-----------+------------+-------+
1241        ");
1242
1243        // Add Put requests to test grouping
1244        requests.push(RequestDetails {
1245            op: Operation::Put,
1246            path: Path::from("test4"),
1247            timestamp: chrono::DateTime::from_timestamp(3, 0).unwrap(),
1248            duration: Some(Duration::from_millis(200)),
1249            size: Some(75),
1250            range: None,
1251            extra_display: None,
1252        });
1253
1254        assert_snapshot!(RequestSummaries::new(&requests), @r"
1255        +-----------+----------+-----------+-----------+-----------+------------+-------+
1256        | Operation | Metric   | min       | max       | avg       | sum        | count |
1257        +-----------+----------+-----------+-----------+-----------+------------+-------+
1258        | Get       | duration | 2.000000s | 8.000000s | 5.000000s | 15.000000s | 3     |
1259        | Get       | size     | 50 B      | 150 B     | 100 B     | 300 B      | 3     |
1260        | Put       | duration | 0.200000s | 0.200000s | 0.200000s | 0.200000s  | 1     |
1261        | Put       | size     | 75 B      | 75 B      | 75 B      | 75 B       | 1     |
1262        +-----------+----------+-----------+-----------+-----------+------------+-------+
1263        ");
1264    }
1265
1266    #[test]
1267    fn request_summary_only_duration() {
1268        // Test request with only duration (no size)
1269        let only_duration = vec![RequestDetails {
1270            op: Operation::Get,
1271            path: Path::from("test1"),
1272            timestamp: chrono::DateTime::from_timestamp(0, 0).unwrap(),
1273            duration: Some(Duration::from_secs(3)),
1274            size: None,
1275            range: None,
1276            extra_display: None,
1277        }];
1278        assert_snapshot!(RequestSummaries::new(&only_duration), @r"
1279        +-----------+----------+-----------+-----------+-----------+-----------+-------+
1280        | Operation | Metric   | min       | max       | avg       | sum       | count |
1281        +-----------+----------+-----------+-----------+-----------+-----------+-------+
1282        | Get       | duration | 3.000000s | 3.000000s | 3.000000s | 3.000000s | 1     |
1283        | Get       | size     |           |           |           |           | 1     |
1284        +-----------+----------+-----------+-----------+-----------+-----------+-------+
1285        ");
1286    }
1287
1288    #[test]
1289    fn request_summary_only_size() {
1290        // Test request with only size (no duration)
1291        let only_size = vec![RequestDetails {
1292            op: Operation::Get,
1293            path: Path::from("test1"),
1294            timestamp: chrono::DateTime::from_timestamp(0, 0).unwrap(),
1295            duration: None,
1296            size: Some(200),
1297            range: None,
1298            extra_display: None,
1299        }];
1300        assert_snapshot!(RequestSummaries::new(&only_size), @r"
1301        +-----------+----------+-------+-------+-------+-------+-------+
1302        | Operation | Metric   | min   | max   | avg   | sum   | count |
1303        +-----------+----------+-------+-------+-------+-------+-------+
1304        | Get       | duration |       |       |       |       | 1     |
1305        | Get       | size     | 200 B | 200 B | 200 B | 200 B | 1     |
1306        +-----------+----------+-------+-------+-------+-------+-------+
1307        ");
1308    }
1309
1310    #[test]
1311    fn request_summary_neither_duration_or_size() {
1312        // Test request with neither duration nor size
1313        let no_stats = vec![RequestDetails {
1314            op: Operation::Get,
1315            path: Path::from("test1"),
1316            timestamp: chrono::DateTime::from_timestamp(0, 0).unwrap(),
1317            duration: None,
1318            size: None,
1319            range: None,
1320            extra_display: None,
1321        }];
1322        assert_snapshot!(RequestSummaries::new(&no_stats), @r"
1323        +-----------+----------+-----+-----+-----+-----+-------+
1324        | Operation | Metric   | min | max | avg | sum | count |
1325        +-----------+----------+-----+-----+-----+-----+-------+
1326        | Get       | duration |     |     |     |     | 1     |
1327        | Get       | size     |     |     |     |     | 1     |
1328        +-----------+----------+-----+-----+-----+-----+-------+
1329        ");
1330    }
1331}