datafusion_physical_expr_common/metrics/
builder.rs1use std::{borrow::Cow, sync::Arc};
21
22use crate::metrics::{
23 MetricType,
24 value::{PruningMetrics, RatioMergeStrategy, RatioMetrics},
25};
26
27use super::{
28 Count, ExecutionPlanMetricsSet, Gauge, Label, Metric, MetricValue, Time, Timestamp,
29};
30
31pub struct MetricBuilder<'a> {
51 metrics: &'a ExecutionPlanMetricsSet,
53
54 partition: Option<usize>,
56
57 labels: Vec<Label>,
59
60 metric_type: MetricType,
63}
64
65impl<'a> MetricBuilder<'a> {
66 pub fn new(metrics: &'a ExecutionPlanMetricsSet) -> Self {
71 Self {
72 metrics,
73 partition: None,
74 labels: vec![],
75 metric_type: MetricType::DEV,
76 }
77 }
78
79 pub fn with_label(mut self, label: Label) -> Self {
81 self.labels.push(label);
82 self
83 }
84
85 pub fn with_type(mut self, metric_type: MetricType) -> Self {
87 self.metric_type = metric_type;
88 self
89 }
90
91 pub fn with_new_label(
93 self,
94 name: impl Into<Cow<'static, str>>,
95 value: impl Into<Cow<'static, str>>,
96 ) -> Self {
97 self.with_label(Label::new(name.into(), value.into()))
98 }
99
100 pub fn with_partition(mut self, partition: usize) -> Self {
102 self.partition = Some(partition);
103 self
104 }
105
106 pub fn build(self, value: MetricValue) {
109 let Self {
110 labels,
111 partition,
112 metrics,
113 metric_type,
114 } = self;
115 let metric = Arc::new(
116 Metric::new_with_labels(value, partition, labels).with_type(metric_type),
117 );
118 metrics.register(metric);
119 }
120
121 pub fn output_rows(self, partition: usize) -> Count {
123 let count = Count::new();
124 self.with_partition(partition)
125 .build(MetricValue::OutputRows(count.clone()));
126 count
127 }
128
129 pub fn spill_count(self, partition: usize) -> Count {
132 let count = Count::new();
133 self.with_partition(partition)
134 .build(MetricValue::SpillCount(count.clone()));
135 count
136 }
137
138 pub fn spilled_bytes(self, partition: usize) -> Count {
141 let count = Count::new();
142 self.with_partition(partition)
143 .build(MetricValue::SpilledBytes(count.clone()));
144 count
145 }
146
147 pub fn spilled_rows(self, partition: usize) -> Count {
150 let count = Count::new();
151 self.with_partition(partition)
152 .build(MetricValue::SpilledRows(count.clone()));
153 count
154 }
155
156 pub fn output_bytes(self, partition: usize) -> Count {
158 let count = Count::new();
159 self.with_partition(partition)
160 .build(MetricValue::OutputBytes(count.clone()));
161 count
162 }
163
164 pub fn output_batches(self, partition: usize) -> Count {
166 let count = Count::new();
167 self.with_partition(partition)
168 .build(MetricValue::OutputBatches(count.clone()));
169 count
170 }
171
172 pub fn mem_used(self, partition: usize) -> Gauge {
174 let gauge = Gauge::new();
175 self.with_partition(partition)
176 .build(MetricValue::CurrentMemoryUsage(gauge.clone()));
177 gauge
178 }
179
180 pub fn counter(
183 self,
184 counter_name: impl Into<Cow<'static, str>>,
185 partition: usize,
186 ) -> Count {
187 self.with_partition(partition).global_counter(counter_name)
188 }
189
190 pub fn gauge(
193 self,
194 gauge_name: impl Into<Cow<'static, str>>,
195 partition: usize,
196 ) -> Gauge {
197 self.with_partition(partition).global_gauge(gauge_name)
198 }
199
200 pub fn global_counter(self, counter_name: impl Into<Cow<'static, str>>) -> Count {
203 let count = Count::new();
204 self.build(MetricValue::Count {
205 name: counter_name.into(),
206 count: count.clone(),
207 });
208 count
209 }
210
211 pub fn global_gauge(self, gauge_name: impl Into<Cow<'static, str>>) -> Gauge {
214 let gauge = Gauge::new();
215 self.build(MetricValue::Gauge {
216 name: gauge_name.into(),
217 gauge: gauge.clone(),
218 });
219 gauge
220 }
221
222 pub fn elapsed_compute(self, partition: usize) -> Time {
225 let time = Time::new();
226 self.with_partition(partition)
227 .build(MetricValue::ElapsedCompute(time.clone()));
228 time
229 }
230
231 pub fn subset_time(
234 self,
235 subset_name: impl Into<Cow<'static, str>>,
236 partition: usize,
237 ) -> Time {
238 let time = Time::new();
239 self.with_partition(partition).build(MetricValue::Time {
240 name: subset_name.into(),
241 time: time.clone(),
242 });
243 time
244 }
245
246 pub fn start_timestamp(self, partition: usize) -> Timestamp {
249 let timestamp = Timestamp::new();
250 self.with_partition(partition)
251 .build(MetricValue::StartTimestamp(timestamp.clone()));
252 timestamp
253 }
254
255 pub fn end_timestamp(self, partition: usize) -> Timestamp {
258 let timestamp = Timestamp::new();
259 self.with_partition(partition)
260 .build(MetricValue::EndTimestamp(timestamp.clone()));
261 timestamp
262 }
263
264 pub fn pruning_metrics(
266 self,
267 name: impl Into<Cow<'static, str>>,
268 partition: usize,
269 ) -> PruningMetrics {
270 let pruning_metrics = PruningMetrics::new();
271 self.with_partition(partition)
272 .build(MetricValue::PruningMetrics {
273 name: name.into(),
274 pruning_metrics: pruning_metrics.clone(),
276 });
277 pruning_metrics
278 }
279
280 pub fn ratio_metrics(
282 self,
283 name: impl Into<Cow<'static, str>>,
284 partition: usize,
285 ) -> RatioMetrics {
286 self.ratio_metrics_with_strategy(name, partition, RatioMergeStrategy::default())
287 }
288
289 pub fn ratio_metrics_with_strategy(
291 self,
292 name: impl Into<Cow<'static, str>>,
293 partition: usize,
294 merge_strategy: RatioMergeStrategy,
295 ) -> RatioMetrics {
296 let ratio_metrics = RatioMetrics::new().with_merge_strategy(merge_strategy);
297 self.with_partition(partition).build(MetricValue::Ratio {
298 name: name.into(),
299 ratio_metrics: ratio_metrics.clone(),
300 });
301 ratio_metrics
302 }
303}