1use super::CustomMetricValue;
21use chrono::{DateTime, Utc};
22use datafusion_common::{
23 human_readable_count, human_readable_duration, human_readable_size, instant::Instant,
24};
25use parking_lot::Mutex;
26use std::{
27 borrow::{Borrow, Cow},
28 fmt::{Debug, Display},
29 sync::{
30 Arc,
31 atomic::{AtomicUsize, Ordering},
32 },
33 time::Duration,
34};
35
36#[derive(Debug, Clone)]
40pub struct Count {
41 value: Arc<AtomicUsize>,
43}
44
45impl PartialEq for Count {
46 fn eq(&self, other: &Self) -> bool {
47 self.value().eq(&other.value())
48 }
49}
50
51impl Display for Count {
52 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
53 write!(f, "{}", human_readable_count(self.value()))
54 }
55}
56
57impl Default for Count {
58 fn default() -> Self {
59 Self::new()
60 }
61}
62
63impl Count {
64 pub fn new() -> Self {
66 Self {
67 value: Arc::new(AtomicUsize::new(0)),
68 }
69 }
70
71 pub fn add(&self, n: usize) {
73 self.value.fetch_add(n, Ordering::Relaxed);
76 }
77
78 pub fn value(&self) -> usize {
80 self.value.load(Ordering::Relaxed)
81 }
82}
83
84#[derive(Debug, Clone)]
89pub struct Gauge {
90 value: Arc<AtomicUsize>,
92}
93
94impl PartialEq for Gauge {
95 fn eq(&self, other: &Self) -> bool {
96 self.value().eq(&other.value())
97 }
98}
99
100impl Display for Gauge {
101 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
102 write!(f, "{}", self.value())
103 }
104}
105
106impl Default for Gauge {
107 fn default() -> Self {
108 Self::new()
109 }
110}
111
112impl Gauge {
113 pub fn new() -> Self {
115 Self {
116 value: Arc::new(AtomicUsize::new(0)),
117 }
118 }
119
120 pub fn add(&self, n: usize) {
122 self.value.fetch_add(n, Ordering::Relaxed);
125 }
126
127 pub fn sub(&self, n: usize) {
129 self.value.fetch_sub(n, Ordering::Relaxed);
132 }
133
134 pub fn set_max(&self, n: usize) {
136 self.value.fetch_max(n, Ordering::Relaxed);
137 }
138
139 pub fn set(&self, n: usize) -> usize {
141 self.value.swap(n, Ordering::Relaxed)
144 }
145
146 pub fn value(&self) -> usize {
148 self.value.load(Ordering::Relaxed)
149 }
150}
151
152#[derive(Debug, Clone)]
154pub struct Time {
155 nanos: Arc<AtomicUsize>,
157}
158
159impl Default for Time {
160 fn default() -> Self {
161 Self::new()
162 }
163}
164
165impl PartialEq for Time {
166 fn eq(&self, other: &Self) -> bool {
167 self.value().eq(&other.value())
168 }
169}
170
171impl Display for Time {
172 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
173 write!(f, "{}", human_readable_duration(self.value() as u64))
174 }
175}
176
177impl Time {
178 pub fn new() -> Self {
181 Self {
182 nanos: Arc::new(AtomicUsize::new(0)),
183 }
184 }
185
186 pub fn add_elapsed(&self, start: Instant) {
188 self.add_duration(start.elapsed());
189 }
190
191 pub fn add_duration(&self, duration: Duration) {
202 let more_nanos = duration.as_nanos() as usize;
203 self.nanos.fetch_add(more_nanos.max(1), Ordering::Relaxed);
204 }
205
206 pub fn add(&self, other: &Time) {
208 self.add_duration(Duration::from_nanos(other.value() as u64))
209 }
210
211 pub fn timer(&self) -> ScopedTimerGuard<'_> {
215 ScopedTimerGuard {
216 inner: self,
217 start: Some(Instant::now()),
218 }
219 }
220
221 pub fn value(&self) -> usize {
223 self.nanos.load(Ordering::Relaxed)
224 }
225
226 pub fn timer_with(&self, now: Instant) -> ScopedTimerGuard<'_> {
229 ScopedTimerGuard {
230 inner: self,
231 start: Some(now),
232 }
233 }
234}
235
236#[derive(Debug, Clone)]
239pub struct Timestamp {
240 timestamp: Arc<Mutex<Option<DateTime<Utc>>>>,
242}
243
244impl Default for Timestamp {
245 fn default() -> Self {
246 Self::new()
247 }
248}
249
250impl Timestamp {
251 pub fn new() -> Self {
253 Self {
254 timestamp: Arc::new(Mutex::new(None)),
255 }
256 }
257
258 pub fn record(&self) {
260 self.set(Utc::now())
261 }
262
263 pub fn set(&self, now: DateTime<Utc>) {
265 *self.timestamp.lock() = Some(now);
266 }
267
268 pub fn value(&self) -> Option<DateTime<Utc>> {
273 *self.timestamp.lock()
274 }
275
276 pub fn update_to_min(&self, other: &Timestamp) {
278 let min = match (self.value(), other.value()) {
279 (None, None) => None,
280 (Some(v), None) => Some(v),
281 (None, Some(v)) => Some(v),
282 (Some(v1), Some(v2)) => Some(if v1 < v2 { v1 } else { v2 }),
283 };
284
285 *self.timestamp.lock() = min;
286 }
287
288 pub fn update_to_max(&self, other: &Timestamp) {
290 let max = match (self.value(), other.value()) {
291 (None, None) => None,
292 (Some(v), None) => Some(v),
293 (None, Some(v)) => Some(v),
294 (Some(v1), Some(v2)) => Some(if v1 < v2 { v2 } else { v1 }),
295 };
296
297 *self.timestamp.lock() = max;
298 }
299}
300
301impl PartialEq for Timestamp {
302 fn eq(&self, other: &Self) -> bool {
303 self.value().eq(&other.value())
304 }
305}
306
307impl Display for Timestamp {
308 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
309 match self.value() {
310 None => write!(f, "NONE"),
311 Some(v) => {
312 write!(f, "{v}")
313 }
314 }
315 }
316}
317
318pub struct ScopedTimerGuard<'a> {
322 inner: &'a Time,
323 start: Option<Instant>,
324}
325
326impl ScopedTimerGuard<'_> {
327 pub fn stop(&mut self) {
329 if let Some(start) = self.start.take() {
330 self.inner.add_elapsed(start)
331 }
332 }
333
334 pub fn restart(&mut self) {
336 self.start = Some(Instant::now())
337 }
338
339 pub fn done(mut self) {
341 self.stop()
342 }
343
344 pub fn stop_with(&mut self, end_time: Instant) {
346 if let Some(start) = self.start.take() {
347 let elapsed = end_time - start;
348 self.inner.add_duration(elapsed)
349 }
350 }
351
352 pub fn done_with(mut self, end_time: Instant) {
355 self.stop_with(end_time)
356 }
357}
358
359impl Drop for ScopedTimerGuard<'_> {
360 fn drop(&mut self) {
361 self.stop()
362 }
363}
364
365#[derive(Debug, Clone)]
372pub struct PruningMetrics {
373 pruned: Arc<AtomicUsize>,
374 matched: Arc<AtomicUsize>,
375 fully_matched: Arc<AtomicUsize>,
376}
377
378impl Display for PruningMetrics {
379 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
380 let matched = self.matched.load(Ordering::Relaxed);
381 let total = self.pruned.load(Ordering::Relaxed) + matched;
382 let fully_matched = self.fully_matched.load(Ordering::Relaxed);
383
384 if fully_matched != 0 {
385 write!(
386 f,
387 "{} total → {} matched -> {} fully matched",
388 human_readable_count(total),
389 human_readable_count(matched),
390 human_readable_count(fully_matched)
391 )
392 } else {
393 write!(
394 f,
395 "{} total → {} matched",
396 human_readable_count(total),
397 human_readable_count(matched)
398 )
399 }
400 }
401}
402
403impl Default for PruningMetrics {
404 fn default() -> Self {
405 Self::new()
406 }
407}
408
409impl PruningMetrics {
410 pub fn new() -> Self {
412 Self {
413 pruned: Arc::new(AtomicUsize::new(0)),
414 matched: Arc::new(AtomicUsize::new(0)),
415 fully_matched: Arc::new(AtomicUsize::new(0)),
416 }
417 }
418
419 pub fn add_pruned(&self, n: usize) {
421 self.pruned.fetch_add(n, Ordering::Relaxed);
424 }
425
426 pub fn add_matched(&self, n: usize) {
428 self.matched.fetch_add(n, Ordering::Relaxed);
431 }
432
433 pub fn add_fully_matched(&self, n: usize) {
435 self.fully_matched.fetch_add(n, Ordering::Relaxed);
438 }
439
440 pub fn subtract_matched(&self, n: usize) {
442 self.matched.fetch_sub(n, Ordering::Relaxed);
445 }
446
447 pub fn pruned(&self) -> usize {
449 self.pruned.load(Ordering::Relaxed)
450 }
451
452 pub fn matched(&self) -> usize {
454 self.matched.load(Ordering::Relaxed)
455 }
456
457 pub fn fully_matched(&self) -> usize {
459 self.fully_matched.load(Ordering::Relaxed)
460 }
461}
462
463#[derive(Debug, Clone, Default)]
467pub struct RatioMetrics {
468 part: Arc<AtomicUsize>,
469 total: Arc<AtomicUsize>,
470 merge_strategy: RatioMergeStrategy,
471}
472
473#[derive(Debug, Clone, Default)]
474pub enum RatioMergeStrategy {
475 #[default]
476 AddPartAddTotal,
477 AddPartSetTotal,
478 SetPartAddTotal,
479}
480
481impl RatioMetrics {
482 pub fn new() -> Self {
484 Self {
485 part: Arc::new(AtomicUsize::new(0)),
486 total: Arc::new(AtomicUsize::new(0)),
487 merge_strategy: RatioMergeStrategy::AddPartAddTotal,
488 }
489 }
490
491 pub fn with_merge_strategy(mut self, merge_strategy: RatioMergeStrategy) -> Self {
492 self.merge_strategy = merge_strategy;
493 self
494 }
495
496 pub fn add_part(&self, n: usize) {
498 self.part.fetch_add(n, Ordering::Relaxed);
499 }
500
501 pub fn add_total(&self, n: usize) {
503 self.total.fetch_add(n, Ordering::Relaxed);
504 }
505
506 pub fn set_part(&self, n: usize) {
508 self.part.store(n, Ordering::Relaxed);
509 }
510
511 pub fn set_total(&self, n: usize) {
513 self.total.store(n, Ordering::Relaxed);
514 }
515
516 pub fn merge(&self, other: &Self) {
518 match self.merge_strategy {
519 RatioMergeStrategy::AddPartAddTotal => {
520 self.add_part(other.part());
521 self.add_total(other.total());
522 }
523 RatioMergeStrategy::AddPartSetTotal => {
524 self.add_part(other.part());
525 self.set_total(other.total());
526 }
527 RatioMergeStrategy::SetPartAddTotal => {
528 self.set_part(other.part());
529 self.add_total(other.total());
530 }
531 }
532 }
533
534 pub fn part(&self) -> usize {
536 self.part.load(Ordering::Relaxed)
537 }
538
539 pub fn total(&self) -> usize {
541 self.total.load(Ordering::Relaxed)
542 }
543}
544
545impl PartialEq for RatioMetrics {
546 fn eq(&self, other: &Self) -> bool {
547 self.part() == other.part() && self.total() == other.total()
548 }
549}
550
551fn fmt_significant(mut x: f64, digits: usize) -> String {
557 if x == 0.0 {
558 return "0".to_string();
559 }
560
561 let exp = x.abs().log10().floor(); let scale = 10f64.powf(-(exp - (digits as f64 - 1.0)));
563 x = (x * scale).round() / scale; format!("{x}")
565}
566
567impl Display for RatioMetrics {
568 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
569 let part = self.part();
570 let total = self.total();
571
572 if total == 0 {
573 if part == 0 {
574 write!(f, "N/A (0/0)")
575 } else {
576 write!(f, "N/A ({}/0)", human_readable_count(part))
577 }
578 } else {
579 let percentage = (part as f64 / total as f64) * 100.0;
580
581 write!(
582 f,
583 "{}% ({}/{})",
584 fmt_significant(percentage, 2),
585 human_readable_count(part),
586 human_readable_count(total)
587 )
588 }
589 }
590}
591
592#[derive(Debug, Clone)]
598pub enum MetricValue {
599 OutputRows(Count),
601 ElapsedCompute(Time),
621 SpillCount(Count),
623 SpilledBytes(Count),
625 OutputBytes(Count),
627 OutputBatches(Count),
629 SpilledRows(Count),
631 CurrentMemoryUsage(Gauge),
633 Count {
635 name: Cow<'static, str>,
637 count: Count,
639 },
640 Gauge {
642 name: Cow<'static, str>,
644 gauge: Gauge,
646 },
647 Time {
649 name: Cow<'static, str>,
651 time: Time,
653 },
654 StartTimestamp(Timestamp),
656 EndTimestamp(Timestamp),
658 PruningMetrics {
660 name: Cow<'static, str>,
661 pruning_metrics: PruningMetrics,
662 },
663 Ratio {
665 name: Cow<'static, str>,
666 ratio_metrics: RatioMetrics,
667 },
668 Custom {
669 name: Cow<'static, str>,
671 value: Arc<dyn CustomMetricValue>,
673 },
674}
675
676impl PartialEq for MetricValue {
679 fn eq(&self, other: &Self) -> bool {
680 match (self, other) {
681 (MetricValue::OutputRows(count), MetricValue::OutputRows(other)) => {
682 count == other
683 }
684 (MetricValue::ElapsedCompute(time), MetricValue::ElapsedCompute(other)) => {
685 time == other
686 }
687 (MetricValue::SpillCount(count), MetricValue::SpillCount(other)) => {
688 count == other
689 }
690 (MetricValue::SpilledBytes(count), MetricValue::SpilledBytes(other)) => {
691 count == other
692 }
693 (MetricValue::OutputBytes(count), MetricValue::OutputBytes(other)) => {
694 count == other
695 }
696 (MetricValue::OutputBatches(count), MetricValue::OutputBatches(other)) => {
697 count == other
698 }
699 (MetricValue::SpilledRows(count), MetricValue::SpilledRows(other)) => {
700 count == other
701 }
702 (
703 MetricValue::CurrentMemoryUsage(gauge),
704 MetricValue::CurrentMemoryUsage(other),
705 ) => gauge == other,
706 (
707 MetricValue::Count { name, count },
708 MetricValue::Count {
709 name: other_name,
710 count: other_count,
711 },
712 ) => name == other_name && count == other_count,
713 (
714 MetricValue::Gauge { name, gauge },
715 MetricValue::Gauge {
716 name: other_name,
717 gauge: other_gauge,
718 },
719 ) => name == other_name && gauge == other_gauge,
720 (
721 MetricValue::Time { name, time },
722 MetricValue::Time {
723 name: other_name,
724 time: other_time,
725 },
726 ) => name == other_name && time == other_time,
727
728 (
729 MetricValue::StartTimestamp(timestamp),
730 MetricValue::StartTimestamp(other),
731 ) => timestamp == other,
732 (MetricValue::EndTimestamp(timestamp), MetricValue::EndTimestamp(other)) => {
733 timestamp == other
734 }
735 (
736 MetricValue::PruningMetrics {
737 name,
738 pruning_metrics,
739 },
740 MetricValue::PruningMetrics {
741 name: other_name,
742 pruning_metrics: other_pruning_metrics,
743 },
744 ) => {
745 name == other_name
746 && pruning_metrics.pruned() == other_pruning_metrics.pruned()
747 && pruning_metrics.matched() == other_pruning_metrics.matched()
748 }
749 (
750 MetricValue::Ratio {
751 name,
752 ratio_metrics,
753 },
754 MetricValue::Ratio {
755 name: other_name,
756 ratio_metrics: other_ratio_metrics,
757 },
758 ) => name == other_name && ratio_metrics == other_ratio_metrics,
759 (
760 MetricValue::Custom { name, value },
761 MetricValue::Custom {
762 name: other_name,
763 value: other_value,
764 },
765 ) => name == other_name && value.is_eq(other_value),
766 _ => false,
768 }
769 }
770}
771
772impl MetricValue {
773 pub fn name(&self) -> &str {
775 match self {
776 Self::OutputRows(_) => "output_rows",
777 Self::SpillCount(_) => "spill_count",
778 Self::SpilledBytes(_) => "spilled_bytes",
779 Self::OutputBytes(_) => "output_bytes",
780 Self::OutputBatches(_) => "output_batches",
781 Self::SpilledRows(_) => "spilled_rows",
782 Self::CurrentMemoryUsage(_) => "mem_used",
783 Self::ElapsedCompute(_) => "elapsed_compute",
784 Self::Count { name, .. } => name.borrow(),
785 Self::Gauge { name, .. } => name.borrow(),
786 Self::Time { name, .. } => name.borrow(),
787 Self::StartTimestamp(_) => "start_timestamp",
788 Self::EndTimestamp(_) => "end_timestamp",
789 Self::PruningMetrics { name, .. } => name.borrow(),
790 Self::Ratio { name, .. } => name.borrow(),
791 Self::Custom { name, .. } => name.borrow(),
792 }
793 }
794
795 pub fn as_usize(&self) -> usize {
798 match self {
799 Self::OutputRows(count) => count.value(),
800 Self::SpillCount(count) => count.value(),
801 Self::SpilledBytes(bytes) => bytes.value(),
802 Self::OutputBytes(bytes) => bytes.value(),
803 Self::OutputBatches(count) => count.value(),
804 Self::SpilledRows(count) => count.value(),
805 Self::CurrentMemoryUsage(used) => used.value(),
806 Self::ElapsedCompute(time) => time.value(),
807 Self::Count { count, .. } => count.value(),
808 Self::Gauge { gauge, .. } => gauge.value(),
809 Self::Time { time, .. } => time.value(),
810 Self::StartTimestamp(timestamp) => timestamp
811 .value()
812 .and_then(|ts| ts.timestamp_nanos_opt())
813 .map(|nanos| nanos as usize)
814 .unwrap_or(0),
815 Self::EndTimestamp(timestamp) => timestamp
816 .value()
817 .and_then(|ts| ts.timestamp_nanos_opt())
818 .map(|nanos| nanos as usize)
819 .unwrap_or(0),
820 Self::PruningMetrics { .. } => 0,
824 Self::Ratio { .. } => 0,
826 Self::Custom { value, .. } => value.as_usize(),
827 }
828 }
829
830 pub fn new_empty(&self) -> Self {
833 match self {
834 Self::OutputRows(_) => Self::OutputRows(Count::new()),
835 Self::SpillCount(_) => Self::SpillCount(Count::new()),
836 Self::SpilledBytes(_) => Self::SpilledBytes(Count::new()),
837 Self::OutputBytes(_) => Self::OutputBytes(Count::new()),
838 Self::OutputBatches(_) => Self::OutputBatches(Count::new()),
839 Self::SpilledRows(_) => Self::SpilledRows(Count::new()),
840 Self::CurrentMemoryUsage(_) => Self::CurrentMemoryUsage(Gauge::new()),
841 Self::ElapsedCompute(_) => Self::ElapsedCompute(Time::new()),
842 Self::Count { name, .. } => Self::Count {
843 name: name.clone(),
844 count: Count::new(),
845 },
846 Self::Gauge { name, .. } => Self::Gauge {
847 name: name.clone(),
848 gauge: Gauge::new(),
849 },
850 Self::Time { name, .. } => Self::Time {
851 name: name.clone(),
852 time: Time::new(),
853 },
854 Self::StartTimestamp(_) => Self::StartTimestamp(Timestamp::new()),
855 Self::EndTimestamp(_) => Self::EndTimestamp(Timestamp::new()),
856 Self::PruningMetrics { name, .. } => Self::PruningMetrics {
857 name: name.clone(),
858 pruning_metrics: PruningMetrics::new(),
859 },
860 Self::Ratio {
861 name,
862 ratio_metrics,
863 } => {
864 let merge_strategy = ratio_metrics.merge_strategy.clone();
865 Self::Ratio {
866 name: name.clone(),
867 ratio_metrics: RatioMetrics::new()
868 .with_merge_strategy(merge_strategy),
869 }
870 }
871 Self::Custom { name, value } => Self::Custom {
872 name: name.clone(),
873 value: value.new_empty(),
874 },
875 }
876 }
877
878 pub fn aggregate(&mut self, other: &Self) {
888 match (self, other) {
889 (Self::OutputRows(count), Self::OutputRows(other_count))
890 | (Self::SpillCount(count), Self::SpillCount(other_count))
891 | (Self::SpilledBytes(count), Self::SpilledBytes(other_count))
892 | (Self::OutputBytes(count), Self::OutputBytes(other_count))
893 | (Self::OutputBatches(count), Self::OutputBatches(other_count))
894 | (Self::SpilledRows(count), Self::SpilledRows(other_count))
895 | (
896 Self::Count { count, .. },
897 Self::Count {
898 count: other_count, ..
899 },
900 ) => count.add(other_count.value()),
901 (Self::CurrentMemoryUsage(gauge), Self::CurrentMemoryUsage(other_gauge))
902 | (
903 Self::Gauge { gauge, .. },
904 Self::Gauge {
905 gauge: other_gauge, ..
906 },
907 ) => gauge.add(other_gauge.value()),
908 (Self::ElapsedCompute(time), Self::ElapsedCompute(other_time))
909 | (
910 Self::Time { time, .. },
911 Self::Time {
912 time: other_time, ..
913 },
914 ) => time.add(other_time),
915 (Self::StartTimestamp(timestamp), Self::StartTimestamp(other_timestamp)) => {
917 timestamp.update_to_min(other_timestamp);
918 }
919 (Self::EndTimestamp(timestamp), Self::EndTimestamp(other_timestamp)) => {
921 timestamp.update_to_max(other_timestamp);
922 }
923 (
924 Self::PruningMetrics {
925 pruning_metrics, ..
926 },
927 Self::PruningMetrics {
928 pruning_metrics: other_pruning_metrics,
929 ..
930 },
931 ) => {
932 let pruned = other_pruning_metrics.pruned.load(Ordering::Relaxed);
933 let matched = other_pruning_metrics.matched.load(Ordering::Relaxed);
934 let fully_matched =
935 other_pruning_metrics.fully_matched.load(Ordering::Relaxed);
936 pruning_metrics.add_pruned(pruned);
937 pruning_metrics.add_matched(matched);
938 pruning_metrics.add_fully_matched(fully_matched);
939 }
940 (
941 Self::Ratio { ratio_metrics, .. },
942 Self::Ratio {
943 ratio_metrics: other_ratio_metrics,
944 ..
945 },
946 ) => {
947 ratio_metrics.merge(other_ratio_metrics);
948 }
949 (
950 Self::Custom { value, .. },
951 Self::Custom {
952 value: other_value, ..
953 },
954 ) => {
955 value.aggregate(Arc::clone(other_value));
956 }
957 m @ (_, _) => {
958 panic!(
959 "Mismatched metric types. Can not aggregate {:?} with value {:?}",
960 m.0, m.1
961 )
962 }
963 }
964 }
965
966 pub fn display_sort_key(&self) -> u8 {
969 match self {
970 Self::OutputRows(_) => 0,
972 Self::ElapsedCompute(_) => 1,
973 Self::OutputBytes(_) => 2,
974 Self::OutputBatches(_) => 3,
975 Self::PruningMetrics { name, .. } => match name.as_ref() {
977 "files_ranges_pruned_statistics" => 4,
985 "row_groups_pruned_statistics" => 5,
986 "row_groups_pruned_bloom_filter" => 6,
987 "page_index_pages_pruned" => 7,
988 "page_index_rows_pruned" => 8,
989 _ => 9,
990 },
991 Self::SpillCount(_) => 10,
992 Self::SpilledBytes(_) => 11,
993 Self::SpilledRows(_) => 12,
994 Self::CurrentMemoryUsage(_) => 13,
995 Self::Count { .. } => 14,
996 Self::Gauge { .. } => 15,
997 Self::Time { .. } => 16,
998 Self::Ratio { .. } => 17,
999 Self::StartTimestamp(_) => 18, Self::EndTimestamp(_) => 19,
1001 Self::Custom { .. } => 20,
1002 }
1003 }
1004
1005 pub fn is_timestamp(&self) -> bool {
1007 matches!(self, Self::StartTimestamp(_) | Self::EndTimestamp(_))
1008 }
1009}
1010
1011impl Display for MetricValue {
1012 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
1014 match self {
1015 Self::OutputRows(count)
1016 | Self::OutputBatches(count)
1017 | Self::SpillCount(count)
1018 | Self::SpilledRows(count)
1019 | Self::Count { count, .. } => {
1020 write!(f, "{count}")
1021 }
1022 Self::SpilledBytes(count) | Self::OutputBytes(count) => {
1023 let readable_count = human_readable_size(count.value());
1024 write!(f, "{readable_count}")
1025 }
1026 Self::CurrentMemoryUsage(gauge) => {
1027 let readable_size = human_readable_size(gauge.value());
1029 write!(f, "{readable_size}")
1030 }
1031 Self::Gauge { gauge, .. } => {
1032 write!(f, "{}", human_readable_count(gauge.value()))
1034 }
1035 Self::ElapsedCompute(time) | Self::Time { time, .. } => {
1036 if time.value() > 0 {
1039 write!(f, "{time}")
1040 } else {
1041 write!(f, "NOT RECORDED")
1042 }
1043 }
1044 Self::StartTimestamp(timestamp) | Self::EndTimestamp(timestamp) => {
1045 write!(f, "{timestamp}")
1046 }
1047 Self::PruningMetrics {
1048 pruning_metrics, ..
1049 } => {
1050 write!(f, "{pruning_metrics}")
1051 }
1052 Self::Ratio { ratio_metrics, .. } => write!(f, "{ratio_metrics}"),
1053 Self::Custom { name, value } => {
1054 write!(f, "name:{name} {value}")
1055 }
1056 }
1057 }
1058}
1059
1060#[cfg(test)]
1061mod tests {
1062 use std::any::Any;
1063
1064 use chrono::TimeZone;
1065 use datafusion_common::units::MB;
1066
1067 use super::*;
1068
1069 #[derive(Debug, Default)]
1070 pub struct CustomCounter {
1071 count: AtomicUsize,
1072 }
1073
1074 impl Display for CustomCounter {
1075 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1076 write!(f, "count: {}", self.count.load(Ordering::Relaxed))
1077 }
1078 }
1079
1080 impl CustomMetricValue for CustomCounter {
1081 fn new_empty(&self) -> Arc<dyn CustomMetricValue> {
1082 Arc::new(CustomCounter::default())
1083 }
1084
1085 fn aggregate(&self, other: Arc<dyn CustomMetricValue + 'static>) {
1086 let other = other.as_any().downcast_ref::<Self>().unwrap();
1087 self.count
1088 .fetch_add(other.count.load(Ordering::Relaxed), Ordering::Relaxed);
1089 }
1090
1091 fn as_any(&self) -> &dyn Any {
1092 self
1093 }
1094
1095 fn is_eq(&self, other: &Arc<dyn CustomMetricValue>) -> bool {
1096 let Some(other) = other.as_any().downcast_ref::<Self>() else {
1097 return false;
1098 };
1099
1100 self.count.load(Ordering::Relaxed) == other.count.load(Ordering::Relaxed)
1101 }
1102 }
1103
1104 fn new_custom_counter(name: &'static str, value: usize) -> MetricValue {
1105 let custom_counter = CustomCounter::default();
1106 custom_counter.count.fetch_add(value, Ordering::Relaxed);
1107
1108 MetricValue::Custom {
1109 name: Cow::Borrowed(name),
1110 value: Arc::new(custom_counter),
1111 }
1112 }
1113
1114 #[test]
1115 fn test_custom_metric_with_mismatching_names() {
1116 let mut custom_val = new_custom_counter("Hi", 1);
1117 let other_custom_val = new_custom_counter("Hello", 1);
1118
1119 assert!(other_custom_val != custom_val);
1121
1122 custom_val.aggregate(&other_custom_val);
1124
1125 let expected_val = new_custom_counter("Hi", 2);
1126 assert!(expected_val == custom_val);
1127 }
1128
1129 #[test]
1130 fn test_custom_metric() {
1131 let mut custom_val = new_custom_counter("hi", 11);
1132 let other_custom_val = new_custom_counter("hi", 20);
1133
1134 custom_val.aggregate(&other_custom_val);
1135
1136 assert!(custom_val != other_custom_val);
1137
1138 if let MetricValue::Custom { value, .. } = custom_val {
1139 let counter = value
1140 .as_any()
1141 .downcast_ref::<CustomCounter>()
1142 .expect("Expected CustomCounter");
1143 assert_eq!(counter.count.load(Ordering::Relaxed), 31);
1144 } else {
1145 panic!("Unexpected value");
1146 }
1147 }
1148
1149 #[test]
1150 fn test_display_output_rows() {
1151 let count = Count::new();
1152 let values = vec![
1153 MetricValue::OutputRows(count.clone()),
1154 MetricValue::Count {
1155 name: "my_counter".into(),
1156 count: count.clone(),
1157 },
1158 ];
1159
1160 for value in &values {
1161 assert_eq!("0", value.to_string(), "value {value:?}");
1162 }
1163
1164 count.add(42);
1165 for value in &values {
1166 assert_eq!("42", value.to_string(), "value {value:?}");
1167 }
1168 }
1169
1170 #[test]
1171 fn test_display_spilled_bytes() {
1172 let count = Count::new();
1173 let spilled_byte = MetricValue::SpilledBytes(count.clone());
1174
1175 assert_eq!("0.0 B", spilled_byte.to_string());
1176
1177 count.add((100 * MB) as usize);
1178 assert_eq!("100.0 MB", spilled_byte.to_string());
1179
1180 count.add((0.5 * MB as f64) as usize);
1181 assert_eq!("100.5 MB", spilled_byte.to_string());
1182 }
1183
1184 #[test]
1185 fn test_display_time() {
1186 let time = Time::new();
1187 let values = vec![
1188 MetricValue::ElapsedCompute(time.clone()),
1189 MetricValue::Time {
1190 name: "my_time".into(),
1191 time: time.clone(),
1192 },
1193 ];
1194
1195 for value in &values {
1197 assert_eq!("NOT RECORDED", value.to_string(), "value {value:?}");
1198 }
1199
1200 time.add_duration(Duration::from_nanos(1042));
1201 for value in &values {
1202 assert_eq!("1.04µs", value.to_string(), "value {value:?}");
1203 }
1204 }
1205
1206 #[test]
1207 fn test_display_ratio() {
1208 let ratio_metrics = RatioMetrics::new();
1209 let ratio = MetricValue::Ratio {
1210 name: Cow::Borrowed("ratio_metric"),
1211 ratio_metrics: ratio_metrics.clone(),
1212 };
1213
1214 assert_eq!("N/A (0/0)", ratio.to_string());
1215
1216 ratio_metrics.add_part(10);
1217 assert_eq!("N/A (10/0)", ratio.to_string());
1218
1219 ratio_metrics.add_total(40);
1220 assert_eq!("25% (10/40)", ratio.to_string());
1221
1222 let tiny_ratio_metrics = RatioMetrics::new();
1223 let tiny_ratio = MetricValue::Ratio {
1224 name: Cow::Borrowed("tiny_ratio_metric"),
1225 ratio_metrics: tiny_ratio_metrics.clone(),
1226 };
1227 tiny_ratio_metrics.add_part(1);
1228 tiny_ratio_metrics.add_total(3000);
1229 assert_eq!("0.033% (1/3.00 K)", tiny_ratio.to_string());
1230 }
1231
1232 #[test]
1233 fn test_ratio_set_methods() {
1234 let ratio_metrics = RatioMetrics::new();
1235
1236 ratio_metrics.set_part(10);
1238 ratio_metrics.set_part(10);
1239 ratio_metrics.set_total(40);
1240 ratio_metrics.set_total(40);
1241 assert_eq!("25% (10/40)", ratio_metrics.to_string());
1242
1243 let ratio_metrics = RatioMetrics::new();
1244
1245 ratio_metrics.set_part(10);
1247 ratio_metrics.set_part(30);
1248 ratio_metrics.set_total(40);
1249 ratio_metrics.set_total(50);
1250 assert_eq!("60% (30/50)", ratio_metrics.to_string());
1251 }
1252
1253 #[test]
1254 fn test_ratio_merge_strategy() {
1255 let ratio_metrics1 =
1257 RatioMetrics::new().with_merge_strategy(RatioMergeStrategy::AddPartSetTotal);
1258
1259 ratio_metrics1.set_part(10);
1260 ratio_metrics1.set_total(40);
1261 assert_eq!("25% (10/40)", ratio_metrics1.to_string());
1262 let ratio_metrics2 =
1263 RatioMetrics::new().with_merge_strategy(RatioMergeStrategy::AddPartSetTotal);
1264 ratio_metrics2.set_part(20);
1265 ratio_metrics2.set_total(40);
1266 assert_eq!("50% (20/40)", ratio_metrics2.to_string());
1267
1268 ratio_metrics1.merge(&ratio_metrics2);
1269 assert_eq!("75% (30/40)", ratio_metrics1.to_string());
1270
1271 let ratio_metrics1 =
1273 RatioMetrics::new().with_merge_strategy(RatioMergeStrategy::SetPartAddTotal);
1274 ratio_metrics1.set_part(20);
1275 ratio_metrics1.set_total(50);
1276 let ratio_metrics2 = RatioMetrics::new();
1277 ratio_metrics2.set_part(20);
1278 ratio_metrics2.set_total(50);
1279 ratio_metrics1.merge(&ratio_metrics2);
1280 assert_eq!("20% (20/100)", ratio_metrics1.to_string());
1281
1282 let ratio_metrics1 = RatioMetrics::new();
1284 ratio_metrics1.set_part(20);
1285 ratio_metrics1.set_total(50);
1286 let ratio_metrics2 = RatioMetrics::new();
1287 ratio_metrics2.set_part(20);
1288 ratio_metrics2.set_total(50);
1289 ratio_metrics1.merge(&ratio_metrics2);
1290 assert_eq!("40% (40/100)", ratio_metrics1.to_string());
1291 }
1292
1293 #[test]
1294 fn test_display_timestamp() {
1295 let timestamp = Timestamp::new();
1296 let values = vec![
1297 MetricValue::StartTimestamp(timestamp.clone()),
1298 MetricValue::EndTimestamp(timestamp.clone()),
1299 ];
1300
1301 for value in &values {
1303 assert_eq!("NONE", value.to_string(), "value {value:?}");
1304 }
1305
1306 timestamp.set(Utc.timestamp_nanos(1431648000000000));
1307 for value in &values {
1308 assert_eq!(
1309 "1970-01-17 13:40:48 UTC",
1310 value.to_string(),
1311 "value {value:?}"
1312 );
1313 }
1314 }
1315
1316 #[test]
1317 fn test_timer_with_custom_instant() {
1318 let time = Time::new();
1319 let start_time = Instant::now();
1320
1321 std::thread::sleep(Duration::from_millis(1));
1323
1324 let mut timer = time.timer_with(start_time);
1326
1327 std::thread::sleep(Duration::from_millis(1));
1329
1330 timer.stop();
1332
1333 assert!(
1335 time.value() >= 2_000_000,
1336 "Expected at least 2ms, got {} ns",
1337 time.value()
1338 );
1339 }
1340
1341 #[test]
1342 fn test_stop_with_custom_endpoint() {
1343 let time = Time::new();
1344 let start = Instant::now();
1345 let mut timer = time.timer_with(start);
1346
1347 let end = start + Duration::from_millis(10);
1349
1350 timer.stop_with(end);
1352
1353 let recorded = time.value();
1356 assert!(
1357 (10_000_000..=10_100_000).contains(&recorded),
1358 "Expected ~10ms, got {recorded} ns"
1359 );
1360
1361 timer.stop_with(end);
1363 assert_eq!(
1364 recorded,
1365 time.value(),
1366 "Time should not change after second stop"
1367 );
1368 }
1369
1370 #[test]
1371 fn test_done_with_custom_endpoint() {
1372 let time = Time::new();
1373 let start = Instant::now();
1374
1375 {
1377 let timer = time.timer_with(start);
1378
1379 let end = start + Duration::from_millis(5);
1381
1382 timer.done_with(end);
1384
1385 }
1387
1388 let recorded = time.value();
1390 assert!(
1391 (5_000_000..=5_100_000).contains(&recorded),
1392 "Expected ~5ms, got {recorded} ns",
1393 );
1394
1395 {
1397 let timer2 = time.timer_with(start);
1398 let end2 = start + Duration::from_millis(5);
1399 timer2.done_with(end2);
1400 }
1402
1403 let new_recorded = time.value();
1405 assert!(
1406 (10_000_000..=10_100_000).contains(&new_recorded),
1407 "Expected ~10ms total, got {new_recorded} ns",
1408 );
1409 }
1410
1411 #[test]
1412 fn test_human_readable_metric_formatting() {
1413 let small_count = Count::new();
1415 small_count.add(42);
1416 assert_eq!(
1417 MetricValue::OutputRows(small_count.clone()).to_string(),
1418 "42"
1419 );
1420
1421 let thousand_count = Count::new();
1422 thousand_count.add(10_100);
1423 assert_eq!(
1424 MetricValue::OutputRows(thousand_count.clone()).to_string(),
1425 "10.10 K"
1426 );
1427
1428 let million_count = Count::new();
1429 million_count.add(1_532_000);
1430 assert_eq!(
1431 MetricValue::SpilledRows(million_count.clone()).to_string(),
1432 "1.53 M"
1433 );
1434
1435 let billion_count = Count::new();
1436 billion_count.add(2_500_000_000);
1437 assert_eq!(
1438 MetricValue::OutputBatches(billion_count.clone()).to_string(),
1439 "2.50 B"
1440 );
1441
1442 let micros_time = Time::new();
1444 micros_time.add_duration(Duration::from_nanos(1_234));
1445 assert_eq!(
1446 MetricValue::ElapsedCompute(micros_time.clone()).to_string(),
1447 "1.23µs"
1448 );
1449
1450 let millis_time = Time::new();
1451 millis_time.add_duration(Duration::from_nanos(11_295_377));
1452 assert_eq!(
1453 MetricValue::ElapsedCompute(millis_time.clone()).to_string(),
1454 "11.30ms"
1455 );
1456
1457 let seconds_time = Time::new();
1458 seconds_time.add_duration(Duration::from_nanos(1_234_567_890));
1459 assert_eq!(
1460 MetricValue::ElapsedCompute(seconds_time.clone()).to_string(),
1461 "1.23s"
1462 );
1463
1464 let mem_gauge = Gauge::new();
1466 mem_gauge.add(100 * MB as usize);
1467 assert_eq!(
1468 MetricValue::CurrentMemoryUsage(mem_gauge.clone()).to_string(),
1469 "100.0 MB"
1470 );
1471
1472 let custom_gauge = Gauge::new();
1474 custom_gauge.add(50_000);
1475 assert_eq!(
1476 MetricValue::Gauge {
1477 name: "custom".into(),
1478 gauge: custom_gauge.clone()
1479 }
1480 .to_string(),
1481 "50.00 K"
1482 );
1483
1484 let pruning = PruningMetrics::new();
1486 pruning.add_matched(500_000);
1487 pruning.add_pruned(500_000);
1488 assert_eq!(
1489 MetricValue::PruningMetrics {
1490 name: "test_pruning".into(),
1491 pruning_metrics: pruning.clone()
1492 }
1493 .to_string(),
1494 "1.00 M total → 500.0 K matched"
1495 );
1496
1497 let ratio = RatioMetrics::new();
1499 ratio.add_part(250_000);
1500 ratio.add_total(1_000_000);
1501 assert_eq!(
1502 MetricValue::Ratio {
1503 name: "test_ratio".into(),
1504 ratio_metrics: ratio.clone()
1505 }
1506 .to_string(),
1507 "25% (250.0 K/1.00 M)"
1508 );
1509 }
1510}