Skip to main content

datafusion_distributed/metrics/
bytes_metric.rs

1use std::{
2    any::Any,
3    borrow::Cow,
4    fmt::{Display, Formatter},
5    sync::{Arc, atomic::AtomicUsize},
6};
7
8use datafusion::{
9    common::human_readable_size,
10    physical_plan::metrics::{CustomMetricValue, MetricBuilder, MetricValue},
11};
12use std::sync::atomic::Ordering::Relaxed;
13
14/// Extension trait for DataFusion's metric system that adds support for byte count metrics
15/// that display using human-readable byte sizes (KB, MB, GB) instead of plain count notation.
16pub trait BytesMetricExt {
17    fn bytes_counter(self, name: impl Into<Cow<'static, str>>) -> BytesCounterMetric;
18}
19
20impl BytesMetricExt for MetricBuilder<'_> {
21    fn bytes_counter(self, name: impl Into<Cow<'static, str>>) -> BytesCounterMetric {
22        let value = BytesCounterMetric::default();
23        self.build(MetricValue::Custom {
24            name: name.into(),
25            value: Arc::new(value.clone()),
26        });
27        value
28    }
29}
30/// A cumulative counter metric for tracking byte counts.
31///
32/// Unlike DataFusion's built-in [`Count`](datafusion::physical_plan::metrics::Count) which formats
33/// large values using plain count notation (e.g., "1.91 B" meaning 1.91 billion), this metric
34/// formats values using [`human_readable_size`] (e.g., "1.91 GB").
35///
36/// This avoids the confusing display where "B" (billions) looks like "bytes".
37///
38/// Aggregation sums values across partitions/tasks.
39#[derive(Debug, Clone)]
40pub struct BytesCounterMetric {
41    bytes: Arc<AtomicUsize>,
42}
43
44impl Default for BytesCounterMetric {
45    fn default() -> Self {
46        Self {
47            bytes: Arc::new(AtomicUsize::new(usize::MIN)),
48        }
49    }
50}
51
52impl BytesCounterMetric {
53    pub fn from_value(bytes: usize) -> Self {
54        Self {
55            bytes: Arc::new(AtomicUsize::new(bytes)),
56        }
57    }
58
59    pub fn value(&self) -> usize {
60        self.bytes.load(Relaxed)
61    }
62
63    pub fn add_bytes(&self, bytes: usize) -> usize {
64        self.bytes.fetch_add(bytes, Relaxed)
65    }
66}
67
68impl Display for BytesCounterMetric {
69    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
70        write!(f, "{}", human_readable_size(self.value()))
71    }
72}
73
74impl CustomMetricValue for BytesCounterMetric {
75    fn new_empty(&self) -> Arc<dyn CustomMetricValue> {
76        Arc::new(BytesCounterMetric::default())
77    }
78
79    fn aggregate(&self, other: Arc<dyn CustomMetricValue + 'static>) {
80        let Some(other) = other.as_any().downcast_ref::<Self>() else {
81            return;
82        };
83        self.bytes.fetch_add(other.bytes.load(Relaxed), Relaxed);
84    }
85
86    fn as_any(&self) -> &dyn Any {
87        self
88    }
89
90    fn as_usize(&self) -> usize {
91        self.value()
92    }
93
94    fn is_eq(&self, other: &Arc<dyn CustomMetricValue>) -> bool {
95        let Some(other) = other.as_any().downcast_ref::<Self>() else {
96            return false;
97        };
98        other.value() == self.value()
99    }
100}
101
102#[cfg(test)]
103mod tests {
104    use super::*;
105
106    #[test]
107    fn default_is_zero_and_add_accumulates() {
108        let m = BytesCounterMetric::default();
109        assert_eq!(m.value(), 0);
110        m.add_bytes(1024);
111        m.add_bytes(2048);
112        assert_eq!(m.value(), 3072);
113    }
114
115    #[test]
116    fn from_value_constructs_correctly() {
117        let m = BytesCounterMetric::from_value(1_000_000);
118        assert_eq!(m.value(), 1_000_000);
119    }
120
121    #[test]
122    fn aggregate_sums_values() {
123        let a = BytesCounterMetric::from_value(500);
124        let b = BytesCounterMetric::from_value(300);
125        a.aggregate(Arc::new(b));
126        assert_eq!(a.value(), 800);
127    }
128
129    #[test]
130    fn display_uses_human_readable_size() {
131        // 0 bytes
132        assert_eq!(format!("{}", BytesCounterMetric::from_value(0)), "0.0 B");
133        // 4 MB (>= 2*MB threshold, so displays in MB)
134        assert_eq!(
135            format!("{}", BytesCounterMetric::from_value(4 * 1024 * 1024)),
136            "4.0 MB"
137        );
138        // 4 GB (>= 2*GB threshold, so displays in GB)
139        assert_eq!(
140            format!("{}", BytesCounterMetric::from_value(4 * 1024 * 1024 * 1024)),
141            "4.0 GB"
142        );
143    }
144}