1use super::CustomMetricValue;
21use chrono::{DateTime, Utc};
22use datafusion_common::{
23 human_readable_count, human_readable_duration, human_readable_size, instant::Instant,
24};
25use parking_lot::Mutex;
26use std::{
27 borrow::{Borrow, Cow},
28 fmt::{Debug, Display},
29 sync::{
30 Arc,
31 atomic::{AtomicUsize, Ordering},
32 },
33 time::Duration,
34};
35
36#[derive(Debug, Clone)]
40pub struct Count {
41 value: Arc<AtomicUsize>,
43}
44
45impl PartialEq for Count {
46 fn eq(&self, other: &Self) -> bool {
47 self.value().eq(&other.value())
48 }
49}
50
51impl Display for Count {
52 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
53 write!(f, "{}", human_readable_count(self.value()))
54 }
55}
56
57impl Default for Count {
58 fn default() -> Self {
59 Self::new()
60 }
61}
62
63impl Count {
64 pub fn new() -> Self {
66 Self {
67 value: Arc::new(AtomicUsize::new(0)),
68 }
69 }
70
71 pub fn add(&self, n: usize) {
73 self.value.fetch_add(n, Ordering::Relaxed);
76 }
77
78 pub fn value(&self) -> usize {
80 self.value.load(Ordering::Relaxed)
81 }
82}
83
84#[derive(Debug, Clone)]
89pub struct Gauge {
90 value: Arc<AtomicUsize>,
92}
93
94impl PartialEq for Gauge {
95 fn eq(&self, other: &Self) -> bool {
96 self.value().eq(&other.value())
97 }
98}
99
100impl Display for Gauge {
101 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
102 write!(f, "{}", self.value())
103 }
104}
105
106impl Default for Gauge {
107 fn default() -> Self {
108 Self::new()
109 }
110}
111
112impl Gauge {
113 pub fn new() -> Self {
115 Self {
116 value: Arc::new(AtomicUsize::new(0)),
117 }
118 }
119
120 pub fn add(&self, n: usize) {
122 self.value.fetch_add(n, Ordering::Relaxed);
125 }
126
127 pub fn sub(&self, n: usize) {
129 self.value.fetch_sub(n, Ordering::Relaxed);
132 }
133
134 pub fn set_max(&self, n: usize) {
136 self.value.fetch_max(n, Ordering::Relaxed);
137 }
138
139 pub fn set(&self, n: usize) -> usize {
141 self.value.swap(n, Ordering::Relaxed)
144 }
145
146 pub fn value(&self) -> usize {
148 self.value.load(Ordering::Relaxed)
149 }
150}
151
152#[derive(Debug, Clone)]
154pub struct Time {
155 nanos: Arc<AtomicUsize>,
157}
158
159impl Default for Time {
160 fn default() -> Self {
161 Self::new()
162 }
163}
164
165impl PartialEq for Time {
166 fn eq(&self, other: &Self) -> bool {
167 self.value().eq(&other.value())
168 }
169}
170
171impl Display for Time {
172 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
173 write!(f, "{}", human_readable_duration(self.value() as u64))
174 }
175}
176
177impl Time {
178 pub fn new() -> Self {
181 Self {
182 nanos: Arc::new(AtomicUsize::new(0)),
183 }
184 }
185
186 pub fn add_elapsed(&self, start: Instant) {
188 self.add_duration(start.elapsed());
189 }
190
191 pub fn add_duration(&self, duration: Duration) {
202 let more_nanos = duration.as_nanos() as usize;
203 self.nanos.fetch_add(more_nanos.max(1), Ordering::Relaxed);
204 }
205
206 pub fn add(&self, other: &Time) {
208 self.add_duration(Duration::from_nanos(other.value() as u64))
209 }
210
211 pub fn timer(&self) -> ScopedTimerGuard<'_> {
215 ScopedTimerGuard {
216 inner: self,
217 start: Some(Instant::now()),
218 }
219 }
220
221 pub fn value(&self) -> usize {
223 self.nanos.load(Ordering::Relaxed)
224 }
225
226 pub fn timer_with(&self, now: Instant) -> ScopedTimerGuard<'_> {
229 ScopedTimerGuard {
230 inner: self,
231 start: Some(now),
232 }
233 }
234}
235
236#[derive(Debug, Clone)]
239pub struct Timestamp {
240 timestamp: Arc<Mutex<Option<DateTime<Utc>>>>,
242}
243
244impl Default for Timestamp {
245 fn default() -> Self {
246 Self::new()
247 }
248}
249
250impl Timestamp {
251 pub fn new() -> Self {
253 Self {
254 timestamp: Arc::new(Mutex::new(None)),
255 }
256 }
257
258 pub fn record(&self) {
260 self.set(Utc::now())
261 }
262
263 pub fn set(&self, now: DateTime<Utc>) {
265 *self.timestamp.lock() = Some(now);
266 }
267
268 pub fn value(&self) -> Option<DateTime<Utc>> {
273 *self.timestamp.lock()
274 }
275
276 pub fn update_to_min(&self, other: &Timestamp) {
278 let min = match (self.value(), other.value()) {
279 (None, None) => None,
280 (Some(v), None) => Some(v),
281 (None, Some(v)) => Some(v),
282 (Some(v1), Some(v2)) => Some(if v1 < v2 { v1 } else { v2 }),
283 };
284
285 *self.timestamp.lock() = min;
286 }
287
288 pub fn update_to_max(&self, other: &Timestamp) {
290 let max = match (self.value(), other.value()) {
291 (None, None) => None,
292 (Some(v), None) => Some(v),
293 (None, Some(v)) => Some(v),
294 (Some(v1), Some(v2)) => Some(if v1 < v2 { v2 } else { v1 }),
295 };
296
297 *self.timestamp.lock() = max;
298 }
299}
300
301impl PartialEq for Timestamp {
302 fn eq(&self, other: &Self) -> bool {
303 self.value().eq(&other.value())
304 }
305}
306
307impl Display for Timestamp {
308 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
309 match self.value() {
310 None => write!(f, "NONE"),
311 Some(v) => {
312 write!(f, "{v}")
313 }
314 }
315 }
316}
317
318pub struct ScopedTimerGuard<'a> {
322 inner: &'a Time,
323 start: Option<Instant>,
324}
325
326impl ScopedTimerGuard<'_> {
327 pub fn stop(&mut self) {
329 if let Some(start) = self.start.take() {
330 self.inner.add_elapsed(start)
331 }
332 }
333
334 pub fn restart(&mut self) {
336 self.start = Some(Instant::now())
337 }
338
339 pub fn done(mut self) {
341 self.stop()
342 }
343
344 pub fn stop_with(&mut self, end_time: Instant) {
346 if let Some(start) = self.start.take() {
347 let elapsed = end_time - start;
348 self.inner.add_duration(elapsed)
349 }
350 }
351
352 pub fn done_with(mut self, end_time: Instant) {
355 self.stop_with(end_time)
356 }
357}
358
359impl Drop for ScopedTimerGuard<'_> {
360 fn drop(&mut self) {
361 self.stop()
362 }
363}
364
365#[derive(Debug, Clone)]
372pub struct PruningMetrics {
373 pruned: Arc<AtomicUsize>,
374 matched: Arc<AtomicUsize>,
375}
376
377impl Display for PruningMetrics {
378 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
379 let matched = self.matched.load(Ordering::Relaxed);
380 let total = self.pruned.load(Ordering::Relaxed) + matched;
381
382 write!(
383 f,
384 "{} total → {} matched",
385 human_readable_count(total),
386 human_readable_count(matched)
387 )
388 }
389}
390
391impl Default for PruningMetrics {
392 fn default() -> Self {
393 Self::new()
394 }
395}
396
397impl PruningMetrics {
398 pub fn new() -> Self {
400 Self {
401 pruned: Arc::new(AtomicUsize::new(0)),
402 matched: Arc::new(AtomicUsize::new(0)),
403 }
404 }
405
406 pub fn add_pruned(&self, n: usize) {
408 self.pruned.fetch_add(n, Ordering::Relaxed);
411 }
412
413 pub fn add_matched(&self, n: usize) {
415 self.matched.fetch_add(n, Ordering::Relaxed);
418 }
419
420 pub fn subtract_matched(&self, n: usize) {
422 self.matched.fetch_sub(n, Ordering::Relaxed);
425 }
426
427 pub fn pruned(&self) -> usize {
429 self.pruned.load(Ordering::Relaxed)
430 }
431
432 pub fn matched(&self) -> usize {
434 self.matched.load(Ordering::Relaxed)
435 }
436}
437
438#[derive(Debug, Clone, Default)]
442pub struct RatioMetrics {
443 part: Arc<AtomicUsize>,
444 total: Arc<AtomicUsize>,
445 merge_strategy: RatioMergeStrategy,
446}
447
448#[derive(Debug, Clone, Default)]
449pub enum RatioMergeStrategy {
450 #[default]
451 AddPartAddTotal,
452 AddPartSetTotal,
453 SetPartAddTotal,
454}
455
456impl RatioMetrics {
457 pub fn new() -> Self {
459 Self {
460 part: Arc::new(AtomicUsize::new(0)),
461 total: Arc::new(AtomicUsize::new(0)),
462 merge_strategy: RatioMergeStrategy::AddPartAddTotal,
463 }
464 }
465
466 pub fn with_merge_strategy(mut self, merge_strategy: RatioMergeStrategy) -> Self {
467 self.merge_strategy = merge_strategy;
468 self
469 }
470
471 pub fn add_part(&self, n: usize) {
473 self.part.fetch_add(n, Ordering::Relaxed);
474 }
475
476 pub fn add_total(&self, n: usize) {
478 self.total.fetch_add(n, Ordering::Relaxed);
479 }
480
481 pub fn set_part(&self, n: usize) {
483 self.part.store(n, Ordering::Relaxed);
484 }
485
486 pub fn set_total(&self, n: usize) {
488 self.total.store(n, Ordering::Relaxed);
489 }
490
491 pub fn merge(&self, other: &Self) {
493 match self.merge_strategy {
494 RatioMergeStrategy::AddPartAddTotal => {
495 self.add_part(other.part());
496 self.add_total(other.total());
497 }
498 RatioMergeStrategy::AddPartSetTotal => {
499 self.add_part(other.part());
500 self.set_total(other.total());
501 }
502 RatioMergeStrategy::SetPartAddTotal => {
503 self.set_part(other.part());
504 self.add_total(other.total());
505 }
506 }
507 }
508
509 pub fn part(&self) -> usize {
511 self.part.load(Ordering::Relaxed)
512 }
513
514 pub fn total(&self) -> usize {
516 self.total.load(Ordering::Relaxed)
517 }
518}
519
520impl PartialEq for RatioMetrics {
521 fn eq(&self, other: &Self) -> bool {
522 self.part() == other.part() && self.total() == other.total()
523 }
524}
525
526fn fmt_significant(mut x: f64, digits: usize) -> String {
532 if x == 0.0 {
533 return "0".to_string();
534 }
535
536 let exp = x.abs().log10().floor(); let scale = 10f64.powf(-(exp - (digits as f64 - 1.0)));
538 x = (x * scale).round() / scale; format!("{x}")
540}
541
542impl Display for RatioMetrics {
543 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
544 let part = self.part();
545 let total = self.total();
546
547 if total == 0 {
548 if part == 0 {
549 write!(f, "N/A (0/0)")
550 } else {
551 write!(f, "N/A ({}/0)", human_readable_count(part))
552 }
553 } else {
554 let percentage = (part as f64 / total as f64) * 100.0;
555
556 write!(
557 f,
558 "{}% ({}/{})",
559 fmt_significant(percentage, 2),
560 human_readable_count(part),
561 human_readable_count(total)
562 )
563 }
564 }
565}
566
567#[derive(Debug, Clone)]
573pub enum MetricValue {
574 OutputRows(Count),
576 ElapsedCompute(Time),
596 SpillCount(Count),
598 SpilledBytes(Count),
600 OutputBytes(Count),
602 OutputBatches(Count),
604 SpilledRows(Count),
606 CurrentMemoryUsage(Gauge),
608 Count {
610 name: Cow<'static, str>,
612 count: Count,
614 },
615 Gauge {
617 name: Cow<'static, str>,
619 gauge: Gauge,
621 },
622 Time {
624 name: Cow<'static, str>,
626 time: Time,
628 },
629 StartTimestamp(Timestamp),
631 EndTimestamp(Timestamp),
633 PruningMetrics {
635 name: Cow<'static, str>,
636 pruning_metrics: PruningMetrics,
637 },
638 Ratio {
640 name: Cow<'static, str>,
641 ratio_metrics: RatioMetrics,
642 },
643 Custom {
644 name: Cow<'static, str>,
646 value: Arc<dyn CustomMetricValue>,
648 },
649}
650
651impl PartialEq for MetricValue {
654 fn eq(&self, other: &Self) -> bool {
655 match (self, other) {
656 (MetricValue::OutputRows(count), MetricValue::OutputRows(other)) => {
657 count == other
658 }
659 (MetricValue::ElapsedCompute(time), MetricValue::ElapsedCompute(other)) => {
660 time == other
661 }
662 (MetricValue::SpillCount(count), MetricValue::SpillCount(other)) => {
663 count == other
664 }
665 (MetricValue::SpilledBytes(count), MetricValue::SpilledBytes(other)) => {
666 count == other
667 }
668 (MetricValue::OutputBytes(count), MetricValue::OutputBytes(other)) => {
669 count == other
670 }
671 (MetricValue::OutputBatches(count), MetricValue::OutputBatches(other)) => {
672 count == other
673 }
674 (MetricValue::SpilledRows(count), MetricValue::SpilledRows(other)) => {
675 count == other
676 }
677 (
678 MetricValue::CurrentMemoryUsage(gauge),
679 MetricValue::CurrentMemoryUsage(other),
680 ) => gauge == other,
681 (
682 MetricValue::Count { name, count },
683 MetricValue::Count {
684 name: other_name,
685 count: other_count,
686 },
687 ) => name == other_name && count == other_count,
688 (
689 MetricValue::Gauge { name, gauge },
690 MetricValue::Gauge {
691 name: other_name,
692 gauge: other_gauge,
693 },
694 ) => name == other_name && gauge == other_gauge,
695 (
696 MetricValue::Time { name, time },
697 MetricValue::Time {
698 name: other_name,
699 time: other_time,
700 },
701 ) => name == other_name && time == other_time,
702
703 (
704 MetricValue::StartTimestamp(timestamp),
705 MetricValue::StartTimestamp(other),
706 ) => timestamp == other,
707 (MetricValue::EndTimestamp(timestamp), MetricValue::EndTimestamp(other)) => {
708 timestamp == other
709 }
710 (
711 MetricValue::PruningMetrics {
712 name,
713 pruning_metrics,
714 },
715 MetricValue::PruningMetrics {
716 name: other_name,
717 pruning_metrics: other_pruning_metrics,
718 },
719 ) => {
720 name == other_name
721 && pruning_metrics.pruned() == other_pruning_metrics.pruned()
722 && pruning_metrics.matched() == other_pruning_metrics.matched()
723 }
724 (
725 MetricValue::Ratio {
726 name,
727 ratio_metrics,
728 },
729 MetricValue::Ratio {
730 name: other_name,
731 ratio_metrics: other_ratio_metrics,
732 },
733 ) => name == other_name && ratio_metrics == other_ratio_metrics,
734 (
735 MetricValue::Custom { name, value },
736 MetricValue::Custom {
737 name: other_name,
738 value: other_value,
739 },
740 ) => name == other_name && value.is_eq(other_value),
741 _ => false,
743 }
744 }
745}
746
747impl MetricValue {
748 pub fn name(&self) -> &str {
750 match self {
751 Self::OutputRows(_) => "output_rows",
752 Self::SpillCount(_) => "spill_count",
753 Self::SpilledBytes(_) => "spilled_bytes",
754 Self::OutputBytes(_) => "output_bytes",
755 Self::OutputBatches(_) => "output_batches",
756 Self::SpilledRows(_) => "spilled_rows",
757 Self::CurrentMemoryUsage(_) => "mem_used",
758 Self::ElapsedCompute(_) => "elapsed_compute",
759 Self::Count { name, .. } => name.borrow(),
760 Self::Gauge { name, .. } => name.borrow(),
761 Self::Time { name, .. } => name.borrow(),
762 Self::StartTimestamp(_) => "start_timestamp",
763 Self::EndTimestamp(_) => "end_timestamp",
764 Self::PruningMetrics { name, .. } => name.borrow(),
765 Self::Ratio { name, .. } => name.borrow(),
766 Self::Custom { name, .. } => name.borrow(),
767 }
768 }
769
770 pub fn as_usize(&self) -> usize {
773 match self {
774 Self::OutputRows(count) => count.value(),
775 Self::SpillCount(count) => count.value(),
776 Self::SpilledBytes(bytes) => bytes.value(),
777 Self::OutputBytes(bytes) => bytes.value(),
778 Self::OutputBatches(count) => count.value(),
779 Self::SpilledRows(count) => count.value(),
780 Self::CurrentMemoryUsage(used) => used.value(),
781 Self::ElapsedCompute(time) => time.value(),
782 Self::Count { count, .. } => count.value(),
783 Self::Gauge { gauge, .. } => gauge.value(),
784 Self::Time { time, .. } => time.value(),
785 Self::StartTimestamp(timestamp) => timestamp
786 .value()
787 .and_then(|ts| ts.timestamp_nanos_opt())
788 .map(|nanos| nanos as usize)
789 .unwrap_or(0),
790 Self::EndTimestamp(timestamp) => timestamp
791 .value()
792 .and_then(|ts| ts.timestamp_nanos_opt())
793 .map(|nanos| nanos as usize)
794 .unwrap_or(0),
795 Self::PruningMetrics { .. } => 0,
799 Self::Ratio { .. } => 0,
801 Self::Custom { value, .. } => value.as_usize(),
802 }
803 }
804
805 pub fn new_empty(&self) -> Self {
808 match self {
809 Self::OutputRows(_) => Self::OutputRows(Count::new()),
810 Self::SpillCount(_) => Self::SpillCount(Count::new()),
811 Self::SpilledBytes(_) => Self::SpilledBytes(Count::new()),
812 Self::OutputBytes(_) => Self::OutputBytes(Count::new()),
813 Self::OutputBatches(_) => Self::OutputBatches(Count::new()),
814 Self::SpilledRows(_) => Self::SpilledRows(Count::new()),
815 Self::CurrentMemoryUsage(_) => Self::CurrentMemoryUsage(Gauge::new()),
816 Self::ElapsedCompute(_) => Self::ElapsedCompute(Time::new()),
817 Self::Count { name, .. } => Self::Count {
818 name: name.clone(),
819 count: Count::new(),
820 },
821 Self::Gauge { name, .. } => Self::Gauge {
822 name: name.clone(),
823 gauge: Gauge::new(),
824 },
825 Self::Time { name, .. } => Self::Time {
826 name: name.clone(),
827 time: Time::new(),
828 },
829 Self::StartTimestamp(_) => Self::StartTimestamp(Timestamp::new()),
830 Self::EndTimestamp(_) => Self::EndTimestamp(Timestamp::new()),
831 Self::PruningMetrics { name, .. } => Self::PruningMetrics {
832 name: name.clone(),
833 pruning_metrics: PruningMetrics::new(),
834 },
835 Self::Ratio {
836 name,
837 ratio_metrics,
838 } => {
839 let merge_strategy = ratio_metrics.merge_strategy.clone();
840 Self::Ratio {
841 name: name.clone(),
842 ratio_metrics: RatioMetrics::new()
843 .with_merge_strategy(merge_strategy),
844 }
845 }
846 Self::Custom { name, value } => Self::Custom {
847 name: name.clone(),
848 value: value.new_empty(),
849 },
850 }
851 }
852
853 pub fn aggregate(&mut self, other: &Self) {
863 match (self, other) {
864 (Self::OutputRows(count), Self::OutputRows(other_count))
865 | (Self::SpillCount(count), Self::SpillCount(other_count))
866 | (Self::SpilledBytes(count), Self::SpilledBytes(other_count))
867 | (Self::OutputBytes(count), Self::OutputBytes(other_count))
868 | (Self::OutputBatches(count), Self::OutputBatches(other_count))
869 | (Self::SpilledRows(count), Self::SpilledRows(other_count))
870 | (
871 Self::Count { count, .. },
872 Self::Count {
873 count: other_count, ..
874 },
875 ) => count.add(other_count.value()),
876 (Self::CurrentMemoryUsage(gauge), Self::CurrentMemoryUsage(other_gauge))
877 | (
878 Self::Gauge { gauge, .. },
879 Self::Gauge {
880 gauge: other_gauge, ..
881 },
882 ) => gauge.add(other_gauge.value()),
883 (Self::ElapsedCompute(time), Self::ElapsedCompute(other_time))
884 | (
885 Self::Time { time, .. },
886 Self::Time {
887 time: other_time, ..
888 },
889 ) => time.add(other_time),
890 (Self::StartTimestamp(timestamp), Self::StartTimestamp(other_timestamp)) => {
892 timestamp.update_to_min(other_timestamp);
893 }
894 (Self::EndTimestamp(timestamp), Self::EndTimestamp(other_timestamp)) => {
896 timestamp.update_to_max(other_timestamp);
897 }
898 (
899 Self::PruningMetrics {
900 pruning_metrics, ..
901 },
902 Self::PruningMetrics {
903 pruning_metrics: other_pruning_metrics,
904 ..
905 },
906 ) => {
907 let pruned = other_pruning_metrics.pruned.load(Ordering::Relaxed);
908 let matched = other_pruning_metrics.matched.load(Ordering::Relaxed);
909 pruning_metrics.add_pruned(pruned);
910 pruning_metrics.add_matched(matched);
911 }
912 (
913 Self::Ratio { ratio_metrics, .. },
914 Self::Ratio {
915 ratio_metrics: other_ratio_metrics,
916 ..
917 },
918 ) => {
919 ratio_metrics.merge(other_ratio_metrics);
920 }
921 (
922 Self::Custom { value, .. },
923 Self::Custom {
924 value: other_value, ..
925 },
926 ) => {
927 value.aggregate(Arc::clone(other_value));
928 }
929 m @ (_, _) => {
930 panic!(
931 "Mismatched metric types. Can not aggregate {:?} with value {:?}",
932 m.0, m.1
933 )
934 }
935 }
936 }
937
938 pub fn display_sort_key(&self) -> u8 {
941 match self {
942 Self::OutputRows(_) => 0,
944 Self::ElapsedCompute(_) => 1,
945 Self::OutputBytes(_) => 2,
946 Self::OutputBatches(_) => 3,
947 Self::PruningMetrics { name, .. } => match name.as_ref() {
949 "files_ranges_pruned_statistics" => 4,
957 "row_groups_pruned_statistics" => 5,
958 "row_groups_pruned_bloom_filter" => 6,
959 "page_index_rows_pruned" => 7,
960 _ => 8,
961 },
962 Self::SpillCount(_) => 9,
963 Self::SpilledBytes(_) => 10,
964 Self::SpilledRows(_) => 11,
965 Self::CurrentMemoryUsage(_) => 12,
966 Self::Count { .. } => 13,
967 Self::Gauge { .. } => 14,
968 Self::Time { .. } => 15,
969 Self::Ratio { .. } => 16,
970 Self::StartTimestamp(_) => 17, Self::EndTimestamp(_) => 18,
972 Self::Custom { .. } => 19,
973 }
974 }
975
976 pub fn is_timestamp(&self) -> bool {
978 matches!(self, Self::StartTimestamp(_) | Self::EndTimestamp(_))
979 }
980}
981
982impl Display for MetricValue {
983 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
985 match self {
986 Self::OutputRows(count)
987 | Self::OutputBatches(count)
988 | Self::SpillCount(count)
989 | Self::SpilledRows(count)
990 | Self::Count { count, .. } => {
991 write!(f, "{count}")
992 }
993 Self::SpilledBytes(count) | Self::OutputBytes(count) => {
994 let readable_count = human_readable_size(count.value());
995 write!(f, "{readable_count}")
996 }
997 Self::CurrentMemoryUsage(gauge) => {
998 let readable_size = human_readable_size(gauge.value());
1000 write!(f, "{readable_size}")
1001 }
1002 Self::Gauge { gauge, .. } => {
1003 write!(f, "{}", human_readable_count(gauge.value()))
1005 }
1006 Self::ElapsedCompute(time) | Self::Time { time, .. } => {
1007 if time.value() > 0 {
1010 write!(f, "{time}")
1011 } else {
1012 write!(f, "NOT RECORDED")
1013 }
1014 }
1015 Self::StartTimestamp(timestamp) | Self::EndTimestamp(timestamp) => {
1016 write!(f, "{timestamp}")
1017 }
1018 Self::PruningMetrics {
1019 pruning_metrics, ..
1020 } => {
1021 write!(f, "{pruning_metrics}")
1022 }
1023 Self::Ratio { ratio_metrics, .. } => write!(f, "{ratio_metrics}"),
1024 Self::Custom { name, value } => {
1025 write!(f, "name:{name} {value}")
1026 }
1027 }
1028 }
1029}
1030
1031#[cfg(test)]
1032mod tests {
1033 use std::any::Any;
1034
1035 use chrono::TimeZone;
1036 use datafusion_common::units::MB;
1037
1038 use super::*;
1039
1040 #[derive(Debug, Default)]
1041 pub struct CustomCounter {
1042 count: AtomicUsize,
1043 }
1044
1045 impl Display for CustomCounter {
1046 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1047 write!(f, "count: {}", self.count.load(Ordering::Relaxed))
1048 }
1049 }
1050
1051 impl CustomMetricValue for CustomCounter {
1052 fn new_empty(&self) -> Arc<dyn CustomMetricValue> {
1053 Arc::new(CustomCounter::default())
1054 }
1055
1056 fn aggregate(&self, other: Arc<dyn CustomMetricValue + 'static>) {
1057 let other = other.as_any().downcast_ref::<Self>().unwrap();
1058 self.count
1059 .fetch_add(other.count.load(Ordering::Relaxed), Ordering::Relaxed);
1060 }
1061
1062 fn as_any(&self) -> &dyn Any {
1063 self
1064 }
1065
1066 fn is_eq(&self, other: &Arc<dyn CustomMetricValue>) -> bool {
1067 let Some(other) = other.as_any().downcast_ref::<Self>() else {
1068 return false;
1069 };
1070
1071 self.count.load(Ordering::Relaxed) == other.count.load(Ordering::Relaxed)
1072 }
1073 }
1074
1075 fn new_custom_counter(name: &'static str, value: usize) -> MetricValue {
1076 let custom_counter = CustomCounter::default();
1077 custom_counter.count.fetch_add(value, Ordering::Relaxed);
1078
1079 MetricValue::Custom {
1080 name: Cow::Borrowed(name),
1081 value: Arc::new(custom_counter),
1082 }
1083 }
1084
1085 #[test]
1086 fn test_custom_metric_with_mismatching_names() {
1087 let mut custom_val = new_custom_counter("Hi", 1);
1088 let other_custom_val = new_custom_counter("Hello", 1);
1089
1090 assert!(other_custom_val != custom_val);
1092
1093 custom_val.aggregate(&other_custom_val);
1095
1096 let expected_val = new_custom_counter("Hi", 2);
1097 assert!(expected_val == custom_val);
1098 }
1099
1100 #[test]
1101 fn test_custom_metric() {
1102 let mut custom_val = new_custom_counter("hi", 11);
1103 let other_custom_val = new_custom_counter("hi", 20);
1104
1105 custom_val.aggregate(&other_custom_val);
1106
1107 assert!(custom_val != other_custom_val);
1108
1109 if let MetricValue::Custom { value, .. } = custom_val {
1110 let counter = value
1111 .as_any()
1112 .downcast_ref::<CustomCounter>()
1113 .expect("Expected CustomCounter");
1114 assert_eq!(counter.count.load(Ordering::Relaxed), 31);
1115 } else {
1116 panic!("Unexpected value");
1117 }
1118 }
1119
1120 #[test]
1121 fn test_display_output_rows() {
1122 let count = Count::new();
1123 let values = vec![
1124 MetricValue::OutputRows(count.clone()),
1125 MetricValue::Count {
1126 name: "my_counter".into(),
1127 count: count.clone(),
1128 },
1129 ];
1130
1131 for value in &values {
1132 assert_eq!("0", value.to_string(), "value {value:?}");
1133 }
1134
1135 count.add(42);
1136 for value in &values {
1137 assert_eq!("42", value.to_string(), "value {value:?}");
1138 }
1139 }
1140
1141 #[test]
1142 fn test_display_spilled_bytes() {
1143 let count = Count::new();
1144 let spilled_byte = MetricValue::SpilledBytes(count.clone());
1145
1146 assert_eq!("0.0 B", spilled_byte.to_string());
1147
1148 count.add((100 * MB) as usize);
1149 assert_eq!("100.0 MB", spilled_byte.to_string());
1150
1151 count.add((0.5 * MB as f64) as usize);
1152 assert_eq!("100.5 MB", spilled_byte.to_string());
1153 }
1154
1155 #[test]
1156 fn test_display_time() {
1157 let time = Time::new();
1158 let values = vec![
1159 MetricValue::ElapsedCompute(time.clone()),
1160 MetricValue::Time {
1161 name: "my_time".into(),
1162 time: time.clone(),
1163 },
1164 ];
1165
1166 for value in &values {
1168 assert_eq!("NOT RECORDED", value.to_string(), "value {value:?}");
1169 }
1170
1171 time.add_duration(Duration::from_nanos(1042));
1172 for value in &values {
1173 assert_eq!("1.04µs", value.to_string(), "value {value:?}");
1174 }
1175 }
1176
1177 #[test]
1178 fn test_display_ratio() {
1179 let ratio_metrics = RatioMetrics::new();
1180 let ratio = MetricValue::Ratio {
1181 name: Cow::Borrowed("ratio_metric"),
1182 ratio_metrics: ratio_metrics.clone(),
1183 };
1184
1185 assert_eq!("N/A (0/0)", ratio.to_string());
1186
1187 ratio_metrics.add_part(10);
1188 assert_eq!("N/A (10/0)", ratio.to_string());
1189
1190 ratio_metrics.add_total(40);
1191 assert_eq!("25% (10/40)", ratio.to_string());
1192
1193 let tiny_ratio_metrics = RatioMetrics::new();
1194 let tiny_ratio = MetricValue::Ratio {
1195 name: Cow::Borrowed("tiny_ratio_metric"),
1196 ratio_metrics: tiny_ratio_metrics.clone(),
1197 };
1198 tiny_ratio_metrics.add_part(1);
1199 tiny_ratio_metrics.add_total(3000);
1200 assert_eq!("0.033% (1/3.00 K)", tiny_ratio.to_string());
1201 }
1202
1203 #[test]
1204 fn test_ratio_set_methods() {
1205 let ratio_metrics = RatioMetrics::new();
1206
1207 ratio_metrics.set_part(10);
1209 ratio_metrics.set_part(10);
1210 ratio_metrics.set_total(40);
1211 ratio_metrics.set_total(40);
1212 assert_eq!("25% (10/40)", ratio_metrics.to_string());
1213
1214 let ratio_metrics = RatioMetrics::new();
1215
1216 ratio_metrics.set_part(10);
1218 ratio_metrics.set_part(30);
1219 ratio_metrics.set_total(40);
1220 ratio_metrics.set_total(50);
1221 assert_eq!("60% (30/50)", ratio_metrics.to_string());
1222 }
1223
1224 #[test]
1225 fn test_ratio_merge_strategy() {
1226 let ratio_metrics1 =
1228 RatioMetrics::new().with_merge_strategy(RatioMergeStrategy::AddPartSetTotal);
1229
1230 ratio_metrics1.set_part(10);
1231 ratio_metrics1.set_total(40);
1232 assert_eq!("25% (10/40)", ratio_metrics1.to_string());
1233 let ratio_metrics2 =
1234 RatioMetrics::new().with_merge_strategy(RatioMergeStrategy::AddPartSetTotal);
1235 ratio_metrics2.set_part(20);
1236 ratio_metrics2.set_total(40);
1237 assert_eq!("50% (20/40)", ratio_metrics2.to_string());
1238
1239 ratio_metrics1.merge(&ratio_metrics2);
1240 assert_eq!("75% (30/40)", ratio_metrics1.to_string());
1241
1242 let ratio_metrics1 =
1244 RatioMetrics::new().with_merge_strategy(RatioMergeStrategy::SetPartAddTotal);
1245 ratio_metrics1.set_part(20);
1246 ratio_metrics1.set_total(50);
1247 let ratio_metrics2 = RatioMetrics::new();
1248 ratio_metrics2.set_part(20);
1249 ratio_metrics2.set_total(50);
1250 ratio_metrics1.merge(&ratio_metrics2);
1251 assert_eq!("20% (20/100)", ratio_metrics1.to_string());
1252
1253 let ratio_metrics1 = RatioMetrics::new();
1255 ratio_metrics1.set_part(20);
1256 ratio_metrics1.set_total(50);
1257 let ratio_metrics2 = RatioMetrics::new();
1258 ratio_metrics2.set_part(20);
1259 ratio_metrics2.set_total(50);
1260 ratio_metrics1.merge(&ratio_metrics2);
1261 assert_eq!("40% (40/100)", ratio_metrics1.to_string());
1262 }
1263
1264 #[test]
1265 fn test_display_timestamp() {
1266 let timestamp = Timestamp::new();
1267 let values = vec![
1268 MetricValue::StartTimestamp(timestamp.clone()),
1269 MetricValue::EndTimestamp(timestamp.clone()),
1270 ];
1271
1272 for value in &values {
1274 assert_eq!("NONE", value.to_string(), "value {value:?}");
1275 }
1276
1277 timestamp.set(Utc.timestamp_nanos(1431648000000000));
1278 for value in &values {
1279 assert_eq!(
1280 "1970-01-17 13:40:48 UTC",
1281 value.to_string(),
1282 "value {value:?}"
1283 );
1284 }
1285 }
1286
1287 #[test]
1288 fn test_timer_with_custom_instant() {
1289 let time = Time::new();
1290 let start_time = Instant::now();
1291
1292 std::thread::sleep(Duration::from_millis(1));
1294
1295 let mut timer = time.timer_with(start_time);
1297
1298 std::thread::sleep(Duration::from_millis(1));
1300
1301 timer.stop();
1303
1304 assert!(
1306 time.value() >= 2_000_000,
1307 "Expected at least 2ms, got {} ns",
1308 time.value()
1309 );
1310 }
1311
1312 #[test]
1313 fn test_stop_with_custom_endpoint() {
1314 let time = Time::new();
1315 let start = Instant::now();
1316 let mut timer = time.timer_with(start);
1317
1318 let end = start + Duration::from_millis(10);
1320
1321 timer.stop_with(end);
1323
1324 let recorded = time.value();
1327 assert!(
1328 (10_000_000..=10_100_000).contains(&recorded),
1329 "Expected ~10ms, got {recorded} ns"
1330 );
1331
1332 timer.stop_with(end);
1334 assert_eq!(
1335 recorded,
1336 time.value(),
1337 "Time should not change after second stop"
1338 );
1339 }
1340
1341 #[test]
1342 fn test_done_with_custom_endpoint() {
1343 let time = Time::new();
1344 let start = Instant::now();
1345
1346 {
1348 let timer = time.timer_with(start);
1349
1350 let end = start + Duration::from_millis(5);
1352
1353 timer.done_with(end);
1355
1356 }
1358
1359 let recorded = time.value();
1361 assert!(
1362 (5_000_000..=5_100_000).contains(&recorded),
1363 "Expected ~5ms, got {recorded} ns",
1364 );
1365
1366 {
1368 let timer2 = time.timer_with(start);
1369 let end2 = start + Duration::from_millis(5);
1370 timer2.done_with(end2);
1371 }
1373
1374 let new_recorded = time.value();
1376 assert!(
1377 (10_000_000..=10_100_000).contains(&new_recorded),
1378 "Expected ~10ms total, got {new_recorded} ns",
1379 );
1380 }
1381
1382 #[test]
1383 fn test_human_readable_metric_formatting() {
1384 let small_count = Count::new();
1386 small_count.add(42);
1387 assert_eq!(
1388 MetricValue::OutputRows(small_count.clone()).to_string(),
1389 "42"
1390 );
1391
1392 let thousand_count = Count::new();
1393 thousand_count.add(10_100);
1394 assert_eq!(
1395 MetricValue::OutputRows(thousand_count.clone()).to_string(),
1396 "10.10 K"
1397 );
1398
1399 let million_count = Count::new();
1400 million_count.add(1_532_000);
1401 assert_eq!(
1402 MetricValue::SpilledRows(million_count.clone()).to_string(),
1403 "1.53 M"
1404 );
1405
1406 let billion_count = Count::new();
1407 billion_count.add(2_500_000_000);
1408 assert_eq!(
1409 MetricValue::OutputBatches(billion_count.clone()).to_string(),
1410 "2.50 B"
1411 );
1412
1413 let micros_time = Time::new();
1415 micros_time.add_duration(Duration::from_nanos(1_234));
1416 assert_eq!(
1417 MetricValue::ElapsedCompute(micros_time.clone()).to_string(),
1418 "1.23µs"
1419 );
1420
1421 let millis_time = Time::new();
1422 millis_time.add_duration(Duration::from_nanos(11_295_377));
1423 assert_eq!(
1424 MetricValue::ElapsedCompute(millis_time.clone()).to_string(),
1425 "11.30ms"
1426 );
1427
1428 let seconds_time = Time::new();
1429 seconds_time.add_duration(Duration::from_nanos(1_234_567_890));
1430 assert_eq!(
1431 MetricValue::ElapsedCompute(seconds_time.clone()).to_string(),
1432 "1.23s"
1433 );
1434
1435 let mem_gauge = Gauge::new();
1437 mem_gauge.add(100 * MB as usize);
1438 assert_eq!(
1439 MetricValue::CurrentMemoryUsage(mem_gauge.clone()).to_string(),
1440 "100.0 MB"
1441 );
1442
1443 let custom_gauge = Gauge::new();
1445 custom_gauge.add(50_000);
1446 assert_eq!(
1447 MetricValue::Gauge {
1448 name: "custom".into(),
1449 gauge: custom_gauge.clone()
1450 }
1451 .to_string(),
1452 "50.00 K"
1453 );
1454
1455 let pruning = PruningMetrics::new();
1457 pruning.add_matched(500_000);
1458 pruning.add_pruned(500_000);
1459 assert_eq!(
1460 MetricValue::PruningMetrics {
1461 name: "test_pruning".into(),
1462 pruning_metrics: pruning.clone()
1463 }
1464 .to_string(),
1465 "1.00 M total → 500.0 K matched"
1466 );
1467
1468 let ratio = RatioMetrics::new();
1470 ratio.add_part(250_000);
1471 ratio.add_total(1_000_000);
1472 assert_eq!(
1473 MetricValue::Ratio {
1474 name: "test_ratio".into(),
1475 ratio_metrics: ratio.clone()
1476 }
1477 .to_string(),
1478 "25% (250.0 K/1.00 M)"
1479 );
1480 }
1481}