datafusion_physical_expr_common/metrics/baseline.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Metrics common for almost all operators
19
20use std::task::Poll;
21
22use arrow::record_batch::RecordBatch;
23use datafusion_common::{Result, utils::memory::get_record_batch_memory_size};
24
25use super::{Count, ExecutionPlanMetricsSet, MetricBuilder, Time, Timestamp};
26
27/// Helper for creating and tracking common "baseline" metrics for
28/// each operator
29///
30/// Example:
31/// ```
32/// use datafusion_physical_expr_common::metrics::{
33/// BaselineMetrics, ExecutionPlanMetricsSet,
34/// };
35/// let metrics = ExecutionPlanMetricsSet::new();
36///
37/// let partition = 2;
38/// let baseline_metrics = BaselineMetrics::new(&metrics, partition);
39///
40/// // during execution, in CPU intensive operation:
41/// let timer = baseline_metrics.elapsed_compute().timer();
42/// // .. do CPU intensive work
43/// timer.done();
44///
45/// // when operator is finished:
46/// baseline_metrics.done();
47/// ```
48#[derive(Debug, Clone)]
49pub struct BaselineMetrics {
50 /// end_time is set when `BaselineMetrics::done()` is called
51 end_time: Timestamp,
52
53 /// amount of time the operator was actively trying to use the CPU
54 elapsed_compute: Time,
55
56 /// output rows: the total output rows
57 output_rows: Count,
58
59 /// Memory usage of all output batches.
60 ///
61 /// Note: This value may be overestimated. If multiple output `RecordBatch`
62 /// instances share underlying memory buffers, their sizes will be counted
63 /// multiple times.
64 /// Issue: <https://github.com/apache/datafusion/issues/16841>
65 output_bytes: Count,
66
67 /// output batches: the total output batch count
68 output_batches: Count,
69 // Remember to update `docs/source/user-guide/metrics.md` when updating comments
70 // or adding new metrics
71}
72
73impl BaselineMetrics {
74 /// Create a new BaselineMetric structure, and set `start_time` to now
75 pub fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self {
76 let start_time = MetricBuilder::new(metrics).start_timestamp(partition);
77 start_time.record();
78
79 Self {
80 end_time: MetricBuilder::new(metrics)
81 .with_type(super::MetricType::SUMMARY)
82 .end_timestamp(partition),
83 elapsed_compute: MetricBuilder::new(metrics)
84 .with_type(super::MetricType::SUMMARY)
85 .elapsed_compute(partition),
86 output_rows: MetricBuilder::new(metrics)
87 .with_type(super::MetricType::SUMMARY)
88 .output_rows(partition),
89 output_bytes: MetricBuilder::new(metrics)
90 .with_type(super::MetricType::SUMMARY)
91 .output_bytes(partition),
92 output_batches: MetricBuilder::new(metrics)
93 .with_type(super::MetricType::DEV)
94 .output_batches(partition),
95 }
96 }
97
98 /// Returns a [`BaselineMetrics`] that updates the same `elapsed_compute` ignoring
99 /// all other metrics
100 ///
101 /// This is useful when an operator offloads some of its intermediate work to separate tasks
102 /// that as a result won't be recorded by [`Self::record_poll`]
103 pub fn intermediate(&self) -> BaselineMetrics {
104 Self {
105 end_time: Default::default(),
106 elapsed_compute: self.elapsed_compute.clone(),
107 output_rows: Default::default(),
108 output_bytes: Default::default(),
109 output_batches: Default::default(),
110 }
111 }
112
113 /// return the metric for cpu time spend in this operator
114 pub fn elapsed_compute(&self) -> &Time {
115 &self.elapsed_compute
116 }
117
118 /// return the metric for the total number of output rows produced
119 pub fn output_rows(&self) -> &Count {
120 &self.output_rows
121 }
122
123 /// return the metric for the total number of output batches produced
124 pub fn output_batches(&self) -> &Count {
125 &self.output_batches
126 }
127
128 /// Records the fact that this operator's execution is complete
129 /// (recording the `end_time` metric).
130 ///
131 /// Note care should be taken to call `done()` manually if
132 /// `BaselineMetrics` is not `drop`ped immediately upon operator
133 /// completion, as async streams may not be dropped immediately
134 /// depending on the consumer.
135 pub fn done(&self) {
136 self.end_time.record()
137 }
138
139 /// Record that some number of rows have been produced as output
140 ///
141 /// See the [`RecordOutput`] for conveniently recording record
142 /// batch output for other thing
143 pub fn record_output(&self, num_rows: usize) {
144 self.output_rows.add(num_rows);
145 }
146
147 /// If not previously recorded `done()`, record
148 pub fn try_done(&self) {
149 if self.end_time.value().is_none() {
150 self.end_time.record()
151 }
152 }
153
154 /// Process a poll result of a stream producing output for an operator.
155 ///
156 /// Note: this method only updates `output_rows` and `end_time` metrics.
157 /// Remember to update `elapsed_compute` and other metrics manually.
158 pub fn record_poll(
159 &self,
160 poll: Poll<Option<Result<RecordBatch>>>,
161 ) -> Poll<Option<Result<RecordBatch>>> {
162 if let Poll::Ready(maybe_batch) = &poll {
163 match maybe_batch {
164 Some(Ok(batch)) => {
165 batch.record_output(self);
166 }
167 Some(Err(_)) => self.done(),
168 None => self.done(),
169 }
170 }
171 poll
172 }
173}
174
175impl Drop for BaselineMetrics {
176 fn drop(&mut self) {
177 self.try_done()
178 }
179}
180
181/// Helper for creating and tracking spill-related metrics for
182/// each operator
183#[derive(Debug, Clone)]
184pub struct SpillMetrics {
185 /// count of spills during the execution of the operator
186 pub spill_file_count: Count,
187
188 /// total bytes actually written to disk during the execution of the operator
189 pub spilled_bytes: Count,
190
191 /// total spilled rows during the execution of the operator
192 pub spilled_rows: Count,
193}
194
195impl SpillMetrics {
196 /// Create a new SpillMetrics structure
197 pub fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self {
198 Self {
199 spill_file_count: MetricBuilder::new(metrics).spill_count(partition),
200 spilled_bytes: MetricBuilder::new(metrics).spilled_bytes(partition),
201 spilled_rows: MetricBuilder::new(metrics).spilled_rows(partition),
202 }
203 }
204}
205
206/// Metrics for tracking batch splitting activity
207#[derive(Debug, Clone)]
208pub struct SplitMetrics {
209 /// Number of times an input [`RecordBatch`] was split
210 pub batches_split: Count,
211}
212
213impl SplitMetrics {
214 /// Create a new [`SplitMetrics`]
215 pub fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self {
216 Self {
217 batches_split: MetricBuilder::new(metrics)
218 .counter("batches_split", partition),
219 }
220 }
221}
222
223/// Trait for things that produce output rows as a result of execution.
224pub trait RecordOutput {
225 /// Record that some number of output rows have been produced
226 ///
227 /// Meant to be composable so that instead of returning `batch`
228 /// the operator can return `batch.record_output(baseline_metrics)`
229 fn record_output(self, bm: &BaselineMetrics) -> Self;
230}
231
232impl RecordOutput for usize {
233 fn record_output(self, bm: &BaselineMetrics) -> Self {
234 bm.record_output(self);
235 self
236 }
237}
238
239impl RecordOutput for RecordBatch {
240 fn record_output(self, bm: &BaselineMetrics) -> Self {
241 bm.record_output(self.num_rows());
242 let n_bytes = get_record_batch_memory_size(&self);
243 bm.output_bytes.add(n_bytes);
244 bm.output_batches.add(1);
245 self
246 }
247}
248
249impl RecordOutput for &RecordBatch {
250 fn record_output(self, bm: &BaselineMetrics) -> Self {
251 bm.record_output(self.num_rows());
252 let n_bytes = get_record_batch_memory_size(self);
253 bm.output_bytes.add(n_bytes);
254 bm.output_batches.add(1);
255 self
256 }
257}
258
259impl RecordOutput for Option<&RecordBatch> {
260 fn record_output(self, bm: &BaselineMetrics) -> Self {
261 if let Some(record_batch) = &self {
262 record_batch.record_output(bm);
263 }
264 self
265 }
266}
267
268impl RecordOutput for Option<RecordBatch> {
269 fn record_output(self, bm: &BaselineMetrics) -> Self {
270 if let Some(record_batch) = &self {
271 record_batch.record_output(bm);
272 }
273 self
274 }
275}
276
277impl RecordOutput for Result<RecordBatch> {
278 fn record_output(self, bm: &BaselineMetrics) -> Self {
279 if let Ok(record_batch) = &self {
280 record_batch.record_output(bm);
281 }
282 self
283 }
284}