1use std::{
19 cmp, fmt,
20 ops::AddAssign,
21 str::FromStr,
22 sync::{
23 atomic::{AtomicU8, Ordering},
24 Arc,
25 },
26 time::Duration,
27};
28
29use arrow::array::{ArrayRef, RecordBatch, StringArray};
30use arrow::util::pretty::pretty_format_batches;
31use async_trait::async_trait;
32use chrono::Utc;
33use datafusion::{
34 common::{instant::Instant, HashMap},
35 error::DataFusionError,
36 execution::object_store::{DefaultObjectStoreRegistry, ObjectStoreRegistry},
37};
38use futures::stream::BoxStream;
39use object_store::{
40 path::Path, GetOptions, GetRange, GetResult, ListResult, MultipartUpload, ObjectMeta,
41 ObjectStore, PutMultipartOptions, PutOptions, PutPayload, PutResult, Result,
42};
43use parking_lot::{Mutex, RwLock};
44use url::Url;
45
46#[derive(Copy, Clone, Debug, Default, PartialEq, Eq)]
49pub enum InstrumentedObjectStoreMode {
50 #[default]
52 Disabled,
53 Summary,
55 Trace,
57}
58
59impl fmt::Display for InstrumentedObjectStoreMode {
60 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
61 write!(f, "{self:?}")
62 }
63}
64
65impl FromStr for InstrumentedObjectStoreMode {
66 type Err = DataFusionError;
67
68 fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
69 match s.to_lowercase().as_str() {
70 "disabled" => Ok(Self::Disabled),
71 "summary" => Ok(Self::Summary),
72 "trace" => Ok(Self::Trace),
73 _ => Err(DataFusionError::Execution(format!("Unrecognized mode {s}"))),
74 }
75 }
76}
77
78impl From<u8> for InstrumentedObjectStoreMode {
79 fn from(value: u8) -> Self {
80 match value {
81 1 => InstrumentedObjectStoreMode::Summary,
82 2 => InstrumentedObjectStoreMode::Trace,
83 _ => InstrumentedObjectStoreMode::Disabled,
84 }
85 }
86}
87
88#[derive(Debug)]
91pub struct InstrumentedObjectStore {
92 inner: Arc<dyn ObjectStore>,
93 instrument_mode: AtomicU8,
94 requests: Mutex<Vec<RequestDetails>>,
95}
96
97impl InstrumentedObjectStore {
98 fn new(object_store: Arc<dyn ObjectStore>, instrument_mode: AtomicU8) -> Self {
100 Self {
101 inner: object_store,
102 instrument_mode,
103 requests: Mutex::new(Vec::new()),
104 }
105 }
106
107 fn set_instrument_mode(&self, mode: InstrumentedObjectStoreMode) {
108 self.instrument_mode.store(mode as u8, Ordering::Relaxed)
109 }
110
111 pub fn take_requests(&self) -> Vec<RequestDetails> {
114 let mut req = self.requests.lock();
115
116 req.drain(..).collect()
117 }
118
119 fn enabled(&self) -> bool {
120 self.instrument_mode.load(Ordering::Relaxed)
121 != InstrumentedObjectStoreMode::Disabled as u8
122 }
123
124 async fn instrumented_put_opts(
125 &self,
126 location: &Path,
127 payload: PutPayload,
128 opts: PutOptions,
129 ) -> Result<PutResult> {
130 let timestamp = Utc::now();
131 let start = Instant::now();
132 let size = payload.content_length();
133 let ret = self.inner.put_opts(location, payload, opts).await?;
134 let elapsed = start.elapsed();
135
136 self.requests.lock().push(RequestDetails {
137 op: Operation::Put,
138 path: location.clone(),
139 timestamp,
140 duration: Some(elapsed),
141 size: Some(size),
142 range: None,
143 extra_display: None,
144 });
145
146 Ok(ret)
147 }
148
149 async fn instrumented_put_multipart(
150 &self,
151 location: &Path,
152 opts: PutMultipartOptions,
153 ) -> Result<Box<dyn MultipartUpload>> {
154 let timestamp = Utc::now();
155 let start = Instant::now();
156 let ret = self.inner.put_multipart_opts(location, opts).await?;
157 let elapsed = start.elapsed();
158
159 self.requests.lock().push(RequestDetails {
160 op: Operation::Put,
161 path: location.clone(),
162 timestamp,
163 duration: Some(elapsed),
164 size: None,
165 range: None,
166 extra_display: None,
167 });
168
169 Ok(ret)
170 }
171
172 async fn instrumented_get_opts(
173 &self,
174 location: &Path,
175 options: GetOptions,
176 ) -> Result<GetResult> {
177 let timestamp = Utc::now();
178 let range = options.range.clone();
179
180 let start = Instant::now();
181 let ret = self.inner.get_opts(location, options).await?;
182 let elapsed = start.elapsed();
183
184 self.requests.lock().push(RequestDetails {
185 op: Operation::Get,
186 path: location.clone(),
187 timestamp,
188 duration: Some(elapsed),
189 size: Some((ret.range.end - ret.range.start) as usize),
190 range,
191 extra_display: None,
192 });
193
194 Ok(ret)
195 }
196
197 async fn instrumented_delete(&self, location: &Path) -> Result<()> {
198 let timestamp = Utc::now();
199 let start = Instant::now();
200 self.inner.delete(location).await?;
201 let elapsed = start.elapsed();
202
203 self.requests.lock().push(RequestDetails {
204 op: Operation::Delete,
205 path: location.clone(),
206 timestamp,
207 duration: Some(elapsed),
208 size: None,
209 range: None,
210 extra_display: None,
211 });
212
213 Ok(())
214 }
215
216 fn instrumented_list(
217 &self,
218 prefix: Option<&Path>,
219 ) -> BoxStream<'static, Result<ObjectMeta>> {
220 let timestamp = Utc::now();
221 let ret = self.inner.list(prefix);
222
223 self.requests.lock().push(RequestDetails {
224 op: Operation::List,
225 path: prefix.cloned().unwrap_or_else(|| Path::from("")),
226 timestamp,
227 duration: None, size: None,
229 range: None,
230 extra_display: None,
231 });
232
233 ret
234 }
235
236 async fn instrumented_list_with_delimiter(
237 &self,
238 prefix: Option<&Path>,
239 ) -> Result<ListResult> {
240 let timestamp = Utc::now();
241 let start = Instant::now();
242 let ret = self.inner.list_with_delimiter(prefix).await?;
243 let elapsed = start.elapsed();
244
245 self.requests.lock().push(RequestDetails {
246 op: Operation::List,
247 path: prefix.cloned().unwrap_or_else(|| Path::from("")),
248 timestamp,
249 duration: Some(elapsed),
250 size: None,
251 range: None,
252 extra_display: None,
253 });
254
255 Ok(ret)
256 }
257
258 async fn instrumented_copy(&self, from: &Path, to: &Path) -> Result<()> {
259 let timestamp = Utc::now();
260 let start = Instant::now();
261 self.inner.copy(from, to).await?;
262 let elapsed = start.elapsed();
263
264 self.requests.lock().push(RequestDetails {
265 op: Operation::Copy,
266 path: from.clone(),
267 timestamp,
268 duration: Some(elapsed),
269 size: None,
270 range: None,
271 extra_display: Some(format!("copy_to: {to}")),
272 });
273
274 Ok(())
275 }
276
277 async fn instrumented_copy_if_not_exists(
278 &self,
279 from: &Path,
280 to: &Path,
281 ) -> Result<()> {
282 let timestamp = Utc::now();
283 let start = Instant::now();
284 self.inner.copy_if_not_exists(from, to).await?;
285 let elapsed = start.elapsed();
286
287 self.requests.lock().push(RequestDetails {
288 op: Operation::Copy,
289 path: from.clone(),
290 timestamp,
291 duration: Some(elapsed),
292 size: None,
293 range: None,
294 extra_display: Some(format!("copy_to: {to}")),
295 });
296
297 Ok(())
298 }
299
300 async fn instrumented_head(&self, location: &Path) -> Result<ObjectMeta> {
301 let timestamp = Utc::now();
302 let start = Instant::now();
303 let ret = self.inner.head(location).await?;
304 let elapsed = start.elapsed();
305
306 self.requests.lock().push(RequestDetails {
307 op: Operation::Head,
308 path: location.clone(),
309 timestamp,
310 duration: Some(elapsed),
311 size: None,
312 range: None,
313 extra_display: None,
314 });
315
316 Ok(ret)
317 }
318}
319
320impl fmt::Display for InstrumentedObjectStore {
321 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
322 let mode: InstrumentedObjectStoreMode =
323 self.instrument_mode.load(Ordering::Relaxed).into();
324 write!(
325 f,
326 "Instrumented Object Store: instrument_mode: {mode}, inner: {}",
327 self.inner
328 )
329 }
330}
331
332#[async_trait]
333impl ObjectStore for InstrumentedObjectStore {
334 async fn put_opts(
335 &self,
336 location: &Path,
337 payload: PutPayload,
338 opts: PutOptions,
339 ) -> Result<PutResult> {
340 if self.enabled() {
341 return self.instrumented_put_opts(location, payload, opts).await;
342 }
343
344 self.inner.put_opts(location, payload, opts).await
345 }
346
347 async fn put_multipart_opts(
348 &self,
349 location: &Path,
350 opts: PutMultipartOptions,
351 ) -> Result<Box<dyn MultipartUpload>> {
352 if self.enabled() {
353 return self.instrumented_put_multipart(location, opts).await;
354 }
355
356 self.inner.put_multipart_opts(location, opts).await
357 }
358
359 async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
360 if self.enabled() {
361 return self.instrumented_get_opts(location, options).await;
362 }
363
364 self.inner.get_opts(location, options).await
365 }
366
367 async fn delete(&self, location: &Path) -> Result<()> {
368 if self.enabled() {
369 return self.instrumented_delete(location).await;
370 }
371
372 self.inner.delete(location).await
373 }
374
375 fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result<ObjectMeta>> {
376 if self.enabled() {
377 return self.instrumented_list(prefix);
378 }
379
380 self.inner.list(prefix)
381 }
382
383 async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
384 if self.enabled() {
385 return self.instrumented_list_with_delimiter(prefix).await;
386 }
387
388 self.inner.list_with_delimiter(prefix).await
389 }
390
391 async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
392 if self.enabled() {
393 return self.instrumented_copy(from, to).await;
394 }
395
396 self.inner.copy(from, to).await
397 }
398
399 async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
400 if self.enabled() {
401 return self.instrumented_copy_if_not_exists(from, to).await;
402 }
403
404 self.inner.copy_if_not_exists(from, to).await
405 }
406
407 async fn head(&self, location: &Path) -> Result<ObjectMeta> {
408 if self.enabled() {
409 return self.instrumented_head(location).await;
410 }
411
412 self.inner.head(location).await
413 }
414}
415
416#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash, PartialOrd, Ord)]
418pub enum Operation {
419 Copy,
420 Delete,
421 Get,
422 Head,
423 List,
424 Put,
425}
426
427impl fmt::Display for Operation {
428 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
429 write!(f, "{self:?}")
430 }
431}
432
433#[derive(Debug)]
435pub struct RequestDetails {
436 op: Operation,
437 path: Path,
438 timestamp: chrono::DateTime<Utc>,
439 duration: Option<Duration>,
440 size: Option<usize>,
441 range: Option<GetRange>,
442 extra_display: Option<String>,
443}
444
445impl fmt::Display for RequestDetails {
446 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
447 let mut output_parts = vec![format!(
448 "{} operation={:?}",
449 self.timestamp.to_rfc3339(),
450 self.op
451 )];
452
453 if let Some(d) = self.duration {
454 output_parts.push(format!("duration={:.6}s", d.as_secs_f32()));
455 }
456 if let Some(s) = self.size {
457 output_parts.push(format!("size={s}"));
458 }
459 if let Some(r) = &self.range {
460 output_parts.push(format!("range: {r}"));
461 }
462 output_parts.push(format!("path={}", self.path));
463
464 if let Some(ed) = &self.extra_display {
465 output_parts.push(ed.clone());
466 }
467
468 write!(f, "{}", output_parts.join(" "))
469 }
470}
471
472#[derive(Default)]
474pub struct RequestSummaries {
475 summaries: Vec<RequestSummary>,
476}
477
478impl fmt::Display for RequestSummaries {
480 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
481 match pretty_format_batches(&[self.to_batch()]) {
483 Err(e) => {
484 write!(f, "Error formatting summary: {e}")
485 }
486 Ok(displayable) => {
487 write!(f, "{displayable}")
488 }
489 }
490 }
491}
492
493impl RequestSummaries {
494 pub fn new(requests: &[RequestDetails]) -> Self {
496 let mut summaries: HashMap<Operation, RequestSummary> = HashMap::new();
497 for rd in requests {
498 match summaries.get_mut(&rd.op) {
499 Some(rs) => rs.push(rd),
500 None => {
501 let mut rs = RequestSummary::new(rd.op);
502 rs.push(rd);
503 summaries.insert(rd.op, rs);
504 }
505 }
506 }
507 let mut summaries: Vec<RequestSummary> = summaries.into_values().collect();
509 summaries.sort_by_key(|s| s.operation);
510 Self { summaries }
511 }
512
513 pub fn to_batch(&self) -> RecordBatch {
525 let operations: StringArray = self
526 .iter()
527 .flat_map(|s| std::iter::repeat_n(Some(s.operation.to_string()), 2))
528 .collect();
529 let metrics: StringArray = self
530 .iter()
531 .flat_map(|_s| [Some("duration"), Some("size")])
532 .collect();
533 let mins: StringArray = self
534 .stats_iter()
535 .flat_map(|(duration_stats, size_stats)| {
536 let dur_min =
537 duration_stats.map(|d| format!("{:.6}s", d.min.as_secs_f32()));
538 let size_min = size_stats.map(|s| format!("{} B", s.min));
539 [dur_min, size_min]
540 })
541 .collect();
542 let maxs: StringArray = self
543 .stats_iter()
544 .flat_map(|(duration_stats, size_stats)| {
545 let dur_max =
546 duration_stats.map(|d| format!("{:.6}s", d.max.as_secs_f32()));
547 let size_max = size_stats.map(|s| format!("{} B", s.max));
548 [dur_max, size_max]
549 })
550 .collect();
551 let avgs: StringArray = self
552 .iter()
553 .flat_map(|s| {
554 let count = s.count as f32;
555 let duration_stats = s.duration_stats.as_ref();
556 let size_stats = s.size_stats.as_ref();
557 let dur_avg = duration_stats.map(|d| {
558 let avg = d.sum.as_secs_f32() / count;
559 format!("{avg:.6}s")
560 });
561 let size_avg = size_stats.map(|s| {
562 let avg = s.sum as f32 / count;
563 format!("{avg} B")
564 });
565 [dur_avg, size_avg]
566 })
567 .collect();
568 let sums: StringArray = self
569 .stats_iter()
570 .flat_map(|(duration_stats, size_stats)| {
571 let dur_sum =
579 duration_stats.map(|d| format!("{:.6}s", d.sum.as_secs_f32()));
580 let size_sum = size_stats.map(|s| format!("{} B", s.sum));
581 [dur_sum, size_sum]
582 })
583 .collect();
584 let counts: StringArray = self
585 .iter()
586 .flat_map(|s| {
587 let count = s.count.to_string();
588 [Some(count.clone()), Some(count)]
589 })
590 .collect();
591
592 RecordBatch::try_from_iter(vec![
593 ("Operation", Arc::new(operations) as ArrayRef),
594 ("Metric", Arc::new(metrics) as ArrayRef),
595 ("min", Arc::new(mins) as ArrayRef),
596 ("max", Arc::new(maxs) as ArrayRef),
597 ("avg", Arc::new(avgs) as ArrayRef),
598 ("sum", Arc::new(sums) as ArrayRef),
599 ("count", Arc::new(counts) as ArrayRef),
600 ])
601 .expect("Created the batch correctly")
602 }
603
604 fn iter(&self) -> impl Iterator<Item = &RequestSummary> {
606 self.summaries.iter()
607 }
608
609 fn stats_iter(
612 &self,
613 ) -> impl Iterator<Item = (Option<&Stats<Duration>>, Option<&Stats<usize>>)> {
614 self.summaries
615 .iter()
616 .map(|s| (s.duration_stats.as_ref(), s.size_stats.as_ref()))
617 }
618}
619
620pub struct RequestSummary {
623 operation: Operation,
624 count: usize,
625 duration_stats: Option<Stats<Duration>>,
626 size_stats: Option<Stats<usize>>,
627}
628
629impl RequestSummary {
630 fn new(operation: Operation) -> Self {
631 Self {
632 operation,
633 count: 0,
634 duration_stats: None,
635 size_stats: None,
636 }
637 }
638 fn push(&mut self, request: &RequestDetails) {
639 self.count += 1;
640 if let Some(dur) = request.duration {
641 self.duration_stats.get_or_insert_default().push(dur)
642 }
643 if let Some(size) = request.size {
644 self.size_stats.get_or_insert_default().push(size)
645 }
646 }
647}
648
649struct Stats<T: Copy + Ord + AddAssign<T>> {
650 min: T,
651 max: T,
652 sum: T,
653}
654
655impl<T: Copy + Ord + AddAssign<T>> Stats<T> {
656 fn push(&mut self, val: T) {
657 self.min = cmp::min(val, self.min);
658 self.max = cmp::max(val, self.max);
659 self.sum += val;
660 }
661}
662
663impl Default for Stats<Duration> {
664 fn default() -> Self {
665 Self {
666 min: Duration::MAX,
667 max: Duration::ZERO,
668 sum: Duration::ZERO,
669 }
670 }
671}
672
673impl Default for Stats<usize> {
674 fn default() -> Self {
675 Self {
676 min: usize::MAX,
677 max: usize::MIN,
678 sum: 0,
679 }
680 }
681}
682
683#[derive(Debug)]
685pub struct InstrumentedObjectStoreRegistry {
686 inner: Arc<dyn ObjectStoreRegistry>,
687 instrument_mode: AtomicU8,
688 stores: RwLock<Vec<Arc<InstrumentedObjectStore>>>,
689}
690
691impl Default for InstrumentedObjectStoreRegistry {
692 fn default() -> Self {
693 Self::new()
694 }
695}
696
697impl InstrumentedObjectStoreRegistry {
698 pub fn new() -> Self {
701 Self {
702 inner: Arc::new(DefaultObjectStoreRegistry::new()),
703 instrument_mode: AtomicU8::new(InstrumentedObjectStoreMode::default() as u8),
704 stores: RwLock::new(Vec::new()),
705 }
706 }
707
708 pub fn with_profile_mode(self, mode: InstrumentedObjectStoreMode) -> Self {
709 self.instrument_mode.store(mode as u8, Ordering::Relaxed);
710 self
711 }
712
713 pub fn stores(&self) -> Vec<Arc<InstrumentedObjectStore>> {
716 self.stores.read().clone()
717 }
718
719 pub fn instrument_mode(&self) -> InstrumentedObjectStoreMode {
722 self.instrument_mode.load(Ordering::Relaxed).into()
723 }
724
725 pub fn set_instrument_mode(&self, mode: InstrumentedObjectStoreMode) {
727 self.instrument_mode.store(mode as u8, Ordering::Relaxed);
728 for s in self.stores.read().iter() {
729 s.set_instrument_mode(mode)
730 }
731 }
732}
733
734impl ObjectStoreRegistry for InstrumentedObjectStoreRegistry {
735 fn register_store(
736 &self,
737 url: &Url,
738 store: Arc<dyn ObjectStore>,
739 ) -> Option<Arc<dyn ObjectStore>> {
740 let mode = self.instrument_mode.load(Ordering::Relaxed);
741 let instrumented =
742 Arc::new(InstrumentedObjectStore::new(store, AtomicU8::new(mode)));
743 self.stores.write().push(Arc::clone(&instrumented));
744 self.inner.register_store(url, instrumented)
745 }
746
747 fn deregister_store(
748 &self,
749 url: &Url,
750 ) -> datafusion::common::Result<Arc<dyn ObjectStore>> {
751 self.inner.deregister_store(url)
752 }
753
754 fn get_store(&self, url: &Url) -> datafusion::common::Result<Arc<dyn ObjectStore>> {
755 self.inner.get_store(url)
756 }
757}
758
759#[cfg(test)]
760mod tests {
761 use object_store::WriteMultipart;
762
763 use super::*;
764 use insta::assert_snapshot;
765
766 #[test]
767 fn instrumented_mode() {
768 assert!(matches!(
769 InstrumentedObjectStoreMode::default(),
770 InstrumentedObjectStoreMode::Disabled
771 ));
772
773 assert!(matches!(
774 "dIsABleD".parse().unwrap(),
775 InstrumentedObjectStoreMode::Disabled
776 ));
777 assert!(matches!(
778 "SUmMaRy".parse().unwrap(),
779 InstrumentedObjectStoreMode::Summary
780 ));
781 assert!(matches!(
782 "TRaCe".parse().unwrap(),
783 InstrumentedObjectStoreMode::Trace
784 ));
785 assert!("does_not_exist"
786 .parse::<InstrumentedObjectStoreMode>()
787 .is_err());
788
789 assert!(matches!(0.into(), InstrumentedObjectStoreMode::Disabled));
790 assert!(matches!(1.into(), InstrumentedObjectStoreMode::Summary));
791 assert!(matches!(2.into(), InstrumentedObjectStoreMode::Trace));
792 assert!(matches!(3.into(), InstrumentedObjectStoreMode::Disabled));
793 }
794
795 #[test]
796 fn instrumented_registry() {
797 let mut reg = InstrumentedObjectStoreRegistry::new();
798 assert!(reg.stores().is_empty());
799 assert_eq!(
800 reg.instrument_mode(),
801 InstrumentedObjectStoreMode::default()
802 );
803
804 reg = reg.with_profile_mode(InstrumentedObjectStoreMode::Trace);
805 assert_eq!(reg.instrument_mode(), InstrumentedObjectStoreMode::Trace);
806
807 let store = object_store::memory::InMemory::new();
808 let url = "mem://test".parse().unwrap();
809 let registered = reg.register_store(&url, Arc::new(store));
810 assert!(registered.is_none());
811
812 let fetched = reg.get_store(&url);
813 assert!(fetched.is_ok());
814 assert_eq!(reg.stores().len(), 1);
815 }
816
817 async fn setup_test_store() -> (InstrumentedObjectStore, Path) {
820 let store = Arc::new(object_store::memory::InMemory::new());
821 let mode = AtomicU8::new(InstrumentedObjectStoreMode::default() as u8);
822 let instrumented = InstrumentedObjectStore::new(store, mode);
823
824 let path = Path::from("test/data");
826 let payload = PutPayload::from_static(b"test_data");
827 instrumented.put(&path, payload).await.unwrap();
828
829 (instrumented, path)
830 }
831
832 #[tokio::test]
833 async fn instrumented_store_get() {
834 let (instrumented, path) = setup_test_store().await;
835
836 assert!(instrumented.requests.lock().is_empty());
838 let _ = instrumented.get(&path).await.unwrap();
839 assert!(instrumented.requests.lock().is_empty());
840
841 instrumented.set_instrument_mode(InstrumentedObjectStoreMode::Trace);
842 assert!(instrumented.requests.lock().is_empty());
843 let _ = instrumented.get(&path).await.unwrap();
844 assert_eq!(instrumented.requests.lock().len(), 1);
845
846 let mut requests = instrumented.take_requests();
847 assert_eq!(requests.len(), 1);
848 assert!(instrumented.requests.lock().is_empty());
849
850 let request = requests.pop().unwrap();
851 assert_eq!(request.op, Operation::Get);
852 assert_eq!(request.path, path);
853 assert!(request.duration.is_some());
854 assert_eq!(request.size, Some(9));
855 assert_eq!(request.range, None);
856 assert!(request.extra_display.is_none());
857 }
858
859 #[tokio::test]
860 async fn instrumented_store_delete() {
861 let (instrumented, path) = setup_test_store().await;
862
863 assert!(instrumented.requests.lock().is_empty());
865 instrumented.delete(&path).await.unwrap();
866 assert!(instrumented.requests.lock().is_empty());
867
868 let (instrumented, path) = setup_test_store().await;
870 instrumented.set_instrument_mode(InstrumentedObjectStoreMode::Trace);
871 assert!(instrumented.requests.lock().is_empty());
872 instrumented.delete(&path).await.unwrap();
873 assert_eq!(instrumented.requests.lock().len(), 1);
874
875 let mut requests = instrumented.take_requests();
876 assert_eq!(requests.len(), 1);
877 assert!(instrumented.requests.lock().is_empty());
878
879 let request = requests.pop().unwrap();
880 assert_eq!(request.op, Operation::Delete);
881 assert_eq!(request.path, path);
882 assert!(request.duration.is_some());
883 assert!(request.size.is_none());
884 assert!(request.range.is_none());
885 assert!(request.extra_display.is_none());
886 }
887
888 #[tokio::test]
889 async fn instrumented_store_list() {
890 let (instrumented, path) = setup_test_store().await;
891
892 assert!(instrumented.requests.lock().is_empty());
894 let _ = instrumented.list(Some(&path));
895 assert!(instrumented.requests.lock().is_empty());
896
897 instrumented.set_instrument_mode(InstrumentedObjectStoreMode::Trace);
898 assert!(instrumented.requests.lock().is_empty());
899 let _ = instrumented.list(Some(&path));
900 assert_eq!(instrumented.requests.lock().len(), 1);
901
902 let request = instrumented.take_requests().pop().unwrap();
903 assert_eq!(request.op, Operation::List);
904 assert_eq!(request.path, path);
905 assert!(request.duration.is_none());
906 assert!(request.size.is_none());
907 assert!(request.range.is_none());
908 assert!(request.extra_display.is_none());
909 }
910
911 #[tokio::test]
912 async fn instrumented_store_list_with_delimiter() {
913 let (instrumented, path) = setup_test_store().await;
914
915 assert!(instrumented.requests.lock().is_empty());
917 let _ = instrumented.list_with_delimiter(Some(&path)).await.unwrap();
918 assert!(instrumented.requests.lock().is_empty());
919
920 instrumented.set_instrument_mode(InstrumentedObjectStoreMode::Trace);
921 assert!(instrumented.requests.lock().is_empty());
922 let _ = instrumented.list_with_delimiter(Some(&path)).await.unwrap();
923 assert_eq!(instrumented.requests.lock().len(), 1);
924
925 let request = instrumented.take_requests().pop().unwrap();
926 assert_eq!(request.op, Operation::List);
927 assert_eq!(request.path, path);
928 assert!(request.duration.is_some());
929 assert!(request.size.is_none());
930 assert!(request.range.is_none());
931 assert!(request.extra_display.is_none());
932 }
933
934 #[tokio::test]
935 async fn instrumented_store_put_opts() {
936 let store = Arc::new(object_store::memory::InMemory::new());
939 let mode = AtomicU8::new(InstrumentedObjectStoreMode::default() as u8);
940 let instrumented = InstrumentedObjectStore::new(store, mode);
941
942 let path = Path::from("test/data");
943 let payload = PutPayload::from_static(b"test_data");
944 let size = payload.content_length();
945
946 assert!(instrumented.requests.lock().is_empty());
948 instrumented.put(&path, payload.clone()).await.unwrap();
949 assert!(instrumented.requests.lock().is_empty());
950
951 instrumented.set_instrument_mode(InstrumentedObjectStoreMode::Trace);
952 assert!(instrumented.requests.lock().is_empty());
953 instrumented.put(&path, payload).await.unwrap();
954 assert_eq!(instrumented.requests.lock().len(), 1);
955
956 let request = instrumented.take_requests().pop().unwrap();
957 assert_eq!(request.op, Operation::Put);
958 assert_eq!(request.path, path);
959 assert!(request.duration.is_some());
960 assert_eq!(request.size.unwrap(), size);
961 assert!(request.range.is_none());
962 assert!(request.extra_display.is_none());
963 }
964
965 #[tokio::test]
966 async fn instrumented_store_put_multipart() {
967 let store = Arc::new(object_store::memory::InMemory::new());
970 let mode = AtomicU8::new(InstrumentedObjectStoreMode::default() as u8);
971 let instrumented = InstrumentedObjectStore::new(store, mode);
972
973 let path = Path::from("test/data");
974
975 assert!(instrumented.requests.lock().is_empty());
977 let mp = instrumented.put_multipart(&path).await.unwrap();
978 let mut write = WriteMultipart::new(mp);
979 write.write(b"test_data");
980 write.finish().await.unwrap();
981 assert!(instrumented.requests.lock().is_empty());
982
983 instrumented.set_instrument_mode(InstrumentedObjectStoreMode::Trace);
984 assert!(instrumented.requests.lock().is_empty());
985 let mp = instrumented.put_multipart(&path).await.unwrap();
986 let mut write = WriteMultipart::new(mp);
987 write.write(b"test_data");
988 write.finish().await.unwrap();
989 assert_eq!(instrumented.requests.lock().len(), 1);
990
991 let request = instrumented.take_requests().pop().unwrap();
992 assert_eq!(request.op, Operation::Put);
993 assert_eq!(request.path, path);
994 assert!(request.duration.is_some());
995 assert!(request.size.is_none());
996 assert!(request.range.is_none());
997 assert!(request.extra_display.is_none());
998 }
999
1000 #[tokio::test]
1001 async fn instrumented_store_copy() {
1002 let (instrumented, path) = setup_test_store().await;
1003 let copy_to = Path::from("test/copied");
1004
1005 assert!(instrumented.requests.lock().is_empty());
1007 instrumented.copy(&path, ©_to).await.unwrap();
1008 assert!(instrumented.requests.lock().is_empty());
1009
1010 instrumented.set_instrument_mode(InstrumentedObjectStoreMode::Trace);
1011 assert!(instrumented.requests.lock().is_empty());
1012 instrumented.copy(&path, ©_to).await.unwrap();
1013 assert_eq!(instrumented.requests.lock().len(), 1);
1014
1015 let mut requests = instrumented.take_requests();
1016 assert_eq!(requests.len(), 1);
1017 assert!(instrumented.requests.lock().is_empty());
1018
1019 let request = requests.pop().unwrap();
1020 assert_eq!(request.op, Operation::Copy);
1021 assert_eq!(request.path, path);
1022 assert!(request.duration.is_some());
1023 assert!(request.size.is_none());
1024 assert!(request.range.is_none());
1025 assert_eq!(
1026 request.extra_display.unwrap(),
1027 format!("copy_to: {copy_to}")
1028 );
1029 }
1030
1031 #[tokio::test]
1032 async fn instrumented_store_copy_if_not_exists() {
1033 let (instrumented, path) = setup_test_store().await;
1034 let mut copy_to = Path::from("test/copied");
1035
1036 assert!(instrumented.requests.lock().is_empty());
1038 instrumented
1039 .copy_if_not_exists(&path, ©_to)
1040 .await
1041 .unwrap();
1042 assert!(instrumented.requests.lock().is_empty());
1043
1044 copy_to = Path::from("test/copied_again");
1046 instrumented.set_instrument_mode(InstrumentedObjectStoreMode::Trace);
1047 assert!(instrumented.requests.lock().is_empty());
1048 instrumented
1049 .copy_if_not_exists(&path, ©_to)
1050 .await
1051 .unwrap();
1052 assert_eq!(instrumented.requests.lock().len(), 1);
1053
1054 let mut requests = instrumented.take_requests();
1055 assert_eq!(requests.len(), 1);
1056 assert!(instrumented.requests.lock().is_empty());
1057
1058 let request = requests.pop().unwrap();
1059 assert_eq!(request.op, Operation::Copy);
1060 assert_eq!(request.path, path);
1061 assert!(request.duration.is_some());
1062 assert!(request.size.is_none());
1063 assert!(request.range.is_none());
1064 assert_eq!(
1065 request.extra_display.unwrap(),
1066 format!("copy_to: {copy_to}")
1067 );
1068 }
1069
1070 #[tokio::test]
1071 async fn instrumented_store_head() {
1072 let (instrumented, path) = setup_test_store().await;
1073
1074 assert!(instrumented.requests.lock().is_empty());
1076 let _ = instrumented.head(&path).await.unwrap();
1077 assert!(instrumented.requests.lock().is_empty());
1078
1079 instrumented.set_instrument_mode(InstrumentedObjectStoreMode::Trace);
1080 assert!(instrumented.requests.lock().is_empty());
1081 let _ = instrumented.head(&path).await.unwrap();
1082 assert_eq!(instrumented.requests.lock().len(), 1);
1083
1084 let mut requests = instrumented.take_requests();
1085 assert_eq!(requests.len(), 1);
1086 assert!(instrumented.requests.lock().is_empty());
1087
1088 let request = requests.pop().unwrap();
1089 assert_eq!(request.op, Operation::Head);
1090 assert_eq!(request.path, path);
1091 assert!(request.duration.is_some());
1092 assert!(request.size.is_none());
1093 assert!(request.range.is_none());
1094 assert!(request.extra_display.is_none());
1095 }
1096
1097 #[test]
1098 fn request_details() {
1099 let rd = RequestDetails {
1100 op: Operation::Get,
1101 path: Path::from("test"),
1102 timestamp: chrono::DateTime::from_timestamp(0, 0).unwrap(),
1103 duration: Some(Duration::new(5, 0)),
1104 size: Some(10),
1105 range: Some((..10).into()),
1106 extra_display: Some(String::from("extra info")),
1107 };
1108
1109 assert_eq!(
1110 format!("{rd}"),
1111 "1970-01-01T00:00:00+00:00 operation=Get duration=5.000000s size=10 range: bytes=0-9 path=test extra info"
1112 );
1113 }
1114
1115 #[test]
1116 fn request_summary() {
1117 let mut requests = Vec::new();
1119 assert_snapshot!(RequestSummaries::new(&requests), @r"
1120 +-----------+--------+-----+-----+-----+-----+-------+
1121 | Operation | Metric | min | max | avg | sum | count |
1122 +-----------+--------+-----+-----+-----+-----+-------+
1123 +-----------+--------+-----+-----+-----+-----+-------+
1124 ");
1125
1126 requests.push(RequestDetails {
1127 op: Operation::Get,
1128 path: Path::from("test1"),
1129 timestamp: chrono::DateTime::from_timestamp(0, 0).unwrap(),
1130 duration: Some(Duration::from_secs(5)),
1131 size: Some(100),
1132 range: None,
1133 extra_display: None,
1134 });
1135
1136 assert_snapshot!(RequestSummaries::new(&requests), @r"
1137 +-----------+----------+-----------+-----------+-----------+-----------+-------+
1138 | Operation | Metric | min | max | avg | sum | count |
1139 +-----------+----------+-----------+-----------+-----------+-----------+-------+
1140 | Get | duration | 5.000000s | 5.000000s | 5.000000s | 5.000000s | 1 |
1141 | Get | size | 100 B | 100 B | 100 B | 100 B | 1 |
1142 +-----------+----------+-----------+-----------+-----------+-----------+-------+
1143 ");
1144
1145 requests.push(RequestDetails {
1147 op: Operation::Get,
1148 path: Path::from("test2"),
1149 timestamp: chrono::DateTime::from_timestamp(1, 0).unwrap(),
1150 duration: Some(Duration::from_secs(8)),
1151 size: Some(150),
1152 range: None,
1153 extra_display: None,
1154 });
1155 requests.push(RequestDetails {
1156 op: Operation::Get,
1157 path: Path::from("test3"),
1158 timestamp: chrono::DateTime::from_timestamp(2, 0).unwrap(),
1159 duration: Some(Duration::from_secs(2)),
1160 size: Some(50),
1161 range: None,
1162 extra_display: None,
1163 });
1164 assert_snapshot!(RequestSummaries::new(&requests), @r"
1165 +-----------+----------+-----------+-----------+-----------+------------+-------+
1166 | Operation | Metric | min | max | avg | sum | count |
1167 +-----------+----------+-----------+-----------+-----------+------------+-------+
1168 | Get | duration | 2.000000s | 8.000000s | 5.000000s | 15.000000s | 3 |
1169 | Get | size | 50 B | 150 B | 100 B | 300 B | 3 |
1170 +-----------+----------+-----------+-----------+-----------+------------+-------+
1171 ");
1172
1173 requests.push(RequestDetails {
1175 op: Operation::Put,
1176 path: Path::from("test4"),
1177 timestamp: chrono::DateTime::from_timestamp(3, 0).unwrap(),
1178 duration: Some(Duration::from_millis(200)),
1179 size: Some(75),
1180 range: None,
1181 extra_display: None,
1182 });
1183
1184 assert_snapshot!(RequestSummaries::new(&requests), @r"
1185 +-----------+----------+-----------+-----------+-----------+------------+-------+
1186 | Operation | Metric | min | max | avg | sum | count |
1187 +-----------+----------+-----------+-----------+-----------+------------+-------+
1188 | Get | duration | 2.000000s | 8.000000s | 5.000000s | 15.000000s | 3 |
1189 | Get | size | 50 B | 150 B | 100 B | 300 B | 3 |
1190 | Put | duration | 0.200000s | 0.200000s | 0.200000s | 0.200000s | 1 |
1191 | Put | size | 75 B | 75 B | 75 B | 75 B | 1 |
1192 +-----------+----------+-----------+-----------+-----------+------------+-------+
1193 ");
1194 }
1195
1196 #[test]
1197 fn request_summary_only_duration() {
1198 let only_duration = vec![RequestDetails {
1200 op: Operation::Get,
1201 path: Path::from("test1"),
1202 timestamp: chrono::DateTime::from_timestamp(0, 0).unwrap(),
1203 duration: Some(Duration::from_secs(3)),
1204 size: None,
1205 range: None,
1206 extra_display: None,
1207 }];
1208 assert_snapshot!(RequestSummaries::new(&only_duration), @r"
1209 +-----------+----------+-----------+-----------+-----------+-----------+-------+
1210 | Operation | Metric | min | max | avg | sum | count |
1211 +-----------+----------+-----------+-----------+-----------+-----------+-------+
1212 | Get | duration | 3.000000s | 3.000000s | 3.000000s | 3.000000s | 1 |
1213 | Get | size | | | | | 1 |
1214 +-----------+----------+-----------+-----------+-----------+-----------+-------+
1215 ");
1216 }
1217
1218 #[test]
1219 fn request_summary_only_size() {
1220 let only_size = vec![RequestDetails {
1222 op: Operation::Get,
1223 path: Path::from("test1"),
1224 timestamp: chrono::DateTime::from_timestamp(0, 0).unwrap(),
1225 duration: None,
1226 size: Some(200),
1227 range: None,
1228 extra_display: None,
1229 }];
1230 assert_snapshot!(RequestSummaries::new(&only_size), @r"
1231 +-----------+----------+-------+-------+-------+-------+-------+
1232 | Operation | Metric | min | max | avg | sum | count |
1233 +-----------+----------+-------+-------+-------+-------+-------+
1234 | Get | duration | | | | | 1 |
1235 | Get | size | 200 B | 200 B | 200 B | 200 B | 1 |
1236 +-----------+----------+-------+-------+-------+-------+-------+
1237 ");
1238 }
1239
1240 #[test]
1241 fn request_summary_neither_duration_or_size() {
1242 let no_stats = vec![RequestDetails {
1244 op: Operation::Get,
1245 path: Path::from("test1"),
1246 timestamp: chrono::DateTime::from_timestamp(0, 0).unwrap(),
1247 duration: None,
1248 size: None,
1249 range: None,
1250 extra_display: None,
1251 }];
1252 assert_snapshot!(RequestSummaries::new(&no_stats), @r"
1253 +-----------+----------+-----+-----+-----+-----+-------+
1254 | Operation | Metric | min | max | avg | sum | count |
1255 +-----------+----------+-----+-----+-----+-----+-------+
1256 | Get | duration | | | | | 1 |
1257 | Get | size | | | | | 1 |
1258 +-----------+----------+-----+-----+-----+-----+-------+
1259 ");
1260 }
1261}