datafusion_physical_expr_common/metrics/
builder.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Builder for creating arbitrary metrics
19
20use std::{borrow::Cow, sync::Arc};
21
22use crate::metrics::{
23    MetricType,
24    value::{PruningMetrics, RatioMergeStrategy, RatioMetrics},
25};
26
27use super::{
28    Count, ExecutionPlanMetricsSet, Gauge, Label, Metric, MetricValue, Time, Timestamp,
29};
30
31/// Structure for constructing metrics, counters, timers, etc.
32///
33/// Note the use of `Cow<..>` is to avoid allocations in the common
34/// case of constant strings
35///
36/// ```rust
37/// use datafusion_physical_expr_common::metrics::*;
38///
39/// let metrics = ExecutionPlanMetricsSet::new();
40/// let partition = 1;
41///
42/// // Create the standard output_rows metric
43/// let output_rows = MetricBuilder::new(&metrics).output_rows(partition);
44///
45/// // Create a operator specific counter with some labels
46/// let num_bytes = MetricBuilder::new(&metrics)
47///     .with_new_label("filename", "my_awesome_file.parquet")
48///     .counter("num_bytes", partition);
49/// ```
50pub struct MetricBuilder<'a> {
51    /// Location that the metric created by this builder will be added do
52    metrics: &'a ExecutionPlanMetricsSet,
53
54    /// optional partition number
55    partition: Option<usize>,
56
57    /// arbitrary name=value pairs identifying this metric
58    labels: Vec<Label>,
59
60    /// The type controlling the verbosity/category for this builder
61    /// See comments in [`MetricType`] for details
62    metric_type: MetricType,
63}
64
65impl<'a> MetricBuilder<'a> {
66    /// Create a new `MetricBuilder` that will register the result of `build()` with the `metrics`
67    ///
68    /// `self.metric_type` controls when such metric is displayed. See comments in
69    /// [`MetricType`] for details.
70    pub fn new(metrics: &'a ExecutionPlanMetricsSet) -> Self {
71        Self {
72            metrics,
73            partition: None,
74            labels: vec![],
75            metric_type: MetricType::DEV,
76        }
77    }
78
79    /// Add a label to the metric being constructed
80    pub fn with_label(mut self, label: Label) -> Self {
81        self.labels.push(label);
82        self
83    }
84
85    /// Set the metric type to the metric being constructed
86    pub fn with_type(mut self, metric_type: MetricType) -> Self {
87        self.metric_type = metric_type;
88        self
89    }
90
91    /// Add a label to the metric being constructed
92    pub fn with_new_label(
93        self,
94        name: impl Into<Cow<'static, str>>,
95        value: impl Into<Cow<'static, str>>,
96    ) -> Self {
97        self.with_label(Label::new(name.into(), value.into()))
98    }
99
100    /// Set the partition of the metric being constructed
101    pub fn with_partition(mut self, partition: usize) -> Self {
102        self.partition = Some(partition);
103        self
104    }
105
106    /// Consume self and create a metric of the specified value
107    /// registered with the MetricsSet
108    pub fn build(self, value: MetricValue) {
109        let Self {
110            labels,
111            partition,
112            metrics,
113            metric_type,
114        } = self;
115        let metric = Arc::new(
116            Metric::new_with_labels(value, partition, labels).with_type(metric_type),
117        );
118        metrics.register(metric);
119    }
120
121    /// Consume self and create a new counter for recording output rows
122    pub fn output_rows(self, partition: usize) -> Count {
123        let count = Count::new();
124        self.with_partition(partition)
125            .build(MetricValue::OutputRows(count.clone()));
126        count
127    }
128
129    /// Consume self and create a new counter for recording the number of spills
130    /// triggered by an operator
131    pub fn spill_count(self, partition: usize) -> Count {
132        let count = Count::new();
133        self.with_partition(partition)
134            .build(MetricValue::SpillCount(count.clone()));
135        count
136    }
137
138    /// Consume self and create a new counter for recording the total spilled bytes
139    /// triggered by an operator
140    pub fn spilled_bytes(self, partition: usize) -> Count {
141        let count = Count::new();
142        self.with_partition(partition)
143            .build(MetricValue::SpilledBytes(count.clone()));
144        count
145    }
146
147    /// Consume self and create a new counter for recording the total spilled rows
148    /// triggered by an operator
149    pub fn spilled_rows(self, partition: usize) -> Count {
150        let count = Count::new();
151        self.with_partition(partition)
152            .build(MetricValue::SpilledRows(count.clone()));
153        count
154    }
155
156    /// Consume self and create a new counter for recording total output bytes
157    pub fn output_bytes(self, partition: usize) -> Count {
158        let count = Count::new();
159        self.with_partition(partition)
160            .build(MetricValue::OutputBytes(count.clone()));
161        count
162    }
163
164    /// Consume self and create a new counter for recording total output batches
165    pub fn output_batches(self, partition: usize) -> Count {
166        let count = Count::new();
167        self.with_partition(partition)
168            .build(MetricValue::OutputBatches(count.clone()));
169        count
170    }
171
172    /// Consume self and create a new gauge for reporting current memory usage
173    pub fn mem_used(self, partition: usize) -> Gauge {
174        let gauge = Gauge::new();
175        self.with_partition(partition)
176            .build(MetricValue::CurrentMemoryUsage(gauge.clone()));
177        gauge
178    }
179
180    /// Consumes self and creates a new [`Count`] for recording some
181    /// arbitrary metric of an operator.
182    pub fn counter(
183        self,
184        counter_name: impl Into<Cow<'static, str>>,
185        partition: usize,
186    ) -> Count {
187        self.with_partition(partition).global_counter(counter_name)
188    }
189
190    /// Consumes self and creates a new [`Gauge`] for reporting some
191    /// arbitrary metric of an operator.
192    pub fn gauge(
193        self,
194        gauge_name: impl Into<Cow<'static, str>>,
195        partition: usize,
196    ) -> Gauge {
197        self.with_partition(partition).global_gauge(gauge_name)
198    }
199
200    /// Consumes self and creates a new [`Count`] for recording a
201    /// metric of an overall operator (not per partition)
202    pub fn global_counter(self, counter_name: impl Into<Cow<'static, str>>) -> Count {
203        let count = Count::new();
204        self.build(MetricValue::Count {
205            name: counter_name.into(),
206            count: count.clone(),
207        });
208        count
209    }
210
211    /// Consumes self and creates a new [`Gauge`] for reporting a
212    /// metric of an overall operator (not per partition)
213    pub fn global_gauge(self, gauge_name: impl Into<Cow<'static, str>>) -> Gauge {
214        let gauge = Gauge::new();
215        self.build(MetricValue::Gauge {
216            name: gauge_name.into(),
217            gauge: gauge.clone(),
218        });
219        gauge
220    }
221
222    /// Consume self and create a new Timer for recording the elapsed
223    /// CPU time spent by an operator
224    pub fn elapsed_compute(self, partition: usize) -> Time {
225        let time = Time::new();
226        self.with_partition(partition)
227            .build(MetricValue::ElapsedCompute(time.clone()));
228        time
229    }
230
231    /// Consumes self and creates a new Timer for recording some
232    /// subset of an operators execution time.
233    pub fn subset_time(
234        self,
235        subset_name: impl Into<Cow<'static, str>>,
236        partition: usize,
237    ) -> Time {
238        let time = Time::new();
239        self.with_partition(partition).build(MetricValue::Time {
240            name: subset_name.into(),
241            time: time.clone(),
242        });
243        time
244    }
245
246    /// Consumes self and creates a new Timestamp for recording the
247    /// starting time of execution for a partition
248    pub fn start_timestamp(self, partition: usize) -> Timestamp {
249        let timestamp = Timestamp::new();
250        self.with_partition(partition)
251            .build(MetricValue::StartTimestamp(timestamp.clone()));
252        timestamp
253    }
254
255    /// Consumes self and creates a new Timestamp for recording the
256    /// ending time of execution for a partition
257    pub fn end_timestamp(self, partition: usize) -> Timestamp {
258        let timestamp = Timestamp::new();
259        self.with_partition(partition)
260            .build(MetricValue::EndTimestamp(timestamp.clone()));
261        timestamp
262    }
263
264    /// Consumes self and creates a new `PruningMetrics`
265    pub fn pruning_metrics(
266        self,
267        name: impl Into<Cow<'static, str>>,
268        partition: usize,
269    ) -> PruningMetrics {
270        let pruning_metrics = PruningMetrics::new();
271        self.with_partition(partition)
272            .build(MetricValue::PruningMetrics {
273                name: name.into(),
274                // inner values will be `Arc::clone()`
275                pruning_metrics: pruning_metrics.clone(),
276            });
277        pruning_metrics
278    }
279
280    /// Consumes self and creates a new [`RatioMetrics`]
281    pub fn ratio_metrics(
282        self,
283        name: impl Into<Cow<'static, str>>,
284        partition: usize,
285    ) -> RatioMetrics {
286        self.ratio_metrics_with_strategy(name, partition, RatioMergeStrategy::default())
287    }
288
289    /// Consumes self and creates a new [`RatioMetrics`] with a specific merge strategy
290    pub fn ratio_metrics_with_strategy(
291        self,
292        name: impl Into<Cow<'static, str>>,
293        partition: usize,
294        merge_strategy: RatioMergeStrategy,
295    ) -> RatioMetrics {
296        let ratio_metrics = RatioMetrics::new().with_merge_strategy(merge_strategy);
297        self.with_partition(partition).build(MetricValue::Ratio {
298            name: name.into(),
299            ratio_metrics: ratio_metrics.clone(),
300        });
301        ratio_metrics
302    }
303}