Skip to main content

datafusion_cli/object_storage/
instrumented.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::{
19    cmp, fmt,
20    ops::AddAssign,
21    str::FromStr,
22    sync::{
23        Arc,
24        atomic::{AtomicU8, Ordering},
25    },
26    time::Duration,
27};
28
29use arrow::array::{ArrayRef, RecordBatch, StringArray};
30use arrow::util::pretty::pretty_format_batches;
31use async_trait::async_trait;
32use chrono::Utc;
33use datafusion::{
34    common::{HashMap, instant::Instant},
35    error::DataFusionError,
36    execution::object_store::{DefaultObjectStoreRegistry, ObjectStoreRegistry},
37};
38use futures::stream::{BoxStream, Stream};
39use futures::{StreamExt, TryStreamExt};
40use object_store::{
41    CopyOptions, GetOptions, GetRange, GetResult, ListResult, MultipartUpload,
42    ObjectMeta, ObjectStore, ObjectStoreExt, PutMultipartOptions, PutOptions, PutPayload,
43    PutResult, Result, path::Path,
44};
45use parking_lot::{Mutex, RwLock};
46use url::Url;
47
48/// A stream wrapper that measures the time until the first response(item or end of stream) is yielded
49struct TimeToFirstItemStream<S> {
50    inner: S,
51    start: Instant,
52    request_index: usize,
53    requests: Arc<Mutex<Vec<RequestDetails>>>,
54    first_item_yielded: bool,
55}
56
57impl<S> TimeToFirstItemStream<S> {
58    fn new(
59        inner: S,
60        start: Instant,
61        request_index: usize,
62        requests: Arc<Mutex<Vec<RequestDetails>>>,
63    ) -> Self {
64        Self {
65            inner,
66            start,
67            request_index,
68            requests,
69            first_item_yielded: false,
70        }
71    }
72}
73
74impl<S> Stream for TimeToFirstItemStream<S>
75where
76    S: Stream<Item = Result<ObjectMeta>> + Unpin,
77{
78    type Item = Result<ObjectMeta>;
79
80    fn poll_next(
81        mut self: std::pin::Pin<&mut Self>,
82        cx: &mut std::task::Context<'_>,
83    ) -> std::task::Poll<Option<Self::Item>> {
84        let poll_result = std::pin::Pin::new(&mut self.inner).poll_next(cx);
85
86        if !self.first_item_yielded && poll_result.is_ready() {
87            self.first_item_yielded = true;
88            let elapsed = self.start.elapsed();
89
90            let mut requests = self.requests.lock();
91            if let Some(request) = requests.get_mut(self.request_index) {
92                request.duration = Some(elapsed);
93            }
94        }
95
96        poll_result
97    }
98}
99
100/// The profiling mode to use for an [`InstrumentedObjectStore`] instance. Collecting profiling
101/// data will have a small negative impact on both CPU and memory usage. Default is `Disabled`
102#[derive(Copy, Clone, Debug, Default, PartialEq, Eq)]
103pub enum InstrumentedObjectStoreMode {
104    /// Disable collection of profiling data
105    #[default]
106    Disabled,
107    /// Enable collection of profiling data and output a summary
108    Summary,
109    /// Enable collection of profiling data and output a summary and all details
110    Trace,
111}
112
113impl fmt::Display for InstrumentedObjectStoreMode {
114    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
115        write!(f, "{self:?}")
116    }
117}
118
119impl FromStr for InstrumentedObjectStoreMode {
120    type Err = DataFusionError;
121
122    fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
123        match s.to_lowercase().as_str() {
124            "disabled" => Ok(Self::Disabled),
125            "summary" => Ok(Self::Summary),
126            "trace" => Ok(Self::Trace),
127            _ => Err(DataFusionError::Execution(format!("Unrecognized mode {s}"))),
128        }
129    }
130}
131
132impl From<u8> for InstrumentedObjectStoreMode {
133    fn from(value: u8) -> Self {
134        match value {
135            1 => InstrumentedObjectStoreMode::Summary,
136            2 => InstrumentedObjectStoreMode::Trace,
137            _ => InstrumentedObjectStoreMode::Disabled,
138        }
139    }
140}
141
142/// Wrapped [`ObjectStore`] instances that record information for reporting on the usage of the
143/// inner [`ObjectStore`]
144#[derive(Debug)]
145pub struct InstrumentedObjectStore {
146    inner: Arc<dyn ObjectStore>,
147    instrument_mode: AtomicU8,
148    requests: Arc<Mutex<Vec<RequestDetails>>>,
149}
150
151impl InstrumentedObjectStore {
152    /// Returns a new [`InstrumentedObjectStore`] that wraps the provided [`ObjectStore`]
153    fn new(object_store: Arc<dyn ObjectStore>, instrument_mode: AtomicU8) -> Self {
154        Self {
155            inner: object_store,
156            instrument_mode,
157            requests: Arc::new(Mutex::new(Vec::new())),
158        }
159    }
160
161    fn set_instrument_mode(&self, mode: InstrumentedObjectStoreMode) {
162        self.instrument_mode.store(mode as u8, Ordering::Relaxed)
163    }
164
165    /// Returns all [`RequestDetails`] accumulated in this [`InstrumentedObjectStore`] and clears
166    /// the stored requests
167    pub fn take_requests(&self) -> Vec<RequestDetails> {
168        let mut req = self.requests.lock();
169
170        req.drain(..).collect()
171    }
172
173    fn enabled(&self) -> bool {
174        self.instrument_mode.load(Ordering::Relaxed)
175            != InstrumentedObjectStoreMode::Disabled as u8
176    }
177
178    async fn instrumented_put_opts(
179        &self,
180        location: &Path,
181        payload: PutPayload,
182        opts: PutOptions,
183    ) -> Result<PutResult> {
184        let timestamp = Utc::now();
185        let start = Instant::now();
186        let size = payload.content_length();
187        let ret = self.inner.put_opts(location, payload, opts).await?;
188        let elapsed = start.elapsed();
189
190        self.requests.lock().push(RequestDetails {
191            op: Operation::Put,
192            path: location.clone(),
193            timestamp,
194            duration: Some(elapsed),
195            size: Some(size),
196            range: None,
197            extra_display: None,
198        });
199
200        Ok(ret)
201    }
202
203    async fn instrumented_put_multipart(
204        &self,
205        location: &Path,
206        opts: PutMultipartOptions,
207    ) -> Result<Box<dyn MultipartUpload>> {
208        let timestamp = Utc::now();
209        let start = Instant::now();
210        let ret = self.inner.put_multipart_opts(location, opts).await?;
211        let elapsed = start.elapsed();
212
213        self.requests.lock().push(RequestDetails {
214            op: Operation::Put,
215            path: location.clone(),
216            timestamp,
217            duration: Some(elapsed),
218            size: None,
219            range: None,
220            extra_display: None,
221        });
222
223        Ok(ret)
224    }
225
226    async fn instrumented_get_opts(
227        &self,
228        location: &Path,
229        options: GetOptions,
230    ) -> Result<GetResult> {
231        let timestamp = Utc::now();
232        let range = options.range.clone();
233
234        let head = options.head;
235        let start = Instant::now();
236        let ret = self.inner.get_opts(location, options).await?;
237        let elapsed = start.elapsed();
238
239        let (op, size) = if head {
240            (Operation::Head, None)
241        } else {
242            (
243                Operation::Get,
244                Some((ret.range.end - ret.range.start) as usize),
245            )
246        };
247
248        self.requests.lock().push(RequestDetails {
249            op,
250            path: location.clone(),
251            timestamp,
252            duration: Some(elapsed),
253            size,
254            range,
255            extra_display: None,
256        });
257
258        Ok(ret)
259    }
260
261    fn instrumented_delete_stream(
262        &self,
263        locations: BoxStream<'static, Result<Path>>,
264    ) -> BoxStream<'static, Result<Path>> {
265        let requests_captured = Arc::clone(&self.requests);
266
267        let timestamp = Utc::now();
268        let start = Instant::now();
269        self.inner
270            .delete_stream(locations)
271            .and_then(move |location| {
272                let elapsed = start.elapsed();
273                requests_captured.lock().push(RequestDetails {
274                    op: Operation::Delete,
275                    path: location.clone(),
276                    timestamp,
277                    duration: Some(elapsed),
278                    size: None,
279                    range: None,
280                    extra_display: None,
281                });
282                futures::future::ok(location)
283            })
284            .boxed()
285    }
286
287    fn instrumented_list(
288        &self,
289        prefix: Option<&Path>,
290    ) -> BoxStream<'static, Result<ObjectMeta>> {
291        let timestamp = Utc::now();
292        let start = Instant::now();
293        let inner_stream = self.inner.list(prefix);
294
295        let request_index = {
296            let mut requests = self.requests.lock();
297            requests.push(RequestDetails {
298                op: Operation::List,
299                path: prefix.cloned().unwrap_or_else(|| Path::from("")),
300                timestamp,
301                duration: None,
302                size: None,
303                range: None,
304                extra_display: None,
305            });
306            requests.len() - 1
307        };
308
309        let wrapped_stream = TimeToFirstItemStream::new(
310            inner_stream,
311            start,
312            request_index,
313            Arc::clone(&self.requests),
314        );
315
316        Box::pin(wrapped_stream)
317    }
318
319    async fn instrumented_list_with_delimiter(
320        &self,
321        prefix: Option<&Path>,
322    ) -> Result<ListResult> {
323        let timestamp = Utc::now();
324        let start = Instant::now();
325        let ret = self.inner.list_with_delimiter(prefix).await?;
326        let elapsed = start.elapsed();
327
328        self.requests.lock().push(RequestDetails {
329            op: Operation::List,
330            path: prefix.cloned().unwrap_or_else(|| Path::from("")),
331            timestamp,
332            duration: Some(elapsed),
333            size: None,
334            range: None,
335            extra_display: None,
336        });
337
338        Ok(ret)
339    }
340
341    async fn instrumented_copy(&self, from: &Path, to: &Path) -> Result<()> {
342        let timestamp = Utc::now();
343        let start = Instant::now();
344        self.inner.copy(from, to).await?;
345        let elapsed = start.elapsed();
346
347        self.requests.lock().push(RequestDetails {
348            op: Operation::Copy,
349            path: from.clone(),
350            timestamp,
351            duration: Some(elapsed),
352            size: None,
353            range: None,
354            extra_display: Some(format!("copy_to: {to}")),
355        });
356
357        Ok(())
358    }
359
360    async fn instrumented_copy_if_not_exists(
361        &self,
362        from: &Path,
363        to: &Path,
364    ) -> Result<()> {
365        let timestamp = Utc::now();
366        let start = Instant::now();
367        self.inner.copy_if_not_exists(from, to).await?;
368        let elapsed = start.elapsed();
369
370        self.requests.lock().push(RequestDetails {
371            op: Operation::Copy,
372            path: from.clone(),
373            timestamp,
374            duration: Some(elapsed),
375            size: None,
376            range: None,
377            extra_display: Some(format!("copy_to: {to}")),
378        });
379
380        Ok(())
381    }
382}
383
384impl fmt::Display for InstrumentedObjectStore {
385    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
386        let mode: InstrumentedObjectStoreMode =
387            self.instrument_mode.load(Ordering::Relaxed).into();
388        write!(
389            f,
390            "Instrumented Object Store: instrument_mode: {mode}, inner: {}",
391            self.inner
392        )
393    }
394}
395
396#[async_trait]
397impl ObjectStore for InstrumentedObjectStore {
398    async fn put_opts(
399        &self,
400        location: &Path,
401        payload: PutPayload,
402        opts: PutOptions,
403    ) -> Result<PutResult> {
404        if self.enabled() {
405            return self.instrumented_put_opts(location, payload, opts).await;
406        }
407
408        self.inner.put_opts(location, payload, opts).await
409    }
410
411    async fn put_multipart_opts(
412        &self,
413        location: &Path,
414        opts: PutMultipartOptions,
415    ) -> Result<Box<dyn MultipartUpload>> {
416        if self.enabled() {
417            return self.instrumented_put_multipart(location, opts).await;
418        }
419
420        self.inner.put_multipart_opts(location, opts).await
421    }
422
423    async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
424        if self.enabled() {
425            return self.instrumented_get_opts(location, options).await;
426        }
427
428        self.inner.get_opts(location, options).await
429    }
430
431    fn delete_stream(
432        &self,
433        locations: BoxStream<'static, Result<Path>>,
434    ) -> BoxStream<'static, Result<Path>> {
435        if self.enabled() {
436            return self.instrumented_delete_stream(locations);
437        }
438
439        self.inner.delete_stream(locations)
440    }
441
442    fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result<ObjectMeta>> {
443        if self.enabled() {
444            return self.instrumented_list(prefix);
445        }
446
447        self.inner.list(prefix)
448    }
449
450    async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
451        if self.enabled() {
452            return self.instrumented_list_with_delimiter(prefix).await;
453        }
454
455        self.inner.list_with_delimiter(prefix).await
456    }
457
458    async fn copy_opts(
459        &self,
460        from: &Path,
461        to: &Path,
462        options: CopyOptions,
463    ) -> Result<()> {
464        if self.enabled() {
465            return match options.mode {
466                object_store::CopyMode::Create => {
467                    self.instrumented_copy_if_not_exists(from, to).await
468                }
469                object_store::CopyMode::Overwrite => {
470                    self.instrumented_copy(from, to).await
471                }
472            };
473        }
474
475        self.inner.copy_opts(from, to, options).await
476    }
477}
478
479/// Object store operation types tracked by [`InstrumentedObjectStore`]
480#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash, PartialOrd, Ord)]
481pub enum Operation {
482    Copy,
483    Delete,
484    Get,
485    Head,
486    List,
487    Put,
488}
489
490impl fmt::Display for Operation {
491    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
492        write!(f, "{self:?}")
493    }
494}
495
496/// Holds profiling details about individual requests made through an [`InstrumentedObjectStore`]
497#[derive(Debug)]
498pub struct RequestDetails {
499    op: Operation,
500    path: Path,
501    timestamp: chrono::DateTime<Utc>,
502    duration: Option<Duration>,
503    size: Option<usize>,
504    range: Option<GetRange>,
505    extra_display: Option<String>,
506}
507
508impl fmt::Display for RequestDetails {
509    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
510        let mut output_parts = vec![format!(
511            "{} operation={:?}",
512            self.timestamp.to_rfc3339(),
513            self.op
514        )];
515
516        if let Some(d) = self.duration {
517            output_parts.push(format!("duration={:.6}s", d.as_secs_f32()));
518        }
519        if let Some(s) = self.size {
520            output_parts.push(format!("size={s}"));
521        }
522        if let Some(r) = &self.range {
523            output_parts.push(format!("range: {r}"));
524        }
525        output_parts.push(format!("path={}", self.path));
526
527        if let Some(ed) = &self.extra_display {
528            output_parts.push(ed.clone());
529        }
530
531        write!(f, "{}", output_parts.join(" "))
532    }
533}
534
535/// Summary statistics for all requests recorded in an [`InstrumentedObjectStore`]
536#[derive(Default)]
537pub struct RequestSummaries {
538    summaries: Vec<RequestSummary>,
539}
540
541/// Display the summary as a table
542impl fmt::Display for RequestSummaries {
543    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
544        // Don't expect an error, but avoid panicking if it happens
545        match pretty_format_batches(&[self.to_batch()]) {
546            Err(e) => {
547                write!(f, "Error formatting summary: {e}")
548            }
549            Ok(displayable) => {
550                write!(f, "{displayable}")
551            }
552        }
553    }
554}
555
556impl RequestSummaries {
557    /// Summarizes input [`RequestDetails`]
558    pub fn new(requests: &[RequestDetails]) -> Self {
559        let mut summaries: HashMap<Operation, RequestSummary> = HashMap::new();
560        for rd in requests {
561            match summaries.get_mut(&rd.op) {
562                Some(rs) => rs.push(rd),
563                None => {
564                    let mut rs = RequestSummary::new(rd.op);
565                    rs.push(rd);
566                    summaries.insert(rd.op, rs);
567                }
568            }
569        }
570        // Convert to a Vec with consistent ordering
571        let mut summaries: Vec<RequestSummary> = summaries.into_values().collect();
572        summaries.sort_by_key(|s| s.operation);
573        Self { summaries }
574    }
575
576    /// Convert the summaries into a `RecordBatch` for display
577    ///
578    /// Results in a table like:
579    /// ```text
580    /// +-----------+----------+-----------+-----------+-----------+-----------+-----------+
581    /// | Operation | Metric   | min       | max       | avg       | sum       | count     |
582    /// +-----------+----------+-----------+-----------+-----------+-----------+-----------+
583    /// | Get       | duration | 5.000000s | 5.000000s | 5.000000s |           | 1         |
584    /// | Get       | size     | 100 B     | 100 B     | 100 B     | 100 B     | 1         |
585    /// +-----------+----------+-----------+-----------+-----------+-----------+-----------+
586    /// ```
587    pub fn to_batch(&self) -> RecordBatch {
588        let operations: StringArray = self
589            .iter()
590            .flat_map(|s| std::iter::repeat_n(Some(s.operation.to_string()), 2))
591            .collect();
592        let metrics: StringArray = self
593            .iter()
594            .flat_map(|_s| [Some("duration"), Some("size")])
595            .collect();
596        let mins: StringArray = self
597            .stats_iter()
598            .flat_map(|(duration_stats, size_stats)| {
599                let dur_min =
600                    duration_stats.map(|d| format!("{:.6}s", d.min.as_secs_f32()));
601                let size_min = size_stats.map(|s| format!("{} B", s.min));
602                [dur_min, size_min]
603            })
604            .collect();
605        let maxs: StringArray = self
606            .stats_iter()
607            .flat_map(|(duration_stats, size_stats)| {
608                let dur_max =
609                    duration_stats.map(|d| format!("{:.6}s", d.max.as_secs_f32()));
610                let size_max = size_stats.map(|s| format!("{} B", s.max));
611                [dur_max, size_max]
612            })
613            .collect();
614        let avgs: StringArray = self
615            .iter()
616            .flat_map(|s| {
617                let count = s.count as f32;
618                let duration_stats = s.duration_stats.as_ref();
619                let size_stats = s.size_stats.as_ref();
620                let dur_avg = duration_stats.map(|d| {
621                    let avg = d.sum.as_secs_f32() / count;
622                    format!("{avg:.6}s")
623                });
624                let size_avg = size_stats.map(|s| {
625                    let avg = s.sum as f32 / count;
626                    format!("{avg} B")
627                });
628                [dur_avg, size_avg]
629            })
630            .collect();
631        let sums: StringArray = self
632            .stats_iter()
633            .flat_map(|(duration_stats, size_stats)| {
634                // Omit a sum stat for duration in the initial
635                // implementation because it can be a bit misleading (at least
636                // at first glance). For example, particularly large queries the
637                // sum of the durations was often larger than the total time of
638                // the query itself, can be confusing without additional
639                // explanation (e.g. that the sum is of individual requests,
640                // which may be concurrent).
641                let dur_sum =
642                    duration_stats.map(|d| format!("{:.6}s", d.sum.as_secs_f32()));
643                let size_sum = size_stats.map(|s| format!("{} B", s.sum));
644                [dur_sum, size_sum]
645            })
646            .collect();
647        let counts: StringArray = self
648            .iter()
649            .flat_map(|s| {
650                let count = s.count.to_string();
651                [Some(count.clone()), Some(count)]
652            })
653            .collect();
654
655        RecordBatch::try_from_iter(vec![
656            ("Operation", Arc::new(operations) as ArrayRef),
657            ("Metric", Arc::new(metrics) as ArrayRef),
658            ("min", Arc::new(mins) as ArrayRef),
659            ("max", Arc::new(maxs) as ArrayRef),
660            ("avg", Arc::new(avgs) as ArrayRef),
661            ("sum", Arc::new(sums) as ArrayRef),
662            ("count", Arc::new(counts) as ArrayRef),
663        ])
664        .expect("Created the batch correctly")
665    }
666
667    /// Return an iterator over the summaries
668    fn iter(&self) -> impl Iterator<Item = &RequestSummary> {
669        self.summaries.iter()
670    }
671
672    /// Return an iterator over (duration_stats, size_stats) tuples
673    /// for each summary
674    fn stats_iter(
675        &self,
676    ) -> impl Iterator<Item = (Option<&Stats<Duration>>, Option<&Stats<usize>>)> {
677        self.summaries
678            .iter()
679            .map(|s| (s.duration_stats.as_ref(), s.size_stats.as_ref()))
680    }
681}
682
683/// Summary statistics for a particular type of [`Operation`] (e.g. `GET` or `PUT`)
684/// in an [`InstrumentedObjectStore`]'s [`RequestDetails`]
685pub struct RequestSummary {
686    operation: Operation,
687    count: usize,
688    duration_stats: Option<Stats<Duration>>,
689    size_stats: Option<Stats<usize>>,
690}
691
692impl RequestSummary {
693    fn new(operation: Operation) -> Self {
694        Self {
695            operation,
696            count: 0,
697            duration_stats: None,
698            size_stats: None,
699        }
700    }
701    fn push(&mut self, request: &RequestDetails) {
702        self.count += 1;
703        if let Some(dur) = request.duration {
704            self.duration_stats.get_or_insert_default().push(dur)
705        }
706        if let Some(size) = request.size {
707            self.size_stats.get_or_insert_default().push(size)
708        }
709    }
710}
711
712struct Stats<T: Copy + Ord + AddAssign<T>> {
713    min: T,
714    max: T,
715    sum: T,
716}
717
718impl<T: Copy + Ord + AddAssign<T>> Stats<T> {
719    fn push(&mut self, val: T) {
720        self.min = cmp::min(val, self.min);
721        self.max = cmp::max(val, self.max);
722        self.sum += val;
723    }
724}
725
726impl Default for Stats<Duration> {
727    fn default() -> Self {
728        Self {
729            min: Duration::MAX,
730            max: Duration::ZERO,
731            sum: Duration::ZERO,
732        }
733    }
734}
735
736impl Default for Stats<usize> {
737    fn default() -> Self {
738        Self {
739            min: usize::MAX,
740            max: usize::MIN,
741            sum: 0,
742        }
743    }
744}
745
746/// Provides access to [`InstrumentedObjectStore`] instances that record requests for reporting
747#[derive(Debug)]
748pub struct InstrumentedObjectStoreRegistry {
749    inner: Arc<dyn ObjectStoreRegistry>,
750    instrument_mode: AtomicU8,
751    stores: RwLock<Vec<Arc<InstrumentedObjectStore>>>,
752}
753
754impl Default for InstrumentedObjectStoreRegistry {
755    fn default() -> Self {
756        Self::new()
757    }
758}
759
760impl InstrumentedObjectStoreRegistry {
761    /// Returns a new [`InstrumentedObjectStoreRegistry`] that wraps the provided
762    /// [`ObjectStoreRegistry`]
763    pub fn new() -> Self {
764        Self {
765            inner: Arc::new(DefaultObjectStoreRegistry::new()),
766            instrument_mode: AtomicU8::new(InstrumentedObjectStoreMode::default() as u8),
767            stores: RwLock::new(Vec::new()),
768        }
769    }
770
771    pub fn with_profile_mode(self, mode: InstrumentedObjectStoreMode) -> Self {
772        self.instrument_mode.store(mode as u8, Ordering::Relaxed);
773        self
774    }
775
776    /// Provides access to all of the [`InstrumentedObjectStore`]s managed by this
777    /// [`InstrumentedObjectStoreRegistry`]
778    pub fn stores(&self) -> Vec<Arc<InstrumentedObjectStore>> {
779        self.stores.read().clone()
780    }
781
782    /// Returns the current [`InstrumentedObjectStoreMode`] for this
783    /// [`InstrumentedObjectStoreRegistry`]
784    pub fn instrument_mode(&self) -> InstrumentedObjectStoreMode {
785        self.instrument_mode.load(Ordering::Relaxed).into()
786    }
787
788    /// Sets the [`InstrumentedObjectStoreMode`] for this [`InstrumentedObjectStoreRegistry`]
789    pub fn set_instrument_mode(&self, mode: InstrumentedObjectStoreMode) {
790        self.instrument_mode.store(mode as u8, Ordering::Relaxed);
791        for s in self.stores.read().iter() {
792            s.set_instrument_mode(mode)
793        }
794    }
795}
796
797impl ObjectStoreRegistry for InstrumentedObjectStoreRegistry {
798    fn register_store(
799        &self,
800        url: &Url,
801        store: Arc<dyn ObjectStore>,
802    ) -> Option<Arc<dyn ObjectStore>> {
803        let mode = self.instrument_mode.load(Ordering::Relaxed);
804        let instrumented =
805            Arc::new(InstrumentedObjectStore::new(store, AtomicU8::new(mode)));
806        self.stores.write().push(Arc::clone(&instrumented));
807        self.inner.register_store(url, instrumented)
808    }
809
810    fn deregister_store(
811        &self,
812        url: &Url,
813    ) -> datafusion::common::Result<Arc<dyn ObjectStore>> {
814        self.inner.deregister_store(url)
815    }
816
817    fn get_store(&self, url: &Url) -> datafusion::common::Result<Arc<dyn ObjectStore>> {
818        self.inner.get_store(url)
819    }
820}
821
822#[cfg(test)]
823mod tests {
824    use futures::StreamExt;
825    use object_store::WriteMultipart;
826
827    use super::*;
828    use insta::assert_snapshot;
829
830    #[test]
831    fn instrumented_mode() {
832        assert!(matches!(
833            InstrumentedObjectStoreMode::default(),
834            InstrumentedObjectStoreMode::Disabled
835        ));
836
837        assert!(matches!(
838            "dIsABleD".parse().unwrap(),
839            InstrumentedObjectStoreMode::Disabled
840        ));
841        assert!(matches!(
842            "SUmMaRy".parse().unwrap(),
843            InstrumentedObjectStoreMode::Summary
844        ));
845        assert!(matches!(
846            "TRaCe".parse().unwrap(),
847            InstrumentedObjectStoreMode::Trace
848        ));
849        assert!(
850            "does_not_exist"
851                .parse::<InstrumentedObjectStoreMode>()
852                .is_err()
853        );
854
855        assert!(matches!(0.into(), InstrumentedObjectStoreMode::Disabled));
856        assert!(matches!(1.into(), InstrumentedObjectStoreMode::Summary));
857        assert!(matches!(2.into(), InstrumentedObjectStoreMode::Trace));
858        assert!(matches!(3.into(), InstrumentedObjectStoreMode::Disabled));
859    }
860
861    #[test]
862    fn instrumented_registry() {
863        let mut reg = InstrumentedObjectStoreRegistry::new();
864        assert!(reg.stores().is_empty());
865        assert_eq!(
866            reg.instrument_mode(),
867            InstrumentedObjectStoreMode::default()
868        );
869
870        reg = reg.with_profile_mode(InstrumentedObjectStoreMode::Trace);
871        assert_eq!(reg.instrument_mode(), InstrumentedObjectStoreMode::Trace);
872
873        let store = object_store::memory::InMemory::new();
874        let url = "mem://test".parse().unwrap();
875        let registered = reg.register_store(&url, Arc::new(store));
876        assert!(registered.is_none());
877
878        let fetched = reg.get_store(&url);
879        assert!(fetched.is_ok());
880        assert_eq!(reg.stores().len(), 1);
881    }
882
883    // Returns an `InstrumentedObjectStore` with some data loaded for testing and the path to
884    // access the data
885    async fn setup_test_store() -> (InstrumentedObjectStore, Path) {
886        let store = Arc::new(object_store::memory::InMemory::new());
887        let mode = AtomicU8::new(InstrumentedObjectStoreMode::default() as u8);
888        let instrumented = InstrumentedObjectStore::new(store, mode);
889
890        // Load the test store with some data we can read
891        let path = Path::from("test/data");
892        let payload = PutPayload::from_static(b"test_data");
893        instrumented.put(&path, payload).await.unwrap();
894
895        (instrumented, path)
896    }
897
898    #[tokio::test]
899    async fn instrumented_store_get() {
900        let (instrumented, path) = setup_test_store().await;
901
902        // By default no requests should be instrumented/stored
903        assert!(instrumented.requests.lock().is_empty());
904        let _ = instrumented.get(&path).await.unwrap();
905        assert!(instrumented.requests.lock().is_empty());
906
907        instrumented.set_instrument_mode(InstrumentedObjectStoreMode::Trace);
908        assert!(instrumented.requests.lock().is_empty());
909        let _ = instrumented.get(&path).await.unwrap();
910        assert_eq!(instrumented.requests.lock().len(), 1);
911
912        let mut requests = instrumented.take_requests();
913        assert_eq!(requests.len(), 1);
914        assert!(instrumented.requests.lock().is_empty());
915
916        let request = requests.pop().unwrap();
917        assert_eq!(request.op, Operation::Get);
918        assert_eq!(request.path, path);
919        assert!(request.duration.is_some());
920        assert_eq!(request.size, Some(9));
921        assert_eq!(request.range, None);
922        assert!(request.extra_display.is_none());
923    }
924
925    #[tokio::test]
926    async fn instrumented_store_delete() {
927        let (instrumented, path) = setup_test_store().await;
928
929        // By default no requests should be instrumented/stored
930        assert!(instrumented.requests.lock().is_empty());
931        instrumented.delete(&path).await.unwrap();
932        assert!(instrumented.requests.lock().is_empty());
933
934        // We need a new store so we have data to delete again
935        let (instrumented, path) = setup_test_store().await;
936        instrumented.set_instrument_mode(InstrumentedObjectStoreMode::Trace);
937        assert!(instrumented.requests.lock().is_empty());
938        instrumented.delete(&path).await.unwrap();
939        assert_eq!(instrumented.requests.lock().len(), 1);
940
941        let mut requests = instrumented.take_requests();
942        assert_eq!(requests.len(), 1);
943        assert!(instrumented.requests.lock().is_empty());
944
945        let request = requests.pop().unwrap();
946        assert_eq!(request.op, Operation::Delete);
947        assert_eq!(request.path, path);
948        assert!(request.duration.is_some());
949        assert!(request.size.is_none());
950        assert!(request.range.is_none());
951        assert!(request.extra_display.is_none());
952    }
953
954    #[tokio::test]
955    async fn instrumented_store_list() {
956        let (instrumented, path) = setup_test_store().await;
957
958        // By default no requests should be instrumented/stored
959        assert!(instrumented.requests.lock().is_empty());
960        let _ = instrumented.list(Some(&path));
961        assert!(instrumented.requests.lock().is_empty());
962
963        instrumented.set_instrument_mode(InstrumentedObjectStoreMode::Trace);
964        assert!(instrumented.requests.lock().is_empty());
965        let mut stream = instrumented.list(Some(&path));
966        // Consume at least one item from the stream to trigger duration measurement
967        let _ = stream.next().await;
968        assert_eq!(instrumented.requests.lock().len(), 1);
969
970        let request = instrumented.take_requests().pop().unwrap();
971        assert_eq!(request.op, Operation::List);
972        assert_eq!(request.path, path);
973        assert!(request.duration.is_some());
974        assert!(request.size.is_none());
975        assert!(request.range.is_none());
976        assert!(request.extra_display.is_none());
977    }
978
979    #[tokio::test]
980    async fn instrumented_store_list_with_delimiter() {
981        let (instrumented, path) = setup_test_store().await;
982
983        // By default no requests should be instrumented/stored
984        assert!(instrumented.requests.lock().is_empty());
985        let _ = instrumented.list_with_delimiter(Some(&path)).await.unwrap();
986        assert!(instrumented.requests.lock().is_empty());
987
988        instrumented.set_instrument_mode(InstrumentedObjectStoreMode::Trace);
989        assert!(instrumented.requests.lock().is_empty());
990        let _ = instrumented.list_with_delimiter(Some(&path)).await.unwrap();
991        assert_eq!(instrumented.requests.lock().len(), 1);
992
993        let request = instrumented.take_requests().pop().unwrap();
994        assert_eq!(request.op, Operation::List);
995        assert_eq!(request.path, path);
996        assert!(request.duration.is_some());
997        assert!(request.size.is_none());
998        assert!(request.range.is_none());
999        assert!(request.extra_display.is_none());
1000    }
1001
1002    #[tokio::test]
1003    async fn instrumented_store_put_opts() {
1004        // The `setup_test_store()` method comes with data already `put` into it, so we'll setup
1005        // manually for this test
1006        let store = Arc::new(object_store::memory::InMemory::new());
1007        let mode = AtomicU8::new(InstrumentedObjectStoreMode::default() as u8);
1008        let instrumented = InstrumentedObjectStore::new(store, mode);
1009
1010        let path = Path::from("test/data");
1011        let payload = PutPayload::from_static(b"test_data");
1012        let size = payload.content_length();
1013
1014        // By default no requests should be instrumented/stored
1015        assert!(instrumented.requests.lock().is_empty());
1016        instrumented.put(&path, payload.clone()).await.unwrap();
1017        assert!(instrumented.requests.lock().is_empty());
1018
1019        instrumented.set_instrument_mode(InstrumentedObjectStoreMode::Trace);
1020        assert!(instrumented.requests.lock().is_empty());
1021        instrumented.put(&path, payload).await.unwrap();
1022        assert_eq!(instrumented.requests.lock().len(), 1);
1023
1024        let request = instrumented.take_requests().pop().unwrap();
1025        assert_eq!(request.op, Operation::Put);
1026        assert_eq!(request.path, path);
1027        assert!(request.duration.is_some());
1028        assert_eq!(request.size.unwrap(), size);
1029        assert!(request.range.is_none());
1030        assert!(request.extra_display.is_none());
1031    }
1032
1033    #[tokio::test]
1034    async fn instrumented_store_put_multipart() {
1035        // The `setup_test_store()` method comes with data already `put` into it, so we'll setup
1036        // manually for this test
1037        let store = Arc::new(object_store::memory::InMemory::new());
1038        let mode = AtomicU8::new(InstrumentedObjectStoreMode::default() as u8);
1039        let instrumented = InstrumentedObjectStore::new(store, mode);
1040
1041        let path = Path::from("test/data");
1042
1043        // By default no requests should be instrumented/stored
1044        assert!(instrumented.requests.lock().is_empty());
1045        let mp = instrumented.put_multipart(&path).await.unwrap();
1046        let mut write = WriteMultipart::new(mp);
1047        write.write(b"test_data");
1048        write.finish().await.unwrap();
1049        assert!(instrumented.requests.lock().is_empty());
1050
1051        instrumented.set_instrument_mode(InstrumentedObjectStoreMode::Trace);
1052        assert!(instrumented.requests.lock().is_empty());
1053        let mp = instrumented.put_multipart(&path).await.unwrap();
1054        let mut write = WriteMultipart::new(mp);
1055        write.write(b"test_data");
1056        write.finish().await.unwrap();
1057        assert_eq!(instrumented.requests.lock().len(), 1);
1058
1059        let request = instrumented.take_requests().pop().unwrap();
1060        assert_eq!(request.op, Operation::Put);
1061        assert_eq!(request.path, path);
1062        assert!(request.duration.is_some());
1063        assert!(request.size.is_none());
1064        assert!(request.range.is_none());
1065        assert!(request.extra_display.is_none());
1066    }
1067
1068    #[tokio::test]
1069    async fn instrumented_store_copy() {
1070        let (instrumented, path) = setup_test_store().await;
1071        let copy_to = Path::from("test/copied");
1072
1073        // By default no requests should be instrumented/stored
1074        assert!(instrumented.requests.lock().is_empty());
1075        instrumented.copy(&path, &copy_to).await.unwrap();
1076        assert!(instrumented.requests.lock().is_empty());
1077
1078        instrumented.set_instrument_mode(InstrumentedObjectStoreMode::Trace);
1079        assert!(instrumented.requests.lock().is_empty());
1080        instrumented.copy(&path, &copy_to).await.unwrap();
1081        assert_eq!(instrumented.requests.lock().len(), 1);
1082
1083        let mut requests = instrumented.take_requests();
1084        assert_eq!(requests.len(), 1);
1085        assert!(instrumented.requests.lock().is_empty());
1086
1087        let request = requests.pop().unwrap();
1088        assert_eq!(request.op, Operation::Copy);
1089        assert_eq!(request.path, path);
1090        assert!(request.duration.is_some());
1091        assert!(request.size.is_none());
1092        assert!(request.range.is_none());
1093        assert_eq!(
1094            request.extra_display.unwrap(),
1095            format!("copy_to: {copy_to}")
1096        );
1097    }
1098
1099    #[tokio::test]
1100    async fn instrumented_store_copy_if_not_exists() {
1101        let (instrumented, path) = setup_test_store().await;
1102        let mut copy_to = Path::from("test/copied");
1103
1104        // By default no requests should be instrumented/stored
1105        assert!(instrumented.requests.lock().is_empty());
1106        instrumented
1107            .copy_if_not_exists(&path, &copy_to)
1108            .await
1109            .unwrap();
1110        assert!(instrumented.requests.lock().is_empty());
1111
1112        // Use a new destination since the previous one already exists
1113        copy_to = Path::from("test/copied_again");
1114        instrumented.set_instrument_mode(InstrumentedObjectStoreMode::Trace);
1115        assert!(instrumented.requests.lock().is_empty());
1116        instrumented
1117            .copy_if_not_exists(&path, &copy_to)
1118            .await
1119            .unwrap();
1120        assert_eq!(instrumented.requests.lock().len(), 1);
1121
1122        let mut requests = instrumented.take_requests();
1123        assert_eq!(requests.len(), 1);
1124        assert!(instrumented.requests.lock().is_empty());
1125
1126        let request = requests.pop().unwrap();
1127        assert_eq!(request.op, Operation::Copy);
1128        assert_eq!(request.path, path);
1129        assert!(request.duration.is_some());
1130        assert!(request.size.is_none());
1131        assert!(request.range.is_none());
1132        assert_eq!(
1133            request.extra_display.unwrap(),
1134            format!("copy_to: {copy_to}")
1135        );
1136    }
1137
1138    #[tokio::test]
1139    async fn instrumented_store_head() {
1140        let (instrumented, path) = setup_test_store().await;
1141
1142        // By default no requests should be instrumented/stored
1143        assert!(instrumented.requests.lock().is_empty());
1144        let _ = instrumented.head(&path).await.unwrap();
1145        assert!(instrumented.requests.lock().is_empty());
1146
1147        instrumented.set_instrument_mode(InstrumentedObjectStoreMode::Trace);
1148        assert!(instrumented.requests.lock().is_empty());
1149        let _ = instrumented.head(&path).await.unwrap();
1150        assert_eq!(instrumented.requests.lock().len(), 1);
1151
1152        let mut requests = instrumented.take_requests();
1153        assert_eq!(requests.len(), 1);
1154        assert!(instrumented.requests.lock().is_empty());
1155
1156        let request = requests.pop().unwrap();
1157        assert_eq!(request.op, Operation::Head);
1158        assert_eq!(request.path, path);
1159        assert!(request.duration.is_some());
1160        assert!(request.size.is_none());
1161        assert!(request.range.is_none());
1162        assert!(request.extra_display.is_none());
1163    }
1164
1165    #[test]
1166    fn request_details() {
1167        let rd = RequestDetails {
1168            op: Operation::Get,
1169            path: Path::from("test"),
1170            timestamp: chrono::DateTime::from_timestamp(0, 0).unwrap(),
1171            duration: Some(Duration::new(5, 0)),
1172            size: Some(10),
1173            range: Some((..10).into()),
1174            extra_display: Some(String::from("extra info")),
1175        };
1176
1177        assert_eq!(
1178            format!("{rd}"),
1179            "1970-01-01T00:00:00+00:00 operation=Get duration=5.000000s size=10 range: bytes=0-9 path=test extra info"
1180        );
1181    }
1182
1183    #[test]
1184    fn request_summary() {
1185        // Test empty request list
1186        let mut requests = Vec::new();
1187        assert_snapshot!(RequestSummaries::new(&requests), @r"
1188        +-----------+--------+-----+-----+-----+-----+-------+
1189        | Operation | Metric | min | max | avg | sum | count |
1190        +-----------+--------+-----+-----+-----+-----+-------+
1191        +-----------+--------+-----+-----+-----+-----+-------+
1192        ");
1193
1194        requests.push(RequestDetails {
1195            op: Operation::Get,
1196            path: Path::from("test1"),
1197            timestamp: chrono::DateTime::from_timestamp(0, 0).unwrap(),
1198            duration: Some(Duration::from_secs(5)),
1199            size: Some(100),
1200            range: None,
1201            extra_display: None,
1202        });
1203
1204        assert_snapshot!(RequestSummaries::new(&requests), @r"
1205        +-----------+----------+-----------+-----------+-----------+-----------+-------+
1206        | Operation | Metric   | min       | max       | avg       | sum       | count |
1207        +-----------+----------+-----------+-----------+-----------+-----------+-------+
1208        | Get       | duration | 5.000000s | 5.000000s | 5.000000s | 5.000000s | 1     |
1209        | Get       | size     | 100 B     | 100 B     | 100 B     | 100 B     | 1     |
1210        +-----------+----------+-----------+-----------+-----------+-----------+-------+
1211        ");
1212
1213        // Add more Get requests to test aggregation
1214        requests.push(RequestDetails {
1215            op: Operation::Get,
1216            path: Path::from("test2"),
1217            timestamp: chrono::DateTime::from_timestamp(1, 0).unwrap(),
1218            duration: Some(Duration::from_secs(8)),
1219            size: Some(150),
1220            range: None,
1221            extra_display: None,
1222        });
1223        requests.push(RequestDetails {
1224            op: Operation::Get,
1225            path: Path::from("test3"),
1226            timestamp: chrono::DateTime::from_timestamp(2, 0).unwrap(),
1227            duration: Some(Duration::from_secs(2)),
1228            size: Some(50),
1229            range: None,
1230            extra_display: None,
1231        });
1232        assert_snapshot!(RequestSummaries::new(&requests), @r"
1233        +-----------+----------+-----------+-----------+-----------+------------+-------+
1234        | Operation | Metric   | min       | max       | avg       | sum        | count |
1235        +-----------+----------+-----------+-----------+-----------+------------+-------+
1236        | Get       | duration | 2.000000s | 8.000000s | 5.000000s | 15.000000s | 3     |
1237        | Get       | size     | 50 B      | 150 B     | 100 B     | 300 B      | 3     |
1238        +-----------+----------+-----------+-----------+-----------+------------+-------+
1239        ");
1240
1241        // Add Put requests to test grouping
1242        requests.push(RequestDetails {
1243            op: Operation::Put,
1244            path: Path::from("test4"),
1245            timestamp: chrono::DateTime::from_timestamp(3, 0).unwrap(),
1246            duration: Some(Duration::from_millis(200)),
1247            size: Some(75),
1248            range: None,
1249            extra_display: None,
1250        });
1251
1252        assert_snapshot!(RequestSummaries::new(&requests), @r"
1253        +-----------+----------+-----------+-----------+-----------+------------+-------+
1254        | Operation | Metric   | min       | max       | avg       | sum        | count |
1255        +-----------+----------+-----------+-----------+-----------+------------+-------+
1256        | Get       | duration | 2.000000s | 8.000000s | 5.000000s | 15.000000s | 3     |
1257        | Get       | size     | 50 B      | 150 B     | 100 B     | 300 B      | 3     |
1258        | Put       | duration | 0.200000s | 0.200000s | 0.200000s | 0.200000s  | 1     |
1259        | Put       | size     | 75 B      | 75 B      | 75 B      | 75 B       | 1     |
1260        +-----------+----------+-----------+-----------+-----------+------------+-------+
1261        ");
1262    }
1263
1264    #[test]
1265    fn request_summary_only_duration() {
1266        // Test request with only duration (no size)
1267        let only_duration = vec![RequestDetails {
1268            op: Operation::Get,
1269            path: Path::from("test1"),
1270            timestamp: chrono::DateTime::from_timestamp(0, 0).unwrap(),
1271            duration: Some(Duration::from_secs(3)),
1272            size: None,
1273            range: None,
1274            extra_display: None,
1275        }];
1276        assert_snapshot!(RequestSummaries::new(&only_duration), @r"
1277        +-----------+----------+-----------+-----------+-----------+-----------+-------+
1278        | Operation | Metric   | min       | max       | avg       | sum       | count |
1279        +-----------+----------+-----------+-----------+-----------+-----------+-------+
1280        | Get       | duration | 3.000000s | 3.000000s | 3.000000s | 3.000000s | 1     |
1281        | Get       | size     |           |           |           |           | 1     |
1282        +-----------+----------+-----------+-----------+-----------+-----------+-------+
1283        ");
1284    }
1285
1286    #[test]
1287    fn request_summary_only_size() {
1288        // Test request with only size (no duration)
1289        let only_size = vec![RequestDetails {
1290            op: Operation::Get,
1291            path: Path::from("test1"),
1292            timestamp: chrono::DateTime::from_timestamp(0, 0).unwrap(),
1293            duration: None,
1294            size: Some(200),
1295            range: None,
1296            extra_display: None,
1297        }];
1298        assert_snapshot!(RequestSummaries::new(&only_size), @r"
1299        +-----------+----------+-------+-------+-------+-------+-------+
1300        | Operation | Metric   | min   | max   | avg   | sum   | count |
1301        +-----------+----------+-------+-------+-------+-------+-------+
1302        | Get       | duration |       |       |       |       | 1     |
1303        | Get       | size     | 200 B | 200 B | 200 B | 200 B | 1     |
1304        +-----------+----------+-------+-------+-------+-------+-------+
1305        ");
1306    }
1307
1308    #[test]
1309    fn request_summary_neither_duration_or_size() {
1310        // Test request with neither duration nor size
1311        let no_stats = vec![RequestDetails {
1312            op: Operation::Get,
1313            path: Path::from("test1"),
1314            timestamp: chrono::DateTime::from_timestamp(0, 0).unwrap(),
1315            duration: None,
1316            size: None,
1317            range: None,
1318            extra_display: None,
1319        }];
1320        assert_snapshot!(RequestSummaries::new(&no_stats), @r"
1321        +-----------+----------+-----+-----+-----+-----+-------+
1322        | Operation | Metric   | min | max | avg | sum | count |
1323        +-----------+----------+-----+-----+-----+-----+-------+
1324        | Get       | duration |     |     |     |     | 1     |
1325        | Get       | size     |     |     |     |     | 1     |
1326        +-----------+----------+-----+-----+-----+-----+-------+
1327        ");
1328    }
1329}