datafusion_distributed/metrics/
max_gauge_metric.rs1use datafusion::physical_plan::metrics::{CustomMetricValue, MetricBuilder, MetricValue};
2use std::sync::atomic::Ordering::Relaxed;
3use std::{
4 any::Any,
5 borrow::Cow,
6 fmt::{Display, Formatter},
7 sync::{Arc, atomic::AtomicUsize},
8};
9
10pub trait GaugeMetricExt {
13 fn max_gauge(self, name: impl Into<Cow<'static, str>>) -> MaxGaugeMetric;
14}
15
16impl GaugeMetricExt for MetricBuilder<'_> {
17 fn max_gauge(self, name: impl Into<Cow<'static, str>>) -> MaxGaugeMetric {
18 let value = MaxGaugeMetric::default();
19 self.build(MetricValue::Custom {
20 name: name.into(),
21 value: Arc::new(value.clone()),
22 });
23 value
24 }
25}
26
27#[derive(Debug, Clone)]
30pub struct MaxGaugeMetric {
31 value: Arc<AtomicUsize>,
32}
33
34impl Default for MaxGaugeMetric {
35 fn default() -> Self {
36 Self {
37 value: Arc::new(AtomicUsize::new(usize::MIN)),
38 }
39 }
40}
41
42impl MaxGaugeMetric {
43 pub fn from_value(bytes: usize) -> Self {
44 Self {
45 value: Arc::new(AtomicUsize::new(bytes)),
46 }
47 }
48
49 pub fn value(&self) -> usize {
50 self.value.load(Relaxed)
51 }
52
53 pub fn set_max(&self, n: usize) {
54 self.value.fetch_max(n, Relaxed);
55 }
56}
57
58impl Display for MaxGaugeMetric {
59 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
60 write!(f, "{}", self.value())
61 }
62}
63
64impl CustomMetricValue for MaxGaugeMetric {
65 fn new_empty(&self) -> Arc<dyn CustomMetricValue> {
66 Arc::new(MaxGaugeMetric::default())
67 }
68
69 fn aggregate(&self, other: Arc<dyn CustomMetricValue + 'static>) {
70 let Some(other) = other.as_any().downcast_ref::<Self>() else {
71 return;
72 };
73 self.value.fetch_max(other.value.load(Relaxed), Relaxed);
74 }
75
76 fn as_any(&self) -> &dyn Any {
77 self
78 }
79
80 fn as_usize(&self) -> usize {
81 self.value()
82 }
83
84 fn is_eq(&self, other: &Arc<dyn CustomMetricValue>) -> bool {
85 let Some(other) = other.as_any().downcast_ref::<Self>() else {
86 return false;
87 };
88 other.value() == self.value()
89 }
90}
91
92#[cfg(test)]
93mod tests {
94 use super::*;
95
96 #[test]
97 fn default_is_zero_and_set_max_updates() {
98 let m = MaxGaugeMetric::default();
99 assert_eq!(m.value(), 0);
100 m.set_max(1024);
101 assert_eq!(m.value(), 1024);
102 m.set_max(512);
104 assert_eq!(m.value(), 1024);
105 m.set_max(2048);
107 assert_eq!(m.value(), 2048);
108 }
109
110 #[test]
111 fn from_value_constructs_correctly() {
112 let m = MaxGaugeMetric::from_value(1_000_000);
113 assert_eq!(m.value(), 1_000_000);
114 }
115
116 #[test]
117 fn aggregate_takes_max() {
118 let a = MaxGaugeMetric::from_value(500);
119 let b = MaxGaugeMetric::from_value(300);
120 a.aggregate(Arc::new(b));
121 assert_eq!(a.value(), 500);
122
123 let a = MaxGaugeMetric::from_value(300);
124 let b = MaxGaugeMetric::from_value(500);
125 a.aggregate(Arc::new(b));
126 assert_eq!(a.value(), 500);
127 }
128}