datafusion_physical_plan/metrics/
baseline.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Metrics common for almost all operators
19
20use std::task::Poll;
21
22use arrow::record_batch::RecordBatch;
23
24use super::{Count, ExecutionPlanMetricsSet, MetricBuilder, Time, Timestamp};
25use datafusion_common::Result;
26
27/// Helper for creating and tracking common "baseline" metrics for
28/// each operator
29///
30/// Example:
31/// ```
32/// use datafusion_physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet};
33/// let metrics = ExecutionPlanMetricsSet::new();
34///
35/// let partition = 2;
36/// let baseline_metrics = BaselineMetrics::new(&metrics, partition);
37///
38/// // during execution, in CPU intensive operation:
39/// let timer = baseline_metrics.elapsed_compute().timer();
40/// // .. do CPU intensive work
41/// timer.done();
42///
43/// // when operator is finished:
44/// baseline_metrics.done();
45/// ```
46#[derive(Debug, Clone)]
47pub struct BaselineMetrics {
48    /// end_time is set when `BaselineMetrics::done()` is called
49    end_time: Timestamp,
50
51    /// amount of time the operator was actively trying to use the CPU
52    elapsed_compute: Time,
53
54    /// output rows: the total output rows
55    output_rows: Count,
56}
57
58impl BaselineMetrics {
59    /// Create a new BaselineMetric structure, and set `start_time` to now
60    pub fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self {
61        let start_time = MetricBuilder::new(metrics).start_timestamp(partition);
62        start_time.record();
63
64        Self {
65            end_time: MetricBuilder::new(metrics).end_timestamp(partition),
66            elapsed_compute: MetricBuilder::new(metrics).elapsed_compute(partition),
67            output_rows: MetricBuilder::new(metrics).output_rows(partition),
68        }
69    }
70
71    /// Returns a [`BaselineMetrics`] that updates the same `elapsed_compute` ignoring
72    /// all other metrics
73    ///
74    /// This is useful when an operator offloads some of its intermediate work to separate tasks
75    /// that as a result won't be recorded by [`Self::record_poll`]
76    pub fn intermediate(&self) -> BaselineMetrics {
77        Self {
78            end_time: Default::default(),
79            elapsed_compute: self.elapsed_compute.clone(),
80            output_rows: Default::default(),
81        }
82    }
83
84    /// return the metric for cpu time spend in this operator
85    pub fn elapsed_compute(&self) -> &Time {
86        &self.elapsed_compute
87    }
88
89    /// return the metric for the total number of output rows produced
90    pub fn output_rows(&self) -> &Count {
91        &self.output_rows
92    }
93
94    /// Records the fact that this operator's execution is complete
95    /// (recording the `end_time` metric).
96    ///
97    /// Note care should be taken to call `done()` manually if
98    /// `BaselineMetrics` is not `drop`ped immediately upon operator
99    /// completion, as async streams may not be dropped immediately
100    /// depending on the consumer.
101    pub fn done(&self) {
102        self.end_time.record()
103    }
104
105    /// Record that some number of rows have been produced as output
106    ///
107    /// See the [`RecordOutput`] for conveniently recording record
108    /// batch output for other thing
109    pub fn record_output(&self, num_rows: usize) {
110        self.output_rows.add(num_rows);
111    }
112
113    /// If not previously recorded `done()`, record
114    pub fn try_done(&self) {
115        if self.end_time.value().is_none() {
116            self.end_time.record()
117        }
118    }
119
120    /// Process a poll result of a stream producing output for an operator.
121    ///
122    /// Note: this method only updates `output_rows` and `end_time` metrics.
123    /// Remember to update `elapsed_compute` and other metrics manually.
124    pub fn record_poll(
125        &self,
126        poll: Poll<Option<Result<RecordBatch>>>,
127    ) -> Poll<Option<Result<RecordBatch>>> {
128        if let Poll::Ready(maybe_batch) = &poll {
129            match maybe_batch {
130                Some(Ok(batch)) => {
131                    batch.record_output(self);
132                }
133                Some(Err(_)) => self.done(),
134                None => self.done(),
135            }
136        }
137        poll
138    }
139}
140
141impl Drop for BaselineMetrics {
142    fn drop(&mut self) {
143        self.try_done()
144    }
145}
146
147/// Helper for creating and tracking spill-related metrics for
148/// each operator
149#[derive(Debug, Clone)]
150pub struct SpillMetrics {
151    /// count of spills during the execution of the operator
152    pub spill_file_count: Count,
153
154    /// total bytes actually written to disk during the execution of the operator
155    pub spilled_bytes: Count,
156
157    /// total spilled rows during the execution of the operator
158    pub spilled_rows: Count,
159}
160
161impl SpillMetrics {
162    /// Create a new SpillMetrics structure
163    pub fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self {
164        Self {
165            spill_file_count: MetricBuilder::new(metrics).spill_count(partition),
166            spilled_bytes: MetricBuilder::new(metrics).spilled_bytes(partition),
167            spilled_rows: MetricBuilder::new(metrics).spilled_rows(partition),
168        }
169    }
170}
171
172/// Metrics for tracking [`crate::stream::BatchSplitStream`] activity
173#[derive(Debug, Clone)]
174pub struct SplitMetrics {
175    /// Number of times an input [`RecordBatch`] was split
176    pub batches_split: Count,
177}
178
179impl SplitMetrics {
180    /// Create a new [`SplitMetrics`]
181    pub fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self {
182        Self {
183            batches_split: MetricBuilder::new(metrics)
184                .counter("batches_split", partition),
185        }
186    }
187}
188
189/// Trait for things that produce output rows as a result of execution.
190pub trait RecordOutput {
191    /// Record that some number of output rows have been produced
192    ///
193    /// Meant to be composable so that instead of returning `batch`
194    /// the operator can return `batch.record_output(baseline_metrics)`
195    fn record_output(self, bm: &BaselineMetrics) -> Self;
196}
197
198impl RecordOutput for usize {
199    fn record_output(self, bm: &BaselineMetrics) -> Self {
200        bm.record_output(self);
201        self
202    }
203}
204
205impl RecordOutput for RecordBatch {
206    fn record_output(self, bm: &BaselineMetrics) -> Self {
207        bm.record_output(self.num_rows());
208        self
209    }
210}
211
212impl RecordOutput for &RecordBatch {
213    fn record_output(self, bm: &BaselineMetrics) -> Self {
214        bm.record_output(self.num_rows());
215        self
216    }
217}
218
219impl RecordOutput for Option<&RecordBatch> {
220    fn record_output(self, bm: &BaselineMetrics) -> Self {
221        if let Some(record_batch) = &self {
222            record_batch.record_output(bm);
223        }
224        self
225    }
226}
227
228impl RecordOutput for Option<RecordBatch> {
229    fn record_output(self, bm: &BaselineMetrics) -> Self {
230        if let Some(record_batch) = &self {
231            record_batch.record_output(bm);
232        }
233        self
234    }
235}
236
237impl RecordOutput for Result<RecordBatch> {
238    fn record_output(self, bm: &BaselineMetrics) -> Self {
239        if let Ok(record_batch) = &self {
240            record_batch.record_output(bm);
241        }
242        self
243    }
244}