1use std::{
19 cmp, fmt,
20 ops::AddAssign,
21 str::FromStr,
22 sync::{
23 Arc,
24 atomic::{AtomicU8, Ordering},
25 },
26 time::Duration,
27};
28
29use arrow::array::{ArrayRef, RecordBatch, StringArray};
30use arrow::util::pretty::pretty_format_batches;
31use async_trait::async_trait;
32use chrono::Utc;
33use datafusion::{
34 common::{HashMap, instant::Instant},
35 error::DataFusionError,
36 execution::object_store::{DefaultObjectStoreRegistry, ObjectStoreRegistry},
37};
38use futures::stream::{BoxStream, Stream};
39use object_store::{
40 GetOptions, GetRange, GetResult, ListResult, MultipartUpload, ObjectMeta,
41 ObjectStore, PutMultipartOptions, PutOptions, PutPayload, PutResult, Result,
42 path::Path,
43};
44use parking_lot::{Mutex, RwLock};
45use url::Url;
46
47struct TimeToFirstItemStream<S> {
49 inner: S,
50 start: Instant,
51 request_index: usize,
52 requests: Arc<Mutex<Vec<RequestDetails>>>,
53 first_item_yielded: bool,
54}
55
56impl<S> TimeToFirstItemStream<S> {
57 fn new(
58 inner: S,
59 start: Instant,
60 request_index: usize,
61 requests: Arc<Mutex<Vec<RequestDetails>>>,
62 ) -> Self {
63 Self {
64 inner,
65 start,
66 request_index,
67 requests,
68 first_item_yielded: false,
69 }
70 }
71}
72
73impl<S> Stream for TimeToFirstItemStream<S>
74where
75 S: Stream<Item = Result<ObjectMeta>> + Unpin,
76{
77 type Item = Result<ObjectMeta>;
78
79 fn poll_next(
80 mut self: std::pin::Pin<&mut Self>,
81 cx: &mut std::task::Context<'_>,
82 ) -> std::task::Poll<Option<Self::Item>> {
83 let poll_result = std::pin::Pin::new(&mut self.inner).poll_next(cx);
84
85 if !self.first_item_yielded && poll_result.is_ready() {
86 self.first_item_yielded = true;
87 let elapsed = self.start.elapsed();
88
89 let mut requests = self.requests.lock();
90 if let Some(request) = requests.get_mut(self.request_index) {
91 request.duration = Some(elapsed);
92 }
93 }
94
95 poll_result
96 }
97}
98
99#[derive(Copy, Clone, Debug, Default, PartialEq, Eq)]
102pub enum InstrumentedObjectStoreMode {
103 #[default]
105 Disabled,
106 Summary,
108 Trace,
110}
111
112impl fmt::Display for InstrumentedObjectStoreMode {
113 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
114 write!(f, "{self:?}")
115 }
116}
117
118impl FromStr for InstrumentedObjectStoreMode {
119 type Err = DataFusionError;
120
121 fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
122 match s.to_lowercase().as_str() {
123 "disabled" => Ok(Self::Disabled),
124 "summary" => Ok(Self::Summary),
125 "trace" => Ok(Self::Trace),
126 _ => Err(DataFusionError::Execution(format!("Unrecognized mode {s}"))),
127 }
128 }
129}
130
131impl From<u8> for InstrumentedObjectStoreMode {
132 fn from(value: u8) -> Self {
133 match value {
134 1 => InstrumentedObjectStoreMode::Summary,
135 2 => InstrumentedObjectStoreMode::Trace,
136 _ => InstrumentedObjectStoreMode::Disabled,
137 }
138 }
139}
140
141#[derive(Debug)]
144pub struct InstrumentedObjectStore {
145 inner: Arc<dyn ObjectStore>,
146 instrument_mode: AtomicU8,
147 requests: Arc<Mutex<Vec<RequestDetails>>>,
148}
149
150impl InstrumentedObjectStore {
151 fn new(object_store: Arc<dyn ObjectStore>, instrument_mode: AtomicU8) -> Self {
153 Self {
154 inner: object_store,
155 instrument_mode,
156 requests: Arc::new(Mutex::new(Vec::new())),
157 }
158 }
159
160 fn set_instrument_mode(&self, mode: InstrumentedObjectStoreMode) {
161 self.instrument_mode.store(mode as u8, Ordering::Relaxed)
162 }
163
164 pub fn take_requests(&self) -> Vec<RequestDetails> {
167 let mut req = self.requests.lock();
168
169 req.drain(..).collect()
170 }
171
172 fn enabled(&self) -> bool {
173 self.instrument_mode.load(Ordering::Relaxed)
174 != InstrumentedObjectStoreMode::Disabled as u8
175 }
176
177 async fn instrumented_put_opts(
178 &self,
179 location: &Path,
180 payload: PutPayload,
181 opts: PutOptions,
182 ) -> Result<PutResult> {
183 let timestamp = Utc::now();
184 let start = Instant::now();
185 let size = payload.content_length();
186 let ret = self.inner.put_opts(location, payload, opts).await?;
187 let elapsed = start.elapsed();
188
189 self.requests.lock().push(RequestDetails {
190 op: Operation::Put,
191 path: location.clone(),
192 timestamp,
193 duration: Some(elapsed),
194 size: Some(size),
195 range: None,
196 extra_display: None,
197 });
198
199 Ok(ret)
200 }
201
202 async fn instrumented_put_multipart(
203 &self,
204 location: &Path,
205 opts: PutMultipartOptions,
206 ) -> Result<Box<dyn MultipartUpload>> {
207 let timestamp = Utc::now();
208 let start = Instant::now();
209 let ret = self.inner.put_multipart_opts(location, opts).await?;
210 let elapsed = start.elapsed();
211
212 self.requests.lock().push(RequestDetails {
213 op: Operation::Put,
214 path: location.clone(),
215 timestamp,
216 duration: Some(elapsed),
217 size: None,
218 range: None,
219 extra_display: None,
220 });
221
222 Ok(ret)
223 }
224
225 async fn instrumented_get_opts(
226 &self,
227 location: &Path,
228 options: GetOptions,
229 ) -> Result<GetResult> {
230 let timestamp = Utc::now();
231 let range = options.range.clone();
232
233 let start = Instant::now();
234 let ret = self.inner.get_opts(location, options).await?;
235 let elapsed = start.elapsed();
236
237 self.requests.lock().push(RequestDetails {
238 op: Operation::Get,
239 path: location.clone(),
240 timestamp,
241 duration: Some(elapsed),
242 size: Some((ret.range.end - ret.range.start) as usize),
243 range,
244 extra_display: None,
245 });
246
247 Ok(ret)
248 }
249
250 async fn instrumented_delete(&self, location: &Path) -> Result<()> {
251 let timestamp = Utc::now();
252 let start = Instant::now();
253 self.inner.delete(location).await?;
254 let elapsed = start.elapsed();
255
256 self.requests.lock().push(RequestDetails {
257 op: Operation::Delete,
258 path: location.clone(),
259 timestamp,
260 duration: Some(elapsed),
261 size: None,
262 range: None,
263 extra_display: None,
264 });
265
266 Ok(())
267 }
268
269 fn instrumented_list(
270 &self,
271 prefix: Option<&Path>,
272 ) -> BoxStream<'static, Result<ObjectMeta>> {
273 let timestamp = Utc::now();
274 let start = Instant::now();
275 let inner_stream = self.inner.list(prefix);
276
277 let request_index = {
278 let mut requests = self.requests.lock();
279 requests.push(RequestDetails {
280 op: Operation::List,
281 path: prefix.cloned().unwrap_or_else(|| Path::from("")),
282 timestamp,
283 duration: None,
284 size: None,
285 range: None,
286 extra_display: None,
287 });
288 requests.len() - 1
289 };
290
291 let wrapped_stream = TimeToFirstItemStream::new(
292 inner_stream,
293 start,
294 request_index,
295 Arc::clone(&self.requests),
296 );
297
298 Box::pin(wrapped_stream)
299 }
300
301 async fn instrumented_list_with_delimiter(
302 &self,
303 prefix: Option<&Path>,
304 ) -> Result<ListResult> {
305 let timestamp = Utc::now();
306 let start = Instant::now();
307 let ret = self.inner.list_with_delimiter(prefix).await?;
308 let elapsed = start.elapsed();
309
310 self.requests.lock().push(RequestDetails {
311 op: Operation::List,
312 path: prefix.cloned().unwrap_or_else(|| Path::from("")),
313 timestamp,
314 duration: Some(elapsed),
315 size: None,
316 range: None,
317 extra_display: None,
318 });
319
320 Ok(ret)
321 }
322
323 async fn instrumented_copy(&self, from: &Path, to: &Path) -> Result<()> {
324 let timestamp = Utc::now();
325 let start = Instant::now();
326 self.inner.copy(from, to).await?;
327 let elapsed = start.elapsed();
328
329 self.requests.lock().push(RequestDetails {
330 op: Operation::Copy,
331 path: from.clone(),
332 timestamp,
333 duration: Some(elapsed),
334 size: None,
335 range: None,
336 extra_display: Some(format!("copy_to: {to}")),
337 });
338
339 Ok(())
340 }
341
342 async fn instrumented_copy_if_not_exists(
343 &self,
344 from: &Path,
345 to: &Path,
346 ) -> Result<()> {
347 let timestamp = Utc::now();
348 let start = Instant::now();
349 self.inner.copy_if_not_exists(from, to).await?;
350 let elapsed = start.elapsed();
351
352 self.requests.lock().push(RequestDetails {
353 op: Operation::Copy,
354 path: from.clone(),
355 timestamp,
356 duration: Some(elapsed),
357 size: None,
358 range: None,
359 extra_display: Some(format!("copy_to: {to}")),
360 });
361
362 Ok(())
363 }
364
365 async fn instrumented_head(&self, location: &Path) -> Result<ObjectMeta> {
366 let timestamp = Utc::now();
367 let start = Instant::now();
368 let ret = self.inner.head(location).await?;
369 let elapsed = start.elapsed();
370
371 self.requests.lock().push(RequestDetails {
372 op: Operation::Head,
373 path: location.clone(),
374 timestamp,
375 duration: Some(elapsed),
376 size: None,
377 range: None,
378 extra_display: None,
379 });
380
381 Ok(ret)
382 }
383}
384
385impl fmt::Display for InstrumentedObjectStore {
386 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
387 let mode: InstrumentedObjectStoreMode =
388 self.instrument_mode.load(Ordering::Relaxed).into();
389 write!(
390 f,
391 "Instrumented Object Store: instrument_mode: {mode}, inner: {}",
392 self.inner
393 )
394 }
395}
396
397#[async_trait]
398impl ObjectStore for InstrumentedObjectStore {
399 async fn put_opts(
400 &self,
401 location: &Path,
402 payload: PutPayload,
403 opts: PutOptions,
404 ) -> Result<PutResult> {
405 if self.enabled() {
406 return self.instrumented_put_opts(location, payload, opts).await;
407 }
408
409 self.inner.put_opts(location, payload, opts).await
410 }
411
412 async fn put_multipart_opts(
413 &self,
414 location: &Path,
415 opts: PutMultipartOptions,
416 ) -> Result<Box<dyn MultipartUpload>> {
417 if self.enabled() {
418 return self.instrumented_put_multipart(location, opts).await;
419 }
420
421 self.inner.put_multipart_opts(location, opts).await
422 }
423
424 async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
425 if self.enabled() {
426 return self.instrumented_get_opts(location, options).await;
427 }
428
429 self.inner.get_opts(location, options).await
430 }
431
432 async fn delete(&self, location: &Path) -> Result<()> {
433 if self.enabled() {
434 return self.instrumented_delete(location).await;
435 }
436
437 self.inner.delete(location).await
438 }
439
440 fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result<ObjectMeta>> {
441 if self.enabled() {
442 return self.instrumented_list(prefix);
443 }
444
445 self.inner.list(prefix)
446 }
447
448 async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
449 if self.enabled() {
450 return self.instrumented_list_with_delimiter(prefix).await;
451 }
452
453 self.inner.list_with_delimiter(prefix).await
454 }
455
456 async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
457 if self.enabled() {
458 return self.instrumented_copy(from, to).await;
459 }
460
461 self.inner.copy(from, to).await
462 }
463
464 async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
465 if self.enabled() {
466 return self.instrumented_copy_if_not_exists(from, to).await;
467 }
468
469 self.inner.copy_if_not_exists(from, to).await
470 }
471
472 async fn head(&self, location: &Path) -> Result<ObjectMeta> {
473 if self.enabled() {
474 return self.instrumented_head(location).await;
475 }
476
477 self.inner.head(location).await
478 }
479}
480
481#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash, PartialOrd, Ord)]
483pub enum Operation {
484 Copy,
485 Delete,
486 Get,
487 Head,
488 List,
489 Put,
490}
491
492impl fmt::Display for Operation {
493 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
494 write!(f, "{self:?}")
495 }
496}
497
498#[derive(Debug)]
500pub struct RequestDetails {
501 op: Operation,
502 path: Path,
503 timestamp: chrono::DateTime<Utc>,
504 duration: Option<Duration>,
505 size: Option<usize>,
506 range: Option<GetRange>,
507 extra_display: Option<String>,
508}
509
510impl fmt::Display for RequestDetails {
511 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
512 let mut output_parts = vec![format!(
513 "{} operation={:?}",
514 self.timestamp.to_rfc3339(),
515 self.op
516 )];
517
518 if let Some(d) = self.duration {
519 output_parts.push(format!("duration={:.6}s", d.as_secs_f32()));
520 }
521 if let Some(s) = self.size {
522 output_parts.push(format!("size={s}"));
523 }
524 if let Some(r) = &self.range {
525 output_parts.push(format!("range: {r}"));
526 }
527 output_parts.push(format!("path={}", self.path));
528
529 if let Some(ed) = &self.extra_display {
530 output_parts.push(ed.clone());
531 }
532
533 write!(f, "{}", output_parts.join(" "))
534 }
535}
536
537#[derive(Default)]
539pub struct RequestSummaries {
540 summaries: Vec<RequestSummary>,
541}
542
543impl fmt::Display for RequestSummaries {
545 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
546 match pretty_format_batches(&[self.to_batch()]) {
548 Err(e) => {
549 write!(f, "Error formatting summary: {e}")
550 }
551 Ok(displayable) => {
552 write!(f, "{displayable}")
553 }
554 }
555 }
556}
557
558impl RequestSummaries {
559 pub fn new(requests: &[RequestDetails]) -> Self {
561 let mut summaries: HashMap<Operation, RequestSummary> = HashMap::new();
562 for rd in requests {
563 match summaries.get_mut(&rd.op) {
564 Some(rs) => rs.push(rd),
565 None => {
566 let mut rs = RequestSummary::new(rd.op);
567 rs.push(rd);
568 summaries.insert(rd.op, rs);
569 }
570 }
571 }
572 let mut summaries: Vec<RequestSummary> = summaries.into_values().collect();
574 summaries.sort_by_key(|s| s.operation);
575 Self { summaries }
576 }
577
578 pub fn to_batch(&self) -> RecordBatch {
590 let operations: StringArray = self
591 .iter()
592 .flat_map(|s| std::iter::repeat_n(Some(s.operation.to_string()), 2))
593 .collect();
594 let metrics: StringArray = self
595 .iter()
596 .flat_map(|_s| [Some("duration"), Some("size")])
597 .collect();
598 let mins: StringArray = self
599 .stats_iter()
600 .flat_map(|(duration_stats, size_stats)| {
601 let dur_min =
602 duration_stats.map(|d| format!("{:.6}s", d.min.as_secs_f32()));
603 let size_min = size_stats.map(|s| format!("{} B", s.min));
604 [dur_min, size_min]
605 })
606 .collect();
607 let maxs: StringArray = self
608 .stats_iter()
609 .flat_map(|(duration_stats, size_stats)| {
610 let dur_max =
611 duration_stats.map(|d| format!("{:.6}s", d.max.as_secs_f32()));
612 let size_max = size_stats.map(|s| format!("{} B", s.max));
613 [dur_max, size_max]
614 })
615 .collect();
616 let avgs: StringArray = self
617 .iter()
618 .flat_map(|s| {
619 let count = s.count as f32;
620 let duration_stats = s.duration_stats.as_ref();
621 let size_stats = s.size_stats.as_ref();
622 let dur_avg = duration_stats.map(|d| {
623 let avg = d.sum.as_secs_f32() / count;
624 format!("{avg:.6}s")
625 });
626 let size_avg = size_stats.map(|s| {
627 let avg = s.sum as f32 / count;
628 format!("{avg} B")
629 });
630 [dur_avg, size_avg]
631 })
632 .collect();
633 let sums: StringArray = self
634 .stats_iter()
635 .flat_map(|(duration_stats, size_stats)| {
636 let dur_sum =
644 duration_stats.map(|d| format!("{:.6}s", d.sum.as_secs_f32()));
645 let size_sum = size_stats.map(|s| format!("{} B", s.sum));
646 [dur_sum, size_sum]
647 })
648 .collect();
649 let counts: StringArray = self
650 .iter()
651 .flat_map(|s| {
652 let count = s.count.to_string();
653 [Some(count.clone()), Some(count)]
654 })
655 .collect();
656
657 RecordBatch::try_from_iter(vec![
658 ("Operation", Arc::new(operations) as ArrayRef),
659 ("Metric", Arc::new(metrics) as ArrayRef),
660 ("min", Arc::new(mins) as ArrayRef),
661 ("max", Arc::new(maxs) as ArrayRef),
662 ("avg", Arc::new(avgs) as ArrayRef),
663 ("sum", Arc::new(sums) as ArrayRef),
664 ("count", Arc::new(counts) as ArrayRef),
665 ])
666 .expect("Created the batch correctly")
667 }
668
669 fn iter(&self) -> impl Iterator<Item = &RequestSummary> {
671 self.summaries.iter()
672 }
673
674 fn stats_iter(
677 &self,
678 ) -> impl Iterator<Item = (Option<&Stats<Duration>>, Option<&Stats<usize>>)> {
679 self.summaries
680 .iter()
681 .map(|s| (s.duration_stats.as_ref(), s.size_stats.as_ref()))
682 }
683}
684
685pub struct RequestSummary {
688 operation: Operation,
689 count: usize,
690 duration_stats: Option<Stats<Duration>>,
691 size_stats: Option<Stats<usize>>,
692}
693
694impl RequestSummary {
695 fn new(operation: Operation) -> Self {
696 Self {
697 operation,
698 count: 0,
699 duration_stats: None,
700 size_stats: None,
701 }
702 }
703 fn push(&mut self, request: &RequestDetails) {
704 self.count += 1;
705 if let Some(dur) = request.duration {
706 self.duration_stats.get_or_insert_default().push(dur)
707 }
708 if let Some(size) = request.size {
709 self.size_stats.get_or_insert_default().push(size)
710 }
711 }
712}
713
714struct Stats<T: Copy + Ord + AddAssign<T>> {
715 min: T,
716 max: T,
717 sum: T,
718}
719
720impl<T: Copy + Ord + AddAssign<T>> Stats<T> {
721 fn push(&mut self, val: T) {
722 self.min = cmp::min(val, self.min);
723 self.max = cmp::max(val, self.max);
724 self.sum += val;
725 }
726}
727
728impl Default for Stats<Duration> {
729 fn default() -> Self {
730 Self {
731 min: Duration::MAX,
732 max: Duration::ZERO,
733 sum: Duration::ZERO,
734 }
735 }
736}
737
738impl Default for Stats<usize> {
739 fn default() -> Self {
740 Self {
741 min: usize::MAX,
742 max: usize::MIN,
743 sum: 0,
744 }
745 }
746}
747
748#[derive(Debug)]
750pub struct InstrumentedObjectStoreRegistry {
751 inner: Arc<dyn ObjectStoreRegistry>,
752 instrument_mode: AtomicU8,
753 stores: RwLock<Vec<Arc<InstrumentedObjectStore>>>,
754}
755
756impl Default for InstrumentedObjectStoreRegistry {
757 fn default() -> Self {
758 Self::new()
759 }
760}
761
762impl InstrumentedObjectStoreRegistry {
763 pub fn new() -> Self {
766 Self {
767 inner: Arc::new(DefaultObjectStoreRegistry::new()),
768 instrument_mode: AtomicU8::new(InstrumentedObjectStoreMode::default() as u8),
769 stores: RwLock::new(Vec::new()),
770 }
771 }
772
773 pub fn with_profile_mode(self, mode: InstrumentedObjectStoreMode) -> Self {
774 self.instrument_mode.store(mode as u8, Ordering::Relaxed);
775 self
776 }
777
778 pub fn stores(&self) -> Vec<Arc<InstrumentedObjectStore>> {
781 self.stores.read().clone()
782 }
783
784 pub fn instrument_mode(&self) -> InstrumentedObjectStoreMode {
787 self.instrument_mode.load(Ordering::Relaxed).into()
788 }
789
790 pub fn set_instrument_mode(&self, mode: InstrumentedObjectStoreMode) {
792 self.instrument_mode.store(mode as u8, Ordering::Relaxed);
793 for s in self.stores.read().iter() {
794 s.set_instrument_mode(mode)
795 }
796 }
797}
798
799impl ObjectStoreRegistry for InstrumentedObjectStoreRegistry {
800 fn register_store(
801 &self,
802 url: &Url,
803 store: Arc<dyn ObjectStore>,
804 ) -> Option<Arc<dyn ObjectStore>> {
805 let mode = self.instrument_mode.load(Ordering::Relaxed);
806 let instrumented =
807 Arc::new(InstrumentedObjectStore::new(store, AtomicU8::new(mode)));
808 self.stores.write().push(Arc::clone(&instrumented));
809 self.inner.register_store(url, instrumented)
810 }
811
812 fn deregister_store(
813 &self,
814 url: &Url,
815 ) -> datafusion::common::Result<Arc<dyn ObjectStore>> {
816 self.inner.deregister_store(url)
817 }
818
819 fn get_store(&self, url: &Url) -> datafusion::common::Result<Arc<dyn ObjectStore>> {
820 self.inner.get_store(url)
821 }
822}
823
824#[cfg(test)]
825mod tests {
826 use futures::StreamExt;
827 use object_store::WriteMultipart;
828
829 use super::*;
830 use insta::assert_snapshot;
831
832 #[test]
833 fn instrumented_mode() {
834 assert!(matches!(
835 InstrumentedObjectStoreMode::default(),
836 InstrumentedObjectStoreMode::Disabled
837 ));
838
839 assert!(matches!(
840 "dIsABleD".parse().unwrap(),
841 InstrumentedObjectStoreMode::Disabled
842 ));
843 assert!(matches!(
844 "SUmMaRy".parse().unwrap(),
845 InstrumentedObjectStoreMode::Summary
846 ));
847 assert!(matches!(
848 "TRaCe".parse().unwrap(),
849 InstrumentedObjectStoreMode::Trace
850 ));
851 assert!(
852 "does_not_exist"
853 .parse::<InstrumentedObjectStoreMode>()
854 .is_err()
855 );
856
857 assert!(matches!(0.into(), InstrumentedObjectStoreMode::Disabled));
858 assert!(matches!(1.into(), InstrumentedObjectStoreMode::Summary));
859 assert!(matches!(2.into(), InstrumentedObjectStoreMode::Trace));
860 assert!(matches!(3.into(), InstrumentedObjectStoreMode::Disabled));
861 }
862
863 #[test]
864 fn instrumented_registry() {
865 let mut reg = InstrumentedObjectStoreRegistry::new();
866 assert!(reg.stores().is_empty());
867 assert_eq!(
868 reg.instrument_mode(),
869 InstrumentedObjectStoreMode::default()
870 );
871
872 reg = reg.with_profile_mode(InstrumentedObjectStoreMode::Trace);
873 assert_eq!(reg.instrument_mode(), InstrumentedObjectStoreMode::Trace);
874
875 let store = object_store::memory::InMemory::new();
876 let url = "mem://test".parse().unwrap();
877 let registered = reg.register_store(&url, Arc::new(store));
878 assert!(registered.is_none());
879
880 let fetched = reg.get_store(&url);
881 assert!(fetched.is_ok());
882 assert_eq!(reg.stores().len(), 1);
883 }
884
885 async fn setup_test_store() -> (InstrumentedObjectStore, Path) {
888 let store = Arc::new(object_store::memory::InMemory::new());
889 let mode = AtomicU8::new(InstrumentedObjectStoreMode::default() as u8);
890 let instrumented = InstrumentedObjectStore::new(store, mode);
891
892 let path = Path::from("test/data");
894 let payload = PutPayload::from_static(b"test_data");
895 instrumented.put(&path, payload).await.unwrap();
896
897 (instrumented, path)
898 }
899
900 #[tokio::test]
901 async fn instrumented_store_get() {
902 let (instrumented, path) = setup_test_store().await;
903
904 assert!(instrumented.requests.lock().is_empty());
906 let _ = instrumented.get(&path).await.unwrap();
907 assert!(instrumented.requests.lock().is_empty());
908
909 instrumented.set_instrument_mode(InstrumentedObjectStoreMode::Trace);
910 assert!(instrumented.requests.lock().is_empty());
911 let _ = instrumented.get(&path).await.unwrap();
912 assert_eq!(instrumented.requests.lock().len(), 1);
913
914 let mut requests = instrumented.take_requests();
915 assert_eq!(requests.len(), 1);
916 assert!(instrumented.requests.lock().is_empty());
917
918 let request = requests.pop().unwrap();
919 assert_eq!(request.op, Operation::Get);
920 assert_eq!(request.path, path);
921 assert!(request.duration.is_some());
922 assert_eq!(request.size, Some(9));
923 assert_eq!(request.range, None);
924 assert!(request.extra_display.is_none());
925 }
926
927 #[tokio::test]
928 async fn instrumented_store_delete() {
929 let (instrumented, path) = setup_test_store().await;
930
931 assert!(instrumented.requests.lock().is_empty());
933 instrumented.delete(&path).await.unwrap();
934 assert!(instrumented.requests.lock().is_empty());
935
936 let (instrumented, path) = setup_test_store().await;
938 instrumented.set_instrument_mode(InstrumentedObjectStoreMode::Trace);
939 assert!(instrumented.requests.lock().is_empty());
940 instrumented.delete(&path).await.unwrap();
941 assert_eq!(instrumented.requests.lock().len(), 1);
942
943 let mut requests = instrumented.take_requests();
944 assert_eq!(requests.len(), 1);
945 assert!(instrumented.requests.lock().is_empty());
946
947 let request = requests.pop().unwrap();
948 assert_eq!(request.op, Operation::Delete);
949 assert_eq!(request.path, path);
950 assert!(request.duration.is_some());
951 assert!(request.size.is_none());
952 assert!(request.range.is_none());
953 assert!(request.extra_display.is_none());
954 }
955
956 #[tokio::test]
957 async fn instrumented_store_list() {
958 let (instrumented, path) = setup_test_store().await;
959
960 assert!(instrumented.requests.lock().is_empty());
962 let _ = instrumented.list(Some(&path));
963 assert!(instrumented.requests.lock().is_empty());
964
965 instrumented.set_instrument_mode(InstrumentedObjectStoreMode::Trace);
966 assert!(instrumented.requests.lock().is_empty());
967 let mut stream = instrumented.list(Some(&path));
968 let _ = stream.next().await;
970 assert_eq!(instrumented.requests.lock().len(), 1);
971
972 let request = instrumented.take_requests().pop().unwrap();
973 assert_eq!(request.op, Operation::List);
974 assert_eq!(request.path, path);
975 assert!(request.duration.is_some());
976 assert!(request.size.is_none());
977 assert!(request.range.is_none());
978 assert!(request.extra_display.is_none());
979 }
980
981 #[tokio::test]
982 async fn instrumented_store_list_with_delimiter() {
983 let (instrumented, path) = setup_test_store().await;
984
985 assert!(instrumented.requests.lock().is_empty());
987 let _ = instrumented.list_with_delimiter(Some(&path)).await.unwrap();
988 assert!(instrumented.requests.lock().is_empty());
989
990 instrumented.set_instrument_mode(InstrumentedObjectStoreMode::Trace);
991 assert!(instrumented.requests.lock().is_empty());
992 let _ = instrumented.list_with_delimiter(Some(&path)).await.unwrap();
993 assert_eq!(instrumented.requests.lock().len(), 1);
994
995 let request = instrumented.take_requests().pop().unwrap();
996 assert_eq!(request.op, Operation::List);
997 assert_eq!(request.path, path);
998 assert!(request.duration.is_some());
999 assert!(request.size.is_none());
1000 assert!(request.range.is_none());
1001 assert!(request.extra_display.is_none());
1002 }
1003
1004 #[tokio::test]
1005 async fn instrumented_store_put_opts() {
1006 let store = Arc::new(object_store::memory::InMemory::new());
1009 let mode = AtomicU8::new(InstrumentedObjectStoreMode::default() as u8);
1010 let instrumented = InstrumentedObjectStore::new(store, mode);
1011
1012 let path = Path::from("test/data");
1013 let payload = PutPayload::from_static(b"test_data");
1014 let size = payload.content_length();
1015
1016 assert!(instrumented.requests.lock().is_empty());
1018 instrumented.put(&path, payload.clone()).await.unwrap();
1019 assert!(instrumented.requests.lock().is_empty());
1020
1021 instrumented.set_instrument_mode(InstrumentedObjectStoreMode::Trace);
1022 assert!(instrumented.requests.lock().is_empty());
1023 instrumented.put(&path, payload).await.unwrap();
1024 assert_eq!(instrumented.requests.lock().len(), 1);
1025
1026 let request = instrumented.take_requests().pop().unwrap();
1027 assert_eq!(request.op, Operation::Put);
1028 assert_eq!(request.path, path);
1029 assert!(request.duration.is_some());
1030 assert_eq!(request.size.unwrap(), size);
1031 assert!(request.range.is_none());
1032 assert!(request.extra_display.is_none());
1033 }
1034
1035 #[tokio::test]
1036 async fn instrumented_store_put_multipart() {
1037 let store = Arc::new(object_store::memory::InMemory::new());
1040 let mode = AtomicU8::new(InstrumentedObjectStoreMode::default() as u8);
1041 let instrumented = InstrumentedObjectStore::new(store, mode);
1042
1043 let path = Path::from("test/data");
1044
1045 assert!(instrumented.requests.lock().is_empty());
1047 let mp = instrumented.put_multipart(&path).await.unwrap();
1048 let mut write = WriteMultipart::new(mp);
1049 write.write(b"test_data");
1050 write.finish().await.unwrap();
1051 assert!(instrumented.requests.lock().is_empty());
1052
1053 instrumented.set_instrument_mode(InstrumentedObjectStoreMode::Trace);
1054 assert!(instrumented.requests.lock().is_empty());
1055 let mp = instrumented.put_multipart(&path).await.unwrap();
1056 let mut write = WriteMultipart::new(mp);
1057 write.write(b"test_data");
1058 write.finish().await.unwrap();
1059 assert_eq!(instrumented.requests.lock().len(), 1);
1060
1061 let request = instrumented.take_requests().pop().unwrap();
1062 assert_eq!(request.op, Operation::Put);
1063 assert_eq!(request.path, path);
1064 assert!(request.duration.is_some());
1065 assert!(request.size.is_none());
1066 assert!(request.range.is_none());
1067 assert!(request.extra_display.is_none());
1068 }
1069
1070 #[tokio::test]
1071 async fn instrumented_store_copy() {
1072 let (instrumented, path) = setup_test_store().await;
1073 let copy_to = Path::from("test/copied");
1074
1075 assert!(instrumented.requests.lock().is_empty());
1077 instrumented.copy(&path, ©_to).await.unwrap();
1078 assert!(instrumented.requests.lock().is_empty());
1079
1080 instrumented.set_instrument_mode(InstrumentedObjectStoreMode::Trace);
1081 assert!(instrumented.requests.lock().is_empty());
1082 instrumented.copy(&path, ©_to).await.unwrap();
1083 assert_eq!(instrumented.requests.lock().len(), 1);
1084
1085 let mut requests = instrumented.take_requests();
1086 assert_eq!(requests.len(), 1);
1087 assert!(instrumented.requests.lock().is_empty());
1088
1089 let request = requests.pop().unwrap();
1090 assert_eq!(request.op, Operation::Copy);
1091 assert_eq!(request.path, path);
1092 assert!(request.duration.is_some());
1093 assert!(request.size.is_none());
1094 assert!(request.range.is_none());
1095 assert_eq!(
1096 request.extra_display.unwrap(),
1097 format!("copy_to: {copy_to}")
1098 );
1099 }
1100
1101 #[tokio::test]
1102 async fn instrumented_store_copy_if_not_exists() {
1103 let (instrumented, path) = setup_test_store().await;
1104 let mut copy_to = Path::from("test/copied");
1105
1106 assert!(instrumented.requests.lock().is_empty());
1108 instrumented
1109 .copy_if_not_exists(&path, ©_to)
1110 .await
1111 .unwrap();
1112 assert!(instrumented.requests.lock().is_empty());
1113
1114 copy_to = Path::from("test/copied_again");
1116 instrumented.set_instrument_mode(InstrumentedObjectStoreMode::Trace);
1117 assert!(instrumented.requests.lock().is_empty());
1118 instrumented
1119 .copy_if_not_exists(&path, ©_to)
1120 .await
1121 .unwrap();
1122 assert_eq!(instrumented.requests.lock().len(), 1);
1123
1124 let mut requests = instrumented.take_requests();
1125 assert_eq!(requests.len(), 1);
1126 assert!(instrumented.requests.lock().is_empty());
1127
1128 let request = requests.pop().unwrap();
1129 assert_eq!(request.op, Operation::Copy);
1130 assert_eq!(request.path, path);
1131 assert!(request.duration.is_some());
1132 assert!(request.size.is_none());
1133 assert!(request.range.is_none());
1134 assert_eq!(
1135 request.extra_display.unwrap(),
1136 format!("copy_to: {copy_to}")
1137 );
1138 }
1139
1140 #[tokio::test]
1141 async fn instrumented_store_head() {
1142 let (instrumented, path) = setup_test_store().await;
1143
1144 assert!(instrumented.requests.lock().is_empty());
1146 let _ = instrumented.head(&path).await.unwrap();
1147 assert!(instrumented.requests.lock().is_empty());
1148
1149 instrumented.set_instrument_mode(InstrumentedObjectStoreMode::Trace);
1150 assert!(instrumented.requests.lock().is_empty());
1151 let _ = instrumented.head(&path).await.unwrap();
1152 assert_eq!(instrumented.requests.lock().len(), 1);
1153
1154 let mut requests = instrumented.take_requests();
1155 assert_eq!(requests.len(), 1);
1156 assert!(instrumented.requests.lock().is_empty());
1157
1158 let request = requests.pop().unwrap();
1159 assert_eq!(request.op, Operation::Head);
1160 assert_eq!(request.path, path);
1161 assert!(request.duration.is_some());
1162 assert!(request.size.is_none());
1163 assert!(request.range.is_none());
1164 assert!(request.extra_display.is_none());
1165 }
1166
1167 #[test]
1168 fn request_details() {
1169 let rd = RequestDetails {
1170 op: Operation::Get,
1171 path: Path::from("test"),
1172 timestamp: chrono::DateTime::from_timestamp(0, 0).unwrap(),
1173 duration: Some(Duration::new(5, 0)),
1174 size: Some(10),
1175 range: Some((..10).into()),
1176 extra_display: Some(String::from("extra info")),
1177 };
1178
1179 assert_eq!(
1180 format!("{rd}"),
1181 "1970-01-01T00:00:00+00:00 operation=Get duration=5.000000s size=10 range: bytes=0-9 path=test extra info"
1182 );
1183 }
1184
1185 #[test]
1186 fn request_summary() {
1187 let mut requests = Vec::new();
1189 assert_snapshot!(RequestSummaries::new(&requests), @r"
1190 +-----------+--------+-----+-----+-----+-----+-------+
1191 | Operation | Metric | min | max | avg | sum | count |
1192 +-----------+--------+-----+-----+-----+-----+-------+
1193 +-----------+--------+-----+-----+-----+-----+-------+
1194 ");
1195
1196 requests.push(RequestDetails {
1197 op: Operation::Get,
1198 path: Path::from("test1"),
1199 timestamp: chrono::DateTime::from_timestamp(0, 0).unwrap(),
1200 duration: Some(Duration::from_secs(5)),
1201 size: Some(100),
1202 range: None,
1203 extra_display: None,
1204 });
1205
1206 assert_snapshot!(RequestSummaries::new(&requests), @r"
1207 +-----------+----------+-----------+-----------+-----------+-----------+-------+
1208 | Operation | Metric | min | max | avg | sum | count |
1209 +-----------+----------+-----------+-----------+-----------+-----------+-------+
1210 | Get | duration | 5.000000s | 5.000000s | 5.000000s | 5.000000s | 1 |
1211 | Get | size | 100 B | 100 B | 100 B | 100 B | 1 |
1212 +-----------+----------+-----------+-----------+-----------+-----------+-------+
1213 ");
1214
1215 requests.push(RequestDetails {
1217 op: Operation::Get,
1218 path: Path::from("test2"),
1219 timestamp: chrono::DateTime::from_timestamp(1, 0).unwrap(),
1220 duration: Some(Duration::from_secs(8)),
1221 size: Some(150),
1222 range: None,
1223 extra_display: None,
1224 });
1225 requests.push(RequestDetails {
1226 op: Operation::Get,
1227 path: Path::from("test3"),
1228 timestamp: chrono::DateTime::from_timestamp(2, 0).unwrap(),
1229 duration: Some(Duration::from_secs(2)),
1230 size: Some(50),
1231 range: None,
1232 extra_display: None,
1233 });
1234 assert_snapshot!(RequestSummaries::new(&requests), @r"
1235 +-----------+----------+-----------+-----------+-----------+------------+-------+
1236 | Operation | Metric | min | max | avg | sum | count |
1237 +-----------+----------+-----------+-----------+-----------+------------+-------+
1238 | Get | duration | 2.000000s | 8.000000s | 5.000000s | 15.000000s | 3 |
1239 | Get | size | 50 B | 150 B | 100 B | 300 B | 3 |
1240 +-----------+----------+-----------+-----------+-----------+------------+-------+
1241 ");
1242
1243 requests.push(RequestDetails {
1245 op: Operation::Put,
1246 path: Path::from("test4"),
1247 timestamp: chrono::DateTime::from_timestamp(3, 0).unwrap(),
1248 duration: Some(Duration::from_millis(200)),
1249 size: Some(75),
1250 range: None,
1251 extra_display: None,
1252 });
1253
1254 assert_snapshot!(RequestSummaries::new(&requests), @r"
1255 +-----------+----------+-----------+-----------+-----------+------------+-------+
1256 | Operation | Metric | min | max | avg | sum | count |
1257 +-----------+----------+-----------+-----------+-----------+------------+-------+
1258 | Get | duration | 2.000000s | 8.000000s | 5.000000s | 15.000000s | 3 |
1259 | Get | size | 50 B | 150 B | 100 B | 300 B | 3 |
1260 | Put | duration | 0.200000s | 0.200000s | 0.200000s | 0.200000s | 1 |
1261 | Put | size | 75 B | 75 B | 75 B | 75 B | 1 |
1262 +-----------+----------+-----------+-----------+-----------+------------+-------+
1263 ");
1264 }
1265
1266 #[test]
1267 fn request_summary_only_duration() {
1268 let only_duration = vec![RequestDetails {
1270 op: Operation::Get,
1271 path: Path::from("test1"),
1272 timestamp: chrono::DateTime::from_timestamp(0, 0).unwrap(),
1273 duration: Some(Duration::from_secs(3)),
1274 size: None,
1275 range: None,
1276 extra_display: None,
1277 }];
1278 assert_snapshot!(RequestSummaries::new(&only_duration), @r"
1279 +-----------+----------+-----------+-----------+-----------+-----------+-------+
1280 | Operation | Metric | min | max | avg | sum | count |
1281 +-----------+----------+-----------+-----------+-----------+-----------+-------+
1282 | Get | duration | 3.000000s | 3.000000s | 3.000000s | 3.000000s | 1 |
1283 | Get | size | | | | | 1 |
1284 +-----------+----------+-----------+-----------+-----------+-----------+-------+
1285 ");
1286 }
1287
1288 #[test]
1289 fn request_summary_only_size() {
1290 let only_size = vec![RequestDetails {
1292 op: Operation::Get,
1293 path: Path::from("test1"),
1294 timestamp: chrono::DateTime::from_timestamp(0, 0).unwrap(),
1295 duration: None,
1296 size: Some(200),
1297 range: None,
1298 extra_display: None,
1299 }];
1300 assert_snapshot!(RequestSummaries::new(&only_size), @r"
1301 +-----------+----------+-------+-------+-------+-------+-------+
1302 | Operation | Metric | min | max | avg | sum | count |
1303 +-----------+----------+-------+-------+-------+-------+-------+
1304 | Get | duration | | | | | 1 |
1305 | Get | size | 200 B | 200 B | 200 B | 200 B | 1 |
1306 +-----------+----------+-------+-------+-------+-------+-------+
1307 ");
1308 }
1309
1310 #[test]
1311 fn request_summary_neither_duration_or_size() {
1312 let no_stats = vec![RequestDetails {
1314 op: Operation::Get,
1315 path: Path::from("test1"),
1316 timestamp: chrono::DateTime::from_timestamp(0, 0).unwrap(),
1317 duration: None,
1318 size: None,
1319 range: None,
1320 extra_display: None,
1321 }];
1322 assert_snapshot!(RequestSummaries::new(&no_stats), @r"
1323 +-----------+----------+-----+-----+-----+-----+-------+
1324 | Operation | Metric | min | max | avg | sum | count |
1325 +-----------+----------+-----+-----+-----+-----+-------+
1326 | Get | duration | | | | | 1 |
1327 | Get | size | | | | | 1 |
1328 +-----------+----------+-----+-----+-----+-----+-------+
1329 ");
1330 }
1331}