datafusion_physical_plan/metrics/baseline.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Metrics common for almost all operators
19
20use std::task::Poll;
21
22use arrow::record_batch::RecordBatch;
23
24use crate::spill::get_record_batch_memory_size;
25
26use super::{Count, ExecutionPlanMetricsSet, MetricBuilder, Time, Timestamp};
27use datafusion_common::Result;
28
29/// Helper for creating and tracking common "baseline" metrics for
30/// each operator
31///
32/// Example:
33/// ```
34/// use datafusion_physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet};
35/// let metrics = ExecutionPlanMetricsSet::new();
36///
37/// let partition = 2;
38/// let baseline_metrics = BaselineMetrics::new(&metrics, partition);
39///
40/// // during execution, in CPU intensive operation:
41/// let timer = baseline_metrics.elapsed_compute().timer();
42/// // .. do CPU intensive work
43/// timer.done();
44///
45/// // when operator is finished:
46/// baseline_metrics.done();
47/// ```
48#[derive(Debug, Clone)]
49pub struct BaselineMetrics {
50 /// end_time is set when `BaselineMetrics::done()` is called
51 end_time: Timestamp,
52
53 /// amount of time the operator was actively trying to use the CPU
54 elapsed_compute: Time,
55
56 /// output rows: the total output rows
57 output_rows: Count,
58
59 /// Memory usage of all output batches.
60 ///
61 /// Note: This value may be overestimated. If multiple output `RecordBatch`
62 /// instances share underlying memory buffers, their sizes will be counted
63 /// multiple times.
64 /// Issue: <https://github.com/apache/datafusion/issues/16841>
65 output_bytes: Count,
66 // Remember to update `docs/source/user-guide/metrics.md` when updating comments
67 // or adding new metrics
68}
69
70impl BaselineMetrics {
71 /// Create a new BaselineMetric structure, and set `start_time` to now
72 pub fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self {
73 let start_time = MetricBuilder::new(metrics).start_timestamp(partition);
74 start_time.record();
75
76 Self {
77 end_time: MetricBuilder::new(metrics)
78 .with_type(super::MetricType::SUMMARY)
79 .end_timestamp(partition),
80 elapsed_compute: MetricBuilder::new(metrics)
81 .with_type(super::MetricType::SUMMARY)
82 .elapsed_compute(partition),
83 output_rows: MetricBuilder::new(metrics)
84 .with_type(super::MetricType::SUMMARY)
85 .output_rows(partition),
86 output_bytes: MetricBuilder::new(metrics)
87 .with_type(super::MetricType::SUMMARY)
88 .output_bytes(partition),
89 }
90 }
91
92 /// Returns a [`BaselineMetrics`] that updates the same `elapsed_compute` ignoring
93 /// all other metrics
94 ///
95 /// This is useful when an operator offloads some of its intermediate work to separate tasks
96 /// that as a result won't be recorded by [`Self::record_poll`]
97 pub fn intermediate(&self) -> BaselineMetrics {
98 Self {
99 end_time: Default::default(),
100 elapsed_compute: self.elapsed_compute.clone(),
101 output_rows: Default::default(),
102 output_bytes: Default::default(),
103 }
104 }
105
106 /// return the metric for cpu time spend in this operator
107 pub fn elapsed_compute(&self) -> &Time {
108 &self.elapsed_compute
109 }
110
111 /// return the metric for the total number of output rows produced
112 pub fn output_rows(&self) -> &Count {
113 &self.output_rows
114 }
115
116 /// Records the fact that this operator's execution is complete
117 /// (recording the `end_time` metric).
118 ///
119 /// Note care should be taken to call `done()` manually if
120 /// `BaselineMetrics` is not `drop`ped immediately upon operator
121 /// completion, as async streams may not be dropped immediately
122 /// depending on the consumer.
123 pub fn done(&self) {
124 self.end_time.record()
125 }
126
127 /// Record that some number of rows have been produced as output
128 ///
129 /// See the [`RecordOutput`] for conveniently recording record
130 /// batch output for other thing
131 pub fn record_output(&self, num_rows: usize) {
132 self.output_rows.add(num_rows);
133 }
134
135 /// If not previously recorded `done()`, record
136 pub fn try_done(&self) {
137 if self.end_time.value().is_none() {
138 self.end_time.record()
139 }
140 }
141
142 /// Process a poll result of a stream producing output for an operator.
143 ///
144 /// Note: this method only updates `output_rows` and `end_time` metrics.
145 /// Remember to update `elapsed_compute` and other metrics manually.
146 pub fn record_poll(
147 &self,
148 poll: Poll<Option<Result<RecordBatch>>>,
149 ) -> Poll<Option<Result<RecordBatch>>> {
150 if let Poll::Ready(maybe_batch) = &poll {
151 match maybe_batch {
152 Some(Ok(batch)) => {
153 batch.record_output(self);
154 }
155 Some(Err(_)) => self.done(),
156 None => self.done(),
157 }
158 }
159 poll
160 }
161}
162
163impl Drop for BaselineMetrics {
164 fn drop(&mut self) {
165 self.try_done()
166 }
167}
168
169/// Helper for creating and tracking spill-related metrics for
170/// each operator
171#[derive(Debug, Clone)]
172pub struct SpillMetrics {
173 /// count of spills during the execution of the operator
174 pub spill_file_count: Count,
175
176 /// total bytes actually written to disk during the execution of the operator
177 pub spilled_bytes: Count,
178
179 /// total spilled rows during the execution of the operator
180 pub spilled_rows: Count,
181}
182
183impl SpillMetrics {
184 /// Create a new SpillMetrics structure
185 pub fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self {
186 Self {
187 spill_file_count: MetricBuilder::new(metrics).spill_count(partition),
188 spilled_bytes: MetricBuilder::new(metrics).spilled_bytes(partition),
189 spilled_rows: MetricBuilder::new(metrics).spilled_rows(partition),
190 }
191 }
192}
193
194/// Metrics for tracking [`crate::stream::BatchSplitStream`] activity
195#[derive(Debug, Clone)]
196pub struct SplitMetrics {
197 /// Number of times an input [`RecordBatch`] was split
198 pub batches_split: Count,
199}
200
201impl SplitMetrics {
202 /// Create a new [`SplitMetrics`]
203 pub fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self {
204 Self {
205 batches_split: MetricBuilder::new(metrics)
206 .counter("batches_split", partition),
207 }
208 }
209}
210
211/// Trait for things that produce output rows as a result of execution.
212pub trait RecordOutput {
213 /// Record that some number of output rows have been produced
214 ///
215 /// Meant to be composable so that instead of returning `batch`
216 /// the operator can return `batch.record_output(baseline_metrics)`
217 fn record_output(self, bm: &BaselineMetrics) -> Self;
218}
219
220impl RecordOutput for usize {
221 fn record_output(self, bm: &BaselineMetrics) -> Self {
222 bm.record_output(self);
223 self
224 }
225}
226
227impl RecordOutput for RecordBatch {
228 fn record_output(self, bm: &BaselineMetrics) -> Self {
229 bm.record_output(self.num_rows());
230 let n_bytes = get_record_batch_memory_size(&self);
231 bm.output_bytes.add(n_bytes);
232 self
233 }
234}
235
236impl RecordOutput for &RecordBatch {
237 fn record_output(self, bm: &BaselineMetrics) -> Self {
238 bm.record_output(self.num_rows());
239 let n_bytes = get_record_batch_memory_size(self);
240 bm.output_bytes.add(n_bytes);
241 self
242 }
243}
244
245impl RecordOutput for Option<&RecordBatch> {
246 fn record_output(self, bm: &BaselineMetrics) -> Self {
247 if let Some(record_batch) = &self {
248 record_batch.record_output(bm);
249 }
250 self
251 }
252}
253
254impl RecordOutput for Option<RecordBatch> {
255 fn record_output(self, bm: &BaselineMetrics) -> Self {
256 if let Some(record_batch) = &self {
257 record_batch.record_output(bm);
258 }
259 self
260 }
261}
262
263impl RecordOutput for Result<RecordBatch> {
264 fn record_output(self, bm: &BaselineMetrics) -> Self {
265 if let Ok(record_batch) = &self {
266 record_batch.record_output(bm);
267 }
268 self
269 }
270}