1use std::{
19 cmp, fmt,
20 ops::AddAssign,
21 str::FromStr,
22 sync::{
23 Arc,
24 atomic::{AtomicU8, Ordering},
25 },
26 time::Duration,
27};
28
29use arrow::array::{ArrayRef, RecordBatch, StringArray};
30use arrow::util::pretty::pretty_format_batches;
31use async_trait::async_trait;
32use chrono::Utc;
33use datafusion::{
34 common::{HashMap, instant::Instant},
35 error::DataFusionError,
36 execution::object_store::{DefaultObjectStoreRegistry, ObjectStoreRegistry},
37};
38use futures::stream::{BoxStream, Stream};
39use futures::{StreamExt, TryStreamExt};
40use object_store::{
41 CopyOptions, GetOptions, GetRange, GetResult, ListResult, MultipartUpload,
42 ObjectMeta, ObjectStore, ObjectStoreExt, PutMultipartOptions, PutOptions, PutPayload,
43 PutResult, Result, path::Path,
44};
45use parking_lot::{Mutex, RwLock};
46use url::Url;
47
48struct TimeToFirstItemStream<S> {
50 inner: S,
51 start: Instant,
52 request_index: usize,
53 requests: Arc<Mutex<Vec<RequestDetails>>>,
54 first_item_yielded: bool,
55}
56
57impl<S> TimeToFirstItemStream<S> {
58 fn new(
59 inner: S,
60 start: Instant,
61 request_index: usize,
62 requests: Arc<Mutex<Vec<RequestDetails>>>,
63 ) -> Self {
64 Self {
65 inner,
66 start,
67 request_index,
68 requests,
69 first_item_yielded: false,
70 }
71 }
72}
73
74impl<S> Stream for TimeToFirstItemStream<S>
75where
76 S: Stream<Item = Result<ObjectMeta>> + Unpin,
77{
78 type Item = Result<ObjectMeta>;
79
80 fn poll_next(
81 mut self: std::pin::Pin<&mut Self>,
82 cx: &mut std::task::Context<'_>,
83 ) -> std::task::Poll<Option<Self::Item>> {
84 let poll_result = std::pin::Pin::new(&mut self.inner).poll_next(cx);
85
86 if !self.first_item_yielded && poll_result.is_ready() {
87 self.first_item_yielded = true;
88 let elapsed = self.start.elapsed();
89
90 let mut requests = self.requests.lock();
91 if let Some(request) = requests.get_mut(self.request_index) {
92 request.duration = Some(elapsed);
93 }
94 }
95
96 poll_result
97 }
98}
99
100#[derive(Copy, Clone, Debug, Default, PartialEq, Eq)]
103pub enum InstrumentedObjectStoreMode {
104 #[default]
106 Disabled,
107 Summary,
109 Trace,
111}
112
113impl fmt::Display for InstrumentedObjectStoreMode {
114 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
115 write!(f, "{self:?}")
116 }
117}
118
119impl FromStr for InstrumentedObjectStoreMode {
120 type Err = DataFusionError;
121
122 fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
123 match s.to_lowercase().as_str() {
124 "disabled" => Ok(Self::Disabled),
125 "summary" => Ok(Self::Summary),
126 "trace" => Ok(Self::Trace),
127 _ => Err(DataFusionError::Execution(format!("Unrecognized mode {s}"))),
128 }
129 }
130}
131
132impl From<u8> for InstrumentedObjectStoreMode {
133 fn from(value: u8) -> Self {
134 match value {
135 1 => InstrumentedObjectStoreMode::Summary,
136 2 => InstrumentedObjectStoreMode::Trace,
137 _ => InstrumentedObjectStoreMode::Disabled,
138 }
139 }
140}
141
142#[derive(Debug)]
145pub struct InstrumentedObjectStore {
146 inner: Arc<dyn ObjectStore>,
147 instrument_mode: AtomicU8,
148 requests: Arc<Mutex<Vec<RequestDetails>>>,
149}
150
151impl InstrumentedObjectStore {
152 fn new(object_store: Arc<dyn ObjectStore>, instrument_mode: AtomicU8) -> Self {
154 Self {
155 inner: object_store,
156 instrument_mode,
157 requests: Arc::new(Mutex::new(Vec::new())),
158 }
159 }
160
161 fn set_instrument_mode(&self, mode: InstrumentedObjectStoreMode) {
162 self.instrument_mode.store(mode as u8, Ordering::Relaxed)
163 }
164
165 pub fn take_requests(&self) -> Vec<RequestDetails> {
168 let mut req = self.requests.lock();
169
170 req.drain(..).collect()
171 }
172
173 fn enabled(&self) -> bool {
174 self.instrument_mode.load(Ordering::Relaxed)
175 != InstrumentedObjectStoreMode::Disabled as u8
176 }
177
178 async fn instrumented_put_opts(
179 &self,
180 location: &Path,
181 payload: PutPayload,
182 opts: PutOptions,
183 ) -> Result<PutResult> {
184 let timestamp = Utc::now();
185 let start = Instant::now();
186 let size = payload.content_length();
187 let ret = self.inner.put_opts(location, payload, opts).await?;
188 let elapsed = start.elapsed();
189
190 self.requests.lock().push(RequestDetails {
191 op: Operation::Put,
192 path: location.clone(),
193 timestamp,
194 duration: Some(elapsed),
195 size: Some(size),
196 range: None,
197 extra_display: None,
198 });
199
200 Ok(ret)
201 }
202
203 async fn instrumented_put_multipart(
204 &self,
205 location: &Path,
206 opts: PutMultipartOptions,
207 ) -> Result<Box<dyn MultipartUpload>> {
208 let timestamp = Utc::now();
209 let start = Instant::now();
210 let ret = self.inner.put_multipart_opts(location, opts).await?;
211 let elapsed = start.elapsed();
212
213 self.requests.lock().push(RequestDetails {
214 op: Operation::Put,
215 path: location.clone(),
216 timestamp,
217 duration: Some(elapsed),
218 size: None,
219 range: None,
220 extra_display: None,
221 });
222
223 Ok(ret)
224 }
225
226 async fn instrumented_get_opts(
227 &self,
228 location: &Path,
229 options: GetOptions,
230 ) -> Result<GetResult> {
231 let timestamp = Utc::now();
232 let range = options.range.clone();
233
234 let head = options.head;
235 let start = Instant::now();
236 let ret = self.inner.get_opts(location, options).await?;
237 let elapsed = start.elapsed();
238
239 let (op, size) = if head {
240 (Operation::Head, None)
241 } else {
242 (
243 Operation::Get,
244 Some((ret.range.end - ret.range.start) as usize),
245 )
246 };
247
248 self.requests.lock().push(RequestDetails {
249 op,
250 path: location.clone(),
251 timestamp,
252 duration: Some(elapsed),
253 size,
254 range,
255 extra_display: None,
256 });
257
258 Ok(ret)
259 }
260
261 fn instrumented_delete_stream(
262 &self,
263 locations: BoxStream<'static, Result<Path>>,
264 ) -> BoxStream<'static, Result<Path>> {
265 let requests_captured = Arc::clone(&self.requests);
266
267 let timestamp = Utc::now();
268 let start = Instant::now();
269 self.inner
270 .delete_stream(locations)
271 .and_then(move |location| {
272 let elapsed = start.elapsed();
273 requests_captured.lock().push(RequestDetails {
274 op: Operation::Delete,
275 path: location.clone(),
276 timestamp,
277 duration: Some(elapsed),
278 size: None,
279 range: None,
280 extra_display: None,
281 });
282 futures::future::ok(location)
283 })
284 .boxed()
285 }
286
287 fn instrumented_list(
288 &self,
289 prefix: Option<&Path>,
290 ) -> BoxStream<'static, Result<ObjectMeta>> {
291 let timestamp = Utc::now();
292 let start = Instant::now();
293 let inner_stream = self.inner.list(prefix);
294
295 let request_index = {
296 let mut requests = self.requests.lock();
297 requests.push(RequestDetails {
298 op: Operation::List,
299 path: prefix.cloned().unwrap_or_else(|| Path::from("")),
300 timestamp,
301 duration: None,
302 size: None,
303 range: None,
304 extra_display: None,
305 });
306 requests.len() - 1
307 };
308
309 let wrapped_stream = TimeToFirstItemStream::new(
310 inner_stream,
311 start,
312 request_index,
313 Arc::clone(&self.requests),
314 );
315
316 Box::pin(wrapped_stream)
317 }
318
319 async fn instrumented_list_with_delimiter(
320 &self,
321 prefix: Option<&Path>,
322 ) -> Result<ListResult> {
323 let timestamp = Utc::now();
324 let start = Instant::now();
325 let ret = self.inner.list_with_delimiter(prefix).await?;
326 let elapsed = start.elapsed();
327
328 self.requests.lock().push(RequestDetails {
329 op: Operation::List,
330 path: prefix.cloned().unwrap_or_else(|| Path::from("")),
331 timestamp,
332 duration: Some(elapsed),
333 size: None,
334 range: None,
335 extra_display: None,
336 });
337
338 Ok(ret)
339 }
340
341 async fn instrumented_copy(&self, from: &Path, to: &Path) -> Result<()> {
342 let timestamp = Utc::now();
343 let start = Instant::now();
344 self.inner.copy(from, to).await?;
345 let elapsed = start.elapsed();
346
347 self.requests.lock().push(RequestDetails {
348 op: Operation::Copy,
349 path: from.clone(),
350 timestamp,
351 duration: Some(elapsed),
352 size: None,
353 range: None,
354 extra_display: Some(format!("copy_to: {to}")),
355 });
356
357 Ok(())
358 }
359
360 async fn instrumented_copy_if_not_exists(
361 &self,
362 from: &Path,
363 to: &Path,
364 ) -> Result<()> {
365 let timestamp = Utc::now();
366 let start = Instant::now();
367 self.inner.copy_if_not_exists(from, to).await?;
368 let elapsed = start.elapsed();
369
370 self.requests.lock().push(RequestDetails {
371 op: Operation::Copy,
372 path: from.clone(),
373 timestamp,
374 duration: Some(elapsed),
375 size: None,
376 range: None,
377 extra_display: Some(format!("copy_to: {to}")),
378 });
379
380 Ok(())
381 }
382}
383
384impl fmt::Display for InstrumentedObjectStore {
385 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
386 let mode: InstrumentedObjectStoreMode =
387 self.instrument_mode.load(Ordering::Relaxed).into();
388 write!(
389 f,
390 "Instrumented Object Store: instrument_mode: {mode}, inner: {}",
391 self.inner
392 )
393 }
394}
395
396#[async_trait]
397impl ObjectStore for InstrumentedObjectStore {
398 async fn put_opts(
399 &self,
400 location: &Path,
401 payload: PutPayload,
402 opts: PutOptions,
403 ) -> Result<PutResult> {
404 if self.enabled() {
405 return self.instrumented_put_opts(location, payload, opts).await;
406 }
407
408 self.inner.put_opts(location, payload, opts).await
409 }
410
411 async fn put_multipart_opts(
412 &self,
413 location: &Path,
414 opts: PutMultipartOptions,
415 ) -> Result<Box<dyn MultipartUpload>> {
416 if self.enabled() {
417 return self.instrumented_put_multipart(location, opts).await;
418 }
419
420 self.inner.put_multipart_opts(location, opts).await
421 }
422
423 async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
424 if self.enabled() {
425 return self.instrumented_get_opts(location, options).await;
426 }
427
428 self.inner.get_opts(location, options).await
429 }
430
431 fn delete_stream(
432 &self,
433 locations: BoxStream<'static, Result<Path>>,
434 ) -> BoxStream<'static, Result<Path>> {
435 if self.enabled() {
436 return self.instrumented_delete_stream(locations);
437 }
438
439 self.inner.delete_stream(locations)
440 }
441
442 fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result<ObjectMeta>> {
443 if self.enabled() {
444 return self.instrumented_list(prefix);
445 }
446
447 self.inner.list(prefix)
448 }
449
450 async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
451 if self.enabled() {
452 return self.instrumented_list_with_delimiter(prefix).await;
453 }
454
455 self.inner.list_with_delimiter(prefix).await
456 }
457
458 async fn copy_opts(
459 &self,
460 from: &Path,
461 to: &Path,
462 options: CopyOptions,
463 ) -> Result<()> {
464 if self.enabled() {
465 return match options.mode {
466 object_store::CopyMode::Create => {
467 self.instrumented_copy_if_not_exists(from, to).await
468 }
469 object_store::CopyMode::Overwrite => {
470 self.instrumented_copy(from, to).await
471 }
472 };
473 }
474
475 self.inner.copy_opts(from, to, options).await
476 }
477}
478
479#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash, PartialOrd, Ord)]
481pub enum Operation {
482 Copy,
483 Delete,
484 Get,
485 Head,
486 List,
487 Put,
488}
489
490impl fmt::Display for Operation {
491 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
492 write!(f, "{self:?}")
493 }
494}
495
496#[derive(Debug)]
498pub struct RequestDetails {
499 op: Operation,
500 path: Path,
501 timestamp: chrono::DateTime<Utc>,
502 duration: Option<Duration>,
503 size: Option<usize>,
504 range: Option<GetRange>,
505 extra_display: Option<String>,
506}
507
508impl fmt::Display for RequestDetails {
509 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
510 let mut output_parts = vec![format!(
511 "{} operation={:?}",
512 self.timestamp.to_rfc3339(),
513 self.op
514 )];
515
516 if let Some(d) = self.duration {
517 output_parts.push(format!("duration={:.6}s", d.as_secs_f32()));
518 }
519 if let Some(s) = self.size {
520 output_parts.push(format!("size={s}"));
521 }
522 if let Some(r) = &self.range {
523 output_parts.push(format!("range: {r}"));
524 }
525 output_parts.push(format!("path={}", self.path));
526
527 if let Some(ed) = &self.extra_display {
528 output_parts.push(ed.clone());
529 }
530
531 write!(f, "{}", output_parts.join(" "))
532 }
533}
534
535#[derive(Default)]
537pub struct RequestSummaries {
538 summaries: Vec<RequestSummary>,
539}
540
541impl fmt::Display for RequestSummaries {
543 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
544 match pretty_format_batches(&[self.to_batch()]) {
546 Err(e) => {
547 write!(f, "Error formatting summary: {e}")
548 }
549 Ok(displayable) => {
550 write!(f, "{displayable}")
551 }
552 }
553 }
554}
555
556impl RequestSummaries {
557 pub fn new(requests: &[RequestDetails]) -> Self {
559 let mut summaries: HashMap<Operation, RequestSummary> = HashMap::new();
560 for rd in requests {
561 match summaries.get_mut(&rd.op) {
562 Some(rs) => rs.push(rd),
563 None => {
564 let mut rs = RequestSummary::new(rd.op);
565 rs.push(rd);
566 summaries.insert(rd.op, rs);
567 }
568 }
569 }
570 let mut summaries: Vec<RequestSummary> = summaries.into_values().collect();
572 summaries.sort_by_key(|s| s.operation);
573 Self { summaries }
574 }
575
576 pub fn to_batch(&self) -> RecordBatch {
588 let operations: StringArray = self
589 .iter()
590 .flat_map(|s| std::iter::repeat_n(Some(s.operation.to_string()), 2))
591 .collect();
592 let metrics: StringArray = self
593 .iter()
594 .flat_map(|_s| [Some("duration"), Some("size")])
595 .collect();
596 let mins: StringArray = self
597 .stats_iter()
598 .flat_map(|(duration_stats, size_stats)| {
599 let dur_min =
600 duration_stats.map(|d| format!("{:.6}s", d.min.as_secs_f32()));
601 let size_min = size_stats.map(|s| format!("{} B", s.min));
602 [dur_min, size_min]
603 })
604 .collect();
605 let maxs: StringArray = self
606 .stats_iter()
607 .flat_map(|(duration_stats, size_stats)| {
608 let dur_max =
609 duration_stats.map(|d| format!("{:.6}s", d.max.as_secs_f32()));
610 let size_max = size_stats.map(|s| format!("{} B", s.max));
611 [dur_max, size_max]
612 })
613 .collect();
614 let avgs: StringArray = self
615 .iter()
616 .flat_map(|s| {
617 let count = s.count as f32;
618 let duration_stats = s.duration_stats.as_ref();
619 let size_stats = s.size_stats.as_ref();
620 let dur_avg = duration_stats.map(|d| {
621 let avg = d.sum.as_secs_f32() / count;
622 format!("{avg:.6}s")
623 });
624 let size_avg = size_stats.map(|s| {
625 let avg = s.sum as f32 / count;
626 format!("{avg} B")
627 });
628 [dur_avg, size_avg]
629 })
630 .collect();
631 let sums: StringArray = self
632 .stats_iter()
633 .flat_map(|(duration_stats, size_stats)| {
634 let dur_sum =
642 duration_stats.map(|d| format!("{:.6}s", d.sum.as_secs_f32()));
643 let size_sum = size_stats.map(|s| format!("{} B", s.sum));
644 [dur_sum, size_sum]
645 })
646 .collect();
647 let counts: StringArray = self
648 .iter()
649 .flat_map(|s| {
650 let count = s.count.to_string();
651 [Some(count.clone()), Some(count)]
652 })
653 .collect();
654
655 RecordBatch::try_from_iter(vec![
656 ("Operation", Arc::new(operations) as ArrayRef),
657 ("Metric", Arc::new(metrics) as ArrayRef),
658 ("min", Arc::new(mins) as ArrayRef),
659 ("max", Arc::new(maxs) as ArrayRef),
660 ("avg", Arc::new(avgs) as ArrayRef),
661 ("sum", Arc::new(sums) as ArrayRef),
662 ("count", Arc::new(counts) as ArrayRef),
663 ])
664 .expect("Created the batch correctly")
665 }
666
667 fn iter(&self) -> impl Iterator<Item = &RequestSummary> {
669 self.summaries.iter()
670 }
671
672 fn stats_iter(
675 &self,
676 ) -> impl Iterator<Item = (Option<&Stats<Duration>>, Option<&Stats<usize>>)> {
677 self.summaries
678 .iter()
679 .map(|s| (s.duration_stats.as_ref(), s.size_stats.as_ref()))
680 }
681}
682
683pub struct RequestSummary {
686 operation: Operation,
687 count: usize,
688 duration_stats: Option<Stats<Duration>>,
689 size_stats: Option<Stats<usize>>,
690}
691
692impl RequestSummary {
693 fn new(operation: Operation) -> Self {
694 Self {
695 operation,
696 count: 0,
697 duration_stats: None,
698 size_stats: None,
699 }
700 }
701 fn push(&mut self, request: &RequestDetails) {
702 self.count += 1;
703 if let Some(dur) = request.duration {
704 self.duration_stats.get_or_insert_default().push(dur)
705 }
706 if let Some(size) = request.size {
707 self.size_stats.get_or_insert_default().push(size)
708 }
709 }
710}
711
712struct Stats<T: Copy + Ord + AddAssign<T>> {
713 min: T,
714 max: T,
715 sum: T,
716}
717
718impl<T: Copy + Ord + AddAssign<T>> Stats<T> {
719 fn push(&mut self, val: T) {
720 self.min = cmp::min(val, self.min);
721 self.max = cmp::max(val, self.max);
722 self.sum += val;
723 }
724}
725
726impl Default for Stats<Duration> {
727 fn default() -> Self {
728 Self {
729 min: Duration::MAX,
730 max: Duration::ZERO,
731 sum: Duration::ZERO,
732 }
733 }
734}
735
736impl Default for Stats<usize> {
737 fn default() -> Self {
738 Self {
739 min: usize::MAX,
740 max: usize::MIN,
741 sum: 0,
742 }
743 }
744}
745
746#[derive(Debug)]
748pub struct InstrumentedObjectStoreRegistry {
749 inner: Arc<dyn ObjectStoreRegistry>,
750 instrument_mode: AtomicU8,
751 stores: RwLock<Vec<Arc<InstrumentedObjectStore>>>,
752}
753
754impl Default for InstrumentedObjectStoreRegistry {
755 fn default() -> Self {
756 Self::new()
757 }
758}
759
760impl InstrumentedObjectStoreRegistry {
761 pub fn new() -> Self {
764 Self {
765 inner: Arc::new(DefaultObjectStoreRegistry::new()),
766 instrument_mode: AtomicU8::new(InstrumentedObjectStoreMode::default() as u8),
767 stores: RwLock::new(Vec::new()),
768 }
769 }
770
771 pub fn with_profile_mode(self, mode: InstrumentedObjectStoreMode) -> Self {
772 self.instrument_mode.store(mode as u8, Ordering::Relaxed);
773 self
774 }
775
776 pub fn stores(&self) -> Vec<Arc<InstrumentedObjectStore>> {
779 self.stores.read().clone()
780 }
781
782 pub fn instrument_mode(&self) -> InstrumentedObjectStoreMode {
785 self.instrument_mode.load(Ordering::Relaxed).into()
786 }
787
788 pub fn set_instrument_mode(&self, mode: InstrumentedObjectStoreMode) {
790 self.instrument_mode.store(mode as u8, Ordering::Relaxed);
791 for s in self.stores.read().iter() {
792 s.set_instrument_mode(mode)
793 }
794 }
795}
796
797impl ObjectStoreRegistry for InstrumentedObjectStoreRegistry {
798 fn register_store(
799 &self,
800 url: &Url,
801 store: Arc<dyn ObjectStore>,
802 ) -> Option<Arc<dyn ObjectStore>> {
803 let mode = self.instrument_mode.load(Ordering::Relaxed);
804 let instrumented =
805 Arc::new(InstrumentedObjectStore::new(store, AtomicU8::new(mode)));
806 self.stores.write().push(Arc::clone(&instrumented));
807 self.inner.register_store(url, instrumented)
808 }
809
810 fn deregister_store(
811 &self,
812 url: &Url,
813 ) -> datafusion::common::Result<Arc<dyn ObjectStore>> {
814 self.inner.deregister_store(url)
815 }
816
817 fn get_store(&self, url: &Url) -> datafusion::common::Result<Arc<dyn ObjectStore>> {
818 self.inner.get_store(url)
819 }
820}
821
822#[cfg(test)]
823mod tests {
824 use futures::StreamExt;
825 use object_store::WriteMultipart;
826
827 use super::*;
828 use insta::assert_snapshot;
829
830 #[test]
831 fn instrumented_mode() {
832 assert!(matches!(
833 InstrumentedObjectStoreMode::default(),
834 InstrumentedObjectStoreMode::Disabled
835 ));
836
837 assert!(matches!(
838 "dIsABleD".parse().unwrap(),
839 InstrumentedObjectStoreMode::Disabled
840 ));
841 assert!(matches!(
842 "SUmMaRy".parse().unwrap(),
843 InstrumentedObjectStoreMode::Summary
844 ));
845 assert!(matches!(
846 "TRaCe".parse().unwrap(),
847 InstrumentedObjectStoreMode::Trace
848 ));
849 assert!(
850 "does_not_exist"
851 .parse::<InstrumentedObjectStoreMode>()
852 .is_err()
853 );
854
855 assert!(matches!(0.into(), InstrumentedObjectStoreMode::Disabled));
856 assert!(matches!(1.into(), InstrumentedObjectStoreMode::Summary));
857 assert!(matches!(2.into(), InstrumentedObjectStoreMode::Trace));
858 assert!(matches!(3.into(), InstrumentedObjectStoreMode::Disabled));
859 }
860
861 #[test]
862 fn instrumented_registry() {
863 let mut reg = InstrumentedObjectStoreRegistry::new();
864 assert!(reg.stores().is_empty());
865 assert_eq!(
866 reg.instrument_mode(),
867 InstrumentedObjectStoreMode::default()
868 );
869
870 reg = reg.with_profile_mode(InstrumentedObjectStoreMode::Trace);
871 assert_eq!(reg.instrument_mode(), InstrumentedObjectStoreMode::Trace);
872
873 let store = object_store::memory::InMemory::new();
874 let url = "mem://test".parse().unwrap();
875 let registered = reg.register_store(&url, Arc::new(store));
876 assert!(registered.is_none());
877
878 let fetched = reg.get_store(&url);
879 assert!(fetched.is_ok());
880 assert_eq!(reg.stores().len(), 1);
881 }
882
883 async fn setup_test_store() -> (InstrumentedObjectStore, Path) {
886 let store = Arc::new(object_store::memory::InMemory::new());
887 let mode = AtomicU8::new(InstrumentedObjectStoreMode::default() as u8);
888 let instrumented = InstrumentedObjectStore::new(store, mode);
889
890 let path = Path::from("test/data");
892 let payload = PutPayload::from_static(b"test_data");
893 instrumented.put(&path, payload).await.unwrap();
894
895 (instrumented, path)
896 }
897
898 #[tokio::test]
899 async fn instrumented_store_get() {
900 let (instrumented, path) = setup_test_store().await;
901
902 assert!(instrumented.requests.lock().is_empty());
904 let _ = instrumented.get(&path).await.unwrap();
905 assert!(instrumented.requests.lock().is_empty());
906
907 instrumented.set_instrument_mode(InstrumentedObjectStoreMode::Trace);
908 assert!(instrumented.requests.lock().is_empty());
909 let _ = instrumented.get(&path).await.unwrap();
910 assert_eq!(instrumented.requests.lock().len(), 1);
911
912 let mut requests = instrumented.take_requests();
913 assert_eq!(requests.len(), 1);
914 assert!(instrumented.requests.lock().is_empty());
915
916 let request = requests.pop().unwrap();
917 assert_eq!(request.op, Operation::Get);
918 assert_eq!(request.path, path);
919 assert!(request.duration.is_some());
920 assert_eq!(request.size, Some(9));
921 assert_eq!(request.range, None);
922 assert!(request.extra_display.is_none());
923 }
924
925 #[tokio::test]
926 async fn instrumented_store_delete() {
927 let (instrumented, path) = setup_test_store().await;
928
929 assert!(instrumented.requests.lock().is_empty());
931 instrumented.delete(&path).await.unwrap();
932 assert!(instrumented.requests.lock().is_empty());
933
934 let (instrumented, path) = setup_test_store().await;
936 instrumented.set_instrument_mode(InstrumentedObjectStoreMode::Trace);
937 assert!(instrumented.requests.lock().is_empty());
938 instrumented.delete(&path).await.unwrap();
939 assert_eq!(instrumented.requests.lock().len(), 1);
940
941 let mut requests = instrumented.take_requests();
942 assert_eq!(requests.len(), 1);
943 assert!(instrumented.requests.lock().is_empty());
944
945 let request = requests.pop().unwrap();
946 assert_eq!(request.op, Operation::Delete);
947 assert_eq!(request.path, path);
948 assert!(request.duration.is_some());
949 assert!(request.size.is_none());
950 assert!(request.range.is_none());
951 assert!(request.extra_display.is_none());
952 }
953
954 #[tokio::test]
955 async fn instrumented_store_list() {
956 let (instrumented, path) = setup_test_store().await;
957
958 assert!(instrumented.requests.lock().is_empty());
960 let _ = instrumented.list(Some(&path));
961 assert!(instrumented.requests.lock().is_empty());
962
963 instrumented.set_instrument_mode(InstrumentedObjectStoreMode::Trace);
964 assert!(instrumented.requests.lock().is_empty());
965 let mut stream = instrumented.list(Some(&path));
966 let _ = stream.next().await;
968 assert_eq!(instrumented.requests.lock().len(), 1);
969
970 let request = instrumented.take_requests().pop().unwrap();
971 assert_eq!(request.op, Operation::List);
972 assert_eq!(request.path, path);
973 assert!(request.duration.is_some());
974 assert!(request.size.is_none());
975 assert!(request.range.is_none());
976 assert!(request.extra_display.is_none());
977 }
978
979 #[tokio::test]
980 async fn instrumented_store_list_with_delimiter() {
981 let (instrumented, path) = setup_test_store().await;
982
983 assert!(instrumented.requests.lock().is_empty());
985 let _ = instrumented.list_with_delimiter(Some(&path)).await.unwrap();
986 assert!(instrumented.requests.lock().is_empty());
987
988 instrumented.set_instrument_mode(InstrumentedObjectStoreMode::Trace);
989 assert!(instrumented.requests.lock().is_empty());
990 let _ = instrumented.list_with_delimiter(Some(&path)).await.unwrap();
991 assert_eq!(instrumented.requests.lock().len(), 1);
992
993 let request = instrumented.take_requests().pop().unwrap();
994 assert_eq!(request.op, Operation::List);
995 assert_eq!(request.path, path);
996 assert!(request.duration.is_some());
997 assert!(request.size.is_none());
998 assert!(request.range.is_none());
999 assert!(request.extra_display.is_none());
1000 }
1001
1002 #[tokio::test]
1003 async fn instrumented_store_put_opts() {
1004 let store = Arc::new(object_store::memory::InMemory::new());
1007 let mode = AtomicU8::new(InstrumentedObjectStoreMode::default() as u8);
1008 let instrumented = InstrumentedObjectStore::new(store, mode);
1009
1010 let path = Path::from("test/data");
1011 let payload = PutPayload::from_static(b"test_data");
1012 let size = payload.content_length();
1013
1014 assert!(instrumented.requests.lock().is_empty());
1016 instrumented.put(&path, payload.clone()).await.unwrap();
1017 assert!(instrumented.requests.lock().is_empty());
1018
1019 instrumented.set_instrument_mode(InstrumentedObjectStoreMode::Trace);
1020 assert!(instrumented.requests.lock().is_empty());
1021 instrumented.put(&path, payload).await.unwrap();
1022 assert_eq!(instrumented.requests.lock().len(), 1);
1023
1024 let request = instrumented.take_requests().pop().unwrap();
1025 assert_eq!(request.op, Operation::Put);
1026 assert_eq!(request.path, path);
1027 assert!(request.duration.is_some());
1028 assert_eq!(request.size.unwrap(), size);
1029 assert!(request.range.is_none());
1030 assert!(request.extra_display.is_none());
1031 }
1032
1033 #[tokio::test]
1034 async fn instrumented_store_put_multipart() {
1035 let store = Arc::new(object_store::memory::InMemory::new());
1038 let mode = AtomicU8::new(InstrumentedObjectStoreMode::default() as u8);
1039 let instrumented = InstrumentedObjectStore::new(store, mode);
1040
1041 let path = Path::from("test/data");
1042
1043 assert!(instrumented.requests.lock().is_empty());
1045 let mp = instrumented.put_multipart(&path).await.unwrap();
1046 let mut write = WriteMultipart::new(mp);
1047 write.write(b"test_data");
1048 write.finish().await.unwrap();
1049 assert!(instrumented.requests.lock().is_empty());
1050
1051 instrumented.set_instrument_mode(InstrumentedObjectStoreMode::Trace);
1052 assert!(instrumented.requests.lock().is_empty());
1053 let mp = instrumented.put_multipart(&path).await.unwrap();
1054 let mut write = WriteMultipart::new(mp);
1055 write.write(b"test_data");
1056 write.finish().await.unwrap();
1057 assert_eq!(instrumented.requests.lock().len(), 1);
1058
1059 let request = instrumented.take_requests().pop().unwrap();
1060 assert_eq!(request.op, Operation::Put);
1061 assert_eq!(request.path, path);
1062 assert!(request.duration.is_some());
1063 assert!(request.size.is_none());
1064 assert!(request.range.is_none());
1065 assert!(request.extra_display.is_none());
1066 }
1067
1068 #[tokio::test]
1069 async fn instrumented_store_copy() {
1070 let (instrumented, path) = setup_test_store().await;
1071 let copy_to = Path::from("test/copied");
1072
1073 assert!(instrumented.requests.lock().is_empty());
1075 instrumented.copy(&path, ©_to).await.unwrap();
1076 assert!(instrumented.requests.lock().is_empty());
1077
1078 instrumented.set_instrument_mode(InstrumentedObjectStoreMode::Trace);
1079 assert!(instrumented.requests.lock().is_empty());
1080 instrumented.copy(&path, ©_to).await.unwrap();
1081 assert_eq!(instrumented.requests.lock().len(), 1);
1082
1083 let mut requests = instrumented.take_requests();
1084 assert_eq!(requests.len(), 1);
1085 assert!(instrumented.requests.lock().is_empty());
1086
1087 let request = requests.pop().unwrap();
1088 assert_eq!(request.op, Operation::Copy);
1089 assert_eq!(request.path, path);
1090 assert!(request.duration.is_some());
1091 assert!(request.size.is_none());
1092 assert!(request.range.is_none());
1093 assert_eq!(
1094 request.extra_display.unwrap(),
1095 format!("copy_to: {copy_to}")
1096 );
1097 }
1098
1099 #[tokio::test]
1100 async fn instrumented_store_copy_if_not_exists() {
1101 let (instrumented, path) = setup_test_store().await;
1102 let mut copy_to = Path::from("test/copied");
1103
1104 assert!(instrumented.requests.lock().is_empty());
1106 instrumented
1107 .copy_if_not_exists(&path, ©_to)
1108 .await
1109 .unwrap();
1110 assert!(instrumented.requests.lock().is_empty());
1111
1112 copy_to = Path::from("test/copied_again");
1114 instrumented.set_instrument_mode(InstrumentedObjectStoreMode::Trace);
1115 assert!(instrumented.requests.lock().is_empty());
1116 instrumented
1117 .copy_if_not_exists(&path, ©_to)
1118 .await
1119 .unwrap();
1120 assert_eq!(instrumented.requests.lock().len(), 1);
1121
1122 let mut requests = instrumented.take_requests();
1123 assert_eq!(requests.len(), 1);
1124 assert!(instrumented.requests.lock().is_empty());
1125
1126 let request = requests.pop().unwrap();
1127 assert_eq!(request.op, Operation::Copy);
1128 assert_eq!(request.path, path);
1129 assert!(request.duration.is_some());
1130 assert!(request.size.is_none());
1131 assert!(request.range.is_none());
1132 assert_eq!(
1133 request.extra_display.unwrap(),
1134 format!("copy_to: {copy_to}")
1135 );
1136 }
1137
1138 #[tokio::test]
1139 async fn instrumented_store_head() {
1140 let (instrumented, path) = setup_test_store().await;
1141
1142 assert!(instrumented.requests.lock().is_empty());
1144 let _ = instrumented.head(&path).await.unwrap();
1145 assert!(instrumented.requests.lock().is_empty());
1146
1147 instrumented.set_instrument_mode(InstrumentedObjectStoreMode::Trace);
1148 assert!(instrumented.requests.lock().is_empty());
1149 let _ = instrumented.head(&path).await.unwrap();
1150 assert_eq!(instrumented.requests.lock().len(), 1);
1151
1152 let mut requests = instrumented.take_requests();
1153 assert_eq!(requests.len(), 1);
1154 assert!(instrumented.requests.lock().is_empty());
1155
1156 let request = requests.pop().unwrap();
1157 assert_eq!(request.op, Operation::Head);
1158 assert_eq!(request.path, path);
1159 assert!(request.duration.is_some());
1160 assert!(request.size.is_none());
1161 assert!(request.range.is_none());
1162 assert!(request.extra_display.is_none());
1163 }
1164
1165 #[test]
1166 fn request_details() {
1167 let rd = RequestDetails {
1168 op: Operation::Get,
1169 path: Path::from("test"),
1170 timestamp: chrono::DateTime::from_timestamp(0, 0).unwrap(),
1171 duration: Some(Duration::new(5, 0)),
1172 size: Some(10),
1173 range: Some((..10).into()),
1174 extra_display: Some(String::from("extra info")),
1175 };
1176
1177 assert_eq!(
1178 format!("{rd}"),
1179 "1970-01-01T00:00:00+00:00 operation=Get duration=5.000000s size=10 range: bytes=0-9 path=test extra info"
1180 );
1181 }
1182
1183 #[test]
1184 fn request_summary() {
1185 let mut requests = Vec::new();
1187 assert_snapshot!(RequestSummaries::new(&requests), @r"
1188 +-----------+--------+-----+-----+-----+-----+-------+
1189 | Operation | Metric | min | max | avg | sum | count |
1190 +-----------+--------+-----+-----+-----+-----+-------+
1191 +-----------+--------+-----+-----+-----+-----+-------+
1192 ");
1193
1194 requests.push(RequestDetails {
1195 op: Operation::Get,
1196 path: Path::from("test1"),
1197 timestamp: chrono::DateTime::from_timestamp(0, 0).unwrap(),
1198 duration: Some(Duration::from_secs(5)),
1199 size: Some(100),
1200 range: None,
1201 extra_display: None,
1202 });
1203
1204 assert_snapshot!(RequestSummaries::new(&requests), @r"
1205 +-----------+----------+-----------+-----------+-----------+-----------+-------+
1206 | Operation | Metric | min | max | avg | sum | count |
1207 +-----------+----------+-----------+-----------+-----------+-----------+-------+
1208 | Get | duration | 5.000000s | 5.000000s | 5.000000s | 5.000000s | 1 |
1209 | Get | size | 100 B | 100 B | 100 B | 100 B | 1 |
1210 +-----------+----------+-----------+-----------+-----------+-----------+-------+
1211 ");
1212
1213 requests.push(RequestDetails {
1215 op: Operation::Get,
1216 path: Path::from("test2"),
1217 timestamp: chrono::DateTime::from_timestamp(1, 0).unwrap(),
1218 duration: Some(Duration::from_secs(8)),
1219 size: Some(150),
1220 range: None,
1221 extra_display: None,
1222 });
1223 requests.push(RequestDetails {
1224 op: Operation::Get,
1225 path: Path::from("test3"),
1226 timestamp: chrono::DateTime::from_timestamp(2, 0).unwrap(),
1227 duration: Some(Duration::from_secs(2)),
1228 size: Some(50),
1229 range: None,
1230 extra_display: None,
1231 });
1232 assert_snapshot!(RequestSummaries::new(&requests), @r"
1233 +-----------+----------+-----------+-----------+-----------+------------+-------+
1234 | Operation | Metric | min | max | avg | sum | count |
1235 +-----------+----------+-----------+-----------+-----------+------------+-------+
1236 | Get | duration | 2.000000s | 8.000000s | 5.000000s | 15.000000s | 3 |
1237 | Get | size | 50 B | 150 B | 100 B | 300 B | 3 |
1238 +-----------+----------+-----------+-----------+-----------+------------+-------+
1239 ");
1240
1241 requests.push(RequestDetails {
1243 op: Operation::Put,
1244 path: Path::from("test4"),
1245 timestamp: chrono::DateTime::from_timestamp(3, 0).unwrap(),
1246 duration: Some(Duration::from_millis(200)),
1247 size: Some(75),
1248 range: None,
1249 extra_display: None,
1250 });
1251
1252 assert_snapshot!(RequestSummaries::new(&requests), @r"
1253 +-----------+----------+-----------+-----------+-----------+------------+-------+
1254 | Operation | Metric | min | max | avg | sum | count |
1255 +-----------+----------+-----------+-----------+-----------+------------+-------+
1256 | Get | duration | 2.000000s | 8.000000s | 5.000000s | 15.000000s | 3 |
1257 | Get | size | 50 B | 150 B | 100 B | 300 B | 3 |
1258 | Put | duration | 0.200000s | 0.200000s | 0.200000s | 0.200000s | 1 |
1259 | Put | size | 75 B | 75 B | 75 B | 75 B | 1 |
1260 +-----------+----------+-----------+-----------+-----------+------------+-------+
1261 ");
1262 }
1263
1264 #[test]
1265 fn request_summary_only_duration() {
1266 let only_duration = vec![RequestDetails {
1268 op: Operation::Get,
1269 path: Path::from("test1"),
1270 timestamp: chrono::DateTime::from_timestamp(0, 0).unwrap(),
1271 duration: Some(Duration::from_secs(3)),
1272 size: None,
1273 range: None,
1274 extra_display: None,
1275 }];
1276 assert_snapshot!(RequestSummaries::new(&only_duration), @r"
1277 +-----------+----------+-----------+-----------+-----------+-----------+-------+
1278 | Operation | Metric | min | max | avg | sum | count |
1279 +-----------+----------+-----------+-----------+-----------+-----------+-------+
1280 | Get | duration | 3.000000s | 3.000000s | 3.000000s | 3.000000s | 1 |
1281 | Get | size | | | | | 1 |
1282 +-----------+----------+-----------+-----------+-----------+-----------+-------+
1283 ");
1284 }
1285
1286 #[test]
1287 fn request_summary_only_size() {
1288 let only_size = vec![RequestDetails {
1290 op: Operation::Get,
1291 path: Path::from("test1"),
1292 timestamp: chrono::DateTime::from_timestamp(0, 0).unwrap(),
1293 duration: None,
1294 size: Some(200),
1295 range: None,
1296 extra_display: None,
1297 }];
1298 assert_snapshot!(RequestSummaries::new(&only_size), @r"
1299 +-----------+----------+-------+-------+-------+-------+-------+
1300 | Operation | Metric | min | max | avg | sum | count |
1301 +-----------+----------+-------+-------+-------+-------+-------+
1302 | Get | duration | | | | | 1 |
1303 | Get | size | 200 B | 200 B | 200 B | 200 B | 1 |
1304 +-----------+----------+-------+-------+-------+-------+-------+
1305 ");
1306 }
1307
1308 #[test]
1309 fn request_summary_neither_duration_or_size() {
1310 let no_stats = vec![RequestDetails {
1312 op: Operation::Get,
1313 path: Path::from("test1"),
1314 timestamp: chrono::DateTime::from_timestamp(0, 0).unwrap(),
1315 duration: None,
1316 size: None,
1317 range: None,
1318 extra_display: None,
1319 }];
1320 assert_snapshot!(RequestSummaries::new(&no_stats), @r"
1321 +-----------+----------+-----+-----+-----+-----+-------+
1322 | Operation | Metric | min | max | avg | sum | count |
1323 +-----------+----------+-----+-----+-----+-----+-------+
1324 | Get | duration | | | | | 1 |
1325 | Get | size | | | | | 1 |
1326 +-----------+----------+-----+-----+-----+-----+-------+
1327 ");
1328 }
1329}