datafusion_physical_plan/metrics/builder.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Builder for creating arbitrary metrics
19
20use std::{borrow::Cow, sync::Arc};
21
22use crate::metrics::{
23 value::{PruningMetrics, RatioMetrics},
24 MetricType,
25};
26
27use super::{
28 Count, ExecutionPlanMetricsSet, Gauge, Label, Metric, MetricValue, Time, Timestamp,
29};
30
31/// Structure for constructing metrics, counters, timers, etc.
32///
33/// Note the use of `Cow<..>` is to avoid allocations in the common
34/// case of constant strings
35///
36/// ```rust
37/// use datafusion_physical_plan::metrics::*;
38///
39/// let metrics = ExecutionPlanMetricsSet::new();
40/// let partition = 1;
41///
42/// // Create the standard output_rows metric
43/// let output_rows = MetricBuilder::new(&metrics).output_rows(partition);
44///
45/// // Create a operator specific counter with some labels
46/// let num_bytes = MetricBuilder::new(&metrics)
47/// .with_new_label("filename", "my_awesome_file.parquet")
48/// .counter("num_bytes", partition);
49/// ```
50pub struct MetricBuilder<'a> {
51 /// Location that the metric created by this builder will be added do
52 metrics: &'a ExecutionPlanMetricsSet,
53
54 /// optional partition number
55 partition: Option<usize>,
56
57 /// arbitrary name=value pairs identifying this metric
58 labels: Vec<Label>,
59
60 /// The type controlling the verbosity/category for this builder
61 /// See comments in [`MetricType`] for details
62 metric_type: MetricType,
63}
64
65impl<'a> MetricBuilder<'a> {
66 /// Create a new `MetricBuilder` that will register the result of `build()` with the `metrics`
67 ///
68 /// `self.metric_type` controls when such metric is displayed. See comments in
69 /// [`MetricType`] for details.
70 pub fn new(metrics: &'a ExecutionPlanMetricsSet) -> Self {
71 Self {
72 metrics,
73 partition: None,
74 labels: vec![],
75 metric_type: MetricType::DEV,
76 }
77 }
78
79 /// Add a label to the metric being constructed
80 pub fn with_label(mut self, label: Label) -> Self {
81 self.labels.push(label);
82 self
83 }
84
85 /// Set the metric type to the metric being constructed
86 pub fn with_type(mut self, metric_type: MetricType) -> Self {
87 self.metric_type = metric_type;
88 self
89 }
90
91 /// Add a label to the metric being constructed
92 pub fn with_new_label(
93 self,
94 name: impl Into<Cow<'static, str>>,
95 value: impl Into<Cow<'static, str>>,
96 ) -> Self {
97 self.with_label(Label::new(name.into(), value.into()))
98 }
99
100 /// Set the partition of the metric being constructed
101 pub fn with_partition(mut self, partition: usize) -> Self {
102 self.partition = Some(partition);
103 self
104 }
105
106 /// Consume self and create a metric of the specified value
107 /// registered with the MetricsSet
108 pub fn build(self, value: MetricValue) {
109 let Self {
110 labels,
111 partition,
112 metrics,
113 metric_type,
114 } = self;
115 let metric = Arc::new(
116 Metric::new_with_labels(value, partition, labels).with_type(metric_type),
117 );
118 metrics.register(metric);
119 }
120
121 /// Consume self and create a new counter for recording output rows
122 pub fn output_rows(self, partition: usize) -> Count {
123 let count = Count::new();
124 self.with_partition(partition)
125 .build(MetricValue::OutputRows(count.clone()));
126 count
127 }
128
129 /// Consume self and create a new counter for recording the number of spills
130 /// triggered by an operator
131 pub fn spill_count(self, partition: usize) -> Count {
132 let count = Count::new();
133 self.with_partition(partition)
134 .build(MetricValue::SpillCount(count.clone()));
135 count
136 }
137
138 /// Consume self and create a new counter for recording the total spilled bytes
139 /// triggered by an operator
140 pub fn spilled_bytes(self, partition: usize) -> Count {
141 let count = Count::new();
142 self.with_partition(partition)
143 .build(MetricValue::SpilledBytes(count.clone()));
144 count
145 }
146
147 /// Consume self and create a new counter for recording the total spilled rows
148 /// triggered by an operator
149 pub fn spilled_rows(self, partition: usize) -> Count {
150 let count = Count::new();
151 self.with_partition(partition)
152 .build(MetricValue::SpilledRows(count.clone()));
153 count
154 }
155
156 /// Consume self and create a new counter for recording total output bytes
157 pub fn output_bytes(self, partition: usize) -> Count {
158 let count = Count::new();
159 self.with_partition(partition)
160 .build(MetricValue::OutputBytes(count.clone()));
161 count
162 }
163
164 /// Consume self and create a new gauge for reporting current memory usage
165 pub fn mem_used(self, partition: usize) -> Gauge {
166 let gauge = Gauge::new();
167 self.with_partition(partition)
168 .build(MetricValue::CurrentMemoryUsage(gauge.clone()));
169 gauge
170 }
171
172 /// Consumes self and creates a new [`Count`] for recording some
173 /// arbitrary metric of an operator.
174 pub fn counter(
175 self,
176 counter_name: impl Into<Cow<'static, str>>,
177 partition: usize,
178 ) -> Count {
179 self.with_partition(partition).global_counter(counter_name)
180 }
181
182 /// Consumes self and creates a new [`Gauge`] for reporting some
183 /// arbitrary metric of an operator.
184 pub fn gauge(
185 self,
186 gauge_name: impl Into<Cow<'static, str>>,
187 partition: usize,
188 ) -> Gauge {
189 self.with_partition(partition).global_gauge(gauge_name)
190 }
191
192 /// Consumes self and creates a new [`Count`] for recording a
193 /// metric of an overall operator (not per partition)
194 pub fn global_counter(self, counter_name: impl Into<Cow<'static, str>>) -> Count {
195 let count = Count::new();
196 self.build(MetricValue::Count {
197 name: counter_name.into(),
198 count: count.clone(),
199 });
200 count
201 }
202
203 /// Consumes self and creates a new [`Gauge`] for reporting a
204 /// metric of an overall operator (not per partition)
205 pub fn global_gauge(self, gauge_name: impl Into<Cow<'static, str>>) -> Gauge {
206 let gauge = Gauge::new();
207 self.build(MetricValue::Gauge {
208 name: gauge_name.into(),
209 gauge: gauge.clone(),
210 });
211 gauge
212 }
213
214 /// Consume self and create a new Timer for recording the elapsed
215 /// CPU time spent by an operator
216 pub fn elapsed_compute(self, partition: usize) -> Time {
217 let time = Time::new();
218 self.with_partition(partition)
219 .build(MetricValue::ElapsedCompute(time.clone()));
220 time
221 }
222
223 /// Consumes self and creates a new Timer for recording some
224 /// subset of an operators execution time.
225 pub fn subset_time(
226 self,
227 subset_name: impl Into<Cow<'static, str>>,
228 partition: usize,
229 ) -> Time {
230 let time = Time::new();
231 self.with_partition(partition).build(MetricValue::Time {
232 name: subset_name.into(),
233 time: time.clone(),
234 });
235 time
236 }
237
238 /// Consumes self and creates a new Timestamp for recording the
239 /// starting time of execution for a partition
240 pub fn start_timestamp(self, partition: usize) -> Timestamp {
241 let timestamp = Timestamp::new();
242 self.with_partition(partition)
243 .build(MetricValue::StartTimestamp(timestamp.clone()));
244 timestamp
245 }
246
247 /// Consumes self and creates a new Timestamp for recording the
248 /// ending time of execution for a partition
249 pub fn end_timestamp(self, partition: usize) -> Timestamp {
250 let timestamp = Timestamp::new();
251 self.with_partition(partition)
252 .build(MetricValue::EndTimestamp(timestamp.clone()));
253 timestamp
254 }
255
256 /// Consumes self and creates a new `PruningMetrics`
257 pub fn pruning_metrics(
258 self,
259 name: impl Into<Cow<'static, str>>,
260 partition: usize,
261 ) -> PruningMetrics {
262 let pruning_metrics = PruningMetrics::new();
263 self.with_partition(partition)
264 .build(MetricValue::PruningMetrics {
265 name: name.into(),
266 // inner values will be `Arc::clone()`
267 pruning_metrics: pruning_metrics.clone(),
268 });
269 pruning_metrics
270 }
271
272 /// Consumes self and creates a new [`RatioMetrics`]
273 pub fn ratio_metrics(
274 self,
275 name: impl Into<Cow<'static, str>>,
276 partition: usize,
277 ) -> RatioMetrics {
278 let ratio_metrics = RatioMetrics::new();
279 self.with_partition(partition).build(MetricValue::Ratio {
280 name: name.into(),
281 ratio_metrics: ratio_metrics.clone(),
282 });
283 ratio_metrics
284 }
285}