Skip to main content

datafusion_physical_expr_common/metrics/
baseline.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Metrics common for almost all operators
19
20use std::{borrow::Cow, collections::BTreeMap, sync::Arc, task::Poll};
21
22use arrow::record_batch::RecordBatch;
23use datafusion_common::{Result, utils::memory::get_record_batch_memory_size};
24
25use super::{
26    Count, ExecutionPlanMetricsSet, Metric, MetricBuilder, MetricsSet, Time, Timestamp,
27};
28
29const OUTPUT_ROWS_SKEW_METRIC_NAME: &str = "output_rows_skew";
30
31/// Helper for creating and tracking common "baseline" metrics for
32/// each operator
33///
34/// Example:
35/// ```
36/// use datafusion_physical_expr_common::metrics::{
37///     BaselineMetrics, ExecutionPlanMetricsSet,
38/// };
39/// let metrics = ExecutionPlanMetricsSet::new();
40///
41/// let partition = 2;
42/// let baseline_metrics = BaselineMetrics::new(&metrics, partition);
43///
44/// // during execution, in CPU intensive operation:
45/// let timer = baseline_metrics.elapsed_compute().timer();
46/// // .. do CPU intensive work
47/// timer.done();
48///
49/// // when operator is finished:
50/// baseline_metrics.done();
51/// ```
52#[derive(Debug, Clone)]
53pub struct BaselineMetrics {
54    /// end_time is set when `BaselineMetrics::done()` is called
55    end_time: Timestamp,
56
57    /// amount of time the operator was actively trying to use the CPU
58    elapsed_compute: Time,
59
60    /// output rows: the total output rows
61    output_rows: Count,
62
63    /// Memory usage of all output batches.
64    ///
65    /// Note: This value may be overestimated. If multiple output `RecordBatch`
66    /// instances share underlying memory buffers, their sizes will be counted
67    /// multiple times.
68    /// Issue: <https://github.com/apache/datafusion/issues/16841>
69    output_bytes: Count,
70
71    /// output batches: the total output batch count
72    output_batches: Count,
73    // Remember to update `docs/source/user-guide/metrics.md` when updating comments
74    // or adding new metrics
75}
76
77impl BaselineMetrics {
78    /// Create a new BaselineMetric structure, and set `start_time` to now
79    pub fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self {
80        let start_time = MetricBuilder::new(metrics).start_timestamp(partition);
81        start_time.record();
82
83        Self {
84            end_time: MetricBuilder::new(metrics)
85                .with_type(super::MetricType::Summary)
86                .end_timestamp(partition),
87            elapsed_compute: MetricBuilder::new(metrics)
88                .with_type(super::MetricType::Summary)
89                .elapsed_compute(partition),
90            output_rows: MetricBuilder::new(metrics)
91                .with_type(super::MetricType::Summary)
92                .output_rows(partition),
93            output_bytes: MetricBuilder::new(metrics)
94                .with_type(super::MetricType::Summary)
95                .output_bytes(partition),
96            output_batches: MetricBuilder::new(metrics)
97                .with_type(super::MetricType::Dev)
98                .output_batches(partition),
99        }
100    }
101
102    /// Returns a [`BaselineMetrics`] that updates the same `elapsed_compute` ignoring
103    /// all other metrics
104    ///
105    /// This is useful when an operator offloads some of its intermediate work to separate tasks
106    /// that as a result won't be recorded by [`Self::record_poll`]
107    pub fn intermediate(&self) -> BaselineMetrics {
108        Self {
109            end_time: Default::default(),
110            elapsed_compute: self.elapsed_compute.clone(),
111            output_rows: Default::default(),
112            output_bytes: Default::default(),
113            output_batches: Default::default(),
114        }
115    }
116
117    /// return the metric for cpu time spend in this operator
118    pub fn elapsed_compute(&self) -> &Time {
119        &self.elapsed_compute
120    }
121
122    /// return the metric for the total number of output rows produced
123    pub fn output_rows(&self) -> &Count {
124        &self.output_rows
125    }
126
127    /// return the metric for the total number of output batches produced
128    pub fn output_batches(&self) -> &Count {
129        &self.output_batches
130    }
131
132    /// Returns a derived metric that summarizes how unevenly `output_rows`
133    /// are distributed across partitions.
134    ///
135    /// The score is normalized to the range `[0%, 100%]`, where `0%`
136    /// indicates a perfectly balanced distribution and `100%` indicates the
137    /// most skewed distribution.
138    ///
139    /// The calculation is:
140    /// `effective_parallelism = square(sum(r_i)) / sum(square(r_i))`
141    /// `output_rows_skew = (1 - ((effective_parallelism - 1) / (partition_count - 1))) * 100%`
142    ///
143    /// Example: for 4 partitions with output rows `[10, 10, 10, 10]`,
144    /// `effective_parallelism = 40^2 / (10^2 + 10^2 + 10^2 + 10^2) = 4`,
145    /// so `output_rows_skew = 0%`. For `[40, 0, 0, 0]`, the score is `100%`.
146    pub fn output_rows_skew_metric(metrics: &MetricsSet) -> Option<Arc<Metric>> {
147        let output_rows = metrics
148            .iter()
149            .filter_map(|metric| match (metric.partition(), metric.value()) {
150                (Some(partition), super::MetricValue::OutputRows(count)) => {
151                    Some((partition, count.value() as u128))
152                }
153                _ => None,
154            })
155            .fold(
156                BTreeMap::<usize, u128>::new(),
157                |mut output_rows, (partition, rows)| {
158                    *output_rows.entry(partition).or_default() += rows;
159                    output_rows
160                },
161            )
162            .into_values()
163            .collect::<Vec<_>>();
164
165        if output_rows.is_empty() {
166            return None;
167        }
168
169        let ratio_metrics = super::RatioMetrics::new().with_display_raw_values(false);
170        if let Some(score) = output_rows_skew_score(&output_rows) {
171            ratio_metrics.set_part((score * 10_000.0).round() as usize);
172            ratio_metrics.set_total(10_000);
173        }
174
175        Some(Arc::new(
176            Metric::new(
177                super::MetricValue::Ratio {
178                    name: Cow::Borrowed(OUTPUT_ROWS_SKEW_METRIC_NAME),
179                    ratio_metrics,
180                },
181                None,
182            )
183            .with_type(super::MetricType::Dev),
184        ))
185    }
186
187    /// Records the fact that this operator's execution is complete
188    /// (recording the `end_time` metric).
189    ///
190    /// Note care should be taken to call `done()` manually if
191    /// `BaselineMetrics` is not `drop`ped immediately upon operator
192    /// completion, as async streams may not be dropped immediately
193    /// depending on the consumer.
194    pub fn done(&self) {
195        self.end_time.record()
196    }
197
198    /// Record that some number of rows have been produced as output
199    ///
200    /// See the [`RecordOutput`] for conveniently recording record
201    /// batch output for other thing
202    pub fn record_output(&self, num_rows: usize) {
203        self.output_rows.add(num_rows);
204    }
205
206    /// If not previously recorded `done()`, record
207    pub fn try_done(&self) {
208        if self.end_time.value().is_none() {
209            self.end_time.record()
210        }
211    }
212
213    /// Process a poll result of a stream producing output for an operator.
214    ///
215    /// Note: this method only updates `output_rows` and `end_time` metrics.
216    /// Remember to update `elapsed_compute` and other metrics manually.
217    pub fn record_poll(
218        &self,
219        poll: Poll<Option<Result<RecordBatch>>>,
220    ) -> Poll<Option<Result<RecordBatch>>> {
221        if let Poll::Ready(maybe_batch) = &poll {
222            match maybe_batch {
223                Some(Ok(batch)) => {
224                    batch.record_output(self);
225                }
226                Some(Err(_)) => self.done(),
227                None => self.done(),
228            }
229        }
230        poll
231    }
232}
233
234impl Drop for BaselineMetrics {
235    fn drop(&mut self) {
236        self.try_done()
237    }
238}
239
240/// See [`BaselineMetrics::output_rows_skew_metric`] for the algorithm.
241fn output_rows_skew_score(output_rows: &[u128]) -> Option<f64> {
242    if output_rows.is_empty() {
243        return None;
244    }
245
246    let partition_count = output_rows.len();
247    if partition_count == 1 {
248        return Some(0.0);
249    }
250
251    let (total_rows, sum_of_squares) =
252        output_rows
253            .iter()
254            .fold((0.0, 0.0), |(total_rows, sum_of_squares), rows| {
255                let rows = *rows as f64;
256                (total_rows + rows, sum_of_squares + rows.powi(2))
257            });
258    if total_rows == 0.0 {
259        return None;
260    }
261
262    if sum_of_squares == 0.0 {
263        return None;
264    }
265
266    let effective_parallelism = total_rows.powi(2) / sum_of_squares;
267    let balanced_score = (effective_parallelism - 1.0) / (partition_count as f64 - 1.0);
268
269    Some((1.0 - balanced_score).clamp(0.0, 1.0))
270}
271
272/// Helper for creating and tracking spill-related metrics for
273/// each operator
274#[derive(Debug, Clone)]
275pub struct SpillMetrics {
276    /// count of spills during the execution of the operator
277    pub spill_file_count: Count,
278
279    /// total bytes actually written to disk during the execution of the operator
280    pub spilled_bytes: Count,
281
282    /// total spilled rows during the execution of the operator
283    pub spilled_rows: Count,
284}
285
286impl SpillMetrics {
287    /// Create a new SpillMetrics structure
288    pub fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self {
289        Self {
290            spill_file_count: MetricBuilder::new(metrics).spill_count(partition),
291            spilled_bytes: MetricBuilder::new(metrics).spilled_bytes(partition),
292            spilled_rows: MetricBuilder::new(metrics).spilled_rows(partition),
293        }
294    }
295}
296
297/// Metrics for tracking batch splitting activity
298#[derive(Debug, Clone)]
299pub struct SplitMetrics {
300    /// Number of times an input [`RecordBatch`] was split
301    pub batches_split: Count,
302}
303
304impl SplitMetrics {
305    /// Create a new [`SplitMetrics`]
306    pub fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self {
307        Self {
308            batches_split: MetricBuilder::new(metrics)
309                .with_category(super::MetricCategory::Rows)
310                .counter("batches_split", partition),
311        }
312    }
313}
314
315/// Trait for things that produce output rows as a result of execution.
316pub trait RecordOutput {
317    /// Record that some number of output rows have been produced
318    ///
319    /// Meant to be composable so that instead of returning `batch`
320    /// the operator can return `batch.record_output(baseline_metrics)`
321    fn record_output(self, bm: &BaselineMetrics) -> Self;
322}
323
324impl RecordOutput for usize {
325    fn record_output(self, bm: &BaselineMetrics) -> Self {
326        bm.record_output(self);
327        self
328    }
329}
330
331impl RecordOutput for RecordBatch {
332    fn record_output(self, bm: &BaselineMetrics) -> Self {
333        bm.record_output(self.num_rows());
334        let n_bytes = get_record_batch_memory_size(&self);
335        bm.output_bytes.add(n_bytes);
336        bm.output_batches.add(1);
337        self
338    }
339}
340
341impl RecordOutput for &RecordBatch {
342    fn record_output(self, bm: &BaselineMetrics) -> Self {
343        bm.record_output(self.num_rows());
344        let n_bytes = get_record_batch_memory_size(self);
345        bm.output_bytes.add(n_bytes);
346        bm.output_batches.add(1);
347        self
348    }
349}
350
351impl RecordOutput for Option<&RecordBatch> {
352    fn record_output(self, bm: &BaselineMetrics) -> Self {
353        if let Some(record_batch) = &self {
354            record_batch.record_output(bm);
355        }
356        self
357    }
358}
359
360impl RecordOutput for Option<RecordBatch> {
361    fn record_output(self, bm: &BaselineMetrics) -> Self {
362        if let Some(record_batch) = &self {
363            record_batch.record_output(bm);
364        }
365        self
366    }
367}
368
369impl RecordOutput for Result<RecordBatch> {
370    fn record_output(self, bm: &BaselineMetrics) -> Self {
371        if let Ok(record_batch) = &self {
372            record_batch.record_output(bm);
373        }
374        self
375    }
376}