datafusion_physical_plan/metrics/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Metrics for recording information about execution
19
20mod baseline;
21mod builder;
22mod custom;
23mod value;
24
25use parking_lot::Mutex;
26use std::{
27    borrow::Cow,
28    fmt::{Debug, Display},
29    sync::Arc,
30};
31
32use datafusion_common::HashMap;
33
34// public exports
35pub use baseline::{BaselineMetrics, RecordOutput, SpillMetrics, SplitMetrics};
36pub use builder::MetricBuilder;
37pub use custom::CustomMetricValue;
38pub use value::{Count, Gauge, MetricValue, ScopedTimerGuard, Time, Timestamp};
39
40/// Something that tracks a value of interest (metric) of a DataFusion
41/// [`ExecutionPlan`] execution.
42///
43/// Typically [`Metric`]s are not created directly, but instead
44/// are created using [`MetricBuilder`] or methods on
45/// [`ExecutionPlanMetricsSet`].
46///
47/// ```
48///  use datafusion_physical_plan::metrics::*;
49///
50///  let metrics = ExecutionPlanMetricsSet::new();
51///  assert!(metrics.clone_inner().output_rows().is_none());
52///
53///  // Create a counter to increment using the MetricBuilder
54///  let partition = 1;
55///  let output_rows = MetricBuilder::new(&metrics)
56///      .output_rows(partition);
57///
58///  // Counter can be incremented
59///  output_rows.add(13);
60///
61///  // The value can be retrieved directly:
62///  assert_eq!(output_rows.value(), 13);
63///
64///  // As well as from the metrics set
65///  assert_eq!(metrics.clone_inner().output_rows(), Some(13));
66/// ```
67///
68/// [`ExecutionPlan`]: super::ExecutionPlan
69
70#[derive(Debug)]
71pub struct Metric {
72    /// The value of the metric
73    value: MetricValue,
74
75    /// arbitrary name=value pairs identifying this metric
76    labels: Vec<Label>,
77
78    /// To which partition of an operators output did this metric
79    /// apply? If `None` then means all partitions.
80    partition: Option<usize>,
81}
82
83impl Display for Metric {
84    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
85        write!(f, "{}", self.value.name())?;
86
87        let mut iter = self
88            .partition
89            .iter()
90            .map(|partition| Label::new("partition", partition.to_string()))
91            .chain(self.labels().iter().cloned())
92            .peekable();
93
94        // print out the labels specially
95        if iter.peek().is_some() {
96            write!(f, "{{")?;
97
98            let mut is_first = true;
99            for i in iter {
100                if !is_first {
101                    write!(f, ", ")?;
102                } else {
103                    is_first = false;
104                }
105
106                write!(f, "{i}")?;
107            }
108
109            write!(f, "}}")?;
110        }
111
112        // and now the value
113        write!(f, "={}", self.value)
114    }
115}
116
117impl Metric {
118    /// Create a new [`Metric`]. Consider using [`MetricBuilder`]
119    /// rather than this function directly.
120    pub fn new(value: MetricValue, partition: Option<usize>) -> Self {
121        Self {
122            value,
123            labels: vec![],
124            partition,
125        }
126    }
127
128    /// Create a new [`Metric`]. Consider using [`MetricBuilder`]
129    /// rather than this function directly.
130    pub fn new_with_labels(
131        value: MetricValue,
132        partition: Option<usize>,
133        labels: Vec<Label>,
134    ) -> Self {
135        Self {
136            value,
137            labels,
138            partition,
139        }
140    }
141
142    /// Add a new label to this metric
143    pub fn with_label(mut self, label: Label) -> Self {
144        self.labels.push(label);
145        self
146    }
147
148    /// What labels are present for this metric?
149    pub fn labels(&self) -> &[Label] {
150        &self.labels
151    }
152
153    /// Return a reference to the value of this metric
154    pub fn value(&self) -> &MetricValue {
155        &self.value
156    }
157
158    /// Return a mutable reference to the value of this metric
159    pub fn value_mut(&mut self) -> &mut MetricValue {
160        &mut self.value
161    }
162
163    /// Return a reference to the partition
164    pub fn partition(&self) -> Option<usize> {
165        self.partition
166    }
167}
168
169/// A snapshot of the metrics for a particular ([`ExecutionPlan`]).
170///
171/// [`ExecutionPlan`]: super::ExecutionPlan
172#[derive(Default, Debug, Clone)]
173pub struct MetricsSet {
174    metrics: Vec<Arc<Metric>>,
175}
176
177impl MetricsSet {
178    /// Create a new container of metrics
179    pub fn new() -> Self {
180        Default::default()
181    }
182
183    /// Add the specified metric
184    pub fn push(&mut self, metric: Arc<Metric>) {
185        self.metrics.push(metric)
186    }
187
188    /// Returns an iterator across all metrics
189    pub fn iter(&self) -> impl Iterator<Item = &Arc<Metric>> {
190        self.metrics.iter()
191    }
192
193    /// Convenience: return the number of rows produced, aggregated
194    /// across partitions or `None` if no metric is present
195    pub fn output_rows(&self) -> Option<usize> {
196        self.sum(|metric| matches!(metric.value(), MetricValue::OutputRows(_)))
197            .map(|v| v.as_usize())
198    }
199
200    /// Convenience: return the count of spills, aggregated
201    /// across partitions or `None` if no metric is present
202    pub fn spill_count(&self) -> Option<usize> {
203        self.sum(|metric| matches!(metric.value(), MetricValue::SpillCount(_)))
204            .map(|v| v.as_usize())
205    }
206
207    /// Convenience: return the total byte size of spills, aggregated
208    /// across partitions or `None` if no metric is present
209    pub fn spilled_bytes(&self) -> Option<usize> {
210        self.sum(|metric| matches!(metric.value(), MetricValue::SpilledBytes(_)))
211            .map(|v| v.as_usize())
212    }
213
214    /// Convenience: return the total rows of spills, aggregated
215    /// across partitions or `None` if no metric is present
216    pub fn spilled_rows(&self) -> Option<usize> {
217        self.sum(|metric| matches!(metric.value(), MetricValue::SpilledRows(_)))
218            .map(|v| v.as_usize())
219    }
220
221    /// Convenience: return the amount of elapsed CPU time spent,
222    /// aggregated across partitions or `None` if no metric is present
223    pub fn elapsed_compute(&self) -> Option<usize> {
224        self.sum(|metric| matches!(metric.value(), MetricValue::ElapsedCompute(_)))
225            .map(|v| v.as_usize())
226    }
227
228    /// Sums the values for metrics for which `f(metric)` returns
229    /// `true`, and returns the value. Returns `None` if no metrics match
230    /// the predicate.
231    pub fn sum<F>(&self, mut f: F) -> Option<MetricValue>
232    where
233        F: FnMut(&Metric) -> bool,
234    {
235        let mut iter = self
236            .metrics
237            .iter()
238            .filter(|metric| f(metric.as_ref()))
239            .peekable();
240
241        let mut accum = match iter.peek() {
242            None => {
243                return None;
244            }
245            Some(metric) => metric.value().new_empty(),
246        };
247
248        iter.for_each(|metric| accum.aggregate(metric.value()));
249
250        Some(accum)
251    }
252
253    /// Returns the sum of all the metrics with the specified name
254    /// in the returned set.
255    pub fn sum_by_name(&self, metric_name: &str) -> Option<MetricValue> {
256        self.sum(|m| match m.value() {
257            MetricValue::Count { name, .. } => name == metric_name,
258            MetricValue::Time { name, .. } => name == metric_name,
259            MetricValue::OutputRows(_) => false,
260            MetricValue::ElapsedCompute(_) => false,
261            MetricValue::SpillCount(_) => false,
262            MetricValue::SpilledBytes(_) => false,
263            MetricValue::SpilledRows(_) => false,
264            MetricValue::CurrentMemoryUsage(_) => false,
265            MetricValue::Gauge { name, .. } => name == metric_name,
266            MetricValue::StartTimestamp(_) => false,
267            MetricValue::EndTimestamp(_) => false,
268            MetricValue::Custom { .. } => false,
269        })
270    }
271
272    /// Returns a new derived `MetricsSet` where all metrics
273    /// that had the same name have been
274    /// aggregated together. The resulting `MetricsSet` has all
275    /// metrics with `Partition=None`
276    pub fn aggregate_by_name(&self) -> Self {
277        let mut map = HashMap::new();
278
279        // There are all sorts of ways to make this more efficient
280        for metric in &self.metrics {
281            let key = metric.value.name();
282            map.entry(key)
283                .and_modify(|accum: &mut Metric| {
284                    accum.value_mut().aggregate(metric.value());
285                })
286                .or_insert_with(|| {
287                    // accumulate with no partition
288                    let partition = None;
289                    let mut accum = Metric::new(metric.value().new_empty(), partition);
290                    accum.value_mut().aggregate(metric.value());
291                    accum
292                });
293        }
294
295        let new_metrics = map
296            .into_iter()
297            .map(|(_k, v)| Arc::new(v))
298            .collect::<Vec<_>>();
299
300        Self {
301            metrics: new_metrics,
302        }
303    }
304
305    /// Sort the order of metrics so the "most useful" show up first
306    pub fn sorted_for_display(mut self) -> Self {
307        self.metrics.sort_unstable_by_key(|metric| {
308            (
309                metric.value().display_sort_key(),
310                metric.value().name().to_owned(),
311            )
312        });
313        self
314    }
315
316    /// Remove all timestamp metrics (for more compact display)
317    pub fn timestamps_removed(self) -> Self {
318        let Self { metrics } = self;
319
320        let metrics = metrics
321            .into_iter()
322            .filter(|m| !m.value.is_timestamp())
323            .collect::<Vec<_>>();
324
325        Self { metrics }
326    }
327}
328
329impl Display for MetricsSet {
330    /// Format the [`MetricsSet`] as a single string
331    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
332        let mut is_first = true;
333        for i in self.metrics.iter() {
334            if !is_first {
335                write!(f, ", ")?;
336            } else {
337                is_first = false;
338            }
339
340            write!(f, "{i}")?;
341        }
342        Ok(())
343    }
344}
345
346/// A set of [`Metric`]s for an individual "operator" (e.g. `&dyn
347/// ExecutionPlan`).
348///
349/// This structure is intended as a convenience for [`ExecutionPlan`]
350/// implementations so they can generate different streams for multiple
351/// partitions but easily report them together.
352///
353/// Each `clone()` of this structure will add metrics to the same
354/// underlying metrics set
355///
356/// [`ExecutionPlan`]: super::ExecutionPlan
357#[derive(Default, Debug, Clone)]
358pub struct ExecutionPlanMetricsSet {
359    inner: Arc<Mutex<MetricsSet>>,
360}
361
362impl ExecutionPlanMetricsSet {
363    /// Create a new empty shared metrics set
364    pub fn new() -> Self {
365        Self {
366            inner: Arc::new(Mutex::new(MetricsSet::new())),
367        }
368    }
369
370    /// Add the specified metric to the underlying metric set
371    pub fn register(&self, metric: Arc<Metric>) {
372        self.inner.lock().push(metric)
373    }
374
375    /// Return a clone of the inner [`MetricsSet`]
376    pub fn clone_inner(&self) -> MetricsSet {
377        let guard = self.inner.lock();
378        (*guard).clone()
379    }
380}
381
382/// `name=value` pairs identifying a metric. This concept is called various things
383/// in various different systems:
384///
385/// "labels" in
386/// [prometheus](https://prometheus.io/docs/concepts/data_model/) and
387/// "tags" in
388/// [InfluxDB](https://docs.influxdata.com/influxdb/v1.8/write_protocols/line_protocol_tutorial/)
389/// , "attributes" in [open
390/// telemetry]<https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/data-model.md>,
391/// etc.
392///
393/// As the name and value are expected to mostly be constant strings,
394/// use a [`Cow`] to avoid copying / allocations in this common case.
395#[derive(Debug, Clone, PartialEq, Eq, Hash)]
396pub struct Label {
397    name: Cow<'static, str>,
398    value: Cow<'static, str>,
399}
400
401impl Label {
402    /// Create a new [`Label`]
403    pub fn new(
404        name: impl Into<Cow<'static, str>>,
405        value: impl Into<Cow<'static, str>>,
406    ) -> Self {
407        let name = name.into();
408        let value = value.into();
409        Self { name, value }
410    }
411
412    /// Returns the name of this label
413    pub fn name(&self) -> &str {
414        self.name.as_ref()
415    }
416
417    /// Returns the value of this label
418    pub fn value(&self) -> &str {
419        self.value.as_ref()
420    }
421}
422
423impl Display for Label {
424    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
425        write!(f, "{}={}", self.name, self.value)
426    }
427}
428
429#[cfg(test)]
430mod tests {
431    use std::time::Duration;
432
433    use chrono::{TimeZone, Utc};
434
435    use super::*;
436
437    #[test]
438    fn test_display_no_labels_no_partition() {
439        let count = Count::new();
440        count.add(33);
441        let value = MetricValue::OutputRows(count);
442        let partition = None;
443        let metric = Metric::new(value, partition);
444
445        assert_eq!("output_rows=33", metric.to_string())
446    }
447
448    #[test]
449    fn test_display_no_labels_with_partition() {
450        let count = Count::new();
451        count.add(44);
452        let value = MetricValue::OutputRows(count);
453        let partition = Some(1);
454        let metric = Metric::new(value, partition);
455
456        assert_eq!("output_rows{partition=1}=44", metric.to_string())
457    }
458
459    #[test]
460    fn test_display_labels_no_partition() {
461        let count = Count::new();
462        count.add(55);
463        let value = MetricValue::OutputRows(count);
464        let partition = None;
465        let label = Label::new("foo", "bar");
466        let metric = Metric::new_with_labels(value, partition, vec![label]);
467
468        assert_eq!("output_rows{foo=bar}=55", metric.to_string())
469    }
470
471    #[test]
472    fn test_display_labels_and_partition() {
473        let count = Count::new();
474        count.add(66);
475        let value = MetricValue::OutputRows(count);
476        let partition = Some(2);
477        let label = Label::new("foo", "bar");
478        let metric = Metric::new_with_labels(value, partition, vec![label]);
479
480        assert_eq!("output_rows{partition=2, foo=bar}=66", metric.to_string())
481    }
482
483    #[test]
484    fn test_output_rows() {
485        let metrics = ExecutionPlanMetricsSet::new();
486        assert!(metrics.clone_inner().output_rows().is_none());
487
488        let partition = 1;
489        let output_rows = MetricBuilder::new(&metrics).output_rows(partition);
490        output_rows.add(13);
491
492        let output_rows = MetricBuilder::new(&metrics).output_rows(partition + 1);
493        output_rows.add(7);
494        assert_eq!(metrics.clone_inner().output_rows().unwrap(), 20);
495    }
496
497    #[test]
498    fn test_elapsed_compute() {
499        let metrics = ExecutionPlanMetricsSet::new();
500        assert!(metrics.clone_inner().elapsed_compute().is_none());
501
502        let partition = 1;
503        let elapsed_compute = MetricBuilder::new(&metrics).elapsed_compute(partition);
504        elapsed_compute.add_duration(Duration::from_nanos(1234));
505
506        let elapsed_compute = MetricBuilder::new(&metrics).elapsed_compute(partition + 1);
507        elapsed_compute.add_duration(Duration::from_nanos(6));
508        assert_eq!(metrics.clone_inner().elapsed_compute().unwrap(), 1240);
509    }
510
511    #[test]
512    fn test_sum() {
513        let metrics = ExecutionPlanMetricsSet::new();
514
515        let count1 = MetricBuilder::new(&metrics)
516            .with_new_label("foo", "bar")
517            .counter("my_counter", 1);
518        count1.add(1);
519
520        let count2 = MetricBuilder::new(&metrics).counter("my_counter", 2);
521        count2.add(2);
522
523        let metrics = metrics.clone_inner();
524        assert!(metrics.sum(|_| false).is_none());
525
526        let expected_count = Count::new();
527        expected_count.add(3);
528        let expected_sum = MetricValue::Count {
529            name: "my_counter".into(),
530            count: expected_count,
531        };
532
533        assert_eq!(metrics.sum(|_| true), Some(expected_sum));
534    }
535
536    #[test]
537    #[should_panic(expected = "Mismatched metric types. Can not aggregate Count")]
538    fn test_bad_sum() {
539        // can not add different kinds of metrics
540        let metrics = ExecutionPlanMetricsSet::new();
541
542        let count = MetricBuilder::new(&metrics).counter("my_metric", 1);
543        count.add(1);
544
545        let time = MetricBuilder::new(&metrics).subset_time("my_metric", 1);
546        time.add_duration(Duration::from_nanos(10));
547
548        // expect that this will error out
549        metrics.clone_inner().sum(|_| true);
550    }
551
552    #[test]
553    fn test_aggregate_by_name() {
554        let metrics = ExecutionPlanMetricsSet::new();
555
556        // Note cpu_time1 has labels but it is still aggregated with metrics 2 and 3
557        let elapsed_compute1 = MetricBuilder::new(&metrics)
558            .with_new_label("foo", "bar")
559            .elapsed_compute(1);
560        elapsed_compute1.add_duration(Duration::from_nanos(12));
561
562        let elapsed_compute2 = MetricBuilder::new(&metrics).elapsed_compute(2);
563        elapsed_compute2.add_duration(Duration::from_nanos(34));
564
565        let elapsed_compute3 = MetricBuilder::new(&metrics).elapsed_compute(4);
566        elapsed_compute3.add_duration(Duration::from_nanos(56));
567
568        let output_rows = MetricBuilder::new(&metrics).output_rows(1); // output rows
569        output_rows.add(56);
570
571        let aggregated = metrics.clone_inner().aggregate_by_name();
572
573        // cpu time should be aggregated:
574        let elapsed_computes = aggregated
575            .iter()
576            .filter(|metric| matches!(metric.value(), MetricValue::ElapsedCompute(_)))
577            .collect::<Vec<_>>();
578        assert_eq!(elapsed_computes.len(), 1);
579        assert_eq!(elapsed_computes[0].value().as_usize(), 12 + 34 + 56);
580        assert!(elapsed_computes[0].partition().is_none());
581
582        // output rows should
583        let output_rows = aggregated
584            .iter()
585            .filter(|metric| matches!(metric.value(), MetricValue::OutputRows(_)))
586            .collect::<Vec<_>>();
587        assert_eq!(output_rows.len(), 1);
588        assert_eq!(output_rows[0].value().as_usize(), 56);
589        assert!(output_rows[0].partition.is_none())
590    }
591
592    #[test]
593    #[should_panic(expected = "Mismatched metric types. Can not aggregate Count")]
594    fn test_aggregate_partition_bad_sum() {
595        let metrics = ExecutionPlanMetricsSet::new();
596
597        let count = MetricBuilder::new(&metrics).counter("my_metric", 1);
598        count.add(1);
599
600        let time = MetricBuilder::new(&metrics).subset_time("my_metric", 1);
601        time.add_duration(Duration::from_nanos(10));
602
603        // can't aggregate time and count -- expect a panic
604        metrics.clone_inner().aggregate_by_name();
605    }
606
607    #[test]
608    fn test_aggregate_partition_timestamps() {
609        let metrics = ExecutionPlanMetricsSet::new();
610
611        // 1431648000000000 == 1970-01-17 13:40:48 UTC
612        let t1 = Utc.timestamp_nanos(1431648000000000);
613        // 1531648000000000 == 1970-01-18 17:27:28 UTC
614        let t2 = Utc.timestamp_nanos(1531648000000000);
615        // 1631648000000000 == 1970-01-19 21:14:08 UTC
616        let t3 = Utc.timestamp_nanos(1631648000000000);
617        // 1731648000000000 == 1970-01-21 01:00:48 UTC
618        let t4 = Utc.timestamp_nanos(1731648000000000);
619
620        let start_timestamp0 = MetricBuilder::new(&metrics).start_timestamp(0);
621        start_timestamp0.set(t1);
622        let end_timestamp0 = MetricBuilder::new(&metrics).end_timestamp(0);
623        end_timestamp0.set(t2);
624        let start_timestamp1 = MetricBuilder::new(&metrics).start_timestamp(0);
625        start_timestamp1.set(t3);
626        let end_timestamp1 = MetricBuilder::new(&metrics).end_timestamp(0);
627        end_timestamp1.set(t4);
628
629        // aggregate
630        let aggregated = metrics.clone_inner().aggregate_by_name();
631
632        let mut ts = aggregated
633            .iter()
634            .filter(|metric| {
635                matches!(metric.value(), MetricValue::StartTimestamp(_))
636                    && metric.labels().is_empty()
637            })
638            .collect::<Vec<_>>();
639        assert_eq!(ts.len(), 1);
640        match ts.remove(0).value() {
641            MetricValue::StartTimestamp(ts) => {
642                // expect earliest of t1, t2
643                assert_eq!(ts.value(), Some(t1));
644            }
645            _ => {
646                panic!("Not a timestamp");
647            }
648        };
649
650        let mut ts = aggregated
651            .iter()
652            .filter(|metric| {
653                matches!(metric.value(), MetricValue::EndTimestamp(_))
654                    && metric.labels().is_empty()
655            })
656            .collect::<Vec<_>>();
657        assert_eq!(ts.len(), 1);
658        match ts.remove(0).value() {
659            MetricValue::EndTimestamp(ts) => {
660                // expect latest of t3, t4
661                assert_eq!(ts.value(), Some(t4));
662            }
663            _ => {
664                panic!("Not a timestamp");
665            }
666        };
667    }
668
669    #[test]
670    fn test_sorted_for_display() {
671        let metrics = ExecutionPlanMetricsSet::new();
672        MetricBuilder::new(&metrics).end_timestamp(0);
673        MetricBuilder::new(&metrics).start_timestamp(0);
674        MetricBuilder::new(&metrics).elapsed_compute(0);
675        MetricBuilder::new(&metrics).counter("the_second_counter", 0);
676        MetricBuilder::new(&metrics).counter("the_counter", 0);
677        MetricBuilder::new(&metrics).counter("the_third_counter", 0);
678        MetricBuilder::new(&metrics).subset_time("the_time", 0);
679        MetricBuilder::new(&metrics).output_rows(0);
680        let metrics = metrics.clone_inner();
681
682        fn metric_names(metrics: &MetricsSet) -> String {
683            let n = metrics.iter().map(|m| m.value().name()).collect::<Vec<_>>();
684            n.join(", ")
685        }
686
687        assert_eq!("end_timestamp, start_timestamp, elapsed_compute, the_second_counter, the_counter, the_third_counter, the_time, output_rows", metric_names(&metrics));
688
689        let metrics = metrics.sorted_for_display();
690        assert_eq!("output_rows, elapsed_compute, the_counter, the_second_counter, the_third_counter, the_time, start_timestamp, end_timestamp", metric_names(&metrics));
691    }
692}