datafusion_physical_plan/metrics/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Metrics for recording information about execution
19
20mod baseline;
21mod builder;
22mod custom;
23mod value;
24
25use parking_lot::Mutex;
26use std::{
27    borrow::Cow,
28    fmt::{Debug, Display},
29    sync::Arc,
30};
31
32use datafusion_common::HashMap;
33
34// public exports
35pub use baseline::{BaselineMetrics, RecordOutput, SpillMetrics, SplitMetrics};
36pub use builder::MetricBuilder;
37pub use custom::CustomMetricValue;
38pub use value::{
39    Count, Gauge, MetricValue, PruningMetrics, RatioMetrics, ScopedTimerGuard, Time,
40    Timestamp,
41};
42
43/// Something that tracks a value of interest (metric) of a DataFusion
44/// [`ExecutionPlan`] execution.
45///
46/// Typically [`Metric`]s are not created directly, but instead
47/// are created using [`MetricBuilder`] or methods on
48/// [`ExecutionPlanMetricsSet`].
49///
50/// ```
51/// use datafusion_physical_plan::metrics::*;
52///
53/// let metrics = ExecutionPlanMetricsSet::new();
54/// assert!(metrics.clone_inner().output_rows().is_none());
55///
56/// // Create a counter to increment using the MetricBuilder
57/// let partition = 1;
58/// let output_rows = MetricBuilder::new(&metrics).output_rows(partition);
59///
60/// // Counter can be incremented
61/// output_rows.add(13);
62///
63/// // The value can be retrieved directly:
64/// assert_eq!(output_rows.value(), 13);
65///
66/// // As well as from the metrics set
67/// assert_eq!(metrics.clone_inner().output_rows(), Some(13));
68/// ```
69///
70/// [`ExecutionPlan`]: super::ExecutionPlan
71
72#[derive(Debug)]
73pub struct Metric {
74    /// The value of the metric
75    value: MetricValue,
76
77    /// arbitrary name=value pairs identifying this metric
78    labels: Vec<Label>,
79
80    /// To which partition of an operators output did this metric
81    /// apply? If `None` then means all partitions.
82    partition: Option<usize>,
83
84    metric_type: MetricType,
85}
86
87/// Categorizes metrics so the display layer can choose the desired verbosity.
88///
89/// # How is it used:
90/// The `datafusion.explain.analyze_level` configuration controls which category is shown.
91/// - When set to `dev`, all metrics with type `MetricType::Summary` or `MetricType::DEV`
92///   will be shown.
93/// - When set to `summary`, only metrics with type `MetricType::Summary` are shown.
94///
95/// # Difference from `EXPLAIN ANALYZE VERBOSE`:  
96/// The `VERBOSE` keyword controls whether per-partition metrics are shown (when specified),  
97/// or aggregated metrics are displayed (when omitted).  
98/// In contrast, the `analyze_level` configuration determines which categories or
99/// levels of metrics are displayed.
100#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
101pub enum MetricType {
102    /// Common metrics for high-level insights (answering which operator is slow)
103    SUMMARY,
104    /// For deep operator-level introspection for developers
105    DEV,
106}
107
108impl Display for Metric {
109    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
110        write!(f, "{}", self.value.name())?;
111
112        let mut iter = self
113            .partition
114            .iter()
115            .map(|partition| Label::new("partition", partition.to_string()))
116            .chain(self.labels().iter().cloned())
117            .peekable();
118
119        // print out the labels specially
120        if iter.peek().is_some() {
121            write!(f, "{{")?;
122
123            let mut is_first = true;
124            for i in iter {
125                if !is_first {
126                    write!(f, ", ")?;
127                } else {
128                    is_first = false;
129                }
130
131                write!(f, "{i}")?;
132            }
133
134            write!(f, "}}")?;
135        }
136
137        // and now the value
138        write!(f, "={}", self.value)
139    }
140}
141
142impl Metric {
143    /// Create a new [`Metric`]. Consider using [`MetricBuilder`]
144    /// rather than this function directly.
145    pub fn new(value: MetricValue, partition: Option<usize>) -> Self {
146        Self {
147            value,
148            labels: vec![],
149            partition,
150            metric_type: MetricType::DEV,
151        }
152    }
153
154    /// Create a new [`Metric`]. Consider using [`MetricBuilder`]
155    /// rather than this function directly.
156    pub fn new_with_labels(
157        value: MetricValue,
158        partition: Option<usize>,
159        labels: Vec<Label>,
160    ) -> Self {
161        Self {
162            value,
163            labels,
164            partition,
165            metric_type: MetricType::DEV,
166        }
167    }
168
169    /// Set the type for this metric. Defaults to [`MetricType::DEV`]
170    pub fn with_type(mut self, metric_type: MetricType) -> Self {
171        self.metric_type = metric_type;
172        self
173    }
174
175    /// Add a new label to this metric
176    pub fn with_label(mut self, label: Label) -> Self {
177        self.labels.push(label);
178        self
179    }
180
181    /// What labels are present for this metric?
182    pub fn labels(&self) -> &[Label] {
183        &self.labels
184    }
185
186    /// Return a reference to the value of this metric
187    pub fn value(&self) -> &MetricValue {
188        &self.value
189    }
190
191    /// Return a mutable reference to the value of this metric
192    pub fn value_mut(&mut self) -> &mut MetricValue {
193        &mut self.value
194    }
195
196    /// Return a reference to the partition
197    pub fn partition(&self) -> Option<usize> {
198        self.partition
199    }
200
201    /// Return the metric type (verbosity level) associated with this metric
202    pub fn metric_type(&self) -> MetricType {
203        self.metric_type
204    }
205}
206
207/// A snapshot of the metrics for a particular ([`ExecutionPlan`]).
208///
209/// [`ExecutionPlan`]: super::ExecutionPlan
210#[derive(Default, Debug, Clone)]
211pub struct MetricsSet {
212    metrics: Vec<Arc<Metric>>,
213}
214
215impl MetricsSet {
216    /// Create a new container of metrics
217    pub fn new() -> Self {
218        Default::default()
219    }
220
221    /// Add the specified metric
222    pub fn push(&mut self, metric: Arc<Metric>) {
223        self.metrics.push(metric)
224    }
225
226    /// Returns an iterator across all metrics
227    pub fn iter(&self) -> impl Iterator<Item = &Arc<Metric>> {
228        self.metrics.iter()
229    }
230
231    /// Convenience: return the number of rows produced, aggregated
232    /// across partitions or `None` if no metric is present
233    pub fn output_rows(&self) -> Option<usize> {
234        self.sum(|metric| matches!(metric.value(), MetricValue::OutputRows(_)))
235            .map(|v| v.as_usize())
236    }
237
238    /// Convenience: return the count of spills, aggregated
239    /// across partitions or `None` if no metric is present
240    pub fn spill_count(&self) -> Option<usize> {
241        self.sum(|metric| matches!(metric.value(), MetricValue::SpillCount(_)))
242            .map(|v| v.as_usize())
243    }
244
245    /// Convenience: return the total byte size of spills, aggregated
246    /// across partitions or `None` if no metric is present
247    pub fn spilled_bytes(&self) -> Option<usize> {
248        self.sum(|metric| matches!(metric.value(), MetricValue::SpilledBytes(_)))
249            .map(|v| v.as_usize())
250    }
251
252    /// Convenience: return the total rows of spills, aggregated
253    /// across partitions or `None` if no metric is present
254    pub fn spilled_rows(&self) -> Option<usize> {
255        self.sum(|metric| matches!(metric.value(), MetricValue::SpilledRows(_)))
256            .map(|v| v.as_usize())
257    }
258
259    /// Convenience: return the amount of elapsed CPU time spent,
260    /// aggregated across partitions or `None` if no metric is present
261    pub fn elapsed_compute(&self) -> Option<usize> {
262        self.sum(|metric| matches!(metric.value(), MetricValue::ElapsedCompute(_)))
263            .map(|v| v.as_usize())
264    }
265
266    /// Sums the values for metrics for which `f(metric)` returns
267    /// `true`, and returns the value. Returns `None` if no metrics match
268    /// the predicate.
269    pub fn sum<F>(&self, mut f: F) -> Option<MetricValue>
270    where
271        F: FnMut(&Metric) -> bool,
272    {
273        let mut iter = self
274            .metrics
275            .iter()
276            .filter(|metric| f(metric.as_ref()))
277            .peekable();
278
279        let mut accum = match iter.peek() {
280            None => {
281                return None;
282            }
283            Some(metric) => metric.value().new_empty(),
284        };
285
286        iter.for_each(|metric| accum.aggregate(metric.value()));
287
288        Some(accum)
289    }
290
291    /// Returns the sum of all the metrics with the specified name
292    /// in the returned set.
293    pub fn sum_by_name(&self, metric_name: &str) -> Option<MetricValue> {
294        self.sum(|m| match m.value() {
295            MetricValue::Count { name, .. } => name == metric_name,
296            MetricValue::Time { name, .. } => name == metric_name,
297            MetricValue::OutputRows(_) => false,
298            MetricValue::ElapsedCompute(_) => false,
299            MetricValue::SpillCount(_) => false,
300            MetricValue::SpilledBytes(_) => false,
301            MetricValue::OutputBytes(_) => false,
302            MetricValue::SpilledRows(_) => false,
303            MetricValue::CurrentMemoryUsage(_) => false,
304            MetricValue::Gauge { name, .. } => name == metric_name,
305            MetricValue::StartTimestamp(_) => false,
306            MetricValue::EndTimestamp(_) => false,
307            MetricValue::PruningMetrics { name, .. } => name == metric_name,
308            MetricValue::Ratio { name, .. } => name == metric_name,
309            MetricValue::Custom { .. } => false,
310        })
311    }
312
313    /// Returns a new derived `MetricsSet` where all metrics
314    /// that had the same name have been
315    /// aggregated together. The resulting `MetricsSet` has all
316    /// metrics with `Partition=None`
317    pub fn aggregate_by_name(&self) -> Self {
318        let mut map = HashMap::new();
319
320        // There are all sorts of ways to make this more efficient
321        for metric in &self.metrics {
322            let key = metric.value.name();
323            map.entry(key)
324                .and_modify(|accum: &mut Metric| {
325                    accum.value_mut().aggregate(metric.value());
326                })
327                .or_insert_with(|| {
328                    // accumulate with no partition
329                    let partition = None;
330                    let mut accum = Metric::new(metric.value().new_empty(), partition)
331                        .with_type(metric.metric_type());
332                    accum.value_mut().aggregate(metric.value());
333                    accum
334                });
335        }
336
337        let new_metrics = map
338            .into_iter()
339            .map(|(_k, v)| Arc::new(v))
340            .collect::<Vec<_>>();
341
342        Self {
343            metrics: new_metrics,
344        }
345    }
346
347    /// Sort the order of metrics so the "most useful" show up first
348    pub fn sorted_for_display(mut self) -> Self {
349        self.metrics.sort_unstable_by_key(|metric| {
350            (
351                metric.value().display_sort_key(),
352                metric.value().name().to_owned(),
353            )
354        });
355        self
356    }
357
358    /// Remove all timestamp metrics (for more compact display)
359    pub fn timestamps_removed(self) -> Self {
360        let Self { metrics } = self;
361
362        let metrics = metrics
363            .into_iter()
364            .filter(|m| !m.value.is_timestamp())
365            .collect::<Vec<_>>();
366
367        Self { metrics }
368    }
369
370    /// Returns a new derived `MetricsSet` containing only metrics whose
371    /// [`MetricType`] appears in `allowed`.
372    pub fn filter_by_metric_types(self, allowed: &[MetricType]) -> Self {
373        if allowed.is_empty() {
374            return Self { metrics: vec![] };
375        }
376
377        let metrics = self
378            .metrics
379            .into_iter()
380            .filter(|metric| allowed.contains(&metric.metric_type()))
381            .collect::<Vec<_>>();
382        Self { metrics }
383    }
384}
385
386impl Display for MetricsSet {
387    /// Format the [`MetricsSet`] as a single string
388    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
389        let mut is_first = true;
390        for i in self.metrics.iter() {
391            if !is_first {
392                write!(f, ", ")?;
393            } else {
394                is_first = false;
395            }
396
397            write!(f, "{i}")?;
398        }
399        Ok(())
400    }
401}
402
403/// A set of [`Metric`]s for an individual "operator" (e.g. `&dyn
404/// ExecutionPlan`).
405///
406/// This structure is intended as a convenience for [`ExecutionPlan`]
407/// implementations so they can generate different streams for multiple
408/// partitions but easily report them together.
409///
410/// Each `clone()` of this structure will add metrics to the same
411/// underlying metrics set
412///
413/// [`ExecutionPlan`]: super::ExecutionPlan
414#[derive(Default, Debug, Clone)]
415pub struct ExecutionPlanMetricsSet {
416    inner: Arc<Mutex<MetricsSet>>,
417}
418
419impl ExecutionPlanMetricsSet {
420    /// Create a new empty shared metrics set
421    pub fn new() -> Self {
422        Self {
423            inner: Arc::new(Mutex::new(MetricsSet::new())),
424        }
425    }
426
427    /// Add the specified metric to the underlying metric set
428    pub fn register(&self, metric: Arc<Metric>) {
429        self.inner.lock().push(metric)
430    }
431
432    /// Return a clone of the inner [`MetricsSet`]
433    pub fn clone_inner(&self) -> MetricsSet {
434        let guard = self.inner.lock();
435        (*guard).clone()
436    }
437}
438
439/// `name=value` pairs identifying a metric. This concept is called various things
440/// in various different systems:
441///
442/// "labels" in
443/// [prometheus](https://prometheus.io/docs/concepts/data_model/) and
444/// "tags" in
445/// [InfluxDB](https://docs.influxdata.com/influxdb/v1.8/write_protocols/line_protocol_tutorial/)
446/// , "attributes" in [open
447/// telemetry]<https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/data-model.md>,
448/// etc.
449///
450/// As the name and value are expected to mostly be constant strings,
451/// use a [`Cow`] to avoid copying / allocations in this common case.
452#[derive(Debug, Clone, PartialEq, Eq, Hash)]
453pub struct Label {
454    name: Cow<'static, str>,
455    value: Cow<'static, str>,
456}
457
458impl Label {
459    /// Create a new [`Label`]
460    pub fn new(
461        name: impl Into<Cow<'static, str>>,
462        value: impl Into<Cow<'static, str>>,
463    ) -> Self {
464        let name = name.into();
465        let value = value.into();
466        Self { name, value }
467    }
468
469    /// Returns the name of this label
470    pub fn name(&self) -> &str {
471        self.name.as_ref()
472    }
473
474    /// Returns the value of this label
475    pub fn value(&self) -> &str {
476        self.value.as_ref()
477    }
478}
479
480impl Display for Label {
481    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
482        write!(f, "{}={}", self.name, self.value)
483    }
484}
485
486#[cfg(test)]
487mod tests {
488    use std::time::Duration;
489
490    use chrono::{TimeZone, Utc};
491
492    use super::*;
493
494    #[test]
495    fn test_display_no_labels_no_partition() {
496        let count = Count::new();
497        count.add(33);
498        let value = MetricValue::OutputRows(count);
499        let partition = None;
500        let metric = Metric::new(value, partition);
501
502        assert_eq!("output_rows=33", metric.to_string())
503    }
504
505    #[test]
506    fn test_display_no_labels_with_partition() {
507        let count = Count::new();
508        count.add(44);
509        let value = MetricValue::OutputRows(count);
510        let partition = Some(1);
511        let metric = Metric::new(value, partition);
512
513        assert_eq!("output_rows{partition=1}=44", metric.to_string())
514    }
515
516    #[test]
517    fn test_display_labels_no_partition() {
518        let count = Count::new();
519        count.add(55);
520        let value = MetricValue::OutputRows(count);
521        let partition = None;
522        let label = Label::new("foo", "bar");
523        let metric = Metric::new_with_labels(value, partition, vec![label]);
524
525        assert_eq!("output_rows{foo=bar}=55", metric.to_string())
526    }
527
528    #[test]
529    fn test_display_labels_and_partition() {
530        let count = Count::new();
531        count.add(66);
532        let value = MetricValue::OutputRows(count);
533        let partition = Some(2);
534        let label = Label::new("foo", "bar");
535        let metric = Metric::new_with_labels(value, partition, vec![label]);
536
537        assert_eq!("output_rows{partition=2, foo=bar}=66", metric.to_string())
538    }
539
540    #[test]
541    fn test_output_rows() {
542        let metrics = ExecutionPlanMetricsSet::new();
543        assert!(metrics.clone_inner().output_rows().is_none());
544
545        let partition = 1;
546        let output_rows = MetricBuilder::new(&metrics).output_rows(partition);
547        output_rows.add(13);
548
549        let output_rows = MetricBuilder::new(&metrics).output_rows(partition + 1);
550        output_rows.add(7);
551        assert_eq!(metrics.clone_inner().output_rows().unwrap(), 20);
552    }
553
554    #[test]
555    fn test_elapsed_compute() {
556        let metrics = ExecutionPlanMetricsSet::new();
557        assert!(metrics.clone_inner().elapsed_compute().is_none());
558
559        let partition = 1;
560        let elapsed_compute = MetricBuilder::new(&metrics).elapsed_compute(partition);
561        elapsed_compute.add_duration(Duration::from_nanos(1234));
562
563        let elapsed_compute = MetricBuilder::new(&metrics).elapsed_compute(partition + 1);
564        elapsed_compute.add_duration(Duration::from_nanos(6));
565        assert_eq!(metrics.clone_inner().elapsed_compute().unwrap(), 1240);
566    }
567
568    #[test]
569    fn test_sum() {
570        let metrics = ExecutionPlanMetricsSet::new();
571
572        let count1 = MetricBuilder::new(&metrics)
573            .with_new_label("foo", "bar")
574            .counter("my_counter", 1);
575        count1.add(1);
576
577        let count2 = MetricBuilder::new(&metrics).counter("my_counter", 2);
578        count2.add(2);
579
580        let metrics = metrics.clone_inner();
581        assert!(metrics.sum(|_| false).is_none());
582
583        let expected_count = Count::new();
584        expected_count.add(3);
585        let expected_sum = MetricValue::Count {
586            name: "my_counter".into(),
587            count: expected_count,
588        };
589
590        assert_eq!(metrics.sum(|_| true), Some(expected_sum));
591    }
592
593    #[test]
594    #[should_panic(expected = "Mismatched metric types. Can not aggregate Count")]
595    fn test_bad_sum() {
596        // can not add different kinds of metrics
597        let metrics = ExecutionPlanMetricsSet::new();
598
599        let count = MetricBuilder::new(&metrics).counter("my_metric", 1);
600        count.add(1);
601
602        let time = MetricBuilder::new(&metrics).subset_time("my_metric", 1);
603        time.add_duration(Duration::from_nanos(10));
604
605        // expect that this will error out
606        metrics.clone_inner().sum(|_| true);
607    }
608
609    #[test]
610    fn test_aggregate_by_name() {
611        let metrics = ExecutionPlanMetricsSet::new();
612
613        // Note cpu_time1 has labels but it is still aggregated with metrics 2 and 3
614        let elapsed_compute1 = MetricBuilder::new(&metrics)
615            .with_new_label("foo", "bar")
616            .elapsed_compute(1);
617        elapsed_compute1.add_duration(Duration::from_nanos(12));
618
619        let elapsed_compute2 = MetricBuilder::new(&metrics).elapsed_compute(2);
620        elapsed_compute2.add_duration(Duration::from_nanos(34));
621
622        let elapsed_compute3 = MetricBuilder::new(&metrics).elapsed_compute(4);
623        elapsed_compute3.add_duration(Duration::from_nanos(56));
624
625        let output_rows = MetricBuilder::new(&metrics).output_rows(1); // output rows
626        output_rows.add(56);
627
628        let aggregated = metrics.clone_inner().aggregate_by_name();
629
630        // cpu time should be aggregated:
631        let elapsed_computes = aggregated
632            .iter()
633            .filter(|metric| matches!(metric.value(), MetricValue::ElapsedCompute(_)))
634            .collect::<Vec<_>>();
635        assert_eq!(elapsed_computes.len(), 1);
636        assert_eq!(elapsed_computes[0].value().as_usize(), 12 + 34 + 56);
637        assert!(elapsed_computes[0].partition().is_none());
638
639        // output rows should
640        let output_rows = aggregated
641            .iter()
642            .filter(|metric| matches!(metric.value(), MetricValue::OutputRows(_)))
643            .collect::<Vec<_>>();
644        assert_eq!(output_rows.len(), 1);
645        assert_eq!(output_rows[0].value().as_usize(), 56);
646        assert!(output_rows[0].partition.is_none())
647    }
648
649    #[test]
650    #[should_panic(expected = "Mismatched metric types. Can not aggregate Count")]
651    fn test_aggregate_partition_bad_sum() {
652        let metrics = ExecutionPlanMetricsSet::new();
653
654        let count = MetricBuilder::new(&metrics).counter("my_metric", 1);
655        count.add(1);
656
657        let time = MetricBuilder::new(&metrics).subset_time("my_metric", 1);
658        time.add_duration(Duration::from_nanos(10));
659
660        // can't aggregate time and count -- expect a panic
661        metrics.clone_inner().aggregate_by_name();
662    }
663
664    #[test]
665    fn test_aggregate_partition_timestamps() {
666        let metrics = ExecutionPlanMetricsSet::new();
667
668        // 1431648000000000 == 1970-01-17 13:40:48 UTC
669        let t1 = Utc.timestamp_nanos(1431648000000000);
670        // 1531648000000000 == 1970-01-18 17:27:28 UTC
671        let t2 = Utc.timestamp_nanos(1531648000000000);
672        // 1631648000000000 == 1970-01-19 21:14:08 UTC
673        let t3 = Utc.timestamp_nanos(1631648000000000);
674        // 1731648000000000 == 1970-01-21 01:00:48 UTC
675        let t4 = Utc.timestamp_nanos(1731648000000000);
676
677        let start_timestamp0 = MetricBuilder::new(&metrics).start_timestamp(0);
678        start_timestamp0.set(t1);
679        let end_timestamp0 = MetricBuilder::new(&metrics).end_timestamp(0);
680        end_timestamp0.set(t2);
681        let start_timestamp1 = MetricBuilder::new(&metrics).start_timestamp(0);
682        start_timestamp1.set(t3);
683        let end_timestamp1 = MetricBuilder::new(&metrics).end_timestamp(0);
684        end_timestamp1.set(t4);
685
686        // aggregate
687        let aggregated = metrics.clone_inner().aggregate_by_name();
688
689        let mut ts = aggregated
690            .iter()
691            .filter(|metric| {
692                matches!(metric.value(), MetricValue::StartTimestamp(_))
693                    && metric.labels().is_empty()
694            })
695            .collect::<Vec<_>>();
696        assert_eq!(ts.len(), 1);
697        match ts.remove(0).value() {
698            MetricValue::StartTimestamp(ts) => {
699                // expect earliest of t1, t2
700                assert_eq!(ts.value(), Some(t1));
701            }
702            _ => {
703                panic!("Not a timestamp");
704            }
705        };
706
707        let mut ts = aggregated
708            .iter()
709            .filter(|metric| {
710                matches!(metric.value(), MetricValue::EndTimestamp(_))
711                    && metric.labels().is_empty()
712            })
713            .collect::<Vec<_>>();
714        assert_eq!(ts.len(), 1);
715        match ts.remove(0).value() {
716            MetricValue::EndTimestamp(ts) => {
717                // expect latest of t3, t4
718                assert_eq!(ts.value(), Some(t4));
719            }
720            _ => {
721                panic!("Not a timestamp");
722            }
723        };
724    }
725
726    #[test]
727    fn test_sorted_for_display() {
728        let metrics = ExecutionPlanMetricsSet::new();
729        MetricBuilder::new(&metrics).end_timestamp(0);
730        MetricBuilder::new(&metrics).start_timestamp(0);
731        MetricBuilder::new(&metrics).elapsed_compute(0);
732        MetricBuilder::new(&metrics).counter("the_second_counter", 0);
733        MetricBuilder::new(&metrics).counter("the_counter", 0);
734        MetricBuilder::new(&metrics).counter("the_third_counter", 0);
735        MetricBuilder::new(&metrics).subset_time("the_time", 0);
736        MetricBuilder::new(&metrics).output_rows(0);
737        let metrics = metrics.clone_inner();
738
739        fn metric_names(metrics: &MetricsSet) -> String {
740            let n = metrics.iter().map(|m| m.value().name()).collect::<Vec<_>>();
741            n.join(", ")
742        }
743
744        assert_eq!("end_timestamp, start_timestamp, elapsed_compute, the_second_counter, the_counter, the_third_counter, the_time, output_rows", metric_names(&metrics));
745
746        let metrics = metrics.sorted_for_display();
747        assert_eq!("output_rows, elapsed_compute, the_counter, the_second_counter, the_third_counter, the_time, start_timestamp, end_timestamp", metric_names(&metrics));
748    }
749}