datafusion_physical_plan/metrics/
baseline.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Metrics common for almost all operators
19
20use std::task::Poll;
21
22use arrow::record_batch::RecordBatch;
23
24use crate::spill::get_record_batch_memory_size;
25
26use super::{Count, ExecutionPlanMetricsSet, MetricBuilder, Time, Timestamp};
27use datafusion_common::Result;
28
29/// Helper for creating and tracking common "baseline" metrics for
30/// each operator
31///
32/// Example:
33/// ```
34/// use datafusion_physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet};
35/// let metrics = ExecutionPlanMetricsSet::new();
36///
37/// let partition = 2;
38/// let baseline_metrics = BaselineMetrics::new(&metrics, partition);
39///
40/// // during execution, in CPU intensive operation:
41/// let timer = baseline_metrics.elapsed_compute().timer();
42/// // .. do CPU intensive work
43/// timer.done();
44///
45/// // when operator is finished:
46/// baseline_metrics.done();
47/// ```
48#[derive(Debug, Clone)]
49pub struct BaselineMetrics {
50    /// end_time is set when `BaselineMetrics::done()` is called
51    end_time: Timestamp,
52
53    /// amount of time the operator was actively trying to use the CPU
54    elapsed_compute: Time,
55
56    /// output rows: the total output rows
57    output_rows: Count,
58
59    /// Memory usage of all output batches.
60    ///
61    /// Note: This value may be overestimated. If multiple output `RecordBatch`
62    /// instances share underlying memory buffers, their sizes will be counted
63    /// multiple times.
64    /// Issue: <https://github.com/apache/datafusion/issues/16841>
65    output_bytes: Count,
66    // Remember to update `docs/source/user-guide/metrics.md` when updating comments
67    // or adding new metrics
68}
69
70impl BaselineMetrics {
71    /// Create a new BaselineMetric structure, and set `start_time` to now
72    pub fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self {
73        let start_time = MetricBuilder::new(metrics).start_timestamp(partition);
74        start_time.record();
75
76        Self {
77            end_time: MetricBuilder::new(metrics)
78                .with_type(super::MetricType::SUMMARY)
79                .end_timestamp(partition),
80            elapsed_compute: MetricBuilder::new(metrics)
81                .with_type(super::MetricType::SUMMARY)
82                .elapsed_compute(partition),
83            output_rows: MetricBuilder::new(metrics)
84                .with_type(super::MetricType::SUMMARY)
85                .output_rows(partition),
86            output_bytes: MetricBuilder::new(metrics)
87                .with_type(super::MetricType::SUMMARY)
88                .output_bytes(partition),
89        }
90    }
91
92    /// Returns a [`BaselineMetrics`] that updates the same `elapsed_compute` ignoring
93    /// all other metrics
94    ///
95    /// This is useful when an operator offloads some of its intermediate work to separate tasks
96    /// that as a result won't be recorded by [`Self::record_poll`]
97    pub fn intermediate(&self) -> BaselineMetrics {
98        Self {
99            end_time: Default::default(),
100            elapsed_compute: self.elapsed_compute.clone(),
101            output_rows: Default::default(),
102            output_bytes: Default::default(),
103        }
104    }
105
106    /// return the metric for cpu time spend in this operator
107    pub fn elapsed_compute(&self) -> &Time {
108        &self.elapsed_compute
109    }
110
111    /// return the metric for the total number of output rows produced
112    pub fn output_rows(&self) -> &Count {
113        &self.output_rows
114    }
115
116    /// Records the fact that this operator's execution is complete
117    /// (recording the `end_time` metric).
118    ///
119    /// Note care should be taken to call `done()` manually if
120    /// `BaselineMetrics` is not `drop`ped immediately upon operator
121    /// completion, as async streams may not be dropped immediately
122    /// depending on the consumer.
123    pub fn done(&self) {
124        self.end_time.record()
125    }
126
127    /// Record that some number of rows have been produced as output
128    ///
129    /// See the [`RecordOutput`] for conveniently recording record
130    /// batch output for other thing
131    pub fn record_output(&self, num_rows: usize) {
132        self.output_rows.add(num_rows);
133    }
134
135    /// If not previously recorded `done()`, record
136    pub fn try_done(&self) {
137        if self.end_time.value().is_none() {
138            self.end_time.record()
139        }
140    }
141
142    /// Process a poll result of a stream producing output for an operator.
143    ///
144    /// Note: this method only updates `output_rows` and `end_time` metrics.
145    /// Remember to update `elapsed_compute` and other metrics manually.
146    pub fn record_poll(
147        &self,
148        poll: Poll<Option<Result<RecordBatch>>>,
149    ) -> Poll<Option<Result<RecordBatch>>> {
150        if let Poll::Ready(maybe_batch) = &poll {
151            match maybe_batch {
152                Some(Ok(batch)) => {
153                    batch.record_output(self);
154                }
155                Some(Err(_)) => self.done(),
156                None => self.done(),
157            }
158        }
159        poll
160    }
161}
162
163impl Drop for BaselineMetrics {
164    fn drop(&mut self) {
165        self.try_done()
166    }
167}
168
169/// Helper for creating and tracking spill-related metrics for
170/// each operator
171#[derive(Debug, Clone)]
172pub struct SpillMetrics {
173    /// count of spills during the execution of the operator
174    pub spill_file_count: Count,
175
176    /// total bytes actually written to disk during the execution of the operator
177    pub spilled_bytes: Count,
178
179    /// total spilled rows during the execution of the operator
180    pub spilled_rows: Count,
181}
182
183impl SpillMetrics {
184    /// Create a new SpillMetrics structure
185    pub fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self {
186        Self {
187            spill_file_count: MetricBuilder::new(metrics).spill_count(partition),
188            spilled_bytes: MetricBuilder::new(metrics).spilled_bytes(partition),
189            spilled_rows: MetricBuilder::new(metrics).spilled_rows(partition),
190        }
191    }
192}
193
194/// Metrics for tracking [`crate::stream::BatchSplitStream`] activity
195#[derive(Debug, Clone)]
196pub struct SplitMetrics {
197    /// Number of times an input [`RecordBatch`] was split
198    pub batches_split: Count,
199}
200
201impl SplitMetrics {
202    /// Create a new [`SplitMetrics`]
203    pub fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self {
204        Self {
205            batches_split: MetricBuilder::new(metrics)
206                .counter("batches_split", partition),
207        }
208    }
209}
210
211/// Trait for things that produce output rows as a result of execution.
212pub trait RecordOutput {
213    /// Record that some number of output rows have been produced
214    ///
215    /// Meant to be composable so that instead of returning `batch`
216    /// the operator can return `batch.record_output(baseline_metrics)`
217    fn record_output(self, bm: &BaselineMetrics) -> Self;
218}
219
220impl RecordOutput for usize {
221    fn record_output(self, bm: &BaselineMetrics) -> Self {
222        bm.record_output(self);
223        self
224    }
225}
226
227impl RecordOutput for RecordBatch {
228    fn record_output(self, bm: &BaselineMetrics) -> Self {
229        bm.record_output(self.num_rows());
230        let n_bytes = get_record_batch_memory_size(&self);
231        bm.output_bytes.add(n_bytes);
232        self
233    }
234}
235
236impl RecordOutput for &RecordBatch {
237    fn record_output(self, bm: &BaselineMetrics) -> Self {
238        bm.record_output(self.num_rows());
239        let n_bytes = get_record_batch_memory_size(self);
240        bm.output_bytes.add(n_bytes);
241        self
242    }
243}
244
245impl RecordOutput for Option<&RecordBatch> {
246    fn record_output(self, bm: &BaselineMetrics) -> Self {
247        if let Some(record_batch) = &self {
248            record_batch.record_output(bm);
249        }
250        self
251    }
252}
253
254impl RecordOutput for Option<RecordBatch> {
255    fn record_output(self, bm: &BaselineMetrics) -> Self {
256        if let Some(record_batch) = &self {
257            record_batch.record_output(bm);
258        }
259        self
260    }
261}
262
263impl RecordOutput for Result<RecordBatch> {
264    fn record_output(self, bm: &BaselineMetrics) -> Self {
265        if let Ok(record_batch) = &self {
266            record_batch.record_output(bm);
267        }
268        self
269    }
270}