datafusion_cli/object_storage/
instrumented.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::{
19    cmp, fmt,
20    ops::AddAssign,
21    str::FromStr,
22    sync::{
23        atomic::{AtomicU8, Ordering},
24        Arc,
25    },
26    time::Duration,
27};
28
29use arrow::array::{ArrayRef, RecordBatch, StringArray};
30use arrow::util::pretty::pretty_format_batches;
31use async_trait::async_trait;
32use chrono::Utc;
33use datafusion::{
34    common::{instant::Instant, HashMap},
35    error::DataFusionError,
36    execution::object_store::{DefaultObjectStoreRegistry, ObjectStoreRegistry},
37};
38use futures::stream::BoxStream;
39use object_store::{
40    path::Path, GetOptions, GetRange, GetResult, ListResult, MultipartUpload, ObjectMeta,
41    ObjectStore, PutMultipartOptions, PutOptions, PutPayload, PutResult, Result,
42};
43use parking_lot::{Mutex, RwLock};
44use url::Url;
45
46/// The profiling mode to use for an [`InstrumentedObjectStore`] instance. Collecting profiling
47/// data will have a small negative impact on both CPU and memory usage. Default is `Disabled`
48#[derive(Copy, Clone, Debug, Default, PartialEq, Eq)]
49pub enum InstrumentedObjectStoreMode {
50    /// Disable collection of profiling data
51    #[default]
52    Disabled,
53    /// Enable collection of profiling data and output a summary
54    Summary,
55    /// Enable collection of profiling data and output a summary and all details
56    Trace,
57}
58
59impl fmt::Display for InstrumentedObjectStoreMode {
60    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
61        write!(f, "{self:?}")
62    }
63}
64
65impl FromStr for InstrumentedObjectStoreMode {
66    type Err = DataFusionError;
67
68    fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
69        match s.to_lowercase().as_str() {
70            "disabled" => Ok(Self::Disabled),
71            "summary" => Ok(Self::Summary),
72            "trace" => Ok(Self::Trace),
73            _ => Err(DataFusionError::Execution(format!("Unrecognized mode {s}"))),
74        }
75    }
76}
77
78impl From<u8> for InstrumentedObjectStoreMode {
79    fn from(value: u8) -> Self {
80        match value {
81            1 => InstrumentedObjectStoreMode::Summary,
82            2 => InstrumentedObjectStoreMode::Trace,
83            _ => InstrumentedObjectStoreMode::Disabled,
84        }
85    }
86}
87
88/// Wrapped [`ObjectStore`] instances that record information for reporting on the usage of the
89/// inner [`ObjectStore`]
90#[derive(Debug)]
91pub struct InstrumentedObjectStore {
92    inner: Arc<dyn ObjectStore>,
93    instrument_mode: AtomicU8,
94    requests: Mutex<Vec<RequestDetails>>,
95}
96
97impl InstrumentedObjectStore {
98    /// Returns a new [`InstrumentedObjectStore`] that wraps the provided [`ObjectStore`]
99    fn new(object_store: Arc<dyn ObjectStore>, instrument_mode: AtomicU8) -> Self {
100        Self {
101            inner: object_store,
102            instrument_mode,
103            requests: Mutex::new(Vec::new()),
104        }
105    }
106
107    fn set_instrument_mode(&self, mode: InstrumentedObjectStoreMode) {
108        self.instrument_mode.store(mode as u8, Ordering::Relaxed)
109    }
110
111    /// Returns all [`RequestDetails`] accumulated in this [`InstrumentedObjectStore`] and clears
112    /// the stored requests
113    pub fn take_requests(&self) -> Vec<RequestDetails> {
114        let mut req = self.requests.lock();
115
116        req.drain(..).collect()
117    }
118
119    fn enabled(&self) -> bool {
120        self.instrument_mode.load(Ordering::Relaxed)
121            != InstrumentedObjectStoreMode::Disabled as u8
122    }
123
124    async fn instrumented_put_opts(
125        &self,
126        location: &Path,
127        payload: PutPayload,
128        opts: PutOptions,
129    ) -> Result<PutResult> {
130        let timestamp = Utc::now();
131        let start = Instant::now();
132        let size = payload.content_length();
133        let ret = self.inner.put_opts(location, payload, opts).await?;
134        let elapsed = start.elapsed();
135
136        self.requests.lock().push(RequestDetails {
137            op: Operation::Put,
138            path: location.clone(),
139            timestamp,
140            duration: Some(elapsed),
141            size: Some(size),
142            range: None,
143            extra_display: None,
144        });
145
146        Ok(ret)
147    }
148
149    async fn instrumented_put_multipart(
150        &self,
151        location: &Path,
152        opts: PutMultipartOptions,
153    ) -> Result<Box<dyn MultipartUpload>> {
154        let timestamp = Utc::now();
155        let start = Instant::now();
156        let ret = self.inner.put_multipart_opts(location, opts).await?;
157        let elapsed = start.elapsed();
158
159        self.requests.lock().push(RequestDetails {
160            op: Operation::Put,
161            path: location.clone(),
162            timestamp,
163            duration: Some(elapsed),
164            size: None,
165            range: None,
166            extra_display: None,
167        });
168
169        Ok(ret)
170    }
171
172    async fn instrumented_get_opts(
173        &self,
174        location: &Path,
175        options: GetOptions,
176    ) -> Result<GetResult> {
177        let timestamp = Utc::now();
178        let range = options.range.clone();
179
180        let start = Instant::now();
181        let ret = self.inner.get_opts(location, options).await?;
182        let elapsed = start.elapsed();
183
184        self.requests.lock().push(RequestDetails {
185            op: Operation::Get,
186            path: location.clone(),
187            timestamp,
188            duration: Some(elapsed),
189            size: Some((ret.range.end - ret.range.start) as usize),
190            range,
191            extra_display: None,
192        });
193
194        Ok(ret)
195    }
196
197    async fn instrumented_delete(&self, location: &Path) -> Result<()> {
198        let timestamp = Utc::now();
199        let start = Instant::now();
200        self.inner.delete(location).await?;
201        let elapsed = start.elapsed();
202
203        self.requests.lock().push(RequestDetails {
204            op: Operation::Delete,
205            path: location.clone(),
206            timestamp,
207            duration: Some(elapsed),
208            size: None,
209            range: None,
210            extra_display: None,
211        });
212
213        Ok(())
214    }
215
216    fn instrumented_list(
217        &self,
218        prefix: Option<&Path>,
219    ) -> BoxStream<'static, Result<ObjectMeta>> {
220        let timestamp = Utc::now();
221        let ret = self.inner.list(prefix);
222
223        self.requests.lock().push(RequestDetails {
224            op: Operation::List,
225            path: prefix.cloned().unwrap_or_else(|| Path::from("")),
226            timestamp,
227            duration: None, // list returns a stream, so the duration isn't meaningful
228            size: None,
229            range: None,
230            extra_display: None,
231        });
232
233        ret
234    }
235
236    async fn instrumented_list_with_delimiter(
237        &self,
238        prefix: Option<&Path>,
239    ) -> Result<ListResult> {
240        let timestamp = Utc::now();
241        let start = Instant::now();
242        let ret = self.inner.list_with_delimiter(prefix).await?;
243        let elapsed = start.elapsed();
244
245        self.requests.lock().push(RequestDetails {
246            op: Operation::List,
247            path: prefix.cloned().unwrap_or_else(|| Path::from("")),
248            timestamp,
249            duration: Some(elapsed),
250            size: None,
251            range: None,
252            extra_display: None,
253        });
254
255        Ok(ret)
256    }
257
258    async fn instrumented_copy(&self, from: &Path, to: &Path) -> Result<()> {
259        let timestamp = Utc::now();
260        let start = Instant::now();
261        self.inner.copy(from, to).await?;
262        let elapsed = start.elapsed();
263
264        self.requests.lock().push(RequestDetails {
265            op: Operation::Copy,
266            path: from.clone(),
267            timestamp,
268            duration: Some(elapsed),
269            size: None,
270            range: None,
271            extra_display: Some(format!("copy_to: {to}")),
272        });
273
274        Ok(())
275    }
276
277    async fn instrumented_copy_if_not_exists(
278        &self,
279        from: &Path,
280        to: &Path,
281    ) -> Result<()> {
282        let timestamp = Utc::now();
283        let start = Instant::now();
284        self.inner.copy_if_not_exists(from, to).await?;
285        let elapsed = start.elapsed();
286
287        self.requests.lock().push(RequestDetails {
288            op: Operation::Copy,
289            path: from.clone(),
290            timestamp,
291            duration: Some(elapsed),
292            size: None,
293            range: None,
294            extra_display: Some(format!("copy_to: {to}")),
295        });
296
297        Ok(())
298    }
299
300    async fn instrumented_head(&self, location: &Path) -> Result<ObjectMeta> {
301        let timestamp = Utc::now();
302        let start = Instant::now();
303        let ret = self.inner.head(location).await?;
304        let elapsed = start.elapsed();
305
306        self.requests.lock().push(RequestDetails {
307            op: Operation::Head,
308            path: location.clone(),
309            timestamp,
310            duration: Some(elapsed),
311            size: None,
312            range: None,
313            extra_display: None,
314        });
315
316        Ok(ret)
317    }
318}
319
320impl fmt::Display for InstrumentedObjectStore {
321    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
322        let mode: InstrumentedObjectStoreMode =
323            self.instrument_mode.load(Ordering::Relaxed).into();
324        write!(
325            f,
326            "Instrumented Object Store: instrument_mode: {mode}, inner: {}",
327            self.inner
328        )
329    }
330}
331
332#[async_trait]
333impl ObjectStore for InstrumentedObjectStore {
334    async fn put_opts(
335        &self,
336        location: &Path,
337        payload: PutPayload,
338        opts: PutOptions,
339    ) -> Result<PutResult> {
340        if self.enabled() {
341            return self.instrumented_put_opts(location, payload, opts).await;
342        }
343
344        self.inner.put_opts(location, payload, opts).await
345    }
346
347    async fn put_multipart_opts(
348        &self,
349        location: &Path,
350        opts: PutMultipartOptions,
351    ) -> Result<Box<dyn MultipartUpload>> {
352        if self.enabled() {
353            return self.instrumented_put_multipart(location, opts).await;
354        }
355
356        self.inner.put_multipart_opts(location, opts).await
357    }
358
359    async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
360        if self.enabled() {
361            return self.instrumented_get_opts(location, options).await;
362        }
363
364        self.inner.get_opts(location, options).await
365    }
366
367    async fn delete(&self, location: &Path) -> Result<()> {
368        if self.enabled() {
369            return self.instrumented_delete(location).await;
370        }
371
372        self.inner.delete(location).await
373    }
374
375    fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result<ObjectMeta>> {
376        if self.enabled() {
377            return self.instrumented_list(prefix);
378        }
379
380        self.inner.list(prefix)
381    }
382
383    async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
384        if self.enabled() {
385            return self.instrumented_list_with_delimiter(prefix).await;
386        }
387
388        self.inner.list_with_delimiter(prefix).await
389    }
390
391    async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
392        if self.enabled() {
393            return self.instrumented_copy(from, to).await;
394        }
395
396        self.inner.copy(from, to).await
397    }
398
399    async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
400        if self.enabled() {
401            return self.instrumented_copy_if_not_exists(from, to).await;
402        }
403
404        self.inner.copy_if_not_exists(from, to).await
405    }
406
407    async fn head(&self, location: &Path) -> Result<ObjectMeta> {
408        if self.enabled() {
409            return self.instrumented_head(location).await;
410        }
411
412        self.inner.head(location).await
413    }
414}
415
416/// Object store operation types tracked by [`InstrumentedObjectStore`]
417#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash, PartialOrd, Ord)]
418pub enum Operation {
419    Copy,
420    Delete,
421    Get,
422    Head,
423    List,
424    Put,
425}
426
427impl fmt::Display for Operation {
428    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
429        write!(f, "{self:?}")
430    }
431}
432
433/// Holds profiling details about individual requests made through an [`InstrumentedObjectStore`]
434#[derive(Debug)]
435pub struct RequestDetails {
436    op: Operation,
437    path: Path,
438    timestamp: chrono::DateTime<Utc>,
439    duration: Option<Duration>,
440    size: Option<usize>,
441    range: Option<GetRange>,
442    extra_display: Option<String>,
443}
444
445impl fmt::Display for RequestDetails {
446    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
447        let mut output_parts = vec![format!(
448            "{} operation={:?}",
449            self.timestamp.to_rfc3339(),
450            self.op
451        )];
452
453        if let Some(d) = self.duration {
454            output_parts.push(format!("duration={:.6}s", d.as_secs_f32()));
455        }
456        if let Some(s) = self.size {
457            output_parts.push(format!("size={s}"));
458        }
459        if let Some(r) = &self.range {
460            output_parts.push(format!("range: {r}"));
461        }
462        output_parts.push(format!("path={}", self.path));
463
464        if let Some(ed) = &self.extra_display {
465            output_parts.push(ed.clone());
466        }
467
468        write!(f, "{}", output_parts.join(" "))
469    }
470}
471
472/// Summary statistics for all requests recorded in an [`InstrumentedObjectStore`]
473#[derive(Default)]
474pub struct RequestSummaries {
475    summaries: Vec<RequestSummary>,
476}
477
478/// Display the summary as a table
479impl fmt::Display for RequestSummaries {
480    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
481        // Don't expect an error, but avoid panicking if it happens
482        match pretty_format_batches(&[self.to_batch()]) {
483            Err(e) => {
484                write!(f, "Error formatting summary: {e}")
485            }
486            Ok(displayable) => {
487                write!(f, "{displayable}")
488            }
489        }
490    }
491}
492
493impl RequestSummaries {
494    /// Summarizes input [`RequestDetails`]
495    pub fn new(requests: &[RequestDetails]) -> Self {
496        let mut summaries: HashMap<Operation, RequestSummary> = HashMap::new();
497        for rd in requests {
498            match summaries.get_mut(&rd.op) {
499                Some(rs) => rs.push(rd),
500                None => {
501                    let mut rs = RequestSummary::new(rd.op);
502                    rs.push(rd);
503                    summaries.insert(rd.op, rs);
504                }
505            }
506        }
507        // Convert to a Vec with consistent ordering
508        let mut summaries: Vec<RequestSummary> = summaries.into_values().collect();
509        summaries.sort_by_key(|s| s.operation);
510        Self { summaries }
511    }
512
513    /// Convert the summaries into a `RecordBatch` for display
514    ///
515    /// Results in a table like:
516    /// ```text
517    /// +-----------+----------+-----------+-----------+-----------+-----------+-----------+
518    /// | Operation | Metric   | min       | max       | avg       | sum       | count     |
519    /// +-----------+----------+-----------+-----------+-----------+-----------+-----------+
520    /// | Get       | duration | 5.000000s | 5.000000s | 5.000000s |           | 1         |
521    /// | Get       | size     | 100 B     | 100 B     | 100 B     | 100 B     | 1         |
522    /// +-----------+----------+-----------+-----------+-----------+-----------+-----------+
523    /// ```
524    pub fn to_batch(&self) -> RecordBatch {
525        let operations: StringArray = self
526            .iter()
527            .flat_map(|s| std::iter::repeat_n(Some(s.operation.to_string()), 2))
528            .collect();
529        let metrics: StringArray = self
530            .iter()
531            .flat_map(|_s| [Some("duration"), Some("size")])
532            .collect();
533        let mins: StringArray = self
534            .stats_iter()
535            .flat_map(|(duration_stats, size_stats)| {
536                let dur_min =
537                    duration_stats.map(|d| format!("{:.6}s", d.min.as_secs_f32()));
538                let size_min = size_stats.map(|s| format!("{} B", s.min));
539                [dur_min, size_min]
540            })
541            .collect();
542        let maxs: StringArray = self
543            .stats_iter()
544            .flat_map(|(duration_stats, size_stats)| {
545                let dur_max =
546                    duration_stats.map(|d| format!("{:.6}s", d.max.as_secs_f32()));
547                let size_max = size_stats.map(|s| format!("{} B", s.max));
548                [dur_max, size_max]
549            })
550            .collect();
551        let avgs: StringArray = self
552            .iter()
553            .flat_map(|s| {
554                let count = s.count as f32;
555                let duration_stats = s.duration_stats.as_ref();
556                let size_stats = s.size_stats.as_ref();
557                let dur_avg = duration_stats.map(|d| {
558                    let avg = d.sum.as_secs_f32() / count;
559                    format!("{avg:.6}s")
560                });
561                let size_avg = size_stats.map(|s| {
562                    let avg = s.sum as f32 / count;
563                    format!("{avg} B")
564                });
565                [dur_avg, size_avg]
566            })
567            .collect();
568        let sums: StringArray = self
569            .stats_iter()
570            .flat_map(|(duration_stats, size_stats)| {
571                // Omit a sum stat for duration in the initial
572                // implementation because it can be a bit misleading (at least
573                // at first glance). For example, particularly large queries the
574                // sum of the durations was often larger than the total time of
575                // the query itself, can be confusing without additional
576                // explanation (e.g. that the sum is of individual requests,
577                // which may be concurrent).
578                let dur_sum =
579                    duration_stats.map(|d| format!("{:.6}s", d.sum.as_secs_f32()));
580                let size_sum = size_stats.map(|s| format!("{} B", s.sum));
581                [dur_sum, size_sum]
582            })
583            .collect();
584        let counts: StringArray = self
585            .iter()
586            .flat_map(|s| {
587                let count = s.count.to_string();
588                [Some(count.clone()), Some(count)]
589            })
590            .collect();
591
592        RecordBatch::try_from_iter(vec![
593            ("Operation", Arc::new(operations) as ArrayRef),
594            ("Metric", Arc::new(metrics) as ArrayRef),
595            ("min", Arc::new(mins) as ArrayRef),
596            ("max", Arc::new(maxs) as ArrayRef),
597            ("avg", Arc::new(avgs) as ArrayRef),
598            ("sum", Arc::new(sums) as ArrayRef),
599            ("count", Arc::new(counts) as ArrayRef),
600        ])
601        .expect("Created the batch correctly")
602    }
603
604    /// Return an iterator over the summaries
605    fn iter(&self) -> impl Iterator<Item = &RequestSummary> {
606        self.summaries.iter()
607    }
608
609    /// Return an iterator over (duration_stats, size_stats) tuples
610    /// for each summary
611    fn stats_iter(
612        &self,
613    ) -> impl Iterator<Item = (Option<&Stats<Duration>>, Option<&Stats<usize>>)> {
614        self.summaries
615            .iter()
616            .map(|s| (s.duration_stats.as_ref(), s.size_stats.as_ref()))
617    }
618}
619
620/// Summary statistics for a particular type of [`Operation`] (e.g. `GET` or `PUT`)
621/// in an [`InstrumentedObjectStore`]'s [`RequestDetails`]
622pub struct RequestSummary {
623    operation: Operation,
624    count: usize,
625    duration_stats: Option<Stats<Duration>>,
626    size_stats: Option<Stats<usize>>,
627}
628
629impl RequestSummary {
630    fn new(operation: Operation) -> Self {
631        Self {
632            operation,
633            count: 0,
634            duration_stats: None,
635            size_stats: None,
636        }
637    }
638    fn push(&mut self, request: &RequestDetails) {
639        self.count += 1;
640        if let Some(dur) = request.duration {
641            self.duration_stats.get_or_insert_default().push(dur)
642        }
643        if let Some(size) = request.size {
644            self.size_stats.get_or_insert_default().push(size)
645        }
646    }
647}
648
649struct Stats<T: Copy + Ord + AddAssign<T>> {
650    min: T,
651    max: T,
652    sum: T,
653}
654
655impl<T: Copy + Ord + AddAssign<T>> Stats<T> {
656    fn push(&mut self, val: T) {
657        self.min = cmp::min(val, self.min);
658        self.max = cmp::max(val, self.max);
659        self.sum += val;
660    }
661}
662
663impl Default for Stats<Duration> {
664    fn default() -> Self {
665        Self {
666            min: Duration::MAX,
667            max: Duration::ZERO,
668            sum: Duration::ZERO,
669        }
670    }
671}
672
673impl Default for Stats<usize> {
674    fn default() -> Self {
675        Self {
676            min: usize::MAX,
677            max: usize::MIN,
678            sum: 0,
679        }
680    }
681}
682
683/// Provides access to [`InstrumentedObjectStore`] instances that record requests for reporting
684#[derive(Debug)]
685pub struct InstrumentedObjectStoreRegistry {
686    inner: Arc<dyn ObjectStoreRegistry>,
687    instrument_mode: AtomicU8,
688    stores: RwLock<Vec<Arc<InstrumentedObjectStore>>>,
689}
690
691impl Default for InstrumentedObjectStoreRegistry {
692    fn default() -> Self {
693        Self::new()
694    }
695}
696
697impl InstrumentedObjectStoreRegistry {
698    /// Returns a new [`InstrumentedObjectStoreRegistry`] that wraps the provided
699    /// [`ObjectStoreRegistry`]
700    pub fn new() -> Self {
701        Self {
702            inner: Arc::new(DefaultObjectStoreRegistry::new()),
703            instrument_mode: AtomicU8::new(InstrumentedObjectStoreMode::default() as u8),
704            stores: RwLock::new(Vec::new()),
705        }
706    }
707
708    pub fn with_profile_mode(self, mode: InstrumentedObjectStoreMode) -> Self {
709        self.instrument_mode.store(mode as u8, Ordering::Relaxed);
710        self
711    }
712
713    /// Provides access to all of the [`InstrumentedObjectStore`]s managed by this
714    /// [`InstrumentedObjectStoreRegistry`]
715    pub fn stores(&self) -> Vec<Arc<InstrumentedObjectStore>> {
716        self.stores.read().clone()
717    }
718
719    /// Returns the current [`InstrumentedObjectStoreMode`] for this
720    /// [`InstrumentedObjectStoreRegistry`]
721    pub fn instrument_mode(&self) -> InstrumentedObjectStoreMode {
722        self.instrument_mode.load(Ordering::Relaxed).into()
723    }
724
725    /// Sets the [`InstrumentedObjectStoreMode`] for this [`InstrumentedObjectStoreRegistry`]
726    pub fn set_instrument_mode(&self, mode: InstrumentedObjectStoreMode) {
727        self.instrument_mode.store(mode as u8, Ordering::Relaxed);
728        for s in self.stores.read().iter() {
729            s.set_instrument_mode(mode)
730        }
731    }
732}
733
734impl ObjectStoreRegistry for InstrumentedObjectStoreRegistry {
735    fn register_store(
736        &self,
737        url: &Url,
738        store: Arc<dyn ObjectStore>,
739    ) -> Option<Arc<dyn ObjectStore>> {
740        let mode = self.instrument_mode.load(Ordering::Relaxed);
741        let instrumented =
742            Arc::new(InstrumentedObjectStore::new(store, AtomicU8::new(mode)));
743        self.stores.write().push(Arc::clone(&instrumented));
744        self.inner.register_store(url, instrumented)
745    }
746
747    fn deregister_store(
748        &self,
749        url: &Url,
750    ) -> datafusion::common::Result<Arc<dyn ObjectStore>> {
751        self.inner.deregister_store(url)
752    }
753
754    fn get_store(&self, url: &Url) -> datafusion::common::Result<Arc<dyn ObjectStore>> {
755        self.inner.get_store(url)
756    }
757}
758
759#[cfg(test)]
760mod tests {
761    use object_store::WriteMultipart;
762
763    use super::*;
764    use insta::assert_snapshot;
765
766    #[test]
767    fn instrumented_mode() {
768        assert!(matches!(
769            InstrumentedObjectStoreMode::default(),
770            InstrumentedObjectStoreMode::Disabled
771        ));
772
773        assert!(matches!(
774            "dIsABleD".parse().unwrap(),
775            InstrumentedObjectStoreMode::Disabled
776        ));
777        assert!(matches!(
778            "SUmMaRy".parse().unwrap(),
779            InstrumentedObjectStoreMode::Summary
780        ));
781        assert!(matches!(
782            "TRaCe".parse().unwrap(),
783            InstrumentedObjectStoreMode::Trace
784        ));
785        assert!("does_not_exist"
786            .parse::<InstrumentedObjectStoreMode>()
787            .is_err());
788
789        assert!(matches!(0.into(), InstrumentedObjectStoreMode::Disabled));
790        assert!(matches!(1.into(), InstrumentedObjectStoreMode::Summary));
791        assert!(matches!(2.into(), InstrumentedObjectStoreMode::Trace));
792        assert!(matches!(3.into(), InstrumentedObjectStoreMode::Disabled));
793    }
794
795    #[test]
796    fn instrumented_registry() {
797        let mut reg = InstrumentedObjectStoreRegistry::new();
798        assert!(reg.stores().is_empty());
799        assert_eq!(
800            reg.instrument_mode(),
801            InstrumentedObjectStoreMode::default()
802        );
803
804        reg = reg.with_profile_mode(InstrumentedObjectStoreMode::Trace);
805        assert_eq!(reg.instrument_mode(), InstrumentedObjectStoreMode::Trace);
806
807        let store = object_store::memory::InMemory::new();
808        let url = "mem://test".parse().unwrap();
809        let registered = reg.register_store(&url, Arc::new(store));
810        assert!(registered.is_none());
811
812        let fetched = reg.get_store(&url);
813        assert!(fetched.is_ok());
814        assert_eq!(reg.stores().len(), 1);
815    }
816
817    // Returns an `InstrumentedObjectStore` with some data loaded for testing and the path to
818    // access the data
819    async fn setup_test_store() -> (InstrumentedObjectStore, Path) {
820        let store = Arc::new(object_store::memory::InMemory::new());
821        let mode = AtomicU8::new(InstrumentedObjectStoreMode::default() as u8);
822        let instrumented = InstrumentedObjectStore::new(store, mode);
823
824        // Load the test store with some data we can read
825        let path = Path::from("test/data");
826        let payload = PutPayload::from_static(b"test_data");
827        instrumented.put(&path, payload).await.unwrap();
828
829        (instrumented, path)
830    }
831
832    #[tokio::test]
833    async fn instrumented_store_get() {
834        let (instrumented, path) = setup_test_store().await;
835
836        // By default no requests should be instrumented/stored
837        assert!(instrumented.requests.lock().is_empty());
838        let _ = instrumented.get(&path).await.unwrap();
839        assert!(instrumented.requests.lock().is_empty());
840
841        instrumented.set_instrument_mode(InstrumentedObjectStoreMode::Trace);
842        assert!(instrumented.requests.lock().is_empty());
843        let _ = instrumented.get(&path).await.unwrap();
844        assert_eq!(instrumented.requests.lock().len(), 1);
845
846        let mut requests = instrumented.take_requests();
847        assert_eq!(requests.len(), 1);
848        assert!(instrumented.requests.lock().is_empty());
849
850        let request = requests.pop().unwrap();
851        assert_eq!(request.op, Operation::Get);
852        assert_eq!(request.path, path);
853        assert!(request.duration.is_some());
854        assert_eq!(request.size, Some(9));
855        assert_eq!(request.range, None);
856        assert!(request.extra_display.is_none());
857    }
858
859    #[tokio::test]
860    async fn instrumented_store_delete() {
861        let (instrumented, path) = setup_test_store().await;
862
863        // By default no requests should be instrumented/stored
864        assert!(instrumented.requests.lock().is_empty());
865        instrumented.delete(&path).await.unwrap();
866        assert!(instrumented.requests.lock().is_empty());
867
868        // We need a new store so we have data to delete again
869        let (instrumented, path) = setup_test_store().await;
870        instrumented.set_instrument_mode(InstrumentedObjectStoreMode::Trace);
871        assert!(instrumented.requests.lock().is_empty());
872        instrumented.delete(&path).await.unwrap();
873        assert_eq!(instrumented.requests.lock().len(), 1);
874
875        let mut requests = instrumented.take_requests();
876        assert_eq!(requests.len(), 1);
877        assert!(instrumented.requests.lock().is_empty());
878
879        let request = requests.pop().unwrap();
880        assert_eq!(request.op, Operation::Delete);
881        assert_eq!(request.path, path);
882        assert!(request.duration.is_some());
883        assert!(request.size.is_none());
884        assert!(request.range.is_none());
885        assert!(request.extra_display.is_none());
886    }
887
888    #[tokio::test]
889    async fn instrumented_store_list() {
890        let (instrumented, path) = setup_test_store().await;
891
892        // By default no requests should be instrumented/stored
893        assert!(instrumented.requests.lock().is_empty());
894        let _ = instrumented.list(Some(&path));
895        assert!(instrumented.requests.lock().is_empty());
896
897        instrumented.set_instrument_mode(InstrumentedObjectStoreMode::Trace);
898        assert!(instrumented.requests.lock().is_empty());
899        let _ = instrumented.list(Some(&path));
900        assert_eq!(instrumented.requests.lock().len(), 1);
901
902        let request = instrumented.take_requests().pop().unwrap();
903        assert_eq!(request.op, Operation::List);
904        assert_eq!(request.path, path);
905        assert!(request.duration.is_none());
906        assert!(request.size.is_none());
907        assert!(request.range.is_none());
908        assert!(request.extra_display.is_none());
909    }
910
911    #[tokio::test]
912    async fn instrumented_store_list_with_delimiter() {
913        let (instrumented, path) = setup_test_store().await;
914
915        // By default no requests should be instrumented/stored
916        assert!(instrumented.requests.lock().is_empty());
917        let _ = instrumented.list_with_delimiter(Some(&path)).await.unwrap();
918        assert!(instrumented.requests.lock().is_empty());
919
920        instrumented.set_instrument_mode(InstrumentedObjectStoreMode::Trace);
921        assert!(instrumented.requests.lock().is_empty());
922        let _ = instrumented.list_with_delimiter(Some(&path)).await.unwrap();
923        assert_eq!(instrumented.requests.lock().len(), 1);
924
925        let request = instrumented.take_requests().pop().unwrap();
926        assert_eq!(request.op, Operation::List);
927        assert_eq!(request.path, path);
928        assert!(request.duration.is_some());
929        assert!(request.size.is_none());
930        assert!(request.range.is_none());
931        assert!(request.extra_display.is_none());
932    }
933
934    #[tokio::test]
935    async fn instrumented_store_put_opts() {
936        // The `setup_test_store()` method comes with data already `put` into it, so we'll setup
937        // manually for this test
938        let store = Arc::new(object_store::memory::InMemory::new());
939        let mode = AtomicU8::new(InstrumentedObjectStoreMode::default() as u8);
940        let instrumented = InstrumentedObjectStore::new(store, mode);
941
942        let path = Path::from("test/data");
943        let payload = PutPayload::from_static(b"test_data");
944        let size = payload.content_length();
945
946        // By default no requests should be instrumented/stored
947        assert!(instrumented.requests.lock().is_empty());
948        instrumented.put(&path, payload.clone()).await.unwrap();
949        assert!(instrumented.requests.lock().is_empty());
950
951        instrumented.set_instrument_mode(InstrumentedObjectStoreMode::Trace);
952        assert!(instrumented.requests.lock().is_empty());
953        instrumented.put(&path, payload).await.unwrap();
954        assert_eq!(instrumented.requests.lock().len(), 1);
955
956        let request = instrumented.take_requests().pop().unwrap();
957        assert_eq!(request.op, Operation::Put);
958        assert_eq!(request.path, path);
959        assert!(request.duration.is_some());
960        assert_eq!(request.size.unwrap(), size);
961        assert!(request.range.is_none());
962        assert!(request.extra_display.is_none());
963    }
964
965    #[tokio::test]
966    async fn instrumented_store_put_multipart() {
967        // The `setup_test_store()` method comes with data already `put` into it, so we'll setup
968        // manually for this test
969        let store = Arc::new(object_store::memory::InMemory::new());
970        let mode = AtomicU8::new(InstrumentedObjectStoreMode::default() as u8);
971        let instrumented = InstrumentedObjectStore::new(store, mode);
972
973        let path = Path::from("test/data");
974
975        // By default no requests should be instrumented/stored
976        assert!(instrumented.requests.lock().is_empty());
977        let mp = instrumented.put_multipart(&path).await.unwrap();
978        let mut write = WriteMultipart::new(mp);
979        write.write(b"test_data");
980        write.finish().await.unwrap();
981        assert!(instrumented.requests.lock().is_empty());
982
983        instrumented.set_instrument_mode(InstrumentedObjectStoreMode::Trace);
984        assert!(instrumented.requests.lock().is_empty());
985        let mp = instrumented.put_multipart(&path).await.unwrap();
986        let mut write = WriteMultipart::new(mp);
987        write.write(b"test_data");
988        write.finish().await.unwrap();
989        assert_eq!(instrumented.requests.lock().len(), 1);
990
991        let request = instrumented.take_requests().pop().unwrap();
992        assert_eq!(request.op, Operation::Put);
993        assert_eq!(request.path, path);
994        assert!(request.duration.is_some());
995        assert!(request.size.is_none());
996        assert!(request.range.is_none());
997        assert!(request.extra_display.is_none());
998    }
999
1000    #[tokio::test]
1001    async fn instrumented_store_copy() {
1002        let (instrumented, path) = setup_test_store().await;
1003        let copy_to = Path::from("test/copied");
1004
1005        // By default no requests should be instrumented/stored
1006        assert!(instrumented.requests.lock().is_empty());
1007        instrumented.copy(&path, &copy_to).await.unwrap();
1008        assert!(instrumented.requests.lock().is_empty());
1009
1010        instrumented.set_instrument_mode(InstrumentedObjectStoreMode::Trace);
1011        assert!(instrumented.requests.lock().is_empty());
1012        instrumented.copy(&path, &copy_to).await.unwrap();
1013        assert_eq!(instrumented.requests.lock().len(), 1);
1014
1015        let mut requests = instrumented.take_requests();
1016        assert_eq!(requests.len(), 1);
1017        assert!(instrumented.requests.lock().is_empty());
1018
1019        let request = requests.pop().unwrap();
1020        assert_eq!(request.op, Operation::Copy);
1021        assert_eq!(request.path, path);
1022        assert!(request.duration.is_some());
1023        assert!(request.size.is_none());
1024        assert!(request.range.is_none());
1025        assert_eq!(
1026            request.extra_display.unwrap(),
1027            format!("copy_to: {copy_to}")
1028        );
1029    }
1030
1031    #[tokio::test]
1032    async fn instrumented_store_copy_if_not_exists() {
1033        let (instrumented, path) = setup_test_store().await;
1034        let mut copy_to = Path::from("test/copied");
1035
1036        // By default no requests should be instrumented/stored
1037        assert!(instrumented.requests.lock().is_empty());
1038        instrumented
1039            .copy_if_not_exists(&path, &copy_to)
1040            .await
1041            .unwrap();
1042        assert!(instrumented.requests.lock().is_empty());
1043
1044        // Use a new destination since the previous one already exists
1045        copy_to = Path::from("test/copied_again");
1046        instrumented.set_instrument_mode(InstrumentedObjectStoreMode::Trace);
1047        assert!(instrumented.requests.lock().is_empty());
1048        instrumented
1049            .copy_if_not_exists(&path, &copy_to)
1050            .await
1051            .unwrap();
1052        assert_eq!(instrumented.requests.lock().len(), 1);
1053
1054        let mut requests = instrumented.take_requests();
1055        assert_eq!(requests.len(), 1);
1056        assert!(instrumented.requests.lock().is_empty());
1057
1058        let request = requests.pop().unwrap();
1059        assert_eq!(request.op, Operation::Copy);
1060        assert_eq!(request.path, path);
1061        assert!(request.duration.is_some());
1062        assert!(request.size.is_none());
1063        assert!(request.range.is_none());
1064        assert_eq!(
1065            request.extra_display.unwrap(),
1066            format!("copy_to: {copy_to}")
1067        );
1068    }
1069
1070    #[tokio::test]
1071    async fn instrumented_store_head() {
1072        let (instrumented, path) = setup_test_store().await;
1073
1074        // By default no requests should be instrumented/stored
1075        assert!(instrumented.requests.lock().is_empty());
1076        let _ = instrumented.head(&path).await.unwrap();
1077        assert!(instrumented.requests.lock().is_empty());
1078
1079        instrumented.set_instrument_mode(InstrumentedObjectStoreMode::Trace);
1080        assert!(instrumented.requests.lock().is_empty());
1081        let _ = instrumented.head(&path).await.unwrap();
1082        assert_eq!(instrumented.requests.lock().len(), 1);
1083
1084        let mut requests = instrumented.take_requests();
1085        assert_eq!(requests.len(), 1);
1086        assert!(instrumented.requests.lock().is_empty());
1087
1088        let request = requests.pop().unwrap();
1089        assert_eq!(request.op, Operation::Head);
1090        assert_eq!(request.path, path);
1091        assert!(request.duration.is_some());
1092        assert!(request.size.is_none());
1093        assert!(request.range.is_none());
1094        assert!(request.extra_display.is_none());
1095    }
1096
1097    #[test]
1098    fn request_details() {
1099        let rd = RequestDetails {
1100            op: Operation::Get,
1101            path: Path::from("test"),
1102            timestamp: chrono::DateTime::from_timestamp(0, 0).unwrap(),
1103            duration: Some(Duration::new(5, 0)),
1104            size: Some(10),
1105            range: Some((..10).into()),
1106            extra_display: Some(String::from("extra info")),
1107        };
1108
1109        assert_eq!(
1110            format!("{rd}"),
1111            "1970-01-01T00:00:00+00:00 operation=Get duration=5.000000s size=10 range: bytes=0-9 path=test extra info"
1112        );
1113    }
1114
1115    #[test]
1116    fn request_summary() {
1117        // Test empty request list
1118        let mut requests = Vec::new();
1119        assert_snapshot!(RequestSummaries::new(&requests), @r"
1120        +-----------+--------+-----+-----+-----+-----+-------+
1121        | Operation | Metric | min | max | avg | sum | count |
1122        +-----------+--------+-----+-----+-----+-----+-------+
1123        +-----------+--------+-----+-----+-----+-----+-------+
1124        ");
1125
1126        requests.push(RequestDetails {
1127            op: Operation::Get,
1128            path: Path::from("test1"),
1129            timestamp: chrono::DateTime::from_timestamp(0, 0).unwrap(),
1130            duration: Some(Duration::from_secs(5)),
1131            size: Some(100),
1132            range: None,
1133            extra_display: None,
1134        });
1135
1136        assert_snapshot!(RequestSummaries::new(&requests), @r"
1137        +-----------+----------+-----------+-----------+-----------+-----------+-------+
1138        | Operation | Metric   | min       | max       | avg       | sum       | count |
1139        +-----------+----------+-----------+-----------+-----------+-----------+-------+
1140        | Get       | duration | 5.000000s | 5.000000s | 5.000000s | 5.000000s | 1     |
1141        | Get       | size     | 100 B     | 100 B     | 100 B     | 100 B     | 1     |
1142        +-----------+----------+-----------+-----------+-----------+-----------+-------+
1143        ");
1144
1145        // Add more Get requests to test aggregation
1146        requests.push(RequestDetails {
1147            op: Operation::Get,
1148            path: Path::from("test2"),
1149            timestamp: chrono::DateTime::from_timestamp(1, 0).unwrap(),
1150            duration: Some(Duration::from_secs(8)),
1151            size: Some(150),
1152            range: None,
1153            extra_display: None,
1154        });
1155        requests.push(RequestDetails {
1156            op: Operation::Get,
1157            path: Path::from("test3"),
1158            timestamp: chrono::DateTime::from_timestamp(2, 0).unwrap(),
1159            duration: Some(Duration::from_secs(2)),
1160            size: Some(50),
1161            range: None,
1162            extra_display: None,
1163        });
1164        assert_snapshot!(RequestSummaries::new(&requests), @r"
1165        +-----------+----------+-----------+-----------+-----------+------------+-------+
1166        | Operation | Metric   | min       | max       | avg       | sum        | count |
1167        +-----------+----------+-----------+-----------+-----------+------------+-------+
1168        | Get       | duration | 2.000000s | 8.000000s | 5.000000s | 15.000000s | 3     |
1169        | Get       | size     | 50 B      | 150 B     | 100 B     | 300 B      | 3     |
1170        +-----------+----------+-----------+-----------+-----------+------------+-------+
1171        ");
1172
1173        // Add Put requests to test grouping
1174        requests.push(RequestDetails {
1175            op: Operation::Put,
1176            path: Path::from("test4"),
1177            timestamp: chrono::DateTime::from_timestamp(3, 0).unwrap(),
1178            duration: Some(Duration::from_millis(200)),
1179            size: Some(75),
1180            range: None,
1181            extra_display: None,
1182        });
1183
1184        assert_snapshot!(RequestSummaries::new(&requests), @r"
1185        +-----------+----------+-----------+-----------+-----------+------------+-------+
1186        | Operation | Metric   | min       | max       | avg       | sum        | count |
1187        +-----------+----------+-----------+-----------+-----------+------------+-------+
1188        | Get       | duration | 2.000000s | 8.000000s | 5.000000s | 15.000000s | 3     |
1189        | Get       | size     | 50 B      | 150 B     | 100 B     | 300 B      | 3     |
1190        | Put       | duration | 0.200000s | 0.200000s | 0.200000s | 0.200000s  | 1     |
1191        | Put       | size     | 75 B      | 75 B      | 75 B      | 75 B       | 1     |
1192        +-----------+----------+-----------+-----------+-----------+------------+-------+
1193        ");
1194    }
1195
1196    #[test]
1197    fn request_summary_only_duration() {
1198        // Test request with only duration (no size)
1199        let only_duration = vec![RequestDetails {
1200            op: Operation::Get,
1201            path: Path::from("test1"),
1202            timestamp: chrono::DateTime::from_timestamp(0, 0).unwrap(),
1203            duration: Some(Duration::from_secs(3)),
1204            size: None,
1205            range: None,
1206            extra_display: None,
1207        }];
1208        assert_snapshot!(RequestSummaries::new(&only_duration), @r"
1209        +-----------+----------+-----------+-----------+-----------+-----------+-------+
1210        | Operation | Metric   | min       | max       | avg       | sum       | count |
1211        +-----------+----------+-----------+-----------+-----------+-----------+-------+
1212        | Get       | duration | 3.000000s | 3.000000s | 3.000000s | 3.000000s | 1     |
1213        | Get       | size     |           |           |           |           | 1     |
1214        +-----------+----------+-----------+-----------+-----------+-----------+-------+
1215        ");
1216    }
1217
1218    #[test]
1219    fn request_summary_only_size() {
1220        // Test request with only size (no duration)
1221        let only_size = vec![RequestDetails {
1222            op: Operation::Get,
1223            path: Path::from("test1"),
1224            timestamp: chrono::DateTime::from_timestamp(0, 0).unwrap(),
1225            duration: None,
1226            size: Some(200),
1227            range: None,
1228            extra_display: None,
1229        }];
1230        assert_snapshot!(RequestSummaries::new(&only_size), @r"
1231        +-----------+----------+-------+-------+-------+-------+-------+
1232        | Operation | Metric   | min   | max   | avg   | sum   | count |
1233        +-----------+----------+-------+-------+-------+-------+-------+
1234        | Get       | duration |       |       |       |       | 1     |
1235        | Get       | size     | 200 B | 200 B | 200 B | 200 B | 1     |
1236        +-----------+----------+-------+-------+-------+-------+-------+
1237        ");
1238    }
1239
1240    #[test]
1241    fn request_summary_neither_duration_or_size() {
1242        // Test request with neither duration nor size
1243        let no_stats = vec![RequestDetails {
1244            op: Operation::Get,
1245            path: Path::from("test1"),
1246            timestamp: chrono::DateTime::from_timestamp(0, 0).unwrap(),
1247            duration: None,
1248            size: None,
1249            range: None,
1250            extra_display: None,
1251        }];
1252        assert_snapshot!(RequestSummaries::new(&no_stats), @r"
1253        +-----------+----------+-----+-----+-----+-----+-------+
1254        | Operation | Metric   | min | max | avg | sum | count |
1255        +-----------+----------+-----+-----+-----+-----+-------+
1256        | Get       | duration |     |     |     |     | 1     |
1257        | Get       | size     |     |     |     |     | 1     |
1258        +-----------+----------+-----+-----+-----+-----+-------+
1259        ");
1260    }
1261}