1use super::CustomMetricValue;
21use chrono::{DateTime, Utc};
22use datafusion_common::instant::Instant;
23use datafusion_execution::memory_pool::human_readable_size;
24use parking_lot::Mutex;
25use std::{
26 borrow::{Borrow, Cow},
27 fmt::{Debug, Display},
28 sync::{
29 atomic::{AtomicUsize, Ordering},
30 Arc,
31 },
32 time::Duration,
33};
34
35#[derive(Debug, Clone)]
39pub struct Count {
40 value: Arc<AtomicUsize>,
42}
43
44impl PartialEq for Count {
45 fn eq(&self, other: &Self) -> bool {
46 self.value().eq(&other.value())
47 }
48}
49
50impl Display for Count {
51 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
52 write!(f, "{}", self.value())
53 }
54}
55
56impl Default for Count {
57 fn default() -> Self {
58 Self::new()
59 }
60}
61
62impl Count {
63 pub fn new() -> Self {
65 Self {
66 value: Arc::new(AtomicUsize::new(0)),
67 }
68 }
69
70 pub fn add(&self, n: usize) {
72 self.value.fetch_add(n, Ordering::Relaxed);
75 }
76
77 pub fn value(&self) -> usize {
79 self.value.load(Ordering::Relaxed)
80 }
81}
82
83#[derive(Debug, Clone)]
88pub struct Gauge {
89 value: Arc<AtomicUsize>,
91}
92
93impl PartialEq for Gauge {
94 fn eq(&self, other: &Self) -> bool {
95 self.value().eq(&other.value())
96 }
97}
98
99impl Display for Gauge {
100 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
101 write!(f, "{}", self.value())
102 }
103}
104
105impl Default for Gauge {
106 fn default() -> Self {
107 Self::new()
108 }
109}
110
111impl Gauge {
112 pub fn new() -> Self {
114 Self {
115 value: Arc::new(AtomicUsize::new(0)),
116 }
117 }
118
119 pub fn add(&self, n: usize) {
121 self.value.fetch_add(n, Ordering::Relaxed);
124 }
125
126 pub fn sub(&self, n: usize) {
128 self.value.fetch_sub(n, Ordering::Relaxed);
131 }
132
133 pub fn set_max(&self, n: usize) {
135 self.value.fetch_max(n, Ordering::Relaxed);
136 }
137
138 pub fn set(&self, n: usize) -> usize {
140 self.value.swap(n, Ordering::Relaxed)
143 }
144
145 pub fn value(&self) -> usize {
147 self.value.load(Ordering::Relaxed)
148 }
149}
150
151#[derive(Debug, Clone)]
153pub struct Time {
154 nanos: Arc<AtomicUsize>,
156}
157
158impl Default for Time {
159 fn default() -> Self {
160 Self::new()
161 }
162}
163
164impl PartialEq for Time {
165 fn eq(&self, other: &Self) -> bool {
166 self.value().eq(&other.value())
167 }
168}
169
170impl Display for Time {
171 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
172 let duration = Duration::from_nanos(self.value() as u64);
173 write!(f, "{duration:?}")
174 }
175}
176
177impl Time {
178 pub fn new() -> Self {
181 Self {
182 nanos: Arc::new(AtomicUsize::new(0)),
183 }
184 }
185
186 pub fn add_elapsed(&self, start: Instant) {
188 self.add_duration(start.elapsed());
189 }
190
191 pub fn add_duration(&self, duration: Duration) {
202 let more_nanos = duration.as_nanos() as usize;
203 self.nanos.fetch_add(more_nanos.max(1), Ordering::Relaxed);
204 }
205
206 pub fn add(&self, other: &Time) {
208 self.add_duration(Duration::from_nanos(other.value() as u64))
209 }
210
211 pub fn timer(&self) -> ScopedTimerGuard<'_> {
215 ScopedTimerGuard {
216 inner: self,
217 start: Some(Instant::now()),
218 }
219 }
220
221 pub fn value(&self) -> usize {
223 self.nanos.load(Ordering::Relaxed)
224 }
225
226 pub fn timer_with(&self, now: Instant) -> ScopedTimerGuard<'_> {
229 ScopedTimerGuard {
230 inner: self,
231 start: Some(now),
232 }
233 }
234}
235
236#[derive(Debug, Clone)]
239pub struct Timestamp {
240 timestamp: Arc<Mutex<Option<DateTime<Utc>>>>,
242}
243
244impl Default for Timestamp {
245 fn default() -> Self {
246 Self::new()
247 }
248}
249
250impl Timestamp {
251 pub fn new() -> Self {
253 Self {
254 timestamp: Arc::new(Mutex::new(None)),
255 }
256 }
257
258 pub fn record(&self) {
260 self.set(Utc::now())
261 }
262
263 pub fn set(&self, now: DateTime<Utc>) {
265 *self.timestamp.lock() = Some(now);
266 }
267
268 pub fn value(&self) -> Option<DateTime<Utc>> {
273 *self.timestamp.lock()
274 }
275
276 pub fn update_to_min(&self, other: &Timestamp) {
278 let min = match (self.value(), other.value()) {
279 (None, None) => None,
280 (Some(v), None) => Some(v),
281 (None, Some(v)) => Some(v),
282 (Some(v1), Some(v2)) => Some(if v1 < v2 { v1 } else { v2 }),
283 };
284
285 *self.timestamp.lock() = min;
286 }
287
288 pub fn update_to_max(&self, other: &Timestamp) {
290 let max = match (self.value(), other.value()) {
291 (None, None) => None,
292 (Some(v), None) => Some(v),
293 (None, Some(v)) => Some(v),
294 (Some(v1), Some(v2)) => Some(if v1 < v2 { v2 } else { v1 }),
295 };
296
297 *self.timestamp.lock() = max;
298 }
299}
300
301impl PartialEq for Timestamp {
302 fn eq(&self, other: &Self) -> bool {
303 self.value().eq(&other.value())
304 }
305}
306
307impl Display for Timestamp {
308 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
309 match self.value() {
310 None => write!(f, "NONE"),
311 Some(v) => {
312 write!(f, "{v}")
313 }
314 }
315 }
316}
317
318pub struct ScopedTimerGuard<'a> {
322 inner: &'a Time,
323 start: Option<Instant>,
324}
325
326impl ScopedTimerGuard<'_> {
327 pub fn stop(&mut self) {
329 if let Some(start) = self.start.take() {
330 self.inner.add_elapsed(start)
331 }
332 }
333
334 pub fn restart(&mut self) {
336 self.start = Some(Instant::now())
337 }
338
339 pub fn done(mut self) {
341 self.stop()
342 }
343
344 pub fn stop_with(&mut self, end_time: Instant) {
346 if let Some(start) = self.start.take() {
347 let elapsed = end_time - start;
348 self.inner.add_duration(elapsed)
349 }
350 }
351
352 pub fn done_with(mut self, end_time: Instant) {
355 self.stop_with(end_time)
356 }
357}
358
359impl Drop for ScopedTimerGuard<'_> {
360 fn drop(&mut self) {
361 self.stop()
362 }
363}
364
365#[derive(Debug, Clone)]
371pub enum MetricValue {
372 OutputRows(Count),
374 ElapsedCompute(Time),
394 SpillCount(Count),
396 SpilledBytes(Count),
398 SpilledRows(Count),
400 CurrentMemoryUsage(Gauge),
402 Count {
404 name: Cow<'static, str>,
406 count: Count,
408 },
409 Gauge {
411 name: Cow<'static, str>,
413 gauge: Gauge,
415 },
416 Time {
418 name: Cow<'static, str>,
420 time: Time,
422 },
423 StartTimestamp(Timestamp),
425 EndTimestamp(Timestamp),
427 Custom {
428 name: Cow<'static, str>,
430 value: Arc<dyn CustomMetricValue>,
432 },
433}
434
435impl PartialEq for MetricValue {
438 fn eq(&self, other: &Self) -> bool {
439 match (self, other) {
440 (MetricValue::OutputRows(count), MetricValue::OutputRows(other)) => {
441 count == other
442 }
443 (MetricValue::ElapsedCompute(time), MetricValue::ElapsedCompute(other)) => {
444 time == other
445 }
446 (MetricValue::SpillCount(count), MetricValue::SpillCount(other)) => {
447 count == other
448 }
449 (MetricValue::SpilledBytes(count), MetricValue::SpilledBytes(other)) => {
450 count == other
451 }
452 (MetricValue::SpilledRows(count), MetricValue::SpilledRows(other)) => {
453 count == other
454 }
455 (
456 MetricValue::CurrentMemoryUsage(gauge),
457 MetricValue::CurrentMemoryUsage(other),
458 ) => gauge == other,
459 (
460 MetricValue::Count { name, count },
461 MetricValue::Count {
462 name: other_name,
463 count: other_count,
464 },
465 ) => name == other_name && count == other_count,
466 (
467 MetricValue::Gauge { name, gauge },
468 MetricValue::Gauge {
469 name: other_name,
470 gauge: other_gauge,
471 },
472 ) => name == other_name && gauge == other_gauge,
473 (
474 MetricValue::Time { name, time },
475 MetricValue::Time {
476 name: other_name,
477 time: other_time,
478 },
479 ) => name == other_name && time == other_time,
480
481 (
482 MetricValue::StartTimestamp(timestamp),
483 MetricValue::StartTimestamp(other),
484 ) => timestamp == other,
485 (MetricValue::EndTimestamp(timestamp), MetricValue::EndTimestamp(other)) => {
486 timestamp == other
487 }
488 (
489 MetricValue::Custom { name, value },
490 MetricValue::Custom {
491 name: other_name,
492 value: other_value,
493 },
494 ) => name == other_name && value.is_eq(other_value),
495 _ => false,
497 }
498 }
499}
500
501impl MetricValue {
502 pub fn name(&self) -> &str {
504 match self {
505 Self::OutputRows(_) => "output_rows",
506 Self::SpillCount(_) => "spill_count",
507 Self::SpilledBytes(_) => "spilled_bytes",
508 Self::SpilledRows(_) => "spilled_rows",
509 Self::CurrentMemoryUsage(_) => "mem_used",
510 Self::ElapsedCompute(_) => "elapsed_compute",
511 Self::Count { name, .. } => name.borrow(),
512 Self::Gauge { name, .. } => name.borrow(),
513 Self::Time { name, .. } => name.borrow(),
514 Self::StartTimestamp(_) => "start_timestamp",
515 Self::EndTimestamp(_) => "end_timestamp",
516 Self::Custom { name, .. } => name.borrow(),
517 }
518 }
519
520 pub fn as_usize(&self) -> usize {
522 match self {
523 Self::OutputRows(count) => count.value(),
524 Self::SpillCount(count) => count.value(),
525 Self::SpilledBytes(bytes) => bytes.value(),
526 Self::SpilledRows(count) => count.value(),
527 Self::CurrentMemoryUsage(used) => used.value(),
528 Self::ElapsedCompute(time) => time.value(),
529 Self::Count { count, .. } => count.value(),
530 Self::Gauge { gauge, .. } => gauge.value(),
531 Self::Time { time, .. } => time.value(),
532 Self::StartTimestamp(timestamp) => timestamp
533 .value()
534 .and_then(|ts| ts.timestamp_nanos_opt())
535 .map(|nanos| nanos as usize)
536 .unwrap_or(0),
537 Self::EndTimestamp(timestamp) => timestamp
538 .value()
539 .and_then(|ts| ts.timestamp_nanos_opt())
540 .map(|nanos| nanos as usize)
541 .unwrap_or(0),
542 Self::Custom { value, .. } => value.as_usize(),
543 }
544 }
545
546 pub fn new_empty(&self) -> Self {
549 match self {
550 Self::OutputRows(_) => Self::OutputRows(Count::new()),
551 Self::SpillCount(_) => Self::SpillCount(Count::new()),
552 Self::SpilledBytes(_) => Self::SpilledBytes(Count::new()),
553 Self::SpilledRows(_) => Self::SpilledRows(Count::new()),
554 Self::CurrentMemoryUsage(_) => Self::CurrentMemoryUsage(Gauge::new()),
555 Self::ElapsedCompute(_) => Self::ElapsedCompute(Time::new()),
556 Self::Count { name, .. } => Self::Count {
557 name: name.clone(),
558 count: Count::new(),
559 },
560 Self::Gauge { name, .. } => Self::Gauge {
561 name: name.clone(),
562 gauge: Gauge::new(),
563 },
564 Self::Time { name, .. } => Self::Time {
565 name: name.clone(),
566 time: Time::new(),
567 },
568 Self::StartTimestamp(_) => Self::StartTimestamp(Timestamp::new()),
569 Self::EndTimestamp(_) => Self::EndTimestamp(Timestamp::new()),
570 Self::Custom { name, value } => Self::Custom {
571 name: name.clone(),
572 value: value.new_empty(),
573 },
574 }
575 }
576
577 pub fn aggregate(&mut self, other: &Self) {
587 match (self, other) {
588 (Self::OutputRows(count), Self::OutputRows(other_count))
589 | (Self::SpillCount(count), Self::SpillCount(other_count))
590 | (Self::SpilledBytes(count), Self::SpilledBytes(other_count))
591 | (Self::SpilledRows(count), Self::SpilledRows(other_count))
592 | (
593 Self::Count { count, .. },
594 Self::Count {
595 count: other_count, ..
596 },
597 ) => count.add(other_count.value()),
598 (Self::CurrentMemoryUsage(gauge), Self::CurrentMemoryUsage(other_gauge))
599 | (
600 Self::Gauge { gauge, .. },
601 Self::Gauge {
602 gauge: other_gauge, ..
603 },
604 ) => gauge.add(other_gauge.value()),
605 (Self::ElapsedCompute(time), Self::ElapsedCompute(other_time))
606 | (
607 Self::Time { time, .. },
608 Self::Time {
609 time: other_time, ..
610 },
611 ) => time.add(other_time),
612 (Self::StartTimestamp(timestamp), Self::StartTimestamp(other_timestamp)) => {
614 timestamp.update_to_min(other_timestamp);
615 }
616 (Self::EndTimestamp(timestamp), Self::EndTimestamp(other_timestamp)) => {
618 timestamp.update_to_max(other_timestamp);
619 }
620 (
621 Self::Custom { value, .. },
622 Self::Custom {
623 value: other_value, ..
624 },
625 ) => {
626 value.aggregate(Arc::clone(other_value));
627 }
628 m @ (_, _) => {
629 panic!(
630 "Mismatched metric types. Can not aggregate {:?} with value {:?}",
631 m.0, m.1
632 )
633 }
634 }
635 }
636
637 pub fn display_sort_key(&self) -> u8 {
640 match self {
641 Self::OutputRows(_) => 0, Self::ElapsedCompute(_) => 1, Self::SpillCount(_) => 2,
644 Self::SpilledBytes(_) => 3,
645 Self::SpilledRows(_) => 4,
646 Self::CurrentMemoryUsage(_) => 5,
647 Self::Count { .. } => 6,
648 Self::Gauge { .. } => 7,
649 Self::Time { .. } => 8,
650 Self::StartTimestamp(_) => 9, Self::EndTimestamp(_) => 10,
652 Self::Custom { .. } => 11,
653 }
654 }
655
656 pub fn is_timestamp(&self) -> bool {
658 matches!(self, Self::StartTimestamp(_) | Self::EndTimestamp(_))
659 }
660}
661
662impl Display for MetricValue {
663 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
665 match self {
666 Self::OutputRows(count)
667 | Self::SpillCount(count)
668 | Self::SpilledRows(count)
669 | Self::Count { count, .. } => {
670 write!(f, "{count}")
671 }
672 Self::SpilledBytes(count) => {
673 let readable_count = human_readable_size(count.value());
674 write!(f, "{readable_count}")
675 }
676 Self::CurrentMemoryUsage(gauge) | Self::Gauge { gauge, .. } => {
677 write!(f, "{gauge}")
678 }
679 Self::ElapsedCompute(time) | Self::Time { time, .. } => {
680 if time.value() > 0 {
683 write!(f, "{time}")
684 } else {
685 write!(f, "NOT RECORDED")
686 }
687 }
688 Self::StartTimestamp(timestamp) | Self::EndTimestamp(timestamp) => {
689 write!(f, "{timestamp}")
690 }
691 Self::Custom { name, value } => {
692 write!(f, "name:{name} {value}")
693 }
694 }
695 }
696}
697
698#[cfg(test)]
699mod tests {
700 use std::any::Any;
701
702 use chrono::TimeZone;
703 use datafusion_execution::memory_pool::units::MB;
704
705 use super::*;
706
707 #[derive(Debug, Default)]
708 pub struct CustomCounter {
709 count: AtomicUsize,
710 }
711
712 impl Display for CustomCounter {
713 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
714 write!(f, "count: {}", self.count.load(Ordering::Relaxed))
715 }
716 }
717
718 impl CustomMetricValue for CustomCounter {
719 fn new_empty(&self) -> Arc<dyn CustomMetricValue> {
720 Arc::new(CustomCounter::default())
721 }
722
723 fn aggregate(&self, other: Arc<dyn CustomMetricValue + 'static>) {
724 let other = other.as_any().downcast_ref::<Self>().unwrap();
725 self.count
726 .fetch_add(other.count.load(Ordering::Relaxed), Ordering::Relaxed);
727 }
728
729 fn as_any(&self) -> &dyn Any {
730 self
731 }
732
733 fn is_eq(&self, other: &Arc<dyn CustomMetricValue>) -> bool {
734 let Some(other) = other.as_any().downcast_ref::<Self>() else {
735 return false;
736 };
737
738 self.count.load(Ordering::Relaxed) == other.count.load(Ordering::Relaxed)
739 }
740 }
741
742 fn new_custom_counter(name: &'static str, value: usize) -> MetricValue {
743 let custom_counter = CustomCounter::default();
744 custom_counter.count.fetch_add(value, Ordering::Relaxed);
745 let custom_val = MetricValue::Custom {
746 name: Cow::Borrowed(name),
747 value: Arc::new(custom_counter),
748 };
749
750 custom_val
751 }
752
753 #[test]
754 fn test_custom_metric_with_mismatching_names() {
755 let mut custom_val = new_custom_counter("Hi", 1);
756 let other_custom_val = new_custom_counter("Hello", 1);
757
758 assert!(other_custom_val != custom_val);
760
761 custom_val.aggregate(&other_custom_val);
763
764 let expected_val = new_custom_counter("Hi", 2);
765 assert!(expected_val == custom_val);
766 }
767
768 #[test]
769 fn test_custom_metric() {
770 let mut custom_val = new_custom_counter("hi", 11);
771 let other_custom_val = new_custom_counter("hi", 20);
772
773 custom_val.aggregate(&other_custom_val);
774
775 assert!(custom_val != other_custom_val);
776
777 if let MetricValue::Custom { value, .. } = custom_val {
778 let counter = value
779 .as_any()
780 .downcast_ref::<CustomCounter>()
781 .expect("Expected CustomCounter");
782 assert_eq!(counter.count.load(Ordering::Relaxed), 31);
783 } else {
784 panic!("Unexpected value");
785 }
786 }
787
788 #[test]
789 fn test_display_output_rows() {
790 let count = Count::new();
791 let values = vec![
792 MetricValue::OutputRows(count.clone()),
793 MetricValue::Count {
794 name: "my_counter".into(),
795 count: count.clone(),
796 },
797 ];
798
799 for value in &values {
800 assert_eq!("0", value.to_string(), "value {value:?}");
801 }
802
803 count.add(42);
804 for value in &values {
805 assert_eq!("42", value.to_string(), "value {value:?}");
806 }
807 }
808
809 #[test]
810 fn test_display_spilled_bytes() {
811 let count = Count::new();
812 let spilled_byte = MetricValue::SpilledBytes(count.clone());
813
814 assert_eq!("0.0 B", spilled_byte.to_string());
815
816 count.add((100 * MB) as usize);
817 assert_eq!("100.0 MB", spilled_byte.to_string());
818
819 count.add((0.5 * MB as f64) as usize);
820 assert_eq!("100.5 MB", spilled_byte.to_string());
821 }
822
823 #[test]
824 fn test_display_time() {
825 let time = Time::new();
826 let values = vec![
827 MetricValue::ElapsedCompute(time.clone()),
828 MetricValue::Time {
829 name: "my_time".into(),
830 time: time.clone(),
831 },
832 ];
833
834 for value in &values {
836 assert_eq!("NOT RECORDED", value.to_string(), "value {value:?}");
837 }
838
839 time.add_duration(Duration::from_nanos(1042));
840 for value in &values {
841 assert_eq!("1.042µs", value.to_string(), "value {value:?}");
842 }
843 }
844
845 #[test]
846 fn test_display_timestamp() {
847 let timestamp = Timestamp::new();
848 let values = vec![
849 MetricValue::StartTimestamp(timestamp.clone()),
850 MetricValue::EndTimestamp(timestamp.clone()),
851 ];
852
853 for value in &values {
855 assert_eq!("NONE", value.to_string(), "value {value:?}");
856 }
857
858 timestamp.set(Utc.timestamp_nanos(1431648000000000));
859 for value in &values {
860 assert_eq!(
861 "1970-01-17 13:40:48 UTC",
862 value.to_string(),
863 "value {value:?}"
864 );
865 }
866 }
867
868 #[test]
869 fn test_timer_with_custom_instant() {
870 let time = Time::new();
871 let start_time = Instant::now();
872
873 std::thread::sleep(Duration::from_millis(1));
875
876 let mut timer = time.timer_with(start_time);
878
879 std::thread::sleep(Duration::from_millis(1));
881
882 timer.stop();
884
885 assert!(
887 time.value() >= 2_000_000,
888 "Expected at least 2ms, got {} ns",
889 time.value()
890 );
891 }
892
893 #[test]
894 fn test_stop_with_custom_endpoint() {
895 let time = Time::new();
896 let start = Instant::now();
897 let mut timer = time.timer_with(start);
898
899 let end = start + Duration::from_millis(10);
901
902 timer.stop_with(end);
904
905 let recorded = time.value();
908 assert!(
909 (10_000_000..=10_100_000).contains(&recorded),
910 "Expected ~10ms, got {recorded} ns"
911 );
912
913 timer.stop_with(end);
915 assert_eq!(
916 recorded,
917 time.value(),
918 "Time should not change after second stop"
919 );
920 }
921
922 #[test]
923 fn test_done_with_custom_endpoint() {
924 let time = Time::new();
925 let start = Instant::now();
926
927 {
929 let timer = time.timer_with(start);
930
931 let end = start + Duration::from_millis(5);
933
934 timer.done_with(end);
936
937 }
939
940 let recorded = time.value();
942 assert!(
943 (5_000_000..=5_100_000).contains(&recorded),
944 "Expected ~5ms, got {recorded} ns",
945 );
946
947 {
949 let timer2 = time.timer_with(start);
950 let end2 = start + Duration::from_millis(5);
951 timer2.done_with(end2);
952 }
954
955 let new_recorded = time.value();
957 assert!(
958 (10_000_000..=10_100_000).contains(&new_recorded),
959 "Expected ~10ms total, got {new_recorded} ns",
960 );
961 }
962}