datafusion_physical_expr_common/metrics/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Metrics for recording information about execution
19
20mod baseline;
21mod builder;
22mod custom;
23mod expression;
24mod value;
25
26use datafusion_common::HashMap;
27use parking_lot::Mutex;
28use std::{
29    borrow::Cow,
30    fmt::{Debug, Display},
31    sync::Arc,
32};
33
34// public exports
35
36pub use baseline::{BaselineMetrics, RecordOutput, SpillMetrics, SplitMetrics};
37pub use builder::MetricBuilder;
38pub use custom::CustomMetricValue;
39pub use expression::ExpressionEvaluatorMetrics;
40pub use value::{
41    Count, Gauge, MetricValue, PruningMetrics, RatioMergeStrategy, RatioMetrics,
42    ScopedTimerGuard, Time, Timestamp,
43};
44
45/// Something that tracks a value of interest (metric) during execution.
46///
47/// Typically [`Metric`]s are not created directly, but instead
48/// are created using [`MetricBuilder`] or methods on
49/// [`ExecutionPlanMetricsSet`].
50///
51/// ```
52/// use datafusion_physical_expr_common::metrics::*;
53///
54/// let metrics = ExecutionPlanMetricsSet::new();
55/// assert!(metrics.clone_inner().output_rows().is_none());
56///
57/// // Create a counter to increment using the MetricBuilder
58/// let partition = 1;
59/// let output_rows = MetricBuilder::new(&metrics).output_rows(partition);
60///
61/// // Counter can be incremented
62/// output_rows.add(13);
63///
64/// // The value can be retrieved directly:
65/// assert_eq!(output_rows.value(), 13);
66///
67/// // As well as from the metrics set
68/// assert_eq!(metrics.clone_inner().output_rows(), Some(13));
69/// ```
70
71#[derive(Debug)]
72pub struct Metric {
73    /// The value of the metric
74    value: MetricValue,
75
76    /// arbitrary name=value pairs identifying this metric
77    labels: Vec<Label>,
78
79    /// To which partition of an operators output did this metric
80    /// apply? If `None` then means all partitions.
81    partition: Option<usize>,
82
83    metric_type: MetricType,
84}
85
86/// Categorizes metrics so the display layer can choose the desired verbosity.
87///
88/// # How is it used:
89/// The `datafusion.explain.analyze_level` configuration controls which category is shown.
90/// - When set to `dev`, all metrics with type `MetricType::Summary` or `MetricType::DEV`
91///   will be shown.
92/// - When set to `summary`, only metrics with type `MetricType::Summary` are shown.
93///
94/// # Difference from `EXPLAIN ANALYZE VERBOSE`:  
95/// The `VERBOSE` keyword controls whether per-partition metrics are shown (when specified),  
96/// or aggregated metrics are displayed (when omitted).  
97/// In contrast, the `analyze_level` configuration determines which categories or
98/// levels of metrics are displayed.
99#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
100pub enum MetricType {
101    /// Common metrics for high-level insights (answering which operator is slow)
102    SUMMARY,
103    /// For deep operator-level introspection for developers
104    DEV,
105}
106
107impl Display for Metric {
108    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
109        write!(f, "{}", self.value.name())?;
110
111        let mut iter = self
112            .partition
113            .iter()
114            .map(|partition| Label::new("partition", partition.to_string()))
115            .chain(self.labels().iter().cloned())
116            .peekable();
117
118        // print out the labels specially
119        if iter.peek().is_some() {
120            write!(f, "{{")?;
121
122            let mut is_first = true;
123            for i in iter {
124                if !is_first {
125                    write!(f, ", ")?;
126                } else {
127                    is_first = false;
128                }
129
130                write!(f, "{i}")?;
131            }
132
133            write!(f, "}}")?;
134        }
135
136        // and now the value
137        write!(f, "={}", self.value)
138    }
139}
140
141impl Metric {
142    /// Create a new [`Metric`]. Consider using [`MetricBuilder`]
143    /// rather than this function directly.
144    pub fn new(value: MetricValue, partition: Option<usize>) -> Self {
145        Self {
146            value,
147            labels: vec![],
148            partition,
149            metric_type: MetricType::DEV,
150        }
151    }
152
153    /// Create a new [`Metric`]. Consider using [`MetricBuilder`]
154    /// rather than this function directly.
155    pub fn new_with_labels(
156        value: MetricValue,
157        partition: Option<usize>,
158        labels: Vec<Label>,
159    ) -> Self {
160        Self {
161            value,
162            labels,
163            partition,
164            metric_type: MetricType::DEV,
165        }
166    }
167
168    /// Set the type for this metric. Defaults to [`MetricType::DEV`]
169    pub fn with_type(mut self, metric_type: MetricType) -> Self {
170        self.metric_type = metric_type;
171        self
172    }
173
174    /// Add a new label to this metric
175    pub fn with_label(mut self, label: Label) -> Self {
176        self.labels.push(label);
177        self
178    }
179
180    /// What labels are present for this metric?
181    pub fn labels(&self) -> &[Label] {
182        &self.labels
183    }
184
185    /// Return a reference to the value of this metric
186    pub fn value(&self) -> &MetricValue {
187        &self.value
188    }
189
190    /// Return a mutable reference to the value of this metric
191    pub fn value_mut(&mut self) -> &mut MetricValue {
192        &mut self.value
193    }
194
195    /// Return a reference to the partition
196    pub fn partition(&self) -> Option<usize> {
197        self.partition
198    }
199
200    /// Return the metric type (verbosity level) associated with this metric
201    pub fn metric_type(&self) -> MetricType {
202        self.metric_type
203    }
204}
205
206/// A snapshot of the metrics for a particular execution plan.
207#[derive(Default, Debug, Clone)]
208pub struct MetricsSet {
209    metrics: Vec<Arc<Metric>>,
210}
211
212impl MetricsSet {
213    /// Create a new container of metrics
214    pub fn new() -> Self {
215        Default::default()
216    }
217
218    /// Add the specified metric
219    pub fn push(&mut self, metric: Arc<Metric>) {
220        self.metrics.push(metric)
221    }
222
223    /// Returns an iterator across all metrics
224    pub fn iter(&self) -> impl Iterator<Item = &Arc<Metric>> {
225        self.metrics.iter()
226    }
227
228    /// Convenience: return the number of rows produced, aggregated
229    /// across partitions or `None` if no metric is present
230    pub fn output_rows(&self) -> Option<usize> {
231        self.sum(|metric| matches!(metric.value(), MetricValue::OutputRows(_)))
232            .map(|v| v.as_usize())
233    }
234
235    /// Convenience: return the count of spills, aggregated
236    /// across partitions or `None` if no metric is present
237    pub fn spill_count(&self) -> Option<usize> {
238        self.sum(|metric| matches!(metric.value(), MetricValue::SpillCount(_)))
239            .map(|v| v.as_usize())
240    }
241
242    /// Convenience: return the total byte size of spills, aggregated
243    /// across partitions or `None` if no metric is present
244    pub fn spilled_bytes(&self) -> Option<usize> {
245        self.sum(|metric| matches!(metric.value(), MetricValue::SpilledBytes(_)))
246            .map(|v| v.as_usize())
247    }
248
249    /// Convenience: return the total rows of spills, aggregated
250    /// across partitions or `None` if no metric is present
251    pub fn spilled_rows(&self) -> Option<usize> {
252        self.sum(|metric| matches!(metric.value(), MetricValue::SpilledRows(_)))
253            .map(|v| v.as_usize())
254    }
255
256    /// Convenience: return the amount of elapsed CPU time spent,
257    /// aggregated across partitions or `None` if no metric is present
258    pub fn elapsed_compute(&self) -> Option<usize> {
259        self.sum(|metric| matches!(metric.value(), MetricValue::ElapsedCompute(_)))
260            .map(|v| v.as_usize())
261    }
262
263    /// Sums the values for metrics for which `f(metric)` returns
264    /// `true`, and returns the value. Returns `None` if no metrics match
265    /// the predicate.
266    pub fn sum<F>(&self, mut f: F) -> Option<MetricValue>
267    where
268        F: FnMut(&Metric) -> bool,
269    {
270        let mut iter = self
271            .metrics
272            .iter()
273            .filter(|metric| f(metric.as_ref()))
274            .peekable();
275
276        let mut accum = match iter.peek() {
277            None => {
278                return None;
279            }
280            Some(metric) => metric.value().new_empty(),
281        };
282
283        iter.for_each(|metric| accum.aggregate(metric.value()));
284
285        Some(accum)
286    }
287
288    /// Returns the sum of all the metrics with the specified name
289    /// in the returned set.
290    pub fn sum_by_name(&self, metric_name: &str) -> Option<MetricValue> {
291        self.sum(|m| match m.value() {
292            MetricValue::Count { name, .. } => name == metric_name,
293            MetricValue::Time { name, .. } => name == metric_name,
294            MetricValue::OutputRows(_) => false,
295            MetricValue::ElapsedCompute(_) => false,
296            MetricValue::SpillCount(_) => false,
297            MetricValue::SpilledBytes(_) => false,
298            MetricValue::OutputBytes(_) => false,
299            MetricValue::OutputBatches(_) => false,
300            MetricValue::SpilledRows(_) => false,
301            MetricValue::CurrentMemoryUsage(_) => false,
302            MetricValue::Gauge { name, .. } => name == metric_name,
303            MetricValue::StartTimestamp(_) => false,
304            MetricValue::EndTimestamp(_) => false,
305            MetricValue::PruningMetrics { name, .. } => name == metric_name,
306            MetricValue::Ratio { name, .. } => name == metric_name,
307            MetricValue::Custom { .. } => false,
308        })
309    }
310
311    /// Returns a new derived `MetricsSet` where all metrics
312    /// that had the same name have been
313    /// aggregated together. The resulting `MetricsSet` has all
314    /// metrics with `Partition=None`
315    pub fn aggregate_by_name(&self) -> Self {
316        let mut map = HashMap::new();
317
318        // There are all sorts of ways to make this more efficient
319        for metric in &self.metrics {
320            let key = metric.value.name();
321            map.entry(key)
322                .and_modify(|accum: &mut Metric| {
323                    accum.value_mut().aggregate(metric.value());
324                })
325                .or_insert_with(|| {
326                    // accumulate with no partition
327                    let partition = None;
328                    let mut accum = Metric::new(metric.value().new_empty(), partition)
329                        .with_type(metric.metric_type());
330                    accum.value_mut().aggregate(metric.value());
331                    accum
332                });
333        }
334
335        let new_metrics = map
336            .into_iter()
337            .map(|(_k, v)| Arc::new(v))
338            .collect::<Vec<_>>();
339
340        Self {
341            metrics: new_metrics,
342        }
343    }
344
345    /// Sort the order of metrics so the "most useful" show up first
346    pub fn sorted_for_display(mut self) -> Self {
347        self.metrics.sort_unstable_by_key(|metric| {
348            (
349                metric.value().display_sort_key(),
350                metric.value().name().to_owned(),
351            )
352        });
353        self
354    }
355
356    /// Remove all timestamp metrics (for more compact display)
357    pub fn timestamps_removed(self) -> Self {
358        let Self { metrics } = self;
359
360        let metrics = metrics
361            .into_iter()
362            .filter(|m| !m.value.is_timestamp())
363            .collect::<Vec<_>>();
364
365        Self { metrics }
366    }
367
368    /// Returns a new derived `MetricsSet` containing only metrics whose
369    /// [`MetricType`] appears in `allowed`.
370    pub fn filter_by_metric_types(self, allowed: &[MetricType]) -> Self {
371        if allowed.is_empty() {
372            return Self { metrics: vec![] };
373        }
374
375        let metrics = self
376            .metrics
377            .into_iter()
378            .filter(|metric| allowed.contains(&metric.metric_type()))
379            .collect::<Vec<_>>();
380        Self { metrics }
381    }
382}
383
384impl Display for MetricsSet {
385    /// Format the [`MetricsSet`] as a single string
386    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
387        let mut is_first = true;
388        for i in self.metrics.iter() {
389            if !is_first {
390                write!(f, ", ")?;
391            } else {
392                is_first = false;
393            }
394
395            write!(f, "{i}")?;
396        }
397        Ok(())
398    }
399}
400
401/// A set of [`Metric`]s for an individual operator.
402///
403/// This structure is intended as a convenience for execution plan
404/// implementations so they can generate different streams for multiple
405/// partitions but easily report them together.
406///
407/// Each `clone()` of this structure will add metrics to the same
408/// underlying metrics set
409#[derive(Default, Debug, Clone)]
410pub struct ExecutionPlanMetricsSet {
411    inner: Arc<Mutex<MetricsSet>>,
412}
413
414impl ExecutionPlanMetricsSet {
415    /// Create a new empty shared metrics set
416    pub fn new() -> Self {
417        Self {
418            inner: Arc::new(Mutex::new(MetricsSet::new())),
419        }
420    }
421
422    /// Add the specified metric to the underlying metric set
423    pub fn register(&self, metric: Arc<Metric>) {
424        self.inner.lock().push(metric)
425    }
426
427    /// Return a clone of the inner [`MetricsSet`]
428    pub fn clone_inner(&self) -> MetricsSet {
429        let guard = self.inner.lock();
430        (*guard).clone()
431    }
432}
433
434/// `name=value` pairs identifying a metric. This concept is called various things
435/// in various different systems:
436///
437/// "labels" in
438/// [prometheus](https://prometheus.io/docs/concepts/data_model/) and
439/// "tags" in
440/// [InfluxDB](https://docs.influxdata.com/influxdb/v1.8/write_protocols/line_protocol_tutorial/)
441/// , "attributes" in [open
442/// telemetry]<https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/data-model.md>,
443/// etc.
444///
445/// As the name and value are expected to mostly be constant strings,
446/// use a [`Cow`] to avoid copying / allocations in this common case.
447#[derive(Debug, Clone, PartialEq, Eq, Hash)]
448pub struct Label {
449    name: Cow<'static, str>,
450    value: Cow<'static, str>,
451}
452
453impl Label {
454    /// Create a new [`Label`]
455    pub fn new(
456        name: impl Into<Cow<'static, str>>,
457        value: impl Into<Cow<'static, str>>,
458    ) -> Self {
459        let name = name.into();
460        let value = value.into();
461        Self { name, value }
462    }
463
464    /// Returns the name of this label
465    pub fn name(&self) -> &str {
466        self.name.as_ref()
467    }
468
469    /// Returns the value of this label
470    pub fn value(&self) -> &str {
471        self.value.as_ref()
472    }
473}
474
475impl Display for Label {
476    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
477        write!(f, "{}={}", self.name, self.value)
478    }
479}
480
481#[cfg(test)]
482mod tests {
483    use std::time::Duration;
484
485    use chrono::{TimeZone, Utc};
486
487    use super::*;
488
489    #[test]
490    fn test_display_no_labels_no_partition() {
491        let count = Count::new();
492        count.add(33);
493        let value = MetricValue::OutputRows(count);
494        let partition = None;
495        let metric = Metric::new(value, partition);
496
497        assert_eq!("output_rows=33", metric.to_string())
498    }
499
500    #[test]
501    fn test_display_no_labels_with_partition() {
502        let count = Count::new();
503        count.add(44);
504        let value = MetricValue::OutputRows(count);
505        let partition = Some(1);
506        let metric = Metric::new(value, partition);
507
508        assert_eq!("output_rows{partition=1}=44", metric.to_string())
509    }
510
511    #[test]
512    fn test_display_labels_no_partition() {
513        let count = Count::new();
514        count.add(55);
515        let value = MetricValue::OutputRows(count);
516        let partition = None;
517        let label = Label::new("foo", "bar");
518        let metric = Metric::new_with_labels(value, partition, vec![label]);
519
520        assert_eq!("output_rows{foo=bar}=55", metric.to_string())
521    }
522
523    #[test]
524    fn test_display_labels_and_partition() {
525        let count = Count::new();
526        count.add(66);
527        let value = MetricValue::OutputRows(count);
528        let partition = Some(2);
529        let label = Label::new("foo", "bar");
530        let metric = Metric::new_with_labels(value, partition, vec![label]);
531
532        assert_eq!("output_rows{partition=2, foo=bar}=66", metric.to_string())
533    }
534
535    #[test]
536    fn test_output_rows() {
537        let metrics = ExecutionPlanMetricsSet::new();
538        assert!(metrics.clone_inner().output_rows().is_none());
539
540        let partition = 1;
541        let output_rows = MetricBuilder::new(&metrics).output_rows(partition);
542        output_rows.add(13);
543
544        let output_rows = MetricBuilder::new(&metrics).output_rows(partition + 1);
545        output_rows.add(7);
546        assert_eq!(metrics.clone_inner().output_rows().unwrap(), 20);
547    }
548
549    #[test]
550    fn test_elapsed_compute() {
551        let metrics = ExecutionPlanMetricsSet::new();
552        assert!(metrics.clone_inner().elapsed_compute().is_none());
553
554        let partition = 1;
555        let elapsed_compute = MetricBuilder::new(&metrics).elapsed_compute(partition);
556        elapsed_compute.add_duration(Duration::from_nanos(1234));
557
558        let elapsed_compute = MetricBuilder::new(&metrics).elapsed_compute(partition + 1);
559        elapsed_compute.add_duration(Duration::from_nanos(6));
560        assert_eq!(metrics.clone_inner().elapsed_compute().unwrap(), 1240);
561    }
562
563    #[test]
564    fn test_sum() {
565        let metrics = ExecutionPlanMetricsSet::new();
566
567        let count1 = MetricBuilder::new(&metrics)
568            .with_new_label("foo", "bar")
569            .counter("my_counter", 1);
570        count1.add(1);
571
572        let count2 = MetricBuilder::new(&metrics).counter("my_counter", 2);
573        count2.add(2);
574
575        let metrics = metrics.clone_inner();
576        assert!(metrics.sum(|_| false).is_none());
577
578        let expected_count = Count::new();
579        expected_count.add(3);
580        let expected_sum = MetricValue::Count {
581            name: "my_counter".into(),
582            count: expected_count,
583        };
584
585        assert_eq!(metrics.sum(|_| true), Some(expected_sum));
586    }
587
588    #[test]
589    #[should_panic(expected = "Mismatched metric types. Can not aggregate Count")]
590    fn test_bad_sum() {
591        // can not add different kinds of metrics
592        let metrics = ExecutionPlanMetricsSet::new();
593
594        let count = MetricBuilder::new(&metrics).counter("my_metric", 1);
595        count.add(1);
596
597        let time = MetricBuilder::new(&metrics).subset_time("my_metric", 1);
598        time.add_duration(Duration::from_nanos(10));
599
600        // expect that this will error out
601        metrics.clone_inner().sum(|_| true);
602    }
603
604    #[test]
605    fn test_aggregate_by_name() {
606        let metrics = ExecutionPlanMetricsSet::new();
607
608        // Note cpu_time1 has labels but it is still aggregated with metrics 2 and 3
609        let elapsed_compute1 = MetricBuilder::new(&metrics)
610            .with_new_label("foo", "bar")
611            .elapsed_compute(1);
612        elapsed_compute1.add_duration(Duration::from_nanos(12));
613
614        let elapsed_compute2 = MetricBuilder::new(&metrics).elapsed_compute(2);
615        elapsed_compute2.add_duration(Duration::from_nanos(34));
616
617        let elapsed_compute3 = MetricBuilder::new(&metrics).elapsed_compute(4);
618        elapsed_compute3.add_duration(Duration::from_nanos(56));
619
620        let output_rows = MetricBuilder::new(&metrics).output_rows(1); // output rows
621        output_rows.add(56);
622
623        let aggregated = metrics.clone_inner().aggregate_by_name();
624
625        // cpu time should be aggregated:
626        let elapsed_computes = aggregated
627            .iter()
628            .filter(|metric| matches!(metric.value(), MetricValue::ElapsedCompute(_)))
629            .collect::<Vec<_>>();
630        assert_eq!(elapsed_computes.len(), 1);
631        assert_eq!(elapsed_computes[0].value().as_usize(), 12 + 34 + 56);
632        assert!(elapsed_computes[0].partition().is_none());
633
634        // output rows should
635        let output_rows = aggregated
636            .iter()
637            .filter(|metric| matches!(metric.value(), MetricValue::OutputRows(_)))
638            .collect::<Vec<_>>();
639        assert_eq!(output_rows.len(), 1);
640        assert_eq!(output_rows[0].value().as_usize(), 56);
641        assert!(output_rows[0].partition.is_none())
642    }
643
644    #[test]
645    #[should_panic(expected = "Mismatched metric types. Can not aggregate Count")]
646    fn test_aggregate_partition_bad_sum() {
647        let metrics = ExecutionPlanMetricsSet::new();
648
649        let count = MetricBuilder::new(&metrics).counter("my_metric", 1);
650        count.add(1);
651
652        let time = MetricBuilder::new(&metrics).subset_time("my_metric", 1);
653        time.add_duration(Duration::from_nanos(10));
654
655        // can't aggregate time and count -- expect a panic
656        metrics.clone_inner().aggregate_by_name();
657    }
658
659    #[test]
660    fn test_aggregate_partition_timestamps() {
661        let metrics = ExecutionPlanMetricsSet::new();
662
663        // 1431648000000000 == 1970-01-17 13:40:48 UTC
664        let t1 = Utc.timestamp_nanos(1431648000000000);
665        // 1531648000000000 == 1970-01-18 17:27:28 UTC
666        let t2 = Utc.timestamp_nanos(1531648000000000);
667        // 1631648000000000 == 1970-01-19 21:14:08 UTC
668        let t3 = Utc.timestamp_nanos(1631648000000000);
669        // 1731648000000000 == 1970-01-21 01:00:48 UTC
670        let t4 = Utc.timestamp_nanos(1731648000000000);
671
672        let start_timestamp0 = MetricBuilder::new(&metrics).start_timestamp(0);
673        start_timestamp0.set(t1);
674        let end_timestamp0 = MetricBuilder::new(&metrics).end_timestamp(0);
675        end_timestamp0.set(t2);
676        let start_timestamp1 = MetricBuilder::new(&metrics).start_timestamp(0);
677        start_timestamp1.set(t3);
678        let end_timestamp1 = MetricBuilder::new(&metrics).end_timestamp(0);
679        end_timestamp1.set(t4);
680
681        // aggregate
682        let aggregated = metrics.clone_inner().aggregate_by_name();
683
684        let mut ts = aggregated
685            .iter()
686            .filter(|metric| {
687                matches!(metric.value(), MetricValue::StartTimestamp(_))
688                    && metric.labels().is_empty()
689            })
690            .collect::<Vec<_>>();
691        assert_eq!(ts.len(), 1);
692        match ts.remove(0).value() {
693            MetricValue::StartTimestamp(ts) => {
694                // expect earliest of t1, t2
695                assert_eq!(ts.value(), Some(t1));
696            }
697            _ => {
698                panic!("Not a timestamp");
699            }
700        };
701
702        let mut ts = aggregated
703            .iter()
704            .filter(|metric| {
705                matches!(metric.value(), MetricValue::EndTimestamp(_))
706                    && metric.labels().is_empty()
707            })
708            .collect::<Vec<_>>();
709        assert_eq!(ts.len(), 1);
710        match ts.remove(0).value() {
711            MetricValue::EndTimestamp(ts) => {
712                // expect latest of t3, t4
713                assert_eq!(ts.value(), Some(t4));
714            }
715            _ => {
716                panic!("Not a timestamp");
717            }
718        };
719    }
720
721    #[test]
722    fn test_sorted_for_display() {
723        let metrics = ExecutionPlanMetricsSet::new();
724        MetricBuilder::new(&metrics).end_timestamp(0);
725        MetricBuilder::new(&metrics).start_timestamp(0);
726        MetricBuilder::new(&metrics).elapsed_compute(0);
727        MetricBuilder::new(&metrics).counter("the_second_counter", 0);
728        MetricBuilder::new(&metrics).counter("the_counter", 0);
729        MetricBuilder::new(&metrics).counter("the_third_counter", 0);
730        MetricBuilder::new(&metrics).subset_time("the_time", 0);
731        MetricBuilder::new(&metrics).output_rows(0);
732        let metrics = metrics.clone_inner();
733
734        fn metric_names(metrics: &MetricsSet) -> String {
735            let n = metrics.iter().map(|m| m.value().name()).collect::<Vec<_>>();
736            n.join(", ")
737        }
738
739        assert_eq!(
740            "end_timestamp, start_timestamp, elapsed_compute, the_second_counter, the_counter, the_third_counter, the_time, output_rows",
741            metric_names(&metrics)
742        );
743
744        let metrics = metrics.sorted_for_display();
745        assert_eq!(
746            "output_rows, elapsed_compute, the_counter, the_second_counter, the_third_counter, the_time, start_timestamp, end_timestamp",
747            metric_names(&metrics)
748        );
749    }
750}