datafusion_physical_expr_common/metrics/
baseline.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Metrics common for almost all operators
19
20use std::task::Poll;
21
22use arrow::record_batch::RecordBatch;
23use datafusion_common::{Result, utils::memory::get_record_batch_memory_size};
24
25use super::{Count, ExecutionPlanMetricsSet, MetricBuilder, Time, Timestamp};
26
27/// Helper for creating and tracking common "baseline" metrics for
28/// each operator
29///
30/// Example:
31/// ```
32/// use datafusion_physical_expr_common::metrics::{
33///     BaselineMetrics, ExecutionPlanMetricsSet,
34/// };
35/// let metrics = ExecutionPlanMetricsSet::new();
36///
37/// let partition = 2;
38/// let baseline_metrics = BaselineMetrics::new(&metrics, partition);
39///
40/// // during execution, in CPU intensive operation:
41/// let timer = baseline_metrics.elapsed_compute().timer();
42/// // .. do CPU intensive work
43/// timer.done();
44///
45/// // when operator is finished:
46/// baseline_metrics.done();
47/// ```
48#[derive(Debug, Clone)]
49pub struct BaselineMetrics {
50    /// end_time is set when `BaselineMetrics::done()` is called
51    end_time: Timestamp,
52
53    /// amount of time the operator was actively trying to use the CPU
54    elapsed_compute: Time,
55
56    /// output rows: the total output rows
57    output_rows: Count,
58
59    /// Memory usage of all output batches.
60    ///
61    /// Note: This value may be overestimated. If multiple output `RecordBatch`
62    /// instances share underlying memory buffers, their sizes will be counted
63    /// multiple times.
64    /// Issue: <https://github.com/apache/datafusion/issues/16841>
65    output_bytes: Count,
66
67    /// output batches: the total output batch count
68    output_batches: Count,
69    // Remember to update `docs/source/user-guide/metrics.md` when updating comments
70    // or adding new metrics
71}
72
73impl BaselineMetrics {
74    /// Create a new BaselineMetric structure, and set `start_time` to now
75    pub fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self {
76        let start_time = MetricBuilder::new(metrics).start_timestamp(partition);
77        start_time.record();
78
79        Self {
80            end_time: MetricBuilder::new(metrics)
81                .with_type(super::MetricType::SUMMARY)
82                .end_timestamp(partition),
83            elapsed_compute: MetricBuilder::new(metrics)
84                .with_type(super::MetricType::SUMMARY)
85                .elapsed_compute(partition),
86            output_rows: MetricBuilder::new(metrics)
87                .with_type(super::MetricType::SUMMARY)
88                .output_rows(partition),
89            output_bytes: MetricBuilder::new(metrics)
90                .with_type(super::MetricType::SUMMARY)
91                .output_bytes(partition),
92            output_batches: MetricBuilder::new(metrics)
93                .with_type(super::MetricType::DEV)
94                .output_batches(partition),
95        }
96    }
97
98    /// Returns a [`BaselineMetrics`] that updates the same `elapsed_compute` ignoring
99    /// all other metrics
100    ///
101    /// This is useful when an operator offloads some of its intermediate work to separate tasks
102    /// that as a result won't be recorded by [`Self::record_poll`]
103    pub fn intermediate(&self) -> BaselineMetrics {
104        Self {
105            end_time: Default::default(),
106            elapsed_compute: self.elapsed_compute.clone(),
107            output_rows: Default::default(),
108            output_bytes: Default::default(),
109            output_batches: Default::default(),
110        }
111    }
112
113    /// return the metric for cpu time spend in this operator
114    pub fn elapsed_compute(&self) -> &Time {
115        &self.elapsed_compute
116    }
117
118    /// return the metric for the total number of output rows produced
119    pub fn output_rows(&self) -> &Count {
120        &self.output_rows
121    }
122
123    /// return the metric for the total number of output batches produced
124    pub fn output_batches(&self) -> &Count {
125        &self.output_batches
126    }
127
128    /// Records the fact that this operator's execution is complete
129    /// (recording the `end_time` metric).
130    ///
131    /// Note care should be taken to call `done()` manually if
132    /// `BaselineMetrics` is not `drop`ped immediately upon operator
133    /// completion, as async streams may not be dropped immediately
134    /// depending on the consumer.
135    pub fn done(&self) {
136        self.end_time.record()
137    }
138
139    /// Record that some number of rows have been produced as output
140    ///
141    /// See the [`RecordOutput`] for conveniently recording record
142    /// batch output for other thing
143    pub fn record_output(&self, num_rows: usize) {
144        self.output_rows.add(num_rows);
145    }
146
147    /// If not previously recorded `done()`, record
148    pub fn try_done(&self) {
149        if self.end_time.value().is_none() {
150            self.end_time.record()
151        }
152    }
153
154    /// Process a poll result of a stream producing output for an operator.
155    ///
156    /// Note: this method only updates `output_rows` and `end_time` metrics.
157    /// Remember to update `elapsed_compute` and other metrics manually.
158    pub fn record_poll(
159        &self,
160        poll: Poll<Option<Result<RecordBatch>>>,
161    ) -> Poll<Option<Result<RecordBatch>>> {
162        if let Poll::Ready(maybe_batch) = &poll {
163            match maybe_batch {
164                Some(Ok(batch)) => {
165                    batch.record_output(self);
166                }
167                Some(Err(_)) => self.done(),
168                None => self.done(),
169            }
170        }
171        poll
172    }
173}
174
175impl Drop for BaselineMetrics {
176    fn drop(&mut self) {
177        self.try_done()
178    }
179}
180
181/// Helper for creating and tracking spill-related metrics for
182/// each operator
183#[derive(Debug, Clone)]
184pub struct SpillMetrics {
185    /// count of spills during the execution of the operator
186    pub spill_file_count: Count,
187
188    /// total bytes actually written to disk during the execution of the operator
189    pub spilled_bytes: Count,
190
191    /// total spilled rows during the execution of the operator
192    pub spilled_rows: Count,
193}
194
195impl SpillMetrics {
196    /// Create a new SpillMetrics structure
197    pub fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self {
198        Self {
199            spill_file_count: MetricBuilder::new(metrics).spill_count(partition),
200            spilled_bytes: MetricBuilder::new(metrics).spilled_bytes(partition),
201            spilled_rows: MetricBuilder::new(metrics).spilled_rows(partition),
202        }
203    }
204}
205
206/// Metrics for tracking batch splitting activity
207#[derive(Debug, Clone)]
208pub struct SplitMetrics {
209    /// Number of times an input [`RecordBatch`] was split
210    pub batches_split: Count,
211}
212
213impl SplitMetrics {
214    /// Create a new [`SplitMetrics`]
215    pub fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self {
216        Self {
217            batches_split: MetricBuilder::new(metrics)
218                .counter("batches_split", partition),
219        }
220    }
221}
222
223/// Trait for things that produce output rows as a result of execution.
224pub trait RecordOutput {
225    /// Record that some number of output rows have been produced
226    ///
227    /// Meant to be composable so that instead of returning `batch`
228    /// the operator can return `batch.record_output(baseline_metrics)`
229    fn record_output(self, bm: &BaselineMetrics) -> Self;
230}
231
232impl RecordOutput for usize {
233    fn record_output(self, bm: &BaselineMetrics) -> Self {
234        bm.record_output(self);
235        self
236    }
237}
238
239impl RecordOutput for RecordBatch {
240    fn record_output(self, bm: &BaselineMetrics) -> Self {
241        bm.record_output(self.num_rows());
242        let n_bytes = get_record_batch_memory_size(&self);
243        bm.output_bytes.add(n_bytes);
244        bm.output_batches.add(1);
245        self
246    }
247}
248
249impl RecordOutput for &RecordBatch {
250    fn record_output(self, bm: &BaselineMetrics) -> Self {
251        bm.record_output(self.num_rows());
252        let n_bytes = get_record_batch_memory_size(self);
253        bm.output_bytes.add(n_bytes);
254        bm.output_batches.add(1);
255        self
256    }
257}
258
259impl RecordOutput for Option<&RecordBatch> {
260    fn record_output(self, bm: &BaselineMetrics) -> Self {
261        if let Some(record_batch) = &self {
262            record_batch.record_output(bm);
263        }
264        self
265    }
266}
267
268impl RecordOutput for Option<RecordBatch> {
269    fn record_output(self, bm: &BaselineMetrics) -> Self {
270        if let Some(record_batch) = &self {
271            record_batch.record_output(bm);
272        }
273        self
274    }
275}
276
277impl RecordOutput for Result<RecordBatch> {
278    fn record_output(self, bm: &BaselineMetrics) -> Self {
279        if let Ok(record_batch) = &self {
280            record_batch.record_output(bm);
281        }
282        self
283    }
284}