Skip to main content

datafusion_physical_expr_common/metrics/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Metrics for recording information about execution
19
20mod baseline;
21mod builder;
22mod custom;
23mod elapsed_compute;
24mod expression;
25mod value;
26
27use datafusion_common::HashMap;
28pub use datafusion_common::format::{MetricCategory, MetricType};
29use parking_lot::Mutex;
30use std::{
31    borrow::Cow,
32    fmt::{self, Debug, Display},
33    sync::Arc,
34    vec::IntoIter,
35};
36
37// public exports
38
39pub use baseline::{BaselineMetrics, RecordOutput, SpillMetrics, SplitMetrics};
40pub use builder::MetricBuilder;
41pub use custom::CustomMetricValue;
42pub use elapsed_compute::{ElapsedComputeFuture, ElapsedComputeFutureExt};
43pub use expression::ExpressionEvaluatorMetrics;
44pub use value::{
45    Count, Gauge, MetricValue, PruningMetrics, RatioMergeStrategy, RatioMetrics,
46    ScopedTimerGuard, Time, Timestamp,
47};
48
49/// Something that tracks a value of interest (metric) during execution.
50///
51/// Typically [`Metric`]s are not created directly, but instead
52/// are created using [`MetricBuilder`] or methods on
53/// [`ExecutionPlanMetricsSet`].
54///
55/// ```
56/// use datafusion_physical_expr_common::metrics::*;
57///
58/// let metrics = ExecutionPlanMetricsSet::new();
59/// assert!(metrics.clone_inner().output_rows().is_none());
60///
61/// // Create a counter to increment using the MetricBuilder
62/// let partition = 1;
63/// let output_rows = MetricBuilder::new(&metrics).output_rows(partition);
64///
65/// // Counter can be incremented
66/// output_rows.add(13);
67///
68/// // The value can be retrieved directly:
69/// assert_eq!(output_rows.value(), 13);
70///
71/// // As well as from the metrics set
72/// assert_eq!(metrics.clone_inner().output_rows(), Some(13));
73/// ```
74
75#[derive(Debug)]
76pub struct Metric {
77    /// The value of the metric
78    value: MetricValue,
79
80    /// arbitrary name=value pairs identifying this metric
81    labels: Vec<Label>,
82
83    /// To which partition of an operators output did this metric
84    /// apply? If `None` then means all partitions.
85    partition: Option<usize>,
86
87    metric_type: MetricType,
88
89    /// Optional semantic category (rows / bytes / timing).
90    ///
91    /// When `None` (the default for custom metrics), the metric is
92    /// **always included** unless the user sets
93    /// `analyze_categories = 'none'`.
94    metric_category: Option<MetricCategory>,
95}
96
97impl Display for Metric {
98    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
99        write!(f, "{}", self.value.name())?;
100
101        let mut iter = self
102            .partition
103            .iter()
104            .map(|partition| Label::new("partition", partition.to_string()))
105            .chain(self.labels().iter().cloned())
106            .peekable();
107
108        // print out the labels specially
109        if iter.peek().is_some() {
110            write!(f, "{{")?;
111
112            let mut is_first = true;
113            for i in iter {
114                if !is_first {
115                    write!(f, ", ")?;
116                } else {
117                    is_first = false;
118                }
119
120                write!(f, "{i}")?;
121            }
122
123            write!(f, "}}")?;
124        }
125
126        // and now the value
127        write!(f, "={}", self.value)
128    }
129}
130
131impl Metric {
132    /// Create a new [`Metric`]. Consider using [`MetricBuilder`]
133    /// rather than this function directly.
134    pub fn new(value: MetricValue, partition: Option<usize>) -> Self {
135        Self {
136            value,
137            labels: vec![],
138            partition,
139            metric_type: MetricType::Dev,
140            metric_category: None,
141        }
142    }
143
144    /// Create a new [`Metric`]. Consider using [`MetricBuilder`]
145    /// rather than this function directly.
146    pub fn new_with_labels(
147        value: MetricValue,
148        partition: Option<usize>,
149        labels: Vec<Label>,
150    ) -> Self {
151        Self {
152            value,
153            labels,
154            partition,
155            metric_type: MetricType::Dev,
156            metric_category: None,
157        }
158    }
159
160    /// Set the type for this metric. Defaults to [`MetricType::Dev`]
161    pub fn with_type(mut self, metric_type: MetricType) -> Self {
162        self.metric_type = metric_type;
163        self
164    }
165
166    /// Set the semantic category for this metric.
167    ///
168    /// See [`MetricCategory`] for details on the determinism properties
169    /// of each category.
170    pub fn with_category(mut self, category: MetricCategory) -> Self {
171        self.metric_category = Some(category);
172        self
173    }
174
175    /// Add a new label to this metric
176    pub fn with_label(mut self, label: Label) -> Self {
177        self.labels.push(label);
178        self
179    }
180
181    /// What labels are present for this metric?
182    pub fn labels(&self) -> &[Label] {
183        &self.labels
184    }
185
186    /// Return a reference to the value of this metric
187    pub fn value(&self) -> &MetricValue {
188        &self.value
189    }
190
191    /// Return a mutable reference to the value of this metric
192    pub fn value_mut(&mut self) -> &mut MetricValue {
193        &mut self.value
194    }
195
196    /// Return a reference to the partition
197    pub fn partition(&self) -> Option<usize> {
198        self.partition
199    }
200
201    /// Return the metric type (verbosity level) associated with this metric
202    pub fn metric_type(&self) -> MetricType {
203        self.metric_type
204    }
205
206    /// Return the metric category, if one was declared.
207    ///
208    /// `None` means the metric is always included (except in `none` mode).
209    pub fn metric_category(&self) -> Option<MetricCategory> {
210        self.metric_category
211    }
212}
213
214/// A snapshot of the metrics for a particular execution plan.
215#[derive(Default, Debug, Clone)]
216pub struct MetricsSet {
217    metrics: Vec<Arc<Metric>>,
218}
219
220impl MetricsSet {
221    /// Create a new container of metrics
222    pub fn new() -> Self {
223        Default::default()
224    }
225
226    /// Add the specified metric
227    pub fn push(&mut self, metric: Arc<Metric>) {
228        self.metrics.push(metric)
229    }
230
231    /// Returns an iterator across all metrics
232    pub fn iter(&self) -> impl Iterator<Item = &Arc<Metric>> {
233        self.metrics.iter()
234    }
235
236    /// Convenience: return the number of rows produced, aggregated
237    /// across partitions or `None` if no metric is present
238    pub fn output_rows(&self) -> Option<usize> {
239        self.sum(|metric| matches!(metric.value(), MetricValue::OutputRows(_)))
240            .map(|v| v.as_usize())
241    }
242
243    /// Convenience: return the count of spills, aggregated
244    /// across partitions or `None` if no metric is present
245    pub fn spill_count(&self) -> Option<usize> {
246        self.sum(|metric| matches!(metric.value(), MetricValue::SpillCount(_)))
247            .map(|v| v.as_usize())
248    }
249
250    /// Convenience: return the total byte size of spills, aggregated
251    /// across partitions or `None` if no metric is present
252    pub fn spilled_bytes(&self) -> Option<usize> {
253        self.sum(|metric| matches!(metric.value(), MetricValue::SpilledBytes(_)))
254            .map(|v| v.as_usize())
255    }
256
257    /// Convenience: return the total rows of spills, aggregated
258    /// across partitions or `None` if no metric is present
259    pub fn spilled_rows(&self) -> Option<usize> {
260        self.sum(|metric| matches!(metric.value(), MetricValue::SpilledRows(_)))
261            .map(|v| v.as_usize())
262    }
263
264    /// Convenience: return the amount of elapsed CPU time spent,
265    /// aggregated across partitions or `None` if no metric is present
266    pub fn elapsed_compute(&self) -> Option<usize> {
267        self.sum(|metric| matches!(metric.value(), MetricValue::ElapsedCompute(_)))
268            .map(|v| v.as_usize())
269    }
270
271    /// Sums the values for metrics for which `f(metric)` returns
272    /// `true`, and returns the value. Returns `None` if no metrics match
273    /// the predicate.
274    pub fn sum<F>(&self, mut f: F) -> Option<MetricValue>
275    where
276        F: FnMut(&Metric) -> bool,
277    {
278        let mut iter = self
279            .metrics
280            .iter()
281            .filter(|metric| f(metric.as_ref()))
282            .peekable();
283
284        let mut accum = match iter.peek() {
285            None => {
286                return None;
287            }
288            Some(metric) => metric.value().new_empty(),
289        };
290
291        iter.for_each(|metric| accum.aggregate(metric.value()));
292
293        Some(accum)
294    }
295
296    /// Returns the sum of all the metrics with the specified name
297    /// in the returned set.
298    pub fn sum_by_name(&self, metric_name: &str) -> Option<MetricValue> {
299        self.sum(|m| match m.value() {
300            MetricValue::Count { name, .. } => name == metric_name,
301            MetricValue::Time { name, .. } => name == metric_name,
302            MetricValue::OutputRows(_) => false,
303            MetricValue::ElapsedCompute(_) => false,
304            MetricValue::SpillCount(_) => false,
305            MetricValue::SpilledBytes(_) => false,
306            MetricValue::OutputBytes(_) => false,
307            MetricValue::OutputBatches(_) => false,
308            MetricValue::SpilledRows(_) => false,
309            MetricValue::CurrentMemoryUsage(_) => false,
310            MetricValue::Gauge { name, .. } => name == metric_name,
311            MetricValue::StartTimestamp(_) => false,
312            MetricValue::EndTimestamp(_) => false,
313            MetricValue::PruningMetrics { name, .. } => name == metric_name,
314            MetricValue::Ratio { name, .. } => name == metric_name,
315            MetricValue::Custom { .. } => false,
316        })
317    }
318
319    /// Returns a new derived `MetricsSet` where all metrics
320    /// that had the same name have been
321    /// aggregated together. The resulting `MetricsSet` has all
322    /// metrics with `Partition=None`
323    pub fn aggregate_by_name(&self) -> Self {
324        let mut map = HashMap::new();
325
326        // There are all sorts of ways to make this more efficient
327        for metric in &self.metrics {
328            let key = metric.value.name();
329            map.entry(key)
330                .and_modify(|accum: &mut Metric| {
331                    accum.value_mut().aggregate(metric.value());
332                })
333                .or_insert_with(|| {
334                    // accumulate with no partition
335                    let partition = None;
336                    let mut accum = Metric::new(metric.value().new_empty(), partition)
337                        .with_type(metric.metric_type());
338                    if let Some(cat) = metric.metric_category() {
339                        accum = accum.with_category(cat);
340                    }
341                    accum.value_mut().aggregate(metric.value());
342                    accum
343                });
344        }
345
346        let new_metrics = map
347            .into_iter()
348            .map(|(_k, v)| Arc::new(v))
349            .collect::<Vec<_>>();
350
351        Self {
352            metrics: new_metrics,
353        }
354    }
355
356    /// Sort the order of metrics so the "most useful" show up first
357    pub fn sorted_for_display(mut self) -> Self {
358        self.metrics.sort_unstable_by_key(|metric| {
359            (
360                metric.value().display_sort_key(),
361                metric.value().name().to_owned(),
362            )
363        });
364        self
365    }
366
367    /// Remove all timestamp metrics (for more compact display)
368    pub fn timestamps_removed(self) -> Self {
369        let Self { metrics } = self;
370
371        let metrics = metrics
372            .into_iter()
373            .filter(|m| !m.value.is_timestamp())
374            .collect::<Vec<_>>();
375
376        Self { metrics }
377    }
378
379    /// Returns a new derived `MetricsSet` containing only metrics whose
380    /// [`MetricType`] appears in `allowed`.
381    pub fn filter_by_metric_types(self, allowed: &[MetricType]) -> Self {
382        if allowed.is_empty() {
383            return Self { metrics: vec![] };
384        }
385
386        let metrics = self
387            .metrics
388            .into_iter()
389            .filter(|metric| allowed.contains(&metric.metric_type()))
390            .collect::<Vec<_>>();
391        Self { metrics }
392    }
393
394    /// Returns a new `MetricsSet` filtered by [`MetricCategory`].
395    ///
396    /// - Metrics that declared a category are kept only when that
397    ///   category appears in `allowed`.
398    /// - Metrics with **no** declared category are treated as
399    ///   [`Uncategorized`](MetricCategory::Uncategorized) for filtering.
400    /// - An **empty** `allowed` slice means "plan only": all metrics are
401    ///   removed.
402    pub fn filter_by_categories(self, allowed: &[MetricCategory]) -> Self {
403        if allowed.is_empty() {
404            return Self { metrics: vec![] };
405        }
406
407        let metrics = self
408            .metrics
409            .into_iter()
410            .filter(|metric| {
411                let cat = metric
412                    .metric_category()
413                    .unwrap_or(MetricCategory::Uncategorized);
414                allowed.contains(&cat)
415            })
416            .collect::<Vec<_>>();
417        Self { metrics }
418    }
419}
420
421impl Display for MetricsSet {
422    /// Format the [`MetricsSet`] as a single string
423    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
424        let mut is_first = true;
425        for i in self.metrics.iter() {
426            if !is_first {
427                write!(f, ", ")?;
428            } else {
429                is_first = false;
430            }
431
432            write!(f, "{i}")?;
433        }
434        Ok(())
435    }
436}
437
438impl IntoIterator for MetricsSet {
439    type Item = Arc<Metric>;
440    type IntoIter = IntoIter<Self::Item>;
441
442    fn into_iter(self) -> Self::IntoIter {
443        self.metrics.into_iter()
444    }
445}
446
447impl<'a> IntoIterator for &'a MetricsSet {
448    type Item = &'a Arc<Metric>;
449    type IntoIter = std::slice::Iter<'a, Arc<Metric>>;
450
451    fn into_iter(self) -> Self::IntoIter {
452        self.metrics.iter()
453    }
454}
455
456impl Extend<Arc<Metric>> for MetricsSet {
457    fn extend<I: IntoIterator<Item = Arc<Metric>>>(&mut self, iter: I) {
458        self.metrics.extend(iter);
459    }
460}
461
462impl FromIterator<Arc<Metric>> for MetricsSet {
463    fn from_iter<T: IntoIterator<Item = Arc<Metric>>>(iter: T) -> Self {
464        Self {
465            metrics: iter.into_iter().collect(),
466        }
467    }
468}
469
470/// A set of [`Metric`]s for an individual operator.
471///
472/// This structure is intended as a convenience for execution plan
473/// implementations so they can generate different streams for multiple
474/// partitions but easily report them together.
475///
476/// Each `clone()` of this structure will add metrics to the same
477/// underlying metrics set
478#[derive(Default, Debug, Clone)]
479pub struct ExecutionPlanMetricsSet {
480    inner: Arc<Mutex<MetricsSet>>,
481}
482
483impl ExecutionPlanMetricsSet {
484    /// Create a new empty shared metrics set
485    pub fn new() -> Self {
486        Self {
487            inner: Arc::new(Mutex::new(MetricsSet::new())),
488        }
489    }
490
491    /// Add the specified metric to the underlying metric set
492    pub fn register(&self, metric: Arc<Metric>) {
493        self.inner.lock().push(metric)
494    }
495
496    /// Return a clone of the inner [`MetricsSet`]
497    pub fn clone_inner(&self) -> MetricsSet {
498        let guard = self.inner.lock();
499        (*guard).clone()
500    }
501}
502
503impl From<MetricsSet> for ExecutionPlanMetricsSet {
504    fn from(metrics: MetricsSet) -> Self {
505        Self {
506            inner: Arc::new(Mutex::new(metrics)),
507        }
508    }
509}
510
511/// `name=value` pairs identifying a metric. This concept is called various things
512/// in various different systems:
513///
514/// "labels" in
515/// [prometheus](https://prometheus.io/docs/concepts/data_model/) and
516/// "tags" in
517/// [InfluxDB](https://docs.influxdata.com/influxdb/v1.8/write_protocols/line_protocol_tutorial/)
518/// , "attributes" in [open
519/// telemetry]<https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/data-model.md>,
520/// etc.
521///
522/// As the name and value are expected to mostly be constant strings,
523/// use a [`Cow`] to avoid copying / allocations in this common case.
524#[derive(Debug, Clone, PartialEq, Eq, Hash)]
525pub struct Label {
526    name: Cow<'static, str>,
527    value: Cow<'static, str>,
528}
529
530impl Label {
531    /// Create a new [`Label`]
532    pub fn new(
533        name: impl Into<Cow<'static, str>>,
534        value: impl Into<Cow<'static, str>>,
535    ) -> Self {
536        let name = name.into();
537        let value = value.into();
538        Self { name, value }
539    }
540
541    /// Returns the name of this label
542    pub fn name(&self) -> &str {
543        self.name.as_ref()
544    }
545
546    /// Returns the value of this label
547    pub fn value(&self) -> &str {
548        self.value.as_ref()
549    }
550}
551
552impl Display for Label {
553    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
554        write!(f, "{}={}", self.name, self.value)
555    }
556}
557
558#[cfg(test)]
559mod tests {
560    use std::time::Duration;
561
562    use chrono::{TimeZone, Utc};
563
564    use super::*;
565
566    #[test]
567    fn test_display_no_labels_no_partition() {
568        let count = Count::new();
569        count.add(33);
570        let value = MetricValue::OutputRows(count);
571        let partition = None;
572        let metric = Metric::new(value, partition);
573
574        assert_eq!("output_rows=33", metric.to_string())
575    }
576
577    #[test]
578    fn test_display_no_labels_with_partition() {
579        let count = Count::new();
580        count.add(44);
581        let value = MetricValue::OutputRows(count);
582        let partition = Some(1);
583        let metric = Metric::new(value, partition);
584
585        assert_eq!("output_rows{partition=1}=44", metric.to_string())
586    }
587
588    #[test]
589    fn test_display_labels_no_partition() {
590        let count = Count::new();
591        count.add(55);
592        let value = MetricValue::OutputRows(count);
593        let partition = None;
594        let label = Label::new("foo", "bar");
595        let metric = Metric::new_with_labels(value, partition, vec![label]);
596
597        assert_eq!("output_rows{foo=bar}=55", metric.to_string())
598    }
599
600    #[test]
601    fn test_display_labels_and_partition() {
602        let count = Count::new();
603        count.add(66);
604        let value = MetricValue::OutputRows(count);
605        let partition = Some(2);
606        let label = Label::new("foo", "bar");
607        let metric = Metric::new_with_labels(value, partition, vec![label]);
608
609        assert_eq!("output_rows{partition=2, foo=bar}=66", metric.to_string())
610    }
611
612    #[test]
613    fn test_output_rows() {
614        let metrics = ExecutionPlanMetricsSet::new();
615        assert!(metrics.clone_inner().output_rows().is_none());
616
617        let partition = 1;
618        let output_rows = MetricBuilder::new(&metrics).output_rows(partition);
619        output_rows.add(13);
620
621        let output_rows = MetricBuilder::new(&metrics).output_rows(partition + 1);
622        output_rows.add(7);
623        assert_eq!(metrics.clone_inner().output_rows().unwrap(), 20);
624    }
625
626    #[test]
627    fn test_elapsed_compute() {
628        let metrics = ExecutionPlanMetricsSet::new();
629        assert!(metrics.clone_inner().elapsed_compute().is_none());
630
631        let partition = 1;
632        let elapsed_compute = MetricBuilder::new(&metrics).elapsed_compute(partition);
633        elapsed_compute.add_duration(Duration::from_nanos(1234));
634
635        let elapsed_compute = MetricBuilder::new(&metrics).elapsed_compute(partition + 1);
636        elapsed_compute.add_duration(Duration::from_nanos(6));
637        assert_eq!(metrics.clone_inner().elapsed_compute().unwrap(), 1240);
638    }
639
640    #[test]
641    fn test_sum() {
642        let metrics = ExecutionPlanMetricsSet::new();
643
644        let count1 = MetricBuilder::new(&metrics)
645            .with_new_label("foo", "bar")
646            .counter("my_counter", 1);
647        count1.add(1);
648
649        let count2 = MetricBuilder::new(&metrics).counter("my_counter", 2);
650        count2.add(2);
651
652        let metrics = metrics.clone_inner();
653        assert!(metrics.sum(|_| false).is_none());
654
655        let expected_count = Count::new();
656        expected_count.add(3);
657        let expected_sum = MetricValue::Count {
658            name: "my_counter".into(),
659            count: expected_count,
660        };
661
662        assert_eq!(metrics.sum(|_| true), Some(expected_sum));
663    }
664
665    #[test]
666    #[should_panic(expected = "Mismatched metric types. Can not aggregate Count")]
667    fn test_bad_sum() {
668        // can not add different kinds of metrics
669        let metrics = ExecutionPlanMetricsSet::new();
670
671        let count = MetricBuilder::new(&metrics).counter("my_metric", 1);
672        count.add(1);
673
674        let time = MetricBuilder::new(&metrics).subset_time("my_metric", 1);
675        time.add_duration(Duration::from_nanos(10));
676
677        // expect that this will error out
678        metrics.clone_inner().sum(|_| true);
679    }
680
681    #[test]
682    fn test_aggregate_by_name() {
683        let metrics = ExecutionPlanMetricsSet::new();
684
685        // Note cpu_time1 has labels but it is still aggregated with metrics 2 and 3
686        let elapsed_compute1 = MetricBuilder::new(&metrics)
687            .with_new_label("foo", "bar")
688            .elapsed_compute(1);
689        elapsed_compute1.add_duration(Duration::from_nanos(12));
690
691        let elapsed_compute2 = MetricBuilder::new(&metrics).elapsed_compute(2);
692        elapsed_compute2.add_duration(Duration::from_nanos(34));
693
694        let elapsed_compute3 = MetricBuilder::new(&metrics).elapsed_compute(4);
695        elapsed_compute3.add_duration(Duration::from_nanos(56));
696
697        let output_rows = MetricBuilder::new(&metrics).output_rows(1); // output rows
698        output_rows.add(56);
699
700        let aggregated = metrics.clone_inner().aggregate_by_name();
701
702        // cpu time should be aggregated:
703        let elapsed_computes = aggregated
704            .iter()
705            .filter(|metric| matches!(metric.value(), MetricValue::ElapsedCompute(_)))
706            .collect::<Vec<_>>();
707        assert_eq!(elapsed_computes.len(), 1);
708        assert_eq!(elapsed_computes[0].value().as_usize(), 12 + 34 + 56);
709        assert!(elapsed_computes[0].partition().is_none());
710
711        // output rows should
712        let output_rows = aggregated
713            .iter()
714            .filter(|metric| matches!(metric.value(), MetricValue::OutputRows(_)))
715            .collect::<Vec<_>>();
716        assert_eq!(output_rows.len(), 1);
717        assert_eq!(output_rows[0].value().as_usize(), 56);
718        assert!(output_rows[0].partition.is_none())
719    }
720
721    #[test]
722    #[should_panic(expected = "Mismatched metric types. Can not aggregate Count")]
723    fn test_aggregate_partition_bad_sum() {
724        let metrics = ExecutionPlanMetricsSet::new();
725
726        let count = MetricBuilder::new(&metrics).counter("my_metric", 1);
727        count.add(1);
728
729        let time = MetricBuilder::new(&metrics).subset_time("my_metric", 1);
730        time.add_duration(Duration::from_nanos(10));
731
732        // can't aggregate time and count -- expect a panic
733        metrics.clone_inner().aggregate_by_name();
734    }
735
736    #[test]
737    fn test_aggregate_partition_timestamps() {
738        let metrics = ExecutionPlanMetricsSet::new();
739
740        // 1431648000000000 == 1970-01-17 13:40:48 UTC
741        let t1 = Utc.timestamp_nanos(1431648000000000);
742        // 1531648000000000 == 1970-01-18 17:27:28 UTC
743        let t2 = Utc.timestamp_nanos(1531648000000000);
744        // 1631648000000000 == 1970-01-19 21:14:08 UTC
745        let t3 = Utc.timestamp_nanos(1631648000000000);
746        // 1731648000000000 == 1970-01-21 01:00:48 UTC
747        let t4 = Utc.timestamp_nanos(1731648000000000);
748
749        let start_timestamp0 = MetricBuilder::new(&metrics).start_timestamp(0);
750        start_timestamp0.set(t1);
751        let end_timestamp0 = MetricBuilder::new(&metrics).end_timestamp(0);
752        end_timestamp0.set(t2);
753        let start_timestamp1 = MetricBuilder::new(&metrics).start_timestamp(0);
754        start_timestamp1.set(t3);
755        let end_timestamp1 = MetricBuilder::new(&metrics).end_timestamp(0);
756        end_timestamp1.set(t4);
757
758        // aggregate
759        let aggregated = metrics.clone_inner().aggregate_by_name();
760
761        let mut ts = aggregated
762            .iter()
763            .filter(|metric| {
764                matches!(metric.value(), MetricValue::StartTimestamp(_))
765                    && metric.labels().is_empty()
766            })
767            .collect::<Vec<_>>();
768        assert_eq!(ts.len(), 1);
769        match ts.remove(0).value() {
770            MetricValue::StartTimestamp(ts) => {
771                // expect earliest of t1, t2
772                assert_eq!(ts.value(), Some(t1));
773            }
774            _ => {
775                panic!("Not a timestamp");
776            }
777        };
778
779        let mut ts = aggregated
780            .iter()
781            .filter(|metric| {
782                matches!(metric.value(), MetricValue::EndTimestamp(_))
783                    && metric.labels().is_empty()
784            })
785            .collect::<Vec<_>>();
786        assert_eq!(ts.len(), 1);
787        match ts.remove(0).value() {
788            MetricValue::EndTimestamp(ts) => {
789                // expect latest of t3, t4
790                assert_eq!(ts.value(), Some(t4));
791            }
792            _ => {
793                panic!("Not a timestamp");
794            }
795        };
796    }
797
798    #[test]
799    fn test_extend() {
800        let mut metrics = MetricsSet::new();
801        let m1 = Arc::new(Metric::new(MetricValue::OutputRows(Count::new()), None));
802        let m2 = Arc::new(Metric::new(MetricValue::SpillCount(Count::new()), None));
803
804        metrics.extend([Arc::clone(&m1), Arc::clone(&m2)]);
805        assert_eq!(metrics.iter().count(), 2);
806
807        let m3 = Arc::new(Metric::new(MetricValue::SpilledBytes(Count::new()), None));
808        metrics.extend(std::iter::once(Arc::clone(&m3)));
809        assert_eq!(metrics.iter().count(), 3);
810    }
811
812    #[test]
813    fn test_collect() {
814        let m1 = Arc::new(Metric::new(MetricValue::OutputRows(Count::new()), None));
815        let m2 = Arc::new(Metric::new(MetricValue::SpillCount(Count::new()), None));
816
817        let metrics: MetricsSet =
818            vec![Arc::clone(&m1), Arc::clone(&m2)].into_iter().collect();
819        assert_eq!(metrics.iter().count(), 2);
820
821        let empty: MetricsSet = std::iter::empty().collect();
822        assert_eq!(empty.iter().count(), 0);
823    }
824
825    #[test]
826    fn test_into_iterator_by_ref() {
827        let mut metrics = MetricsSet::new();
828        metrics.push(Arc::new(Metric::new(
829            MetricValue::OutputRows(Count::new()),
830            None,
831        )));
832        metrics.push(Arc::new(Metric::new(
833            MetricValue::SpillCount(Count::new()),
834            None,
835        )));
836
837        let mut count = 0;
838        for _m in &metrics {
839            count += 1;
840        }
841        assert_eq!(count, 2);
842    }
843
844    #[test]
845    fn test_sorted_for_display() {
846        let metrics = ExecutionPlanMetricsSet::new();
847        MetricBuilder::new(&metrics).end_timestamp(0);
848        MetricBuilder::new(&metrics).start_timestamp(0);
849        MetricBuilder::new(&metrics).elapsed_compute(0);
850        MetricBuilder::new(&metrics).counter("the_second_counter", 0);
851        MetricBuilder::new(&metrics).counter("the_counter", 0);
852        MetricBuilder::new(&metrics).counter("the_third_counter", 0);
853        MetricBuilder::new(&metrics).subset_time("the_time", 0);
854        MetricBuilder::new(&metrics).output_rows(0);
855        let metrics = metrics.clone_inner();
856
857        fn metric_names(metrics: &MetricsSet) -> String {
858            let n = metrics.iter().map(|m| m.value().name()).collect::<Vec<_>>();
859            n.join(", ")
860        }
861
862        assert_eq!(
863            "end_timestamp, start_timestamp, elapsed_compute, the_second_counter, the_counter, the_third_counter, the_time, output_rows",
864            metric_names(&metrics)
865        );
866
867        let metrics = metrics.sorted_for_display();
868        assert_eq!(
869            "output_rows, elapsed_compute, the_counter, the_second_counter, the_third_counter, the_time, start_timestamp, end_timestamp",
870            metric_names(&metrics)
871        );
872    }
873}