datafusion_physical_expr_common/metrics/baseline.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Metrics common for almost all operators
19
20use std::{borrow::Cow, collections::BTreeMap, sync::Arc, task::Poll};
21
22use arrow::record_batch::RecordBatch;
23use datafusion_common::{Result, utils::memory::get_record_batch_memory_size};
24
25use super::{
26 Count, ExecutionPlanMetricsSet, Metric, MetricBuilder, MetricsSet, Time, Timestamp,
27};
28
29const OUTPUT_ROWS_SKEW_METRIC_NAME: &str = "output_rows_skew";
30
31/// Helper for creating and tracking common "baseline" metrics for
32/// each operator
33///
34/// Example:
35/// ```
36/// use datafusion_physical_expr_common::metrics::{
37/// BaselineMetrics, ExecutionPlanMetricsSet,
38/// };
39/// let metrics = ExecutionPlanMetricsSet::new();
40///
41/// let partition = 2;
42/// let baseline_metrics = BaselineMetrics::new(&metrics, partition);
43///
44/// // during execution, in CPU intensive operation:
45/// let timer = baseline_metrics.elapsed_compute().timer();
46/// // .. do CPU intensive work
47/// timer.done();
48///
49/// // when operator is finished:
50/// baseline_metrics.done();
51/// ```
52#[derive(Debug, Clone)]
53pub struct BaselineMetrics {
54 /// end_time is set when `BaselineMetrics::done()` is called
55 end_time: Timestamp,
56
57 /// amount of time the operator was actively trying to use the CPU
58 elapsed_compute: Time,
59
60 /// output rows: the total output rows
61 output_rows: Count,
62
63 /// Memory usage of all output batches.
64 ///
65 /// Note: This value may be overestimated. If multiple output `RecordBatch`
66 /// instances share underlying memory buffers, their sizes will be counted
67 /// multiple times.
68 /// Issue: <https://github.com/apache/datafusion/issues/16841>
69 output_bytes: Count,
70
71 /// output batches: the total output batch count
72 output_batches: Count,
73 // Remember to update `docs/source/user-guide/metrics.md` when updating comments
74 // or adding new metrics
75}
76
77impl BaselineMetrics {
78 /// Create a new BaselineMetric structure, and set `start_time` to now
79 pub fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self {
80 let start_time = MetricBuilder::new(metrics).start_timestamp(partition);
81 start_time.record();
82
83 Self {
84 end_time: MetricBuilder::new(metrics)
85 .with_type(super::MetricType::Summary)
86 .end_timestamp(partition),
87 elapsed_compute: MetricBuilder::new(metrics)
88 .with_type(super::MetricType::Summary)
89 .elapsed_compute(partition),
90 output_rows: MetricBuilder::new(metrics)
91 .with_type(super::MetricType::Summary)
92 .output_rows(partition),
93 output_bytes: MetricBuilder::new(metrics)
94 .with_type(super::MetricType::Summary)
95 .output_bytes(partition),
96 output_batches: MetricBuilder::new(metrics)
97 .with_type(super::MetricType::Dev)
98 .output_batches(partition),
99 }
100 }
101
102 /// Returns a [`BaselineMetrics`] that updates the same `elapsed_compute` ignoring
103 /// all other metrics
104 ///
105 /// This is useful when an operator offloads some of its intermediate work to separate tasks
106 /// that as a result won't be recorded by [`Self::record_poll`]
107 pub fn intermediate(&self) -> BaselineMetrics {
108 Self {
109 end_time: Default::default(),
110 elapsed_compute: self.elapsed_compute.clone(),
111 output_rows: Default::default(),
112 output_bytes: Default::default(),
113 output_batches: Default::default(),
114 }
115 }
116
117 /// return the metric for cpu time spend in this operator
118 pub fn elapsed_compute(&self) -> &Time {
119 &self.elapsed_compute
120 }
121
122 /// return the metric for the total number of output rows produced
123 pub fn output_rows(&self) -> &Count {
124 &self.output_rows
125 }
126
127 /// return the metric for the total number of output batches produced
128 pub fn output_batches(&self) -> &Count {
129 &self.output_batches
130 }
131
132 /// Returns a derived metric that summarizes how unevenly `output_rows`
133 /// are distributed across partitions.
134 ///
135 /// The score is normalized to the range `[0%, 100%]`, where `0%`
136 /// indicates a perfectly balanced distribution and `100%` indicates the
137 /// most skewed distribution.
138 ///
139 /// The calculation is:
140 /// `effective_parallelism = square(sum(r_i)) / sum(square(r_i))`
141 /// `output_rows_skew = (1 - ((effective_parallelism - 1) / (partition_count - 1))) * 100%`
142 ///
143 /// Example: for 4 partitions with output rows `[10, 10, 10, 10]`,
144 /// `effective_parallelism = 40^2 / (10^2 + 10^2 + 10^2 + 10^2) = 4`,
145 /// so `output_rows_skew = 0%`. For `[40, 0, 0, 0]`, the score is `100%`.
146 pub fn output_rows_skew_metric(metrics: &MetricsSet) -> Option<Arc<Metric>> {
147 let output_rows = metrics
148 .iter()
149 .filter_map(|metric| match (metric.partition(), metric.value()) {
150 (Some(partition), super::MetricValue::OutputRows(count)) => {
151 Some((partition, count.value() as u128))
152 }
153 _ => None,
154 })
155 .fold(
156 BTreeMap::<usize, u128>::new(),
157 |mut output_rows, (partition, rows)| {
158 *output_rows.entry(partition).or_default() += rows;
159 output_rows
160 },
161 )
162 .into_values()
163 .collect::<Vec<_>>();
164
165 if output_rows.is_empty() {
166 return None;
167 }
168
169 let ratio_metrics = super::RatioMetrics::new().with_display_raw_values(false);
170 if let Some(score) = output_rows_skew_score(&output_rows) {
171 ratio_metrics.set_part((score * 10_000.0).round() as usize);
172 ratio_metrics.set_total(10_000);
173 }
174
175 Some(Arc::new(
176 Metric::new(
177 super::MetricValue::Ratio {
178 name: Cow::Borrowed(OUTPUT_ROWS_SKEW_METRIC_NAME),
179 ratio_metrics,
180 },
181 None,
182 )
183 .with_type(super::MetricType::Dev),
184 ))
185 }
186
187 /// Records the fact that this operator's execution is complete
188 /// (recording the `end_time` metric).
189 ///
190 /// Note care should be taken to call `done()` manually if
191 /// `BaselineMetrics` is not `drop`ped immediately upon operator
192 /// completion, as async streams may not be dropped immediately
193 /// depending on the consumer.
194 pub fn done(&self) {
195 self.end_time.record()
196 }
197
198 /// Record that some number of rows have been produced as output
199 ///
200 /// See the [`RecordOutput`] for conveniently recording record
201 /// batch output for other thing
202 pub fn record_output(&self, num_rows: usize) {
203 self.output_rows.add(num_rows);
204 }
205
206 /// If not previously recorded `done()`, record
207 pub fn try_done(&self) {
208 if self.end_time.value().is_none() {
209 self.end_time.record()
210 }
211 }
212
213 /// Process a poll result of a stream producing output for an operator.
214 ///
215 /// Note: this method only updates `output_rows` and `end_time` metrics.
216 /// Remember to update `elapsed_compute` and other metrics manually.
217 pub fn record_poll(
218 &self,
219 poll: Poll<Option<Result<RecordBatch>>>,
220 ) -> Poll<Option<Result<RecordBatch>>> {
221 if let Poll::Ready(maybe_batch) = &poll {
222 match maybe_batch {
223 Some(Ok(batch)) => {
224 batch.record_output(self);
225 }
226 Some(Err(_)) => self.done(),
227 None => self.done(),
228 }
229 }
230 poll
231 }
232}
233
234impl Drop for BaselineMetrics {
235 fn drop(&mut self) {
236 self.try_done()
237 }
238}
239
240/// See [`BaselineMetrics::output_rows_skew_metric`] for the algorithm.
241fn output_rows_skew_score(output_rows: &[u128]) -> Option<f64> {
242 if output_rows.is_empty() {
243 return None;
244 }
245
246 let partition_count = output_rows.len();
247 if partition_count == 1 {
248 return Some(0.0);
249 }
250
251 let (total_rows, sum_of_squares) =
252 output_rows
253 .iter()
254 .fold((0.0, 0.0), |(total_rows, sum_of_squares), rows| {
255 let rows = *rows as f64;
256 (total_rows + rows, sum_of_squares + rows.powi(2))
257 });
258 if total_rows == 0.0 {
259 return None;
260 }
261
262 if sum_of_squares == 0.0 {
263 return None;
264 }
265
266 let effective_parallelism = total_rows.powi(2) / sum_of_squares;
267 let balanced_score = (effective_parallelism - 1.0) / (partition_count as f64 - 1.0);
268
269 Some((1.0 - balanced_score).clamp(0.0, 1.0))
270}
271
272/// Helper for creating and tracking spill-related metrics for
273/// each operator
274#[derive(Debug, Clone)]
275pub struct SpillMetrics {
276 /// count of spills during the execution of the operator
277 pub spill_file_count: Count,
278
279 /// total bytes actually written to disk during the execution of the operator
280 pub spilled_bytes: Count,
281
282 /// total spilled rows during the execution of the operator
283 pub spilled_rows: Count,
284}
285
286impl SpillMetrics {
287 /// Create a new SpillMetrics structure
288 pub fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self {
289 Self {
290 spill_file_count: MetricBuilder::new(metrics).spill_count(partition),
291 spilled_bytes: MetricBuilder::new(metrics).spilled_bytes(partition),
292 spilled_rows: MetricBuilder::new(metrics).spilled_rows(partition),
293 }
294 }
295}
296
297/// Metrics for tracking batch splitting activity
298#[derive(Debug, Clone)]
299pub struct SplitMetrics {
300 /// Number of times an input [`RecordBatch`] was split
301 pub batches_split: Count,
302}
303
304impl SplitMetrics {
305 /// Create a new [`SplitMetrics`]
306 pub fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self {
307 Self {
308 batches_split: MetricBuilder::new(metrics)
309 .with_category(super::MetricCategory::Rows)
310 .counter("batches_split", partition),
311 }
312 }
313}
314
315/// Trait for things that produce output rows as a result of execution.
316pub trait RecordOutput {
317 /// Record that some number of output rows have been produced
318 ///
319 /// Meant to be composable so that instead of returning `batch`
320 /// the operator can return `batch.record_output(baseline_metrics)`
321 fn record_output(self, bm: &BaselineMetrics) -> Self;
322}
323
324impl RecordOutput for usize {
325 fn record_output(self, bm: &BaselineMetrics) -> Self {
326 bm.record_output(self);
327 self
328 }
329}
330
331impl RecordOutput for RecordBatch {
332 fn record_output(self, bm: &BaselineMetrics) -> Self {
333 bm.record_output(self.num_rows());
334 let n_bytes = get_record_batch_memory_size(&self);
335 bm.output_bytes.add(n_bytes);
336 bm.output_batches.add(1);
337 self
338 }
339}
340
341impl RecordOutput for &RecordBatch {
342 fn record_output(self, bm: &BaselineMetrics) -> Self {
343 bm.record_output(self.num_rows());
344 let n_bytes = get_record_batch_memory_size(self);
345 bm.output_bytes.add(n_bytes);
346 bm.output_batches.add(1);
347 self
348 }
349}
350
351impl RecordOutput for Option<&RecordBatch> {
352 fn record_output(self, bm: &BaselineMetrics) -> Self {
353 if let Some(record_batch) = &self {
354 record_batch.record_output(bm);
355 }
356 self
357 }
358}
359
360impl RecordOutput for Option<RecordBatch> {
361 fn record_output(self, bm: &BaselineMetrics) -> Self {
362 if let Some(record_batch) = &self {
363 record_batch.record_output(bm);
364 }
365 self
366 }
367}
368
369impl RecordOutput for Result<RecordBatch> {
370 fn record_output(self, bm: &BaselineMetrics) -> Self {
371 if let Ok(record_batch) = &self {
372 record_batch.record_output(bm);
373 }
374 self
375 }
376}