Skip to main content

datafusion_distributed/metrics/
max_gauge_metric.rs

1use datafusion::physical_plan::metrics::{CustomMetricValue, MetricBuilder, MetricValue};
2use std::sync::atomic::Ordering::Relaxed;
3use std::{
4    any::Any,
5    borrow::Cow,
6    fmt::{Display, Formatter},
7    sync::{Arc, atomic::AtomicUsize},
8};
9
10/// Extension trait for DataFusion's metric system that adds support for a Gauge metric that
11/// aggregates to others using `max` instead of `sum`
12pub trait GaugeMetricExt {
13    fn max_gauge(self, name: impl Into<Cow<'static, str>>) -> MaxGaugeMetric;
14}
15
16impl GaugeMetricExt for MetricBuilder<'_> {
17    fn max_gauge(self, name: impl Into<Cow<'static, str>>) -> MaxGaugeMetric {
18        let value = MaxGaugeMetric::default();
19        self.build(MetricValue::Custom {
20            name: name.into(),
21            value: Arc::new(value.clone()),
22        });
23        value
24    }
25}
26
27/// Similar to DataFusion's Gauge metric, but aggregates between instances using `max` instead of
28/// `sum`.
29#[derive(Debug, Clone)]
30pub struct MaxGaugeMetric {
31    value: Arc<AtomicUsize>,
32}
33
34impl Default for MaxGaugeMetric {
35    fn default() -> Self {
36        Self {
37            value: Arc::new(AtomicUsize::new(usize::MIN)),
38        }
39    }
40}
41
42impl MaxGaugeMetric {
43    pub fn from_value(bytes: usize) -> Self {
44        Self {
45            value: Arc::new(AtomicUsize::new(bytes)),
46        }
47    }
48
49    pub fn value(&self) -> usize {
50        self.value.load(Relaxed)
51    }
52
53    pub fn set_max(&self, n: usize) {
54        self.value.fetch_max(n, Relaxed);
55    }
56}
57
58impl Display for MaxGaugeMetric {
59    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
60        write!(f, "{}", self.value())
61    }
62}
63
64impl CustomMetricValue for MaxGaugeMetric {
65    fn new_empty(&self) -> Arc<dyn CustomMetricValue> {
66        Arc::new(MaxGaugeMetric::default())
67    }
68
69    fn aggregate(&self, other: Arc<dyn CustomMetricValue + 'static>) {
70        let Some(other) = other.as_any().downcast_ref::<Self>() else {
71            return;
72        };
73        self.value.fetch_max(other.value.load(Relaxed), Relaxed);
74    }
75
76    fn as_any(&self) -> &dyn Any {
77        self
78    }
79
80    fn as_usize(&self) -> usize {
81        self.value()
82    }
83
84    fn is_eq(&self, other: &Arc<dyn CustomMetricValue>) -> bool {
85        let Some(other) = other.as_any().downcast_ref::<Self>() else {
86            return false;
87        };
88        other.value() == self.value()
89    }
90}
91
92#[cfg(test)]
93mod tests {
94    use super::*;
95
96    #[test]
97    fn default_is_zero_and_set_max_updates() {
98        let m = MaxGaugeMetric::default();
99        assert_eq!(m.value(), 0);
100        m.set_max(1024);
101        assert_eq!(m.value(), 1024);
102        // Lower value should not decrease the gauge
103        m.set_max(512);
104        assert_eq!(m.value(), 1024);
105        // Higher value should increase it
106        m.set_max(2048);
107        assert_eq!(m.value(), 2048);
108    }
109
110    #[test]
111    fn from_value_constructs_correctly() {
112        let m = MaxGaugeMetric::from_value(1_000_000);
113        assert_eq!(m.value(), 1_000_000);
114    }
115
116    #[test]
117    fn aggregate_takes_max() {
118        let a = MaxGaugeMetric::from_value(500);
119        let b = MaxGaugeMetric::from_value(300);
120        a.aggregate(Arc::new(b));
121        assert_eq!(a.value(), 500);
122
123        let a = MaxGaugeMetric::from_value(300);
124        let b = MaxGaugeMetric::from_value(500);
125        a.aggregate(Arc::new(b));
126        assert_eq!(a.value(), 500);
127    }
128}