1use super::CustomMetricValue;
21use chrono::{DateTime, Utc};
22use datafusion_common::{
23 human_readable_count, human_readable_duration, human_readable_size, instant::Instant,
24};
25use parking_lot::Mutex;
26use std::{
27 borrow::{Borrow, Cow},
28 fmt::{Debug, Display},
29 sync::{
30 Arc,
31 atomic::{AtomicUsize, Ordering},
32 },
33 time::Duration,
34};
35
36#[derive(Debug, Clone)]
40pub struct Count {
41 value: Arc<AtomicUsize>,
43}
44
45impl PartialEq for Count {
46 fn eq(&self, other: &Self) -> bool {
47 self.value().eq(&other.value())
48 }
49}
50
51impl Display for Count {
52 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
53 write!(f, "{}", human_readable_count(self.value()))
54 }
55}
56
57impl Default for Count {
58 fn default() -> Self {
59 Self::new()
60 }
61}
62
63impl Count {
64 pub fn new() -> Self {
66 Self {
67 value: Arc::new(AtomicUsize::new(0)),
68 }
69 }
70
71 pub fn add(&self, n: usize) {
73 self.value.fetch_add(n, Ordering::Relaxed);
76 }
77
78 pub fn value(&self) -> usize {
80 self.value.load(Ordering::Relaxed)
81 }
82}
83
84#[derive(Debug, Clone)]
89pub struct Gauge {
90 value: Arc<AtomicUsize>,
92}
93
94impl PartialEq for Gauge {
95 fn eq(&self, other: &Self) -> bool {
96 self.value().eq(&other.value())
97 }
98}
99
100impl Display for Gauge {
101 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
102 write!(f, "{}", self.value())
103 }
104}
105
106impl Default for Gauge {
107 fn default() -> Self {
108 Self::new()
109 }
110}
111
112impl Gauge {
113 pub fn new() -> Self {
115 Self {
116 value: Arc::new(AtomicUsize::new(0)),
117 }
118 }
119
120 pub fn add(&self, n: usize) {
122 self.value.fetch_add(n, Ordering::Relaxed);
125 }
126
127 pub fn sub(&self, n: usize) {
129 self.value.fetch_sub(n, Ordering::Relaxed);
132 }
133
134 pub fn set_max(&self, n: usize) {
136 self.value.fetch_max(n, Ordering::Relaxed);
137 }
138
139 pub fn set(&self, n: usize) -> usize {
141 self.value.swap(n, Ordering::Relaxed)
144 }
145
146 pub fn value(&self) -> usize {
148 self.value.load(Ordering::Relaxed)
149 }
150}
151
152#[derive(Debug, Clone)]
154pub struct Time {
155 nanos: Arc<AtomicUsize>,
157}
158
159impl Default for Time {
160 fn default() -> Self {
161 Self::new()
162 }
163}
164
165impl PartialEq for Time {
166 fn eq(&self, other: &Self) -> bool {
167 self.value().eq(&other.value())
168 }
169}
170
171impl Display for Time {
172 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
173 write!(f, "{}", human_readable_duration(self.value() as u64))
174 }
175}
176
177impl Time {
178 pub fn new() -> Self {
181 Self {
182 nanos: Arc::new(AtomicUsize::new(0)),
183 }
184 }
185
186 pub fn add_elapsed(&self, start: Instant) {
188 self.add_duration(start.elapsed());
189 }
190
191 pub fn add_duration(&self, duration: Duration) {
202 let more_nanos = duration.as_nanos() as usize;
203 self.nanos.fetch_add(more_nanos.max(1), Ordering::Relaxed);
204 }
205
206 pub fn add(&self, other: &Time) {
208 self.add_duration(Duration::from_nanos(other.value() as u64))
209 }
210
211 pub fn timer(&self) -> ScopedTimerGuard<'_> {
215 ScopedTimerGuard {
216 inner: self,
217 start: Some(Instant::now()),
218 }
219 }
220
221 pub fn value(&self) -> usize {
223 self.nanos.load(Ordering::Relaxed)
224 }
225
226 pub fn timer_with(&self, now: Instant) -> ScopedTimerGuard<'_> {
229 ScopedTimerGuard {
230 inner: self,
231 start: Some(now),
232 }
233 }
234}
235
236#[derive(Debug, Clone)]
239pub struct Timestamp {
240 timestamp: Arc<Mutex<Option<DateTime<Utc>>>>,
242}
243
244impl Default for Timestamp {
245 fn default() -> Self {
246 Self::new()
247 }
248}
249
250impl Timestamp {
251 pub fn new() -> Self {
253 Self {
254 timestamp: Arc::new(Mutex::new(None)),
255 }
256 }
257
258 pub fn record(&self) {
260 self.set(Utc::now())
261 }
262
263 pub fn set(&self, now: DateTime<Utc>) {
265 *self.timestamp.lock() = Some(now);
266 }
267
268 pub fn value(&self) -> Option<DateTime<Utc>> {
273 *self.timestamp.lock()
274 }
275
276 pub fn update_to_min(&self, other: &Timestamp) {
278 let min = match (self.value(), other.value()) {
279 (None, None) => None,
280 (Some(v), None) => Some(v),
281 (None, Some(v)) => Some(v),
282 (Some(v1), Some(v2)) => Some(if v1 < v2 { v1 } else { v2 }),
283 };
284
285 *self.timestamp.lock() = min;
286 }
287
288 pub fn update_to_max(&self, other: &Timestamp) {
290 let max = match (self.value(), other.value()) {
291 (None, None) => None,
292 (Some(v), None) => Some(v),
293 (None, Some(v)) => Some(v),
294 (Some(v1), Some(v2)) => Some(if v1 < v2 { v2 } else { v1 }),
295 };
296
297 *self.timestamp.lock() = max;
298 }
299}
300
301impl PartialEq for Timestamp {
302 fn eq(&self, other: &Self) -> bool {
303 self.value().eq(&other.value())
304 }
305}
306
307impl Display for Timestamp {
308 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
309 match self.value() {
310 None => write!(f, "NONE"),
311 Some(v) => {
312 write!(f, "{v}")
313 }
314 }
315 }
316}
317
318pub struct ScopedTimerGuard<'a> {
322 inner: &'a Time,
323 start: Option<Instant>,
324}
325
326impl ScopedTimerGuard<'_> {
327 pub fn stop(&mut self) {
329 if let Some(start) = self.start.take() {
330 self.inner.add_elapsed(start)
331 }
332 }
333
334 pub fn restart(&mut self) {
336 self.start = Some(Instant::now())
337 }
338
339 pub fn done(mut self) {
341 self.stop()
342 }
343
344 pub fn stop_with(&mut self, end_time: Instant) {
346 if let Some(start) = self.start.take() {
347 let elapsed = end_time - start;
348 self.inner.add_duration(elapsed)
349 }
350 }
351
352 pub fn done_with(mut self, end_time: Instant) {
355 self.stop_with(end_time)
356 }
357}
358
359impl Drop for ScopedTimerGuard<'_> {
360 fn drop(&mut self) {
361 self.stop()
362 }
363}
364
365#[derive(Debug, Clone)]
372pub struct PruningMetrics {
373 pruned: Arc<AtomicUsize>,
374 matched: Arc<AtomicUsize>,
375 fully_matched: Arc<AtomicUsize>,
376}
377
378impl Display for PruningMetrics {
379 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
380 let matched = self.matched.load(Ordering::Relaxed);
381 let total = self.pruned.load(Ordering::Relaxed) + matched;
382 let fully_matched = self.fully_matched.load(Ordering::Relaxed);
383
384 if fully_matched != 0 {
385 write!(
386 f,
387 "{} total → {} matched -> {} fully matched",
388 human_readable_count(total),
389 human_readable_count(matched),
390 human_readable_count(fully_matched)
391 )
392 } else {
393 write!(
394 f,
395 "{} total → {} matched",
396 human_readable_count(total),
397 human_readable_count(matched)
398 )
399 }
400 }
401}
402
403impl Default for PruningMetrics {
404 fn default() -> Self {
405 Self::new()
406 }
407}
408
409impl PruningMetrics {
410 pub fn new() -> Self {
412 Self {
413 pruned: Arc::new(AtomicUsize::new(0)),
414 matched: Arc::new(AtomicUsize::new(0)),
415 fully_matched: Arc::new(AtomicUsize::new(0)),
416 }
417 }
418
419 pub fn add_pruned(&self, n: usize) {
421 self.pruned.fetch_add(n, Ordering::Relaxed);
424 }
425
426 pub fn add_matched(&self, n: usize) {
428 self.matched.fetch_add(n, Ordering::Relaxed);
431 }
432
433 pub fn add_fully_matched(&self, n: usize) {
435 self.fully_matched.fetch_add(n, Ordering::Relaxed);
438 }
439
440 pub fn subtract_matched(&self, n: usize) {
442 self.matched.fetch_sub(n, Ordering::Relaxed);
445 }
446
447 pub fn pruned(&self) -> usize {
449 self.pruned.load(Ordering::Relaxed)
450 }
451
452 pub fn matched(&self) -> usize {
454 self.matched.load(Ordering::Relaxed)
455 }
456
457 pub fn fully_matched(&self) -> usize {
459 self.fully_matched.load(Ordering::Relaxed)
460 }
461}
462
463#[derive(Debug, Clone, Default)]
467pub struct RatioMetrics {
468 part: Arc<AtomicUsize>,
469 total: Arc<AtomicUsize>,
470 merge_strategy: RatioMergeStrategy,
471 display_raw_values: bool,
473}
474
475#[derive(Debug, Clone, Default)]
476pub enum RatioMergeStrategy {
477 #[default]
478 AddPartAddTotal,
479 AddPartSetTotal,
480 SetPartAddTotal,
481}
482
483impl RatioMetrics {
484 pub fn new() -> Self {
486 Self {
487 part: Arc::new(AtomicUsize::new(0)),
488 total: Arc::new(AtomicUsize::new(0)),
489 merge_strategy: RatioMergeStrategy::AddPartAddTotal,
490 display_raw_values: true,
491 }
492 }
493
494 pub fn with_merge_strategy(mut self, merge_strategy: RatioMergeStrategy) -> Self {
495 self.merge_strategy = merge_strategy;
496 self
497 }
498
499 pub fn with_display_raw_values(mut self, display_raw_values: bool) -> Self {
500 self.display_raw_values = display_raw_values;
501 self
502 }
503
504 pub fn add_part(&self, n: usize) {
506 self.part.fetch_add(n, Ordering::Relaxed);
507 }
508
509 pub fn add_total(&self, n: usize) {
511 self.total.fetch_add(n, Ordering::Relaxed);
512 }
513
514 pub fn set_part(&self, n: usize) {
516 self.part.store(n, Ordering::Relaxed);
517 }
518
519 pub fn set_total(&self, n: usize) {
521 self.total.store(n, Ordering::Relaxed);
522 }
523
524 pub fn merge(&self, other: &Self) {
526 match self.merge_strategy {
527 RatioMergeStrategy::AddPartAddTotal => {
528 self.add_part(other.part());
529 self.add_total(other.total());
530 }
531 RatioMergeStrategy::AddPartSetTotal => {
532 self.add_part(other.part());
533 self.set_total(other.total());
534 }
535 RatioMergeStrategy::SetPartAddTotal => {
536 self.set_part(other.part());
537 self.add_total(other.total());
538 }
539 }
540 }
541
542 pub fn part(&self) -> usize {
544 self.part.load(Ordering::Relaxed)
545 }
546
547 pub fn total(&self) -> usize {
549 self.total.load(Ordering::Relaxed)
550 }
551
552 pub fn merge_strategy(&self) -> &RatioMergeStrategy {
554 &self.merge_strategy
555 }
556
557 pub fn display_raw_values(&self) -> bool {
560 self.display_raw_values
561 }
562}
563
564impl PartialEq for RatioMetrics {
565 fn eq(&self, other: &Self) -> bool {
566 self.part() == other.part()
567 && self.total() == other.total()
568 && self.display_raw_values == other.display_raw_values
569 }
570}
571
572impl Display for RatioMetrics {
573 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
575 let part = self.part();
576 let total = self.total();
577
578 if total == 0 {
581 write!(f, "N/A")?;
582 } else {
583 let basis_points = (((part as u128 * 10_000) + (total as u128 / 2))
586 / total as u128) as usize;
587 let whole = basis_points / 100;
588 let fractional = basis_points % 100;
589
590 if fractional == 0 {
591 write!(f, "{whole}%")?;
592 } else if fractional.is_multiple_of(10) {
593 write!(f, "{whole}.{}%", fractional / 10)?;
594 } else {
595 write!(f, "{whole}.{fractional:02}%")?;
596 }
597 }
598
599 if !self.display_raw_values {
600 return Ok(());
601 }
602
603 if total == 0 {
604 if part == 0 {
605 write!(f, " (0/0)")
606 } else {
607 write!(f, " ({}/0)", human_readable_count(part))
608 }
609 } else {
610 write!(
611 f,
612 " ({}/{})",
613 human_readable_count(part),
614 human_readable_count(total)
615 )
616 }
617 }
618}
619
620#[derive(Debug, Clone)]
626pub enum MetricValue {
627 OutputRows(Count),
629 ElapsedCompute(Time),
649 SpillCount(Count),
651 SpilledBytes(Count),
653 OutputBytes(Count),
655 OutputBatches(Count),
657 SpilledRows(Count),
659 CurrentMemoryUsage(Gauge),
661 Count {
663 name: Cow<'static, str>,
665 count: Count,
667 },
668 Gauge {
670 name: Cow<'static, str>,
672 gauge: Gauge,
674 },
675 Time {
677 name: Cow<'static, str>,
679 time: Time,
681 },
682 StartTimestamp(Timestamp),
684 EndTimestamp(Timestamp),
686 PruningMetrics {
688 name: Cow<'static, str>,
689 pruning_metrics: PruningMetrics,
690 },
691 Ratio {
693 name: Cow<'static, str>,
694 ratio_metrics: RatioMetrics,
695 },
696 Custom {
697 name: Cow<'static, str>,
699 value: Arc<dyn CustomMetricValue>,
701 },
702}
703
704impl PartialEq for MetricValue {
707 fn eq(&self, other: &Self) -> bool {
708 match (self, other) {
709 (MetricValue::OutputRows(count), MetricValue::OutputRows(other)) => {
710 count == other
711 }
712 (MetricValue::ElapsedCompute(time), MetricValue::ElapsedCompute(other)) => {
713 time == other
714 }
715 (MetricValue::SpillCount(count), MetricValue::SpillCount(other)) => {
716 count == other
717 }
718 (MetricValue::SpilledBytes(count), MetricValue::SpilledBytes(other)) => {
719 count == other
720 }
721 (MetricValue::OutputBytes(count), MetricValue::OutputBytes(other)) => {
722 count == other
723 }
724 (MetricValue::OutputBatches(count), MetricValue::OutputBatches(other)) => {
725 count == other
726 }
727 (MetricValue::SpilledRows(count), MetricValue::SpilledRows(other)) => {
728 count == other
729 }
730 (
731 MetricValue::CurrentMemoryUsage(gauge),
732 MetricValue::CurrentMemoryUsage(other),
733 ) => gauge == other,
734 (
735 MetricValue::Count { name, count },
736 MetricValue::Count {
737 name: other_name,
738 count: other_count,
739 },
740 ) => name == other_name && count == other_count,
741 (
742 MetricValue::Gauge { name, gauge },
743 MetricValue::Gauge {
744 name: other_name,
745 gauge: other_gauge,
746 },
747 ) => name == other_name && gauge == other_gauge,
748 (
749 MetricValue::Time { name, time },
750 MetricValue::Time {
751 name: other_name,
752 time: other_time,
753 },
754 ) => name == other_name && time == other_time,
755
756 (
757 MetricValue::StartTimestamp(timestamp),
758 MetricValue::StartTimestamp(other),
759 ) => timestamp == other,
760 (MetricValue::EndTimestamp(timestamp), MetricValue::EndTimestamp(other)) => {
761 timestamp == other
762 }
763 (
764 MetricValue::PruningMetrics {
765 name,
766 pruning_metrics,
767 },
768 MetricValue::PruningMetrics {
769 name: other_name,
770 pruning_metrics: other_pruning_metrics,
771 },
772 ) => {
773 name == other_name
774 && pruning_metrics.pruned() == other_pruning_metrics.pruned()
775 && pruning_metrics.matched() == other_pruning_metrics.matched()
776 }
777 (
778 MetricValue::Ratio {
779 name,
780 ratio_metrics,
781 },
782 MetricValue::Ratio {
783 name: other_name,
784 ratio_metrics: other_ratio_metrics,
785 },
786 ) => name == other_name && ratio_metrics == other_ratio_metrics,
787 (
788 MetricValue::Custom { name, value },
789 MetricValue::Custom {
790 name: other_name,
791 value: other_value,
792 },
793 ) => name == other_name && value.is_eq(other_value),
794 _ => false,
796 }
797 }
798}
799
800impl MetricValue {
801 pub fn name(&self) -> &str {
803 match self {
804 Self::OutputRows(_) => "output_rows",
805 Self::SpillCount(_) => "spill_count",
806 Self::SpilledBytes(_) => "spilled_bytes",
807 Self::OutputBytes(_) => "output_bytes",
808 Self::OutputBatches(_) => "output_batches",
809 Self::SpilledRows(_) => "spilled_rows",
810 Self::CurrentMemoryUsage(_) => "mem_used",
811 Self::ElapsedCompute(_) => "elapsed_compute",
812 Self::Count { name, .. } => name.borrow(),
813 Self::Gauge { name, .. } => name.borrow(),
814 Self::Time { name, .. } => name.borrow(),
815 Self::StartTimestamp(_) => "start_timestamp",
816 Self::EndTimestamp(_) => "end_timestamp",
817 Self::PruningMetrics { name, .. } => name.borrow(),
818 Self::Ratio { name, .. } => name.borrow(),
819 Self::Custom { name, .. } => name.borrow(),
820 }
821 }
822
823 pub fn as_usize(&self) -> usize {
826 match self {
827 Self::OutputRows(count) => count.value(),
828 Self::SpillCount(count) => count.value(),
829 Self::SpilledBytes(bytes) => bytes.value(),
830 Self::OutputBytes(bytes) => bytes.value(),
831 Self::OutputBatches(count) => count.value(),
832 Self::SpilledRows(count) => count.value(),
833 Self::CurrentMemoryUsage(used) => used.value(),
834 Self::ElapsedCompute(time) => time.value(),
835 Self::Count { count, .. } => count.value(),
836 Self::Gauge { gauge, .. } => gauge.value(),
837 Self::Time { time, .. } => time.value(),
838 Self::StartTimestamp(timestamp) => timestamp
839 .value()
840 .and_then(|ts| ts.timestamp_nanos_opt())
841 .map(|nanos| nanos as usize)
842 .unwrap_or(0),
843 Self::EndTimestamp(timestamp) => timestamp
844 .value()
845 .and_then(|ts| ts.timestamp_nanos_opt())
846 .map(|nanos| nanos as usize)
847 .unwrap_or(0),
848 Self::PruningMetrics { .. } => 0,
852 Self::Ratio { .. } => 0,
854 Self::Custom { value, .. } => value.as_usize(),
855 }
856 }
857
858 pub fn new_empty(&self) -> Self {
861 match self {
862 Self::OutputRows(_) => Self::OutputRows(Count::new()),
863 Self::SpillCount(_) => Self::SpillCount(Count::new()),
864 Self::SpilledBytes(_) => Self::SpilledBytes(Count::new()),
865 Self::OutputBytes(_) => Self::OutputBytes(Count::new()),
866 Self::OutputBatches(_) => Self::OutputBatches(Count::new()),
867 Self::SpilledRows(_) => Self::SpilledRows(Count::new()),
868 Self::CurrentMemoryUsage(_) => Self::CurrentMemoryUsage(Gauge::new()),
869 Self::ElapsedCompute(_) => Self::ElapsedCompute(Time::new()),
870 Self::Count { name, .. } => Self::Count {
871 name: name.clone(),
872 count: Count::new(),
873 },
874 Self::Gauge { name, .. } => Self::Gauge {
875 name: name.clone(),
876 gauge: Gauge::new(),
877 },
878 Self::Time { name, .. } => Self::Time {
879 name: name.clone(),
880 time: Time::new(),
881 },
882 Self::StartTimestamp(_) => Self::StartTimestamp(Timestamp::new()),
883 Self::EndTimestamp(_) => Self::EndTimestamp(Timestamp::new()),
884 Self::PruningMetrics { name, .. } => Self::PruningMetrics {
885 name: name.clone(),
886 pruning_metrics: PruningMetrics::new(),
887 },
888 Self::Ratio {
889 name,
890 ratio_metrics,
891 } => {
892 let merge_strategy = ratio_metrics.merge_strategy.clone();
893 Self::Ratio {
894 name: name.clone(),
895 ratio_metrics: RatioMetrics::new()
896 .with_merge_strategy(merge_strategy)
897 .with_display_raw_values(ratio_metrics.display_raw_values),
898 }
899 }
900 Self::Custom { name, value } => Self::Custom {
901 name: name.clone(),
902 value: value.new_empty(),
903 },
904 }
905 }
906
907 pub fn aggregate(&mut self, other: &Self) {
917 match (self, other) {
918 (Self::OutputRows(count), Self::OutputRows(other_count))
919 | (Self::SpillCount(count), Self::SpillCount(other_count))
920 | (Self::SpilledBytes(count), Self::SpilledBytes(other_count))
921 | (Self::OutputBytes(count), Self::OutputBytes(other_count))
922 | (Self::OutputBatches(count), Self::OutputBatches(other_count))
923 | (Self::SpilledRows(count), Self::SpilledRows(other_count))
924 | (
925 Self::Count { count, .. },
926 Self::Count {
927 count: other_count, ..
928 },
929 ) => count.add(other_count.value()),
930 (Self::CurrentMemoryUsage(gauge), Self::CurrentMemoryUsage(other_gauge))
931 | (
932 Self::Gauge { gauge, .. },
933 Self::Gauge {
934 gauge: other_gauge, ..
935 },
936 ) => gauge.add(other_gauge.value()),
937 (Self::ElapsedCompute(time), Self::ElapsedCompute(other_time))
938 | (
939 Self::Time { time, .. },
940 Self::Time {
941 time: other_time, ..
942 },
943 ) => time.add(other_time),
944 (Self::StartTimestamp(timestamp), Self::StartTimestamp(other_timestamp)) => {
946 timestamp.update_to_min(other_timestamp);
947 }
948 (Self::EndTimestamp(timestamp), Self::EndTimestamp(other_timestamp)) => {
950 timestamp.update_to_max(other_timestamp);
951 }
952 (
953 Self::PruningMetrics {
954 pruning_metrics, ..
955 },
956 Self::PruningMetrics {
957 pruning_metrics: other_pruning_metrics,
958 ..
959 },
960 ) => {
961 let pruned = other_pruning_metrics.pruned.load(Ordering::Relaxed);
962 let matched = other_pruning_metrics.matched.load(Ordering::Relaxed);
963 let fully_matched =
964 other_pruning_metrics.fully_matched.load(Ordering::Relaxed);
965 pruning_metrics.add_pruned(pruned);
966 pruning_metrics.add_matched(matched);
967 pruning_metrics.add_fully_matched(fully_matched);
968 }
969 (
970 Self::Ratio { ratio_metrics, .. },
971 Self::Ratio {
972 ratio_metrics: other_ratio_metrics,
973 ..
974 },
975 ) => {
976 ratio_metrics.merge(other_ratio_metrics);
977 }
978 (
979 Self::Custom { value, .. },
980 Self::Custom {
981 value: other_value, ..
982 },
983 ) => {
984 value.aggregate(Arc::clone(other_value));
985 }
986 m @ (_, _) => {
987 panic!(
988 "Mismatched metric types. Can not aggregate {:?} with value {:?}",
989 m.0, m.1
990 )
991 }
992 }
993 }
994
995 pub fn display_sort_key(&self) -> u8 {
998 match self {
999 Self::OutputRows(_) => 0,
1001 Self::ElapsedCompute(_) => 1,
1002 Self::OutputBytes(_) => 2,
1003 Self::OutputBatches(_) => 3,
1004 Self::PruningMetrics { name, .. } => match name.as_ref() {
1006 "files_ranges_pruned_statistics" => 4,
1014 "row_groups_pruned_statistics" => 5,
1015 "row_groups_pruned_bloom_filter" => 6,
1016 "page_index_pages_pruned" => 7,
1017 "page_index_rows_pruned" => 8,
1018 _ => 9,
1019 },
1020 Self::SpillCount(_) => 10,
1021 Self::SpilledBytes(_) => 11,
1022 Self::SpilledRows(_) => 12,
1023 Self::CurrentMemoryUsage(_) => 13,
1024 Self::Count { name, .. } => match name.as_ref() {
1025 "page_index_pages_skipped_by_fully_matched" => 8,
1030 _ => 14,
1031 },
1032 Self::Gauge { .. } => 15,
1033 Self::Time { .. } => 16,
1034 Self::Ratio { .. } => 17,
1035 Self::StartTimestamp(_) => 18, Self::EndTimestamp(_) => 19,
1037 Self::Custom { .. } => 20,
1038 }
1039 }
1040
1041 pub fn is_timestamp(&self) -> bool {
1043 matches!(self, Self::StartTimestamp(_) | Self::EndTimestamp(_))
1044 }
1045}
1046
1047impl Display for MetricValue {
1048 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
1050 match self {
1051 Self::OutputRows(count)
1052 | Self::OutputBatches(count)
1053 | Self::SpillCount(count)
1054 | Self::SpilledRows(count)
1055 | Self::Count { count, .. } => {
1056 write!(f, "{count}")
1057 }
1058 Self::SpilledBytes(count) | Self::OutputBytes(count) => {
1059 let readable_count = human_readable_size(count.value());
1060 write!(f, "{readable_count}")
1061 }
1062 Self::CurrentMemoryUsage(gauge) => {
1063 let readable_size = human_readable_size(gauge.value());
1065 write!(f, "{readable_size}")
1066 }
1067 Self::Gauge { gauge, .. } => {
1068 write!(f, "{}", human_readable_count(gauge.value()))
1070 }
1071 Self::ElapsedCompute(time) | Self::Time { time, .. } => {
1072 if time.value() > 0 {
1075 write!(f, "{time}")
1076 } else {
1077 write!(f, "NOT RECORDED")
1078 }
1079 }
1080 Self::StartTimestamp(timestamp) | Self::EndTimestamp(timestamp) => {
1081 write!(f, "{timestamp}")
1082 }
1083 Self::PruningMetrics {
1084 pruning_metrics, ..
1085 } => {
1086 write!(f, "{pruning_metrics}")
1087 }
1088 Self::Ratio { ratio_metrics, .. } => write!(f, "{ratio_metrics}"),
1089 Self::Custom { value, .. } => {
1090 write!(f, "{value}")
1091 }
1092 }
1093 }
1094}
1095
1096#[cfg(test)]
1097mod tests {
1098 use std::any::Any;
1099
1100 use chrono::TimeZone;
1101 use datafusion_common::units::MB;
1102
1103 use super::*;
1104
1105 #[derive(Debug, Default)]
1106 pub struct CustomCounter {
1107 count: AtomicUsize,
1108 }
1109
1110 impl Display for CustomCounter {
1111 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1112 write!(f, "count: {}", self.count.load(Ordering::Relaxed))
1113 }
1114 }
1115
1116 impl CustomMetricValue for CustomCounter {
1117 fn new_empty(&self) -> Arc<dyn CustomMetricValue> {
1118 Arc::new(CustomCounter::default())
1119 }
1120
1121 fn aggregate(&self, other: Arc<dyn CustomMetricValue + 'static>) {
1122 let other = other.as_any().downcast_ref::<Self>().unwrap();
1123 self.count
1124 .fetch_add(other.count.load(Ordering::Relaxed), Ordering::Relaxed);
1125 }
1126
1127 fn as_any(&self) -> &dyn Any {
1128 self
1129 }
1130
1131 fn is_eq(&self, other: &Arc<dyn CustomMetricValue>) -> bool {
1132 let Some(other) = other.as_any().downcast_ref::<Self>() else {
1133 return false;
1134 };
1135
1136 self.count.load(Ordering::Relaxed) == other.count.load(Ordering::Relaxed)
1137 }
1138 }
1139
1140 fn new_custom_counter(name: &'static str, value: usize) -> MetricValue {
1141 let custom_counter = CustomCounter::default();
1142 custom_counter.count.fetch_add(value, Ordering::Relaxed);
1143
1144 MetricValue::Custom {
1145 name: Cow::Borrowed(name),
1146 value: Arc::new(custom_counter),
1147 }
1148 }
1149
1150 #[test]
1151 fn test_custom_metric_with_mismatching_names() {
1152 let mut custom_val = new_custom_counter("Hi", 1);
1153 let other_custom_val = new_custom_counter("Hello", 1);
1154
1155 assert!(other_custom_val != custom_val);
1157
1158 custom_val.aggregate(&other_custom_val);
1160
1161 let expected_val = new_custom_counter("Hi", 2);
1162 assert!(expected_val == custom_val);
1163 }
1164
1165 #[test]
1166 fn test_custom_metric() {
1167 let mut custom_val = new_custom_counter("hi", 11);
1168 let other_custom_val = new_custom_counter("hi", 20);
1169
1170 custom_val.aggregate(&other_custom_val);
1171
1172 assert!(custom_val != other_custom_val);
1173
1174 if let MetricValue::Custom { value, .. } = custom_val {
1175 let counter = value
1176 .as_any()
1177 .downcast_ref::<CustomCounter>()
1178 .expect("Expected CustomCounter");
1179 assert_eq!(counter.count.load(Ordering::Relaxed), 31);
1180 } else {
1181 panic!("Unexpected value");
1182 }
1183 }
1184
1185 #[test]
1186 fn test_display_custom_metric() {
1187 let custom_val = new_custom_counter("hi", 11);
1188 assert_eq!(custom_val.to_string(), "count: 11");
1189 }
1190
1191 #[test]
1192 fn test_display_output_rows() {
1193 let count = Count::new();
1194 let values = vec![
1195 MetricValue::OutputRows(count.clone()),
1196 MetricValue::Count {
1197 name: "my_counter".into(),
1198 count: count.clone(),
1199 },
1200 ];
1201
1202 for value in &values {
1203 assert_eq!("0", value.to_string(), "value {value:?}");
1204 }
1205
1206 count.add(42);
1207 for value in &values {
1208 assert_eq!("42", value.to_string(), "value {value:?}");
1209 }
1210 }
1211
1212 #[test]
1213 fn test_display_spilled_bytes() {
1214 let count = Count::new();
1215 let spilled_byte = MetricValue::SpilledBytes(count.clone());
1216
1217 assert_eq!("0.0 B", spilled_byte.to_string());
1218
1219 count.add((100 * MB) as usize);
1220 assert_eq!("100.0 MB", spilled_byte.to_string());
1221
1222 count.add((0.5 * MB as f64) as usize);
1223 assert_eq!("100.5 MB", spilled_byte.to_string());
1224 }
1225
1226 #[test]
1227 fn test_display_time() {
1228 let time = Time::new();
1229 let values = vec![
1230 MetricValue::ElapsedCompute(time.clone()),
1231 MetricValue::Time {
1232 name: "my_time".into(),
1233 time: time.clone(),
1234 },
1235 ];
1236
1237 for value in &values {
1239 assert_eq!("NOT RECORDED", value.to_string(), "value {value:?}");
1240 }
1241
1242 time.add_duration(Duration::from_nanos(1042));
1243 for value in &values {
1244 assert_eq!("1.04µs", value.to_string(), "value {value:?}");
1245 }
1246 }
1247
1248 #[test]
1249 fn test_display_ratio() {
1250 let ratio_metrics = RatioMetrics::new();
1251 let ratio = MetricValue::Ratio {
1252 name: Cow::Borrowed("ratio_metric"),
1253 ratio_metrics: ratio_metrics.clone(),
1254 };
1255
1256 assert_eq!("N/A (0/0)", ratio.to_string());
1257
1258 ratio_metrics.add_part(10);
1259 assert_eq!("N/A (10/0)", ratio.to_string());
1260
1261 ratio_metrics.add_total(40);
1262 assert_eq!("25% (10/40)", ratio.to_string());
1263
1264 let tiny_ratio_metrics = RatioMetrics::new();
1265 let tiny_ratio = MetricValue::Ratio {
1266 name: Cow::Borrowed("tiny_ratio_metric"),
1267 ratio_metrics: tiny_ratio_metrics.clone(),
1268 };
1269 tiny_ratio_metrics.add_part(1);
1270 tiny_ratio_metrics.add_total(3000);
1271 assert_eq!("0.03% (1/3.00 K)", tiny_ratio.to_string());
1272
1273 ratio_metrics.set_part(6667);
1274 ratio_metrics.set_total(10_000);
1275 assert_eq!("66.67% (6.67 K/10.00 K)", ratio.to_string());
1276
1277 let percentage_only = RatioMetrics::new().with_display_raw_values(false);
1278 let ratio = MetricValue::Ratio {
1279 name: Cow::Borrowed("percentage_only"),
1280 ratio_metrics: percentage_only.clone(),
1281 };
1282 assert_eq!("N/A", ratio.to_string());
1283 percentage_only.set_part(6667);
1284 percentage_only.set_total(10_000);
1285 assert_eq!("66.67%", ratio.to_string());
1286 }
1287
1288 #[test]
1289 fn test_ratio_set_methods() {
1290 let ratio_metrics = RatioMetrics::new();
1291
1292 ratio_metrics.set_part(10);
1294 ratio_metrics.set_part(10);
1295 ratio_metrics.set_total(40);
1296 ratio_metrics.set_total(40);
1297 assert_eq!("25% (10/40)", ratio_metrics.to_string());
1298
1299 let ratio_metrics = RatioMetrics::new();
1300
1301 ratio_metrics.set_part(10);
1303 ratio_metrics.set_part(30);
1304 ratio_metrics.set_total(40);
1305 ratio_metrics.set_total(50);
1306 assert_eq!("60% (30/50)", ratio_metrics.to_string());
1307 }
1308
1309 #[test]
1310 fn test_ratio_merge_strategy() {
1311 let ratio_metrics1 =
1313 RatioMetrics::new().with_merge_strategy(RatioMergeStrategy::AddPartSetTotal);
1314
1315 ratio_metrics1.set_part(10);
1316 ratio_metrics1.set_total(40);
1317 assert_eq!("25% (10/40)", ratio_metrics1.to_string());
1318 let ratio_metrics2 =
1319 RatioMetrics::new().with_merge_strategy(RatioMergeStrategy::AddPartSetTotal);
1320 ratio_metrics2.set_part(20);
1321 ratio_metrics2.set_total(40);
1322 assert_eq!("50% (20/40)", ratio_metrics2.to_string());
1323
1324 ratio_metrics1.merge(&ratio_metrics2);
1325 assert_eq!("75% (30/40)", ratio_metrics1.to_string());
1326
1327 let ratio_metrics1 =
1329 RatioMetrics::new().with_merge_strategy(RatioMergeStrategy::SetPartAddTotal);
1330 ratio_metrics1.set_part(20);
1331 ratio_metrics1.set_total(50);
1332 let ratio_metrics2 = RatioMetrics::new();
1333 ratio_metrics2.set_part(20);
1334 ratio_metrics2.set_total(50);
1335 ratio_metrics1.merge(&ratio_metrics2);
1336 assert_eq!("20% (20/100)", ratio_metrics1.to_string());
1337
1338 let ratio_metrics1 = RatioMetrics::new();
1340 ratio_metrics1.set_part(20);
1341 ratio_metrics1.set_total(50);
1342 let ratio_metrics2 = RatioMetrics::new();
1343 ratio_metrics2.set_part(20);
1344 ratio_metrics2.set_total(50);
1345 ratio_metrics1.merge(&ratio_metrics2);
1346 assert_eq!("40% (40/100)", ratio_metrics1.to_string());
1347 }
1348
1349 #[test]
1350 fn test_display_timestamp() {
1351 let timestamp = Timestamp::new();
1352 let values = vec![
1353 MetricValue::StartTimestamp(timestamp.clone()),
1354 MetricValue::EndTimestamp(timestamp.clone()),
1355 ];
1356
1357 for value in &values {
1359 assert_eq!("NONE", value.to_string(), "value {value:?}");
1360 }
1361
1362 timestamp.set(Utc.timestamp_nanos(1431648000000000));
1363 for value in &values {
1364 assert_eq!(
1365 "1970-01-17 13:40:48 UTC",
1366 value.to_string(),
1367 "value {value:?}"
1368 );
1369 }
1370 }
1371
1372 #[test]
1373 fn test_timer_with_custom_instant() {
1374 let time = Time::new();
1375 let start_time = Instant::now();
1376
1377 std::thread::sleep(Duration::from_millis(1));
1379
1380 let mut timer = time.timer_with(start_time);
1382
1383 std::thread::sleep(Duration::from_millis(1));
1385
1386 timer.stop();
1388
1389 assert!(
1391 time.value() >= 2_000_000,
1392 "Expected at least 2ms, got {} ns",
1393 time.value()
1394 );
1395 }
1396
1397 #[test]
1398 fn test_stop_with_custom_endpoint() {
1399 let time = Time::new();
1400 let start = Instant::now();
1401 let mut timer = time.timer_with(start);
1402
1403 let end = start + Duration::from_millis(10);
1405
1406 timer.stop_with(end);
1408
1409 let recorded = time.value();
1412 assert!(
1413 (10_000_000..=10_100_000).contains(&recorded),
1414 "Expected ~10ms, got {recorded} ns"
1415 );
1416
1417 timer.stop_with(end);
1419 assert_eq!(
1420 recorded,
1421 time.value(),
1422 "Time should not change after second stop"
1423 );
1424 }
1425
1426 #[test]
1427 fn test_done_with_custom_endpoint() {
1428 let time = Time::new();
1429 let start = Instant::now();
1430
1431 {
1433 let timer = time.timer_with(start);
1434
1435 let end = start + Duration::from_millis(5);
1437
1438 timer.done_with(end);
1440
1441 }
1443
1444 let recorded = time.value();
1446 assert!(
1447 (5_000_000..=5_100_000).contains(&recorded),
1448 "Expected ~5ms, got {recorded} ns",
1449 );
1450
1451 {
1453 let timer2 = time.timer_with(start);
1454 let end2 = start + Duration::from_millis(5);
1455 timer2.done_with(end2);
1456 }
1458
1459 let new_recorded = time.value();
1461 assert!(
1462 (10_000_000..=10_100_000).contains(&new_recorded),
1463 "Expected ~10ms total, got {new_recorded} ns",
1464 );
1465 }
1466
1467 #[test]
1468 fn test_human_readable_metric_formatting() {
1469 let small_count = Count::new();
1471 small_count.add(42);
1472 assert_eq!(
1473 MetricValue::OutputRows(small_count.clone()).to_string(),
1474 "42"
1475 );
1476
1477 let thousand_count = Count::new();
1478 thousand_count.add(10_100);
1479 assert_eq!(
1480 MetricValue::OutputRows(thousand_count.clone()).to_string(),
1481 "10.10 K"
1482 );
1483
1484 let million_count = Count::new();
1485 million_count.add(1_532_000);
1486 assert_eq!(
1487 MetricValue::SpilledRows(million_count.clone()).to_string(),
1488 "1.53 M"
1489 );
1490
1491 let billion_count = Count::new();
1492 billion_count.add(2_500_000_000);
1493 assert_eq!(
1494 MetricValue::OutputBatches(billion_count.clone()).to_string(),
1495 "2.50 B"
1496 );
1497
1498 let micros_time = Time::new();
1500 micros_time.add_duration(Duration::from_nanos(1_234));
1501 assert_eq!(
1502 MetricValue::ElapsedCompute(micros_time.clone()).to_string(),
1503 "1.23µs"
1504 );
1505
1506 let millis_time = Time::new();
1507 millis_time.add_duration(Duration::from_nanos(11_295_377));
1508 assert_eq!(
1509 MetricValue::ElapsedCompute(millis_time.clone()).to_string(),
1510 "11.30ms"
1511 );
1512
1513 let seconds_time = Time::new();
1514 seconds_time.add_duration(Duration::from_nanos(1_234_567_890));
1515 assert_eq!(
1516 MetricValue::ElapsedCompute(seconds_time.clone()).to_string(),
1517 "1.23s"
1518 );
1519
1520 let mem_gauge = Gauge::new();
1522 mem_gauge.add(100 * MB as usize);
1523 assert_eq!(
1524 MetricValue::CurrentMemoryUsage(mem_gauge.clone()).to_string(),
1525 "100.0 MB"
1526 );
1527
1528 let custom_gauge = Gauge::new();
1530 custom_gauge.add(50_000);
1531 assert_eq!(
1532 MetricValue::Gauge {
1533 name: "custom".into(),
1534 gauge: custom_gauge.clone()
1535 }
1536 .to_string(),
1537 "50.00 K"
1538 );
1539
1540 let pruning = PruningMetrics::new();
1542 pruning.add_matched(500_000);
1543 pruning.add_pruned(500_000);
1544 assert_eq!(
1545 MetricValue::PruningMetrics {
1546 name: "test_pruning".into(),
1547 pruning_metrics: pruning.clone()
1548 }
1549 .to_string(),
1550 "1.00 M total → 500.0 K matched"
1551 );
1552
1553 let ratio = RatioMetrics::new();
1555 ratio.add_part(250_000);
1556 ratio.add_total(1_000_000);
1557 assert_eq!(
1558 MetricValue::Ratio {
1559 name: "test_ratio".into(),
1560 ratio_metrics: ratio.clone()
1561 }
1562 .to_string(),
1563 "25% (250.0 K/1.00 M)"
1564 );
1565 }
1566}