datafusion_physical_plan/metrics/
builder.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Builder for creating arbitrary metrics
19
20use std::{borrow::Cow, sync::Arc};
21
22use crate::metrics::{
23    value::{PruningMetrics, RatioMetrics},
24    MetricType,
25};
26
27use super::{
28    Count, ExecutionPlanMetricsSet, Gauge, Label, Metric, MetricValue, Time, Timestamp,
29};
30
31/// Structure for constructing metrics, counters, timers, etc.
32///
33/// Note the use of `Cow<..>` is to avoid allocations in the common
34/// case of constant strings
35///
36/// ```rust
37/// use datafusion_physical_plan::metrics::*;
38///
39/// let metrics = ExecutionPlanMetricsSet::new();
40/// let partition = 1;
41///
42/// // Create the standard output_rows metric
43/// let output_rows = MetricBuilder::new(&metrics).output_rows(partition);
44///
45/// // Create a operator specific counter with some labels
46/// let num_bytes = MetricBuilder::new(&metrics)
47///     .with_new_label("filename", "my_awesome_file.parquet")
48///     .counter("num_bytes", partition);
49/// ```
50pub struct MetricBuilder<'a> {
51    /// Location that the metric created by this builder will be added do
52    metrics: &'a ExecutionPlanMetricsSet,
53
54    /// optional partition number
55    partition: Option<usize>,
56
57    /// arbitrary name=value pairs identifying this metric
58    labels: Vec<Label>,
59
60    /// The type controlling the verbosity/category for this builder
61    /// See comments in [`MetricType`] for details
62    metric_type: MetricType,
63}
64
65impl<'a> MetricBuilder<'a> {
66    /// Create a new `MetricBuilder` that will register the result of `build()` with the `metrics`
67    ///
68    /// `self.metric_type` controls when such metric is displayed. See comments in
69    /// [`MetricType`] for details.
70    pub fn new(metrics: &'a ExecutionPlanMetricsSet) -> Self {
71        Self {
72            metrics,
73            partition: None,
74            labels: vec![],
75            metric_type: MetricType::DEV,
76        }
77    }
78
79    /// Add a label to the metric being constructed
80    pub fn with_label(mut self, label: Label) -> Self {
81        self.labels.push(label);
82        self
83    }
84
85    /// Set the metric type to the metric being constructed
86    pub fn with_type(mut self, metric_type: MetricType) -> Self {
87        self.metric_type = metric_type;
88        self
89    }
90
91    /// Add a label to the metric being constructed
92    pub fn with_new_label(
93        self,
94        name: impl Into<Cow<'static, str>>,
95        value: impl Into<Cow<'static, str>>,
96    ) -> Self {
97        self.with_label(Label::new(name.into(), value.into()))
98    }
99
100    /// Set the partition of the metric being constructed
101    pub fn with_partition(mut self, partition: usize) -> Self {
102        self.partition = Some(partition);
103        self
104    }
105
106    /// Consume self and create a metric of the specified value
107    /// registered with the MetricsSet
108    pub fn build(self, value: MetricValue) {
109        let Self {
110            labels,
111            partition,
112            metrics,
113            metric_type,
114        } = self;
115        let metric = Arc::new(
116            Metric::new_with_labels(value, partition, labels).with_type(metric_type),
117        );
118        metrics.register(metric);
119    }
120
121    /// Consume self and create a new counter for recording output rows
122    pub fn output_rows(self, partition: usize) -> Count {
123        let count = Count::new();
124        self.with_partition(partition)
125            .build(MetricValue::OutputRows(count.clone()));
126        count
127    }
128
129    /// Consume self and create a new counter for recording the number of spills
130    /// triggered by an operator
131    pub fn spill_count(self, partition: usize) -> Count {
132        let count = Count::new();
133        self.with_partition(partition)
134            .build(MetricValue::SpillCount(count.clone()));
135        count
136    }
137
138    /// Consume self and create a new counter for recording the total spilled bytes
139    /// triggered by an operator
140    pub fn spilled_bytes(self, partition: usize) -> Count {
141        let count = Count::new();
142        self.with_partition(partition)
143            .build(MetricValue::SpilledBytes(count.clone()));
144        count
145    }
146
147    /// Consume self and create a new counter for recording the total spilled rows
148    /// triggered by an operator
149    pub fn spilled_rows(self, partition: usize) -> Count {
150        let count = Count::new();
151        self.with_partition(partition)
152            .build(MetricValue::SpilledRows(count.clone()));
153        count
154    }
155
156    /// Consume self and create a new counter for recording total output bytes
157    pub fn output_bytes(self, partition: usize) -> Count {
158        let count = Count::new();
159        self.with_partition(partition)
160            .build(MetricValue::OutputBytes(count.clone()));
161        count
162    }
163
164    /// Consume self and create a new gauge for reporting current memory usage
165    pub fn mem_used(self, partition: usize) -> Gauge {
166        let gauge = Gauge::new();
167        self.with_partition(partition)
168            .build(MetricValue::CurrentMemoryUsage(gauge.clone()));
169        gauge
170    }
171
172    /// Consumes self and creates a new [`Count`] for recording some
173    /// arbitrary metric of an operator.
174    pub fn counter(
175        self,
176        counter_name: impl Into<Cow<'static, str>>,
177        partition: usize,
178    ) -> Count {
179        self.with_partition(partition).global_counter(counter_name)
180    }
181
182    /// Consumes self and creates a new [`Gauge`] for reporting some
183    /// arbitrary metric of an operator.
184    pub fn gauge(
185        self,
186        gauge_name: impl Into<Cow<'static, str>>,
187        partition: usize,
188    ) -> Gauge {
189        self.with_partition(partition).global_gauge(gauge_name)
190    }
191
192    /// Consumes self and creates a new [`Count`] for recording a
193    /// metric of an overall operator (not per partition)
194    pub fn global_counter(self, counter_name: impl Into<Cow<'static, str>>) -> Count {
195        let count = Count::new();
196        self.build(MetricValue::Count {
197            name: counter_name.into(),
198            count: count.clone(),
199        });
200        count
201    }
202
203    /// Consumes self and creates a new [`Gauge`] for reporting a
204    /// metric of an overall operator (not per partition)
205    pub fn global_gauge(self, gauge_name: impl Into<Cow<'static, str>>) -> Gauge {
206        let gauge = Gauge::new();
207        self.build(MetricValue::Gauge {
208            name: gauge_name.into(),
209            gauge: gauge.clone(),
210        });
211        gauge
212    }
213
214    /// Consume self and create a new Timer for recording the elapsed
215    /// CPU time spent by an operator
216    pub fn elapsed_compute(self, partition: usize) -> Time {
217        let time = Time::new();
218        self.with_partition(partition)
219            .build(MetricValue::ElapsedCompute(time.clone()));
220        time
221    }
222
223    /// Consumes self and creates a new Timer for recording some
224    /// subset of an operators execution time.
225    pub fn subset_time(
226        self,
227        subset_name: impl Into<Cow<'static, str>>,
228        partition: usize,
229    ) -> Time {
230        let time = Time::new();
231        self.with_partition(partition).build(MetricValue::Time {
232            name: subset_name.into(),
233            time: time.clone(),
234        });
235        time
236    }
237
238    /// Consumes self and creates a new Timestamp for recording the
239    /// starting time of execution for a partition
240    pub fn start_timestamp(self, partition: usize) -> Timestamp {
241        let timestamp = Timestamp::new();
242        self.with_partition(partition)
243            .build(MetricValue::StartTimestamp(timestamp.clone()));
244        timestamp
245    }
246
247    /// Consumes self and creates a new Timestamp for recording the
248    /// ending time of execution for a partition
249    pub fn end_timestamp(self, partition: usize) -> Timestamp {
250        let timestamp = Timestamp::new();
251        self.with_partition(partition)
252            .build(MetricValue::EndTimestamp(timestamp.clone()));
253        timestamp
254    }
255
256    /// Consumes self and creates a new `PruningMetrics`
257    pub fn pruning_metrics(
258        self,
259        name: impl Into<Cow<'static, str>>,
260        partition: usize,
261    ) -> PruningMetrics {
262        let pruning_metrics = PruningMetrics::new();
263        self.with_partition(partition)
264            .build(MetricValue::PruningMetrics {
265                name: name.into(),
266                // inner values will be `Arc::clone()`
267                pruning_metrics: pruning_metrics.clone(),
268            });
269        pruning_metrics
270    }
271
272    /// Consumes self and creates a new [`RatioMetrics`]
273    pub fn ratio_metrics(
274        self,
275        name: impl Into<Cow<'static, str>>,
276        partition: usize,
277    ) -> RatioMetrics {
278        let ratio_metrics = RatioMetrics::new();
279        self.with_partition(partition).build(MetricValue::Ratio {
280            name: name.into(),
281            ratio_metrics: ratio_metrics.clone(),
282        });
283        ratio_metrics
284    }
285}