1use super::CustomMetricValue;
21use chrono::{DateTime, Utc};
22use datafusion_common::instant::Instant;
23use datafusion_execution::memory_pool::human_readable_size;
24use parking_lot::Mutex;
25use std::{
26 borrow::{Borrow, Cow},
27 fmt::{Debug, Display},
28 sync::{
29 atomic::{AtomicUsize, Ordering},
30 Arc,
31 },
32 time::Duration,
33};
34
35#[derive(Debug, Clone)]
39pub struct Count {
40 value: Arc<AtomicUsize>,
42}
43
44impl PartialEq for Count {
45 fn eq(&self, other: &Self) -> bool {
46 self.value().eq(&other.value())
47 }
48}
49
50impl Display for Count {
51 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
52 write!(f, "{}", self.value())
53 }
54}
55
56impl Default for Count {
57 fn default() -> Self {
58 Self::new()
59 }
60}
61
62impl Count {
63 pub fn new() -> Self {
65 Self {
66 value: Arc::new(AtomicUsize::new(0)),
67 }
68 }
69
70 pub fn add(&self, n: usize) {
72 self.value.fetch_add(n, Ordering::Relaxed);
75 }
76
77 pub fn value(&self) -> usize {
79 self.value.load(Ordering::Relaxed)
80 }
81}
82
83#[derive(Debug, Clone)]
88pub struct Gauge {
89 value: Arc<AtomicUsize>,
91}
92
93impl PartialEq for Gauge {
94 fn eq(&self, other: &Self) -> bool {
95 self.value().eq(&other.value())
96 }
97}
98
99impl Display for Gauge {
100 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
101 write!(f, "{}", self.value())
102 }
103}
104
105impl Default for Gauge {
106 fn default() -> Self {
107 Self::new()
108 }
109}
110
111impl Gauge {
112 pub fn new() -> Self {
114 Self {
115 value: Arc::new(AtomicUsize::new(0)),
116 }
117 }
118
119 pub fn add(&self, n: usize) {
121 self.value.fetch_add(n, Ordering::Relaxed);
124 }
125
126 pub fn sub(&self, n: usize) {
128 self.value.fetch_sub(n, Ordering::Relaxed);
131 }
132
133 pub fn set_max(&self, n: usize) {
135 self.value.fetch_max(n, Ordering::Relaxed);
136 }
137
138 pub fn set(&self, n: usize) -> usize {
140 self.value.swap(n, Ordering::Relaxed)
143 }
144
145 pub fn value(&self) -> usize {
147 self.value.load(Ordering::Relaxed)
148 }
149}
150
151#[derive(Debug, Clone)]
153pub struct Time {
154 nanos: Arc<AtomicUsize>,
156}
157
158impl Default for Time {
159 fn default() -> Self {
160 Self::new()
161 }
162}
163
164impl PartialEq for Time {
165 fn eq(&self, other: &Self) -> bool {
166 self.value().eq(&other.value())
167 }
168}
169
170impl Display for Time {
171 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
172 let duration = Duration::from_nanos(self.value() as u64);
173 write!(f, "{duration:?}")
174 }
175}
176
177impl Time {
178 pub fn new() -> Self {
181 Self {
182 nanos: Arc::new(AtomicUsize::new(0)),
183 }
184 }
185
186 pub fn add_elapsed(&self, start: Instant) {
188 self.add_duration(start.elapsed());
189 }
190
191 pub fn add_duration(&self, duration: Duration) {
202 let more_nanos = duration.as_nanos() as usize;
203 self.nanos.fetch_add(more_nanos.max(1), Ordering::Relaxed);
204 }
205
206 pub fn add(&self, other: &Time) {
208 self.add_duration(Duration::from_nanos(other.value() as u64))
209 }
210
211 pub fn timer(&self) -> ScopedTimerGuard<'_> {
215 ScopedTimerGuard {
216 inner: self,
217 start: Some(Instant::now()),
218 }
219 }
220
221 pub fn value(&self) -> usize {
223 self.nanos.load(Ordering::Relaxed)
224 }
225
226 pub fn timer_with(&self, now: Instant) -> ScopedTimerGuard<'_> {
229 ScopedTimerGuard {
230 inner: self,
231 start: Some(now),
232 }
233 }
234}
235
236#[derive(Debug, Clone)]
239pub struct Timestamp {
240 timestamp: Arc<Mutex<Option<DateTime<Utc>>>>,
242}
243
244impl Default for Timestamp {
245 fn default() -> Self {
246 Self::new()
247 }
248}
249
250impl Timestamp {
251 pub fn new() -> Self {
253 Self {
254 timestamp: Arc::new(Mutex::new(None)),
255 }
256 }
257
258 pub fn record(&self) {
260 self.set(Utc::now())
261 }
262
263 pub fn set(&self, now: DateTime<Utc>) {
265 *self.timestamp.lock() = Some(now);
266 }
267
268 pub fn value(&self) -> Option<DateTime<Utc>> {
273 *self.timestamp.lock()
274 }
275
276 pub fn update_to_min(&self, other: &Timestamp) {
278 let min = match (self.value(), other.value()) {
279 (None, None) => None,
280 (Some(v), None) => Some(v),
281 (None, Some(v)) => Some(v),
282 (Some(v1), Some(v2)) => Some(if v1 < v2 { v1 } else { v2 }),
283 };
284
285 *self.timestamp.lock() = min;
286 }
287
288 pub fn update_to_max(&self, other: &Timestamp) {
290 let max = match (self.value(), other.value()) {
291 (None, None) => None,
292 (Some(v), None) => Some(v),
293 (None, Some(v)) => Some(v),
294 (Some(v1), Some(v2)) => Some(if v1 < v2 { v2 } else { v1 }),
295 };
296
297 *self.timestamp.lock() = max;
298 }
299}
300
301impl PartialEq for Timestamp {
302 fn eq(&self, other: &Self) -> bool {
303 self.value().eq(&other.value())
304 }
305}
306
307impl Display for Timestamp {
308 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
309 match self.value() {
310 None => write!(f, "NONE"),
311 Some(v) => {
312 write!(f, "{v}")
313 }
314 }
315 }
316}
317
318pub struct ScopedTimerGuard<'a> {
322 inner: &'a Time,
323 start: Option<Instant>,
324}
325
326impl ScopedTimerGuard<'_> {
327 pub fn stop(&mut self) {
329 if let Some(start) = self.start.take() {
330 self.inner.add_elapsed(start)
331 }
332 }
333
334 pub fn restart(&mut self) {
336 self.start = Some(Instant::now())
337 }
338
339 pub fn done(mut self) {
341 self.stop()
342 }
343
344 pub fn stop_with(&mut self, end_time: Instant) {
346 if let Some(start) = self.start.take() {
347 let elapsed = end_time - start;
348 self.inner.add_duration(elapsed)
349 }
350 }
351
352 pub fn done_with(mut self, end_time: Instant) {
355 self.stop_with(end_time)
356 }
357}
358
359impl Drop for ScopedTimerGuard<'_> {
360 fn drop(&mut self) {
361 self.stop()
362 }
363}
364
365#[derive(Debug, Clone)]
372pub struct PruningMetrics {
373 pruned: Arc<AtomicUsize>,
374 matched: Arc<AtomicUsize>,
375}
376
377impl Display for PruningMetrics {
378 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
379 let matched = self.matched.load(Ordering::Relaxed);
380 let total = self.pruned.load(Ordering::Relaxed) + matched;
381
382 write!(f, "{total} total → {matched} matched")
383 }
384}
385
386impl Default for PruningMetrics {
387 fn default() -> Self {
388 Self::new()
389 }
390}
391
392impl PruningMetrics {
393 pub fn new() -> Self {
395 Self {
396 pruned: Arc::new(AtomicUsize::new(0)),
397 matched: Arc::new(AtomicUsize::new(0)),
398 }
399 }
400
401 pub fn add_pruned(&self, n: usize) {
403 self.pruned.fetch_add(n, Ordering::Relaxed);
406 }
407
408 pub fn add_matched(&self, n: usize) {
410 self.matched.fetch_add(n, Ordering::Relaxed);
413 }
414
415 pub fn subtract_matched(&self, n: usize) {
417 self.matched.fetch_sub(n, Ordering::Relaxed);
420 }
421
422 pub fn pruned(&self) -> usize {
424 self.pruned.load(Ordering::Relaxed)
425 }
426
427 pub fn matched(&self) -> usize {
429 self.matched.load(Ordering::Relaxed)
430 }
431}
432
433#[derive(Debug, Clone, Default)]
437pub struct RatioMetrics {
438 part: Arc<AtomicUsize>,
439 total: Arc<AtomicUsize>,
440}
441
442impl RatioMetrics {
443 pub fn new() -> Self {
445 Self {
446 part: Arc::new(AtomicUsize::new(0)),
447 total: Arc::new(AtomicUsize::new(0)),
448 }
449 }
450
451 pub fn add_part(&self, n: usize) {
453 self.part.fetch_add(n, Ordering::Relaxed);
454 }
455
456 pub fn add_total(&self, n: usize) {
458 self.total.fetch_add(n, Ordering::Relaxed);
459 }
460
461 pub fn merge(&self, other: &Self) {
463 self.add_part(other.part());
464 self.add_total(other.total());
465 }
466
467 pub fn part(&self) -> usize {
469 self.part.load(Ordering::Relaxed)
470 }
471
472 pub fn total(&self) -> usize {
474 self.total.load(Ordering::Relaxed)
475 }
476}
477
478impl PartialEq for RatioMetrics {
479 fn eq(&self, other: &Self) -> bool {
480 self.part() == other.part() && self.total() == other.total()
481 }
482}
483
484fn fmt_significant(mut x: f64, digits: usize) -> String {
490 if x == 0.0 {
491 return "0".to_string();
492 }
493
494 let exp = x.abs().log10().floor(); let scale = 10f64.powf(-(exp - (digits as f64 - 1.0)));
496 x = (x * scale).round() / scale; format!("{x}")
498}
499
500impl Display for RatioMetrics {
501 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
502 let part = self.part();
503 let total = self.total();
504
505 if total == 0 {
506 if part == 0 {
507 write!(f, "N/A (0/0)")
508 } else {
509 write!(f, "N/A ({part}/0)")
510 }
511 } else {
512 let percentage = (part as f64 / total as f64) * 100.0;
513
514 write!(f, "{}% ({part}/{total})", fmt_significant(percentage, 2))
515 }
516 }
517}
518
519#[derive(Debug, Clone)]
525pub enum MetricValue {
526 OutputRows(Count),
528 ElapsedCompute(Time),
548 SpillCount(Count),
550 SpilledBytes(Count),
552 OutputBytes(Count),
554 SpilledRows(Count),
556 CurrentMemoryUsage(Gauge),
558 Count {
560 name: Cow<'static, str>,
562 count: Count,
564 },
565 Gauge {
567 name: Cow<'static, str>,
569 gauge: Gauge,
571 },
572 Time {
574 name: Cow<'static, str>,
576 time: Time,
578 },
579 StartTimestamp(Timestamp),
581 EndTimestamp(Timestamp),
583 PruningMetrics {
585 name: Cow<'static, str>,
586 pruning_metrics: PruningMetrics,
587 },
588 Ratio {
590 name: Cow<'static, str>,
591 ratio_metrics: RatioMetrics,
592 },
593 Custom {
594 name: Cow<'static, str>,
596 value: Arc<dyn CustomMetricValue>,
598 },
599}
600
601impl PartialEq for MetricValue {
604 fn eq(&self, other: &Self) -> bool {
605 match (self, other) {
606 (MetricValue::OutputRows(count), MetricValue::OutputRows(other)) => {
607 count == other
608 }
609 (MetricValue::ElapsedCompute(time), MetricValue::ElapsedCompute(other)) => {
610 time == other
611 }
612 (MetricValue::SpillCount(count), MetricValue::SpillCount(other)) => {
613 count == other
614 }
615 (MetricValue::SpilledBytes(count), MetricValue::SpilledBytes(other)) => {
616 count == other
617 }
618 (MetricValue::OutputBytes(count), MetricValue::OutputBytes(other)) => {
619 count == other
620 }
621 (MetricValue::SpilledRows(count), MetricValue::SpilledRows(other)) => {
622 count == other
623 }
624 (
625 MetricValue::CurrentMemoryUsage(gauge),
626 MetricValue::CurrentMemoryUsage(other),
627 ) => gauge == other,
628 (
629 MetricValue::Count { name, count },
630 MetricValue::Count {
631 name: other_name,
632 count: other_count,
633 },
634 ) => name == other_name && count == other_count,
635 (
636 MetricValue::Gauge { name, gauge },
637 MetricValue::Gauge {
638 name: other_name,
639 gauge: other_gauge,
640 },
641 ) => name == other_name && gauge == other_gauge,
642 (
643 MetricValue::Time { name, time },
644 MetricValue::Time {
645 name: other_name,
646 time: other_time,
647 },
648 ) => name == other_name && time == other_time,
649
650 (
651 MetricValue::StartTimestamp(timestamp),
652 MetricValue::StartTimestamp(other),
653 ) => timestamp == other,
654 (MetricValue::EndTimestamp(timestamp), MetricValue::EndTimestamp(other)) => {
655 timestamp == other
656 }
657 (
658 MetricValue::PruningMetrics {
659 name,
660 pruning_metrics,
661 },
662 MetricValue::PruningMetrics {
663 name: other_name,
664 pruning_metrics: other_pruning_metrics,
665 },
666 ) => {
667 name == other_name
668 && pruning_metrics.pruned() == other_pruning_metrics.pruned()
669 && pruning_metrics.matched() == other_pruning_metrics.matched()
670 }
671 (
672 MetricValue::Ratio {
673 name,
674 ratio_metrics,
675 },
676 MetricValue::Ratio {
677 name: other_name,
678 ratio_metrics: other_ratio_metrics,
679 },
680 ) => name == other_name && ratio_metrics == other_ratio_metrics,
681 (
682 MetricValue::Custom { name, value },
683 MetricValue::Custom {
684 name: other_name,
685 value: other_value,
686 },
687 ) => name == other_name && value.is_eq(other_value),
688 _ => false,
690 }
691 }
692}
693
694impl MetricValue {
695 pub fn name(&self) -> &str {
697 match self {
698 Self::OutputRows(_) => "output_rows",
699 Self::SpillCount(_) => "spill_count",
700 Self::SpilledBytes(_) => "spilled_bytes",
701 Self::OutputBytes(_) => "output_bytes",
702 Self::SpilledRows(_) => "spilled_rows",
703 Self::CurrentMemoryUsage(_) => "mem_used",
704 Self::ElapsedCompute(_) => "elapsed_compute",
705 Self::Count { name, .. } => name.borrow(),
706 Self::Gauge { name, .. } => name.borrow(),
707 Self::Time { name, .. } => name.borrow(),
708 Self::StartTimestamp(_) => "start_timestamp",
709 Self::EndTimestamp(_) => "end_timestamp",
710 Self::PruningMetrics { name, .. } => name.borrow(),
711 Self::Ratio { name, .. } => name.borrow(),
712 Self::Custom { name, .. } => name.borrow(),
713 }
714 }
715
716 pub fn as_usize(&self) -> usize {
719 match self {
720 Self::OutputRows(count) => count.value(),
721 Self::SpillCount(count) => count.value(),
722 Self::SpilledBytes(bytes) => bytes.value(),
723 Self::OutputBytes(bytes) => bytes.value(),
724 Self::SpilledRows(count) => count.value(),
725 Self::CurrentMemoryUsage(used) => used.value(),
726 Self::ElapsedCompute(time) => time.value(),
727 Self::Count { count, .. } => count.value(),
728 Self::Gauge { gauge, .. } => gauge.value(),
729 Self::Time { time, .. } => time.value(),
730 Self::StartTimestamp(timestamp) => timestamp
731 .value()
732 .and_then(|ts| ts.timestamp_nanos_opt())
733 .map(|nanos| nanos as usize)
734 .unwrap_or(0),
735 Self::EndTimestamp(timestamp) => timestamp
736 .value()
737 .and_then(|ts| ts.timestamp_nanos_opt())
738 .map(|nanos| nanos as usize)
739 .unwrap_or(0),
740 Self::PruningMetrics { .. } => 0,
744 Self::Ratio { .. } => 0,
746 Self::Custom { value, .. } => value.as_usize(),
747 }
748 }
749
750 pub fn new_empty(&self) -> Self {
753 match self {
754 Self::OutputRows(_) => Self::OutputRows(Count::new()),
755 Self::SpillCount(_) => Self::SpillCount(Count::new()),
756 Self::SpilledBytes(_) => Self::SpilledBytes(Count::new()),
757 Self::OutputBytes(_) => Self::OutputBytes(Count::new()),
758 Self::SpilledRows(_) => Self::SpilledRows(Count::new()),
759 Self::CurrentMemoryUsage(_) => Self::CurrentMemoryUsage(Gauge::new()),
760 Self::ElapsedCompute(_) => Self::ElapsedCompute(Time::new()),
761 Self::Count { name, .. } => Self::Count {
762 name: name.clone(),
763 count: Count::new(),
764 },
765 Self::Gauge { name, .. } => Self::Gauge {
766 name: name.clone(),
767 gauge: Gauge::new(),
768 },
769 Self::Time { name, .. } => Self::Time {
770 name: name.clone(),
771 time: Time::new(),
772 },
773 Self::StartTimestamp(_) => Self::StartTimestamp(Timestamp::new()),
774 Self::EndTimestamp(_) => Self::EndTimestamp(Timestamp::new()),
775 Self::PruningMetrics { name, .. } => Self::PruningMetrics {
776 name: name.clone(),
777 pruning_metrics: PruningMetrics::new(),
778 },
779 Self::Ratio { name, .. } => Self::Ratio {
780 name: name.clone(),
781 ratio_metrics: RatioMetrics::new(),
782 },
783 Self::Custom { name, value } => Self::Custom {
784 name: name.clone(),
785 value: value.new_empty(),
786 },
787 }
788 }
789
790 pub fn aggregate(&mut self, other: &Self) {
800 match (self, other) {
801 (Self::OutputRows(count), Self::OutputRows(other_count))
802 | (Self::SpillCount(count), Self::SpillCount(other_count))
803 | (Self::SpilledBytes(count), Self::SpilledBytes(other_count))
804 | (Self::OutputBytes(count), Self::OutputBytes(other_count))
805 | (Self::SpilledRows(count), Self::SpilledRows(other_count))
806 | (
807 Self::Count { count, .. },
808 Self::Count {
809 count: other_count, ..
810 },
811 ) => count.add(other_count.value()),
812 (Self::CurrentMemoryUsage(gauge), Self::CurrentMemoryUsage(other_gauge))
813 | (
814 Self::Gauge { gauge, .. },
815 Self::Gauge {
816 gauge: other_gauge, ..
817 },
818 ) => gauge.add(other_gauge.value()),
819 (Self::ElapsedCompute(time), Self::ElapsedCompute(other_time))
820 | (
821 Self::Time { time, .. },
822 Self::Time {
823 time: other_time, ..
824 },
825 ) => time.add(other_time),
826 (Self::StartTimestamp(timestamp), Self::StartTimestamp(other_timestamp)) => {
828 timestamp.update_to_min(other_timestamp);
829 }
830 (Self::EndTimestamp(timestamp), Self::EndTimestamp(other_timestamp)) => {
832 timestamp.update_to_max(other_timestamp);
833 }
834 (
835 Self::PruningMetrics {
836 pruning_metrics, ..
837 },
838 Self::PruningMetrics {
839 pruning_metrics: other_pruning_metrics,
840 ..
841 },
842 ) => {
843 let pruned = other_pruning_metrics.pruned.load(Ordering::Relaxed);
844 let matched = other_pruning_metrics.matched.load(Ordering::Relaxed);
845 pruning_metrics.add_pruned(pruned);
846 pruning_metrics.add_matched(matched);
847 }
848 (
849 Self::Ratio { ratio_metrics, .. },
850 Self::Ratio {
851 ratio_metrics: other_ratio_metrics,
852 ..
853 },
854 ) => {
855 ratio_metrics.merge(other_ratio_metrics);
856 }
857 (
858 Self::Custom { value, .. },
859 Self::Custom {
860 value: other_value, ..
861 },
862 ) => {
863 value.aggregate(Arc::clone(other_value));
864 }
865 m @ (_, _) => {
866 panic!(
867 "Mismatched metric types. Can not aggregate {:?} with value {:?}",
868 m.0, m.1
869 )
870 }
871 }
872 }
873
874 pub fn display_sort_key(&self) -> u8 {
877 match self {
878 Self::OutputRows(_) => 0,
880 Self::ElapsedCompute(_) => 1,
881 Self::OutputBytes(_) => 2,
882 Self::PruningMetrics { name, .. } => match name.as_ref() {
884 "files_ranges_pruned_statistics" => 3,
892 "row_groups_pruned_statistics" => 4,
893 "row_groups_pruned_bloom_filter" => 5,
894 "page_index_rows_pruned" => 6,
895 _ => 7,
896 },
897 Self::SpillCount(_) => 8,
898 Self::SpilledBytes(_) => 9,
899 Self::SpilledRows(_) => 10,
900 Self::CurrentMemoryUsage(_) => 11,
901 Self::Count { .. } => 12,
902 Self::Gauge { .. } => 13,
903 Self::Time { .. } => 14,
904 Self::Ratio { .. } => 15,
905 Self::StartTimestamp(_) => 16, Self::EndTimestamp(_) => 17,
907 Self::Custom { .. } => 18,
908 }
909 }
910
911 pub fn is_timestamp(&self) -> bool {
913 matches!(self, Self::StartTimestamp(_) | Self::EndTimestamp(_))
914 }
915}
916
917impl Display for MetricValue {
918 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
920 match self {
921 Self::OutputRows(count)
922 | Self::SpillCount(count)
923 | Self::SpilledRows(count)
924 | Self::Count { count, .. } => {
925 write!(f, "{count}")
926 }
927 Self::SpilledBytes(count) | Self::OutputBytes(count) => {
928 let readable_count = human_readable_size(count.value());
929 write!(f, "{readable_count}")
930 }
931 Self::CurrentMemoryUsage(gauge) | Self::Gauge { gauge, .. } => {
932 write!(f, "{gauge}")
933 }
934 Self::ElapsedCompute(time) | Self::Time { time, .. } => {
935 if time.value() > 0 {
938 write!(f, "{time}")
939 } else {
940 write!(f, "NOT RECORDED")
941 }
942 }
943 Self::StartTimestamp(timestamp) | Self::EndTimestamp(timestamp) => {
944 write!(f, "{timestamp}")
945 }
946 Self::PruningMetrics {
947 pruning_metrics, ..
948 } => {
949 write!(f, "{pruning_metrics}")
950 }
951 Self::Ratio { ratio_metrics, .. } => write!(f, "{ratio_metrics}"),
952 Self::Custom { name, value } => {
953 write!(f, "name:{name} {value}")
954 }
955 }
956 }
957}
958
959#[cfg(test)]
960mod tests {
961 use std::any::Any;
962
963 use chrono::TimeZone;
964 use datafusion_execution::memory_pool::units::MB;
965
966 use super::*;
967
968 #[derive(Debug, Default)]
969 pub struct CustomCounter {
970 count: AtomicUsize,
971 }
972
973 impl Display for CustomCounter {
974 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
975 write!(f, "count: {}", self.count.load(Ordering::Relaxed))
976 }
977 }
978
979 impl CustomMetricValue for CustomCounter {
980 fn new_empty(&self) -> Arc<dyn CustomMetricValue> {
981 Arc::new(CustomCounter::default())
982 }
983
984 fn aggregate(&self, other: Arc<dyn CustomMetricValue + 'static>) {
985 let other = other.as_any().downcast_ref::<Self>().unwrap();
986 self.count
987 .fetch_add(other.count.load(Ordering::Relaxed), Ordering::Relaxed);
988 }
989
990 fn as_any(&self) -> &dyn Any {
991 self
992 }
993
994 fn is_eq(&self, other: &Arc<dyn CustomMetricValue>) -> bool {
995 let Some(other) = other.as_any().downcast_ref::<Self>() else {
996 return false;
997 };
998
999 self.count.load(Ordering::Relaxed) == other.count.load(Ordering::Relaxed)
1000 }
1001 }
1002
1003 fn new_custom_counter(name: &'static str, value: usize) -> MetricValue {
1004 let custom_counter = CustomCounter::default();
1005 custom_counter.count.fetch_add(value, Ordering::Relaxed);
1006 let custom_val = MetricValue::Custom {
1007 name: Cow::Borrowed(name),
1008 value: Arc::new(custom_counter),
1009 };
1010
1011 custom_val
1012 }
1013
1014 #[test]
1015 fn test_custom_metric_with_mismatching_names() {
1016 let mut custom_val = new_custom_counter("Hi", 1);
1017 let other_custom_val = new_custom_counter("Hello", 1);
1018
1019 assert!(other_custom_val != custom_val);
1021
1022 custom_val.aggregate(&other_custom_val);
1024
1025 let expected_val = new_custom_counter("Hi", 2);
1026 assert!(expected_val == custom_val);
1027 }
1028
1029 #[test]
1030 fn test_custom_metric() {
1031 let mut custom_val = new_custom_counter("hi", 11);
1032 let other_custom_val = new_custom_counter("hi", 20);
1033
1034 custom_val.aggregate(&other_custom_val);
1035
1036 assert!(custom_val != other_custom_val);
1037
1038 if let MetricValue::Custom { value, .. } = custom_val {
1039 let counter = value
1040 .as_any()
1041 .downcast_ref::<CustomCounter>()
1042 .expect("Expected CustomCounter");
1043 assert_eq!(counter.count.load(Ordering::Relaxed), 31);
1044 } else {
1045 panic!("Unexpected value");
1046 }
1047 }
1048
1049 #[test]
1050 fn test_display_output_rows() {
1051 let count = Count::new();
1052 let values = vec![
1053 MetricValue::OutputRows(count.clone()),
1054 MetricValue::Count {
1055 name: "my_counter".into(),
1056 count: count.clone(),
1057 },
1058 ];
1059
1060 for value in &values {
1061 assert_eq!("0", value.to_string(), "value {value:?}");
1062 }
1063
1064 count.add(42);
1065 for value in &values {
1066 assert_eq!("42", value.to_string(), "value {value:?}");
1067 }
1068 }
1069
1070 #[test]
1071 fn test_display_spilled_bytes() {
1072 let count = Count::new();
1073 let spilled_byte = MetricValue::SpilledBytes(count.clone());
1074
1075 assert_eq!("0.0 B", spilled_byte.to_string());
1076
1077 count.add((100 * MB) as usize);
1078 assert_eq!("100.0 MB", spilled_byte.to_string());
1079
1080 count.add((0.5 * MB as f64) as usize);
1081 assert_eq!("100.5 MB", spilled_byte.to_string());
1082 }
1083
1084 #[test]
1085 fn test_display_time() {
1086 let time = Time::new();
1087 let values = vec![
1088 MetricValue::ElapsedCompute(time.clone()),
1089 MetricValue::Time {
1090 name: "my_time".into(),
1091 time: time.clone(),
1092 },
1093 ];
1094
1095 for value in &values {
1097 assert_eq!("NOT RECORDED", value.to_string(), "value {value:?}");
1098 }
1099
1100 time.add_duration(Duration::from_nanos(1042));
1101 for value in &values {
1102 assert_eq!("1.042µs", value.to_string(), "value {value:?}");
1103 }
1104 }
1105
1106 #[test]
1107 fn test_display_ratio() {
1108 let ratio_metrics = RatioMetrics::new();
1109 let ratio = MetricValue::Ratio {
1110 name: Cow::Borrowed("ratio_metric"),
1111 ratio_metrics: ratio_metrics.clone(),
1112 };
1113
1114 assert_eq!("N/A (0/0)", ratio.to_string());
1115
1116 ratio_metrics.add_part(10);
1117 assert_eq!("N/A (10/0)", ratio.to_string());
1118
1119 ratio_metrics.add_total(40);
1120 assert_eq!("25% (10/40)", ratio.to_string());
1121
1122 let tiny_ratio_metrics = RatioMetrics::new();
1123 let tiny_ratio = MetricValue::Ratio {
1124 name: Cow::Borrowed("tiny_ratio_metric"),
1125 ratio_metrics: tiny_ratio_metrics.clone(),
1126 };
1127 tiny_ratio_metrics.add_part(1);
1128 tiny_ratio_metrics.add_total(3000);
1129 assert_eq!("0.033% (1/3000)", tiny_ratio.to_string());
1130 }
1131
1132 #[test]
1133 fn test_display_timestamp() {
1134 let timestamp = Timestamp::new();
1135 let values = vec![
1136 MetricValue::StartTimestamp(timestamp.clone()),
1137 MetricValue::EndTimestamp(timestamp.clone()),
1138 ];
1139
1140 for value in &values {
1142 assert_eq!("NONE", value.to_string(), "value {value:?}");
1143 }
1144
1145 timestamp.set(Utc.timestamp_nanos(1431648000000000));
1146 for value in &values {
1147 assert_eq!(
1148 "1970-01-17 13:40:48 UTC",
1149 value.to_string(),
1150 "value {value:?}"
1151 );
1152 }
1153 }
1154
1155 #[test]
1156 fn test_timer_with_custom_instant() {
1157 let time = Time::new();
1158 let start_time = Instant::now();
1159
1160 std::thread::sleep(Duration::from_millis(1));
1162
1163 let mut timer = time.timer_with(start_time);
1165
1166 std::thread::sleep(Duration::from_millis(1));
1168
1169 timer.stop();
1171
1172 assert!(
1174 time.value() >= 2_000_000,
1175 "Expected at least 2ms, got {} ns",
1176 time.value()
1177 );
1178 }
1179
1180 #[test]
1181 fn test_stop_with_custom_endpoint() {
1182 let time = Time::new();
1183 let start = Instant::now();
1184 let mut timer = time.timer_with(start);
1185
1186 let end = start + Duration::from_millis(10);
1188
1189 timer.stop_with(end);
1191
1192 let recorded = time.value();
1195 assert!(
1196 (10_000_000..=10_100_000).contains(&recorded),
1197 "Expected ~10ms, got {recorded} ns"
1198 );
1199
1200 timer.stop_with(end);
1202 assert_eq!(
1203 recorded,
1204 time.value(),
1205 "Time should not change after second stop"
1206 );
1207 }
1208
1209 #[test]
1210 fn test_done_with_custom_endpoint() {
1211 let time = Time::new();
1212 let start = Instant::now();
1213
1214 {
1216 let timer = time.timer_with(start);
1217
1218 let end = start + Duration::from_millis(5);
1220
1221 timer.done_with(end);
1223
1224 }
1226
1227 let recorded = time.value();
1229 assert!(
1230 (5_000_000..=5_100_000).contains(&recorded),
1231 "Expected ~5ms, got {recorded} ns",
1232 );
1233
1234 {
1236 let timer2 = time.timer_with(start);
1237 let end2 = start + Duration::from_millis(5);
1238 timer2.done_with(end2);
1239 }
1241
1242 let new_recorded = time.value();
1244 assert!(
1245 (10_000_000..=10_100_000).contains(&new_recorded),
1246 "Expected ~10ms total, got {new_recorded} ns",
1247 );
1248 }
1249}