datafusion_distributed/metrics/
bytes_metric.rs1use std::{
2 any::Any,
3 borrow::Cow,
4 fmt::{Display, Formatter},
5 sync::{Arc, atomic::AtomicUsize},
6};
7
8use datafusion::{
9 common::human_readable_size,
10 physical_plan::metrics::{CustomMetricValue, MetricBuilder, MetricValue},
11};
12use std::sync::atomic::Ordering::Relaxed;
13
14pub trait BytesMetricExt {
17 fn bytes_counter(self, name: impl Into<Cow<'static, str>>) -> BytesCounterMetric;
18}
19
20impl BytesMetricExt for MetricBuilder<'_> {
21 fn bytes_counter(self, name: impl Into<Cow<'static, str>>) -> BytesCounterMetric {
22 let value = BytesCounterMetric::default();
23 self.build(MetricValue::Custom {
24 name: name.into(),
25 value: Arc::new(value.clone()),
26 });
27 value
28 }
29}
30#[derive(Debug, Clone)]
40pub struct BytesCounterMetric {
41 bytes: Arc<AtomicUsize>,
42}
43
44impl Default for BytesCounterMetric {
45 fn default() -> Self {
46 Self {
47 bytes: Arc::new(AtomicUsize::new(usize::MIN)),
48 }
49 }
50}
51
52impl BytesCounterMetric {
53 pub fn from_value(bytes: usize) -> Self {
54 Self {
55 bytes: Arc::new(AtomicUsize::new(bytes)),
56 }
57 }
58
59 pub fn value(&self) -> usize {
60 self.bytes.load(Relaxed)
61 }
62
63 pub fn add_bytes(&self, bytes: usize) -> usize {
64 self.bytes.fetch_add(bytes, Relaxed)
65 }
66}
67
68impl Display for BytesCounterMetric {
69 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
70 write!(f, "{}", human_readable_size(self.value()))
71 }
72}
73
74impl CustomMetricValue for BytesCounterMetric {
75 fn new_empty(&self) -> Arc<dyn CustomMetricValue> {
76 Arc::new(BytesCounterMetric::default())
77 }
78
79 fn aggregate(&self, other: Arc<dyn CustomMetricValue + 'static>) {
80 let Some(other) = other.as_any().downcast_ref::<Self>() else {
81 return;
82 };
83 self.bytes.fetch_add(other.bytes.load(Relaxed), Relaxed);
84 }
85
86 fn as_any(&self) -> &dyn Any {
87 self
88 }
89
90 fn as_usize(&self) -> usize {
91 self.value()
92 }
93
94 fn is_eq(&self, other: &Arc<dyn CustomMetricValue>) -> bool {
95 let Some(other) = other.as_any().downcast_ref::<Self>() else {
96 return false;
97 };
98 other.value() == self.value()
99 }
100}
101
102#[cfg(test)]
103mod tests {
104 use super::*;
105
106 #[test]
107 fn default_is_zero_and_add_accumulates() {
108 let m = BytesCounterMetric::default();
109 assert_eq!(m.value(), 0);
110 m.add_bytes(1024);
111 m.add_bytes(2048);
112 assert_eq!(m.value(), 3072);
113 }
114
115 #[test]
116 fn from_value_constructs_correctly() {
117 let m = BytesCounterMetric::from_value(1_000_000);
118 assert_eq!(m.value(), 1_000_000);
119 }
120
121 #[test]
122 fn aggregate_sums_values() {
123 let a = BytesCounterMetric::from_value(500);
124 let b = BytesCounterMetric::from_value(300);
125 a.aggregate(Arc::new(b));
126 assert_eq!(a.value(), 800);
127 }
128
129 #[test]
130 fn display_uses_human_readable_size() {
131 assert_eq!(format!("{}", BytesCounterMetric::from_value(0)), "0.0 B");
133 assert_eq!(
135 format!("{}", BytesCounterMetric::from_value(4 * 1024 * 1024)),
136 "4.0 MB"
137 );
138 assert_eq!(
140 format!("{}", BytesCounterMetric::from_value(4 * 1024 * 1024 * 1024)),
141 "4.0 GB"
142 );
143 }
144}