datafusion_physical_plan/metrics/baseline.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Metrics common for almost all operators
19
20use std::task::Poll;
21
22use arrow::record_batch::RecordBatch;
23
24use super::{Count, ExecutionPlanMetricsSet, MetricBuilder, Time, Timestamp};
25use datafusion_common::Result;
26
27/// Helper for creating and tracking common "baseline" metrics for
28/// each operator
29///
30/// Example:
31/// ```
32/// use datafusion_physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet};
33/// let metrics = ExecutionPlanMetricsSet::new();
34///
35/// let partition = 2;
36/// let baseline_metrics = BaselineMetrics::new(&metrics, partition);
37///
38/// // during execution, in CPU intensive operation:
39/// let timer = baseline_metrics.elapsed_compute().timer();
40/// // .. do CPU intensive work
41/// timer.done();
42///
43/// // when operator is finished:
44/// baseline_metrics.done();
45/// ```
46#[derive(Debug, Clone)]
47pub struct BaselineMetrics {
48 /// end_time is set when `BaselineMetrics::done()` is called
49 end_time: Timestamp,
50
51 /// amount of time the operator was actively trying to use the CPU
52 elapsed_compute: Time,
53
54 /// output rows: the total output rows
55 output_rows: Count,
56}
57
58impl BaselineMetrics {
59 /// Create a new BaselineMetric structure, and set `start_time` to now
60 pub fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self {
61 let start_time = MetricBuilder::new(metrics).start_timestamp(partition);
62 start_time.record();
63
64 Self {
65 end_time: MetricBuilder::new(metrics).end_timestamp(partition),
66 elapsed_compute: MetricBuilder::new(metrics).elapsed_compute(partition),
67 output_rows: MetricBuilder::new(metrics).output_rows(partition),
68 }
69 }
70
71 /// Returns a [`BaselineMetrics`] that updates the same `elapsed_compute` ignoring
72 /// all other metrics
73 ///
74 /// This is useful when an operator offloads some of its intermediate work to separate tasks
75 /// that as a result won't be recorded by [`Self::record_poll`]
76 pub fn intermediate(&self) -> BaselineMetrics {
77 Self {
78 end_time: Default::default(),
79 elapsed_compute: self.elapsed_compute.clone(),
80 output_rows: Default::default(),
81 }
82 }
83
84 /// return the metric for cpu time spend in this operator
85 pub fn elapsed_compute(&self) -> &Time {
86 &self.elapsed_compute
87 }
88
89 /// return the metric for the total number of output rows produced
90 pub fn output_rows(&self) -> &Count {
91 &self.output_rows
92 }
93
94 /// Records the fact that this operator's execution is complete
95 /// (recording the `end_time` metric).
96 ///
97 /// Note care should be taken to call `done()` manually if
98 /// `BaselineMetrics` is not `drop`ped immediately upon operator
99 /// completion, as async streams may not be dropped immediately
100 /// depending on the consumer.
101 pub fn done(&self) {
102 self.end_time.record()
103 }
104
105 /// Record that some number of rows have been produced as output
106 ///
107 /// See the [`RecordOutput`] for conveniently recording record
108 /// batch output for other thing
109 pub fn record_output(&self, num_rows: usize) {
110 self.output_rows.add(num_rows);
111 }
112
113 /// If not previously recorded `done()`, record
114 pub fn try_done(&self) {
115 if self.end_time.value().is_none() {
116 self.end_time.record()
117 }
118 }
119
120 /// Process a poll result of a stream producing output for an operator.
121 ///
122 /// Note: this method only updates `output_rows` and `end_time` metrics.
123 /// Remember to update `elapsed_compute` and other metrics manually.
124 pub fn record_poll(
125 &self,
126 poll: Poll<Option<Result<RecordBatch>>>,
127 ) -> Poll<Option<Result<RecordBatch>>> {
128 if let Poll::Ready(maybe_batch) = &poll {
129 match maybe_batch {
130 Some(Ok(batch)) => {
131 batch.record_output(self);
132 }
133 Some(Err(_)) => self.done(),
134 None => self.done(),
135 }
136 }
137 poll
138 }
139}
140
141impl Drop for BaselineMetrics {
142 fn drop(&mut self) {
143 self.try_done()
144 }
145}
146
147/// Helper for creating and tracking spill-related metrics for
148/// each operator
149#[derive(Debug, Clone)]
150pub struct SpillMetrics {
151 /// count of spills during the execution of the operator
152 pub spill_file_count: Count,
153
154 /// total bytes actually written to disk during the execution of the operator
155 pub spilled_bytes: Count,
156
157 /// total spilled rows during the execution of the operator
158 pub spilled_rows: Count,
159}
160
161impl SpillMetrics {
162 /// Create a new SpillMetrics structure
163 pub fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self {
164 Self {
165 spill_file_count: MetricBuilder::new(metrics).spill_count(partition),
166 spilled_bytes: MetricBuilder::new(metrics).spilled_bytes(partition),
167 spilled_rows: MetricBuilder::new(metrics).spilled_rows(partition),
168 }
169 }
170}
171
172/// Metrics for tracking [`crate::stream::BatchSplitStream`] activity
173#[derive(Debug, Clone)]
174pub struct SplitMetrics {
175 /// Number of times an input [`RecordBatch`] was split
176 pub batches_split: Count,
177}
178
179impl SplitMetrics {
180 /// Create a new [`SplitMetrics`]
181 pub fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self {
182 Self {
183 batches_split: MetricBuilder::new(metrics)
184 .counter("batches_split", partition),
185 }
186 }
187}
188
189/// Trait for things that produce output rows as a result of execution.
190pub trait RecordOutput {
191 /// Record that some number of output rows have been produced
192 ///
193 /// Meant to be composable so that instead of returning `batch`
194 /// the operator can return `batch.record_output(baseline_metrics)`
195 fn record_output(self, bm: &BaselineMetrics) -> Self;
196}
197
198impl RecordOutput for usize {
199 fn record_output(self, bm: &BaselineMetrics) -> Self {
200 bm.record_output(self);
201 self
202 }
203}
204
205impl RecordOutput for RecordBatch {
206 fn record_output(self, bm: &BaselineMetrics) -> Self {
207 bm.record_output(self.num_rows());
208 self
209 }
210}
211
212impl RecordOutput for &RecordBatch {
213 fn record_output(self, bm: &BaselineMetrics) -> Self {
214 bm.record_output(self.num_rows());
215 self
216 }
217}
218
219impl RecordOutput for Option<&RecordBatch> {
220 fn record_output(self, bm: &BaselineMetrics) -> Self {
221 if let Some(record_batch) = &self {
222 record_batch.record_output(bm);
223 }
224 self
225 }
226}
227
228impl RecordOutput for Option<RecordBatch> {
229 fn record_output(self, bm: &BaselineMetrics) -> Self {
230 if let Some(record_batch) = &self {
231 record_batch.record_output(bm);
232 }
233 self
234 }
235}
236
237impl RecordOutput for Result<RecordBatch> {
238 fn record_output(self, bm: &BaselineMetrics) -> Self {
239 if let Ok(record_batch) = &self {
240 record_batch.record_output(bm);
241 }
242 self
243 }
244}