Skip to main content

datafusion_physical_expr_common/metrics/
value.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Value representation of metrics
19
20use super::CustomMetricValue;
21use chrono::{DateTime, Utc};
22use datafusion_common::{
23    human_readable_count, human_readable_duration, human_readable_size, instant::Instant,
24};
25use parking_lot::Mutex;
26use std::{
27    borrow::{Borrow, Cow},
28    fmt::{Debug, Display},
29    sync::{
30        Arc,
31        atomic::{AtomicUsize, Ordering},
32    },
33    time::Duration,
34};
35
36/// A counter to record things such as number of input or output rows
37///
38/// Note `clone`ing counters update the same underlying metrics
39#[derive(Debug, Clone)]
40pub struct Count {
41    /// value of the metric counter
42    value: Arc<AtomicUsize>,
43}
44
45impl PartialEq for Count {
46    fn eq(&self, other: &Self) -> bool {
47        self.value().eq(&other.value())
48    }
49}
50
51impl Display for Count {
52    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
53        write!(f, "{}", human_readable_count(self.value()))
54    }
55}
56
57impl Default for Count {
58    fn default() -> Self {
59        Self::new()
60    }
61}
62
63impl Count {
64    /// create a new counter
65    pub fn new() -> Self {
66        Self {
67            value: Arc::new(AtomicUsize::new(0)),
68        }
69    }
70
71    /// Add `n` to the metric's value
72    pub fn add(&self, n: usize) {
73        // relaxed ordering for operations on `value` poses no issues
74        // we're purely using atomic ops with no associated memory ops
75        self.value.fetch_add(n, Ordering::Relaxed);
76    }
77
78    /// Get the current value
79    pub fn value(&self) -> usize {
80        self.value.load(Ordering::Relaxed)
81    }
82}
83
84/// A gauge is the simplest metrics type. It just returns a value.
85/// For example, you can easily expose current memory consumption with a gauge.
86///
87/// Note `clone`ing gauge update the same underlying metrics
88#[derive(Debug, Clone)]
89pub struct Gauge {
90    /// value of the metric gauge
91    value: Arc<AtomicUsize>,
92}
93
94impl PartialEq for Gauge {
95    fn eq(&self, other: &Self) -> bool {
96        self.value().eq(&other.value())
97    }
98}
99
100impl Display for Gauge {
101    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
102        write!(f, "{}", self.value())
103    }
104}
105
106impl Default for Gauge {
107    fn default() -> Self {
108        Self::new()
109    }
110}
111
112impl Gauge {
113    /// create a new gauge
114    pub fn new() -> Self {
115        Self {
116            value: Arc::new(AtomicUsize::new(0)),
117        }
118    }
119
120    /// Add `n` to the metric's value
121    pub fn add(&self, n: usize) {
122        // relaxed ordering for operations on `value` poses no issues
123        // we're purely using atomic ops with no associated memory ops
124        self.value.fetch_add(n, Ordering::Relaxed);
125    }
126
127    /// Sub `n` from the metric's value
128    pub fn sub(&self, n: usize) {
129        // relaxed ordering for operations on `value` poses no issues
130        // we're purely using atomic ops with no associated memory ops
131        self.value.fetch_sub(n, Ordering::Relaxed);
132    }
133
134    /// Set metric's value to maximum of `n` and current value
135    pub fn set_max(&self, n: usize) {
136        self.value.fetch_max(n, Ordering::Relaxed);
137    }
138
139    /// Set the metric's value to `n` and return the previous value
140    pub fn set(&self, n: usize) -> usize {
141        // relaxed ordering for operations on `value` poses no issues
142        // we're purely using atomic ops with no associated memory ops
143        self.value.swap(n, Ordering::Relaxed)
144    }
145
146    /// Get the current value
147    pub fn value(&self) -> usize {
148        self.value.load(Ordering::Relaxed)
149    }
150}
151
152/// Measure a potentially non contiguous duration of time
153#[derive(Debug, Clone)]
154pub struct Time {
155    /// elapsed time, in nanoseconds
156    nanos: Arc<AtomicUsize>,
157}
158
159impl Default for Time {
160    fn default() -> Self {
161        Self::new()
162    }
163}
164
165impl PartialEq for Time {
166    fn eq(&self, other: &Self) -> bool {
167        self.value().eq(&other.value())
168    }
169}
170
171impl Display for Time {
172    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
173        write!(f, "{}", human_readable_duration(self.value() as u64))
174    }
175}
176
177impl Time {
178    /// Create a new [`Time`] wrapper suitable for recording elapsed
179    /// times for operations.
180    pub fn new() -> Self {
181        Self {
182            nanos: Arc::new(AtomicUsize::new(0)),
183        }
184    }
185
186    /// Add elapsed nanoseconds since `start`to self
187    pub fn add_elapsed(&self, start: Instant) {
188        self.add_duration(start.elapsed());
189    }
190
191    /// Add duration of time to self
192    ///
193    /// Note: this will always increment the recorded time by at least 1 nanosecond
194    /// to distinguish between the scenario of no values recorded, in which
195    /// case the value will be 0, and no measurable amount of time having passed,
196    /// in which case the value will be small but not 0.
197    ///
198    /// This is based on the assumption that the timing logic in most cases is likely
199    /// to take at least a nanosecond, and so this is reasonable mechanism to avoid
200    /// ambiguity, especially on systems with low-resolution monotonic clocks
201    pub fn add_duration(&self, duration: Duration) {
202        let more_nanos = duration.as_nanos() as usize;
203        self.nanos.fetch_add(more_nanos.max(1), Ordering::Relaxed);
204    }
205
206    /// Add the number of nanoseconds of other `Time` to self
207    pub fn add(&self, other: &Time) {
208        self.add_duration(Duration::from_nanos(other.value() as u64))
209    }
210
211    /// return a scoped guard that adds the amount of time elapsed
212    /// between its creation and its drop or call to `stop` to the
213    /// underlying metric.
214    pub fn timer(&self) -> ScopedTimerGuard<'_> {
215        ScopedTimerGuard {
216            inner: self,
217            start: Some(Instant::now()),
218        }
219    }
220
221    /// Get the number of nanoseconds record by this Time metric
222    pub fn value(&self) -> usize {
223        self.nanos.load(Ordering::Relaxed)
224    }
225
226    /// Return a scoped guard that adds the amount of time elapsed between the
227    /// given instant and its drop (or the call to `stop`) to the underlying metric
228    pub fn timer_with(&self, now: Instant) -> ScopedTimerGuard<'_> {
229        ScopedTimerGuard {
230            inner: self,
231            start: Some(now),
232        }
233    }
234}
235
236/// Stores a single timestamp, stored as the number of nanoseconds
237/// elapsed from Jan 1, 1970 UTC
238#[derive(Debug, Clone)]
239pub struct Timestamp {
240    /// Time thing started
241    timestamp: Arc<Mutex<Option<DateTime<Utc>>>>,
242}
243
244impl Default for Timestamp {
245    fn default() -> Self {
246        Self::new()
247    }
248}
249
250impl Timestamp {
251    /// Create a new timestamp and sets its value to 0
252    pub fn new() -> Self {
253        Self {
254            timestamp: Arc::new(Mutex::new(None)),
255        }
256    }
257
258    /// Sets the timestamps value to the current time
259    pub fn record(&self) {
260        self.set(Utc::now())
261    }
262
263    /// Sets the timestamps value to a specified time
264    pub fn set(&self, now: DateTime<Utc>) {
265        *self.timestamp.lock() = Some(now);
266    }
267
268    /// return the timestamps value at the last time `record()` was
269    /// called.
270    ///
271    /// Returns `None` if `record()` has not been called
272    pub fn value(&self) -> Option<DateTime<Utc>> {
273        *self.timestamp.lock()
274    }
275
276    /// sets the value of this timestamp to the minimum of this and other
277    pub fn update_to_min(&self, other: &Timestamp) {
278        let min = match (self.value(), other.value()) {
279            (None, None) => None,
280            (Some(v), None) => Some(v),
281            (None, Some(v)) => Some(v),
282            (Some(v1), Some(v2)) => Some(if v1 < v2 { v1 } else { v2 }),
283        };
284
285        *self.timestamp.lock() = min;
286    }
287
288    /// sets the value of this timestamp to the maximum of this and other
289    pub fn update_to_max(&self, other: &Timestamp) {
290        let max = match (self.value(), other.value()) {
291            (None, None) => None,
292            (Some(v), None) => Some(v),
293            (None, Some(v)) => Some(v),
294            (Some(v1), Some(v2)) => Some(if v1 < v2 { v2 } else { v1 }),
295        };
296
297        *self.timestamp.lock() = max;
298    }
299}
300
301impl PartialEq for Timestamp {
302    fn eq(&self, other: &Self) -> bool {
303        self.value().eq(&other.value())
304    }
305}
306
307impl Display for Timestamp {
308    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
309        match self.value() {
310            None => write!(f, "NONE"),
311            Some(v) => {
312                write!(f, "{v}")
313            }
314        }
315    }
316}
317
318/// RAAI structure that adds all time between its construction and
319/// destruction to the CPU time or the first call to `stop` whichever
320/// comes first
321pub struct ScopedTimerGuard<'a> {
322    inner: &'a Time,
323    start: Option<Instant>,
324}
325
326impl ScopedTimerGuard<'_> {
327    /// Stop the timer timing and record the time taken
328    pub fn stop(&mut self) {
329        if let Some(start) = self.start.take() {
330            self.inner.add_elapsed(start)
331        }
332    }
333
334    /// Restarts the timer recording from the current time
335    pub fn restart(&mut self) {
336        self.start = Some(Instant::now())
337    }
338
339    /// Stop the timer, record the time taken and consume self
340    pub fn done(mut self) {
341        self.stop()
342    }
343
344    /// Stop the timer timing and record the time taken since the given endpoint.
345    pub fn stop_with(&mut self, end_time: Instant) {
346        if let Some(start) = self.start.take() {
347            let elapsed = end_time - start;
348            self.inner.add_duration(elapsed)
349        }
350    }
351
352    /// Stop the timer, record the time taken since `end_time` endpoint, and
353    /// consume self.
354    pub fn done_with(mut self, end_time: Instant) {
355        self.stop_with(end_time)
356    }
357}
358
359impl Drop for ScopedTimerGuard<'_> {
360    fn drop(&mut self) {
361        self.stop()
362    }
363}
364
365/// Counters tracking pruning metrics
366///
367/// For example, a file scanner initially is planned to scan 10 files, but skipped
368/// 8 of them using statistics, the pruning metrics would look like: 10 total -> 2 matched
369///
370/// Note `clone`ing update the same underlying metrics
371#[derive(Debug, Clone)]
372pub struct PruningMetrics {
373    pruned: Arc<AtomicUsize>,
374    matched: Arc<AtomicUsize>,
375    fully_matched: Arc<AtomicUsize>,
376}
377
378impl Display for PruningMetrics {
379    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
380        let matched = self.matched.load(Ordering::Relaxed);
381        let total = self.pruned.load(Ordering::Relaxed) + matched;
382        let fully_matched = self.fully_matched.load(Ordering::Relaxed);
383
384        if fully_matched != 0 {
385            write!(
386                f,
387                "{} total → {} matched -> {} fully matched",
388                human_readable_count(total),
389                human_readable_count(matched),
390                human_readable_count(fully_matched)
391            )
392        } else {
393            write!(
394                f,
395                "{} total → {} matched",
396                human_readable_count(total),
397                human_readable_count(matched)
398            )
399        }
400    }
401}
402
403impl Default for PruningMetrics {
404    fn default() -> Self {
405        Self::new()
406    }
407}
408
409impl PruningMetrics {
410    /// create a new PruningMetrics
411    pub fn new() -> Self {
412        Self {
413            pruned: Arc::new(AtomicUsize::new(0)),
414            matched: Arc::new(AtomicUsize::new(0)),
415            fully_matched: Arc::new(AtomicUsize::new(0)),
416        }
417    }
418
419    /// Add `n` to the metric's pruned value
420    pub fn add_pruned(&self, n: usize) {
421        // relaxed ordering for operations on `value` poses no issues
422        // we're purely using atomic ops with no associated memory ops
423        self.pruned.fetch_add(n, Ordering::Relaxed);
424    }
425
426    /// Add `n` to the metric's matched value
427    pub fn add_matched(&self, n: usize) {
428        // relaxed ordering for operations on `value` poses no issues
429        // we're purely using atomic ops with no associated memory ops
430        self.matched.fetch_add(n, Ordering::Relaxed);
431    }
432
433    /// Add `n` to the metric's fully matched value
434    pub fn add_fully_matched(&self, n: usize) {
435        // relaxed ordering for operations on `value` poses no issues
436        // we're purely using atomic ops with no associated memory ops
437        self.fully_matched.fetch_add(n, Ordering::Relaxed);
438    }
439
440    /// Subtract `n` to the metric's matched value.
441    pub fn subtract_matched(&self, n: usize) {
442        // relaxed ordering for operations on `value` poses no issues
443        // we're purely using atomic ops with no associated memory ops
444        self.matched.fetch_sub(n, Ordering::Relaxed);
445    }
446
447    /// Number of items pruned
448    pub fn pruned(&self) -> usize {
449        self.pruned.load(Ordering::Relaxed)
450    }
451
452    /// Number of items matched (not pruned)
453    pub fn matched(&self) -> usize {
454        self.matched.load(Ordering::Relaxed)
455    }
456
457    /// Number of items fully matched
458    pub fn fully_matched(&self) -> usize {
459        self.fully_matched.load(Ordering::Relaxed)
460    }
461}
462
463/// Counters tracking ratio metrics (e.g. matched vs total)
464///
465/// The counters are thread-safe and shared across clones.
466#[derive(Debug, Clone, Default)]
467pub struct RatioMetrics {
468    part: Arc<AtomicUsize>,
469    total: Arc<AtomicUsize>,
470    merge_strategy: RatioMergeStrategy,
471}
472
473#[derive(Debug, Clone, Default)]
474pub enum RatioMergeStrategy {
475    #[default]
476    AddPartAddTotal,
477    AddPartSetTotal,
478    SetPartAddTotal,
479}
480
481impl RatioMetrics {
482    /// Create a new [`RatioMetrics`]
483    pub fn new() -> Self {
484        Self {
485            part: Arc::new(AtomicUsize::new(0)),
486            total: Arc::new(AtomicUsize::new(0)),
487            merge_strategy: RatioMergeStrategy::AddPartAddTotal,
488        }
489    }
490
491    pub fn with_merge_strategy(mut self, merge_strategy: RatioMergeStrategy) -> Self {
492        self.merge_strategy = merge_strategy;
493        self
494    }
495
496    /// Add `n` to the numerator (`part`) value
497    pub fn add_part(&self, n: usize) {
498        self.part.fetch_add(n, Ordering::Relaxed);
499    }
500
501    /// Add `n` to the denominator (`total`) value
502    pub fn add_total(&self, n: usize) {
503        self.total.fetch_add(n, Ordering::Relaxed);
504    }
505
506    /// Set the numerator (`part`) value to `n`, overwriting any existing value
507    pub fn set_part(&self, n: usize) {
508        self.part.store(n, Ordering::Relaxed);
509    }
510
511    /// Set the denominator (`total`) value to `n`, overwriting any existing value
512    pub fn set_total(&self, n: usize) {
513        self.total.store(n, Ordering::Relaxed);
514    }
515
516    /// Merge the value from `other` into `self`
517    pub fn merge(&self, other: &Self) {
518        match self.merge_strategy {
519            RatioMergeStrategy::AddPartAddTotal => {
520                self.add_part(other.part());
521                self.add_total(other.total());
522            }
523            RatioMergeStrategy::AddPartSetTotal => {
524                self.add_part(other.part());
525                self.set_total(other.total());
526            }
527            RatioMergeStrategy::SetPartAddTotal => {
528                self.set_part(other.part());
529                self.add_total(other.total());
530            }
531        }
532    }
533
534    /// Return the numerator (`part`) value
535    pub fn part(&self) -> usize {
536        self.part.load(Ordering::Relaxed)
537    }
538
539    /// Return the denominator (`total`) value
540    pub fn total(&self) -> usize {
541        self.total.load(Ordering::Relaxed)
542    }
543}
544
545impl PartialEq for RatioMetrics {
546    fn eq(&self, other: &Self) -> bool {
547        self.part() == other.part() && self.total() == other.total()
548    }
549}
550
551/// Format a float number with `digits` most significant numbers.
552///
553/// fmt_significant(12.5) -> "12"
554/// fmt_significant(0.0543) -> "0.054"
555/// fmt_significant(0.000123) -> "0.00012"
556fn fmt_significant(mut x: f64, digits: usize) -> String {
557    if x == 0.0 {
558        return "0".to_string();
559    }
560
561    let exp = x.abs().log10().floor(); // exponent of first significant digit
562    let scale = 10f64.powf(-(exp - (digits as f64 - 1.0)));
563    x = (x * scale).round() / scale; // round to N significant digits
564    format!("{x}")
565}
566
567impl Display for RatioMetrics {
568    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
569        let part = self.part();
570        let total = self.total();
571
572        if total == 0 {
573            if part == 0 {
574                write!(f, "N/A (0/0)")
575            } else {
576                write!(f, "N/A ({}/0)", human_readable_count(part))
577            }
578        } else {
579            let percentage = (part as f64 / total as f64) * 100.0;
580
581            write!(
582                f,
583                "{}% ({}/{})",
584                fmt_significant(percentage, 2),
585                human_readable_count(part),
586                human_readable_count(total)
587            )
588        }
589    }
590}
591
592/// Possible values for a [super::Metric].
593///
594/// Among other differences, the metric types have different ways to
595/// logically interpret their underlying values and some metrics are
596/// so common they are given special treatment.
597#[derive(Debug, Clone)]
598pub enum MetricValue {
599    /// Number of output rows produced: "output_rows" metric
600    OutputRows(Count),
601    /// Elapsed Compute Time: the wall clock time spent in "cpu
602    /// intensive" work.
603    ///
604    /// This measurement represents, roughly:
605    /// ```
606    /// use std::time::Instant;
607    /// let start = Instant::now();
608    /// // ...CPU intensive work here...
609    /// let elapsed_compute = (Instant::now() - start).as_nanos();
610    /// ```
611    ///
612    /// Note 1: Does *not* include time other operators spend
613    /// computing input.
614    ///
615    /// Note 2: *Does* includes time when the thread could have made
616    /// progress but the OS did not schedule it (e.g. due to CPU
617    /// contention), thus making this value different than the
618    /// classical definition of "cpu_time", which is the time reported
619    /// from `clock_gettime(CLOCK_THREAD_CPUTIME_ID, ..)`.
620    ElapsedCompute(Time),
621    /// Number of spills produced: "spill_count" metric
622    SpillCount(Count),
623    /// Total size of spilled bytes produced: "spilled_bytes" metric
624    SpilledBytes(Count),
625    /// Total size of output bytes produced: "output_bytes" metric
626    OutputBytes(Count),
627    /// Total number of output batches produced: "output_batches" metric
628    OutputBatches(Count),
629    /// Total size of spilled rows produced: "spilled_rows" metric
630    SpilledRows(Count),
631    /// Current memory used
632    CurrentMemoryUsage(Gauge),
633    /// Operator defined count.
634    Count {
635        /// The provided name of this metric
636        name: Cow<'static, str>,
637        /// The value of the metric
638        count: Count,
639    },
640    /// Operator defined gauge.
641    Gauge {
642        /// The provided name of this metric
643        name: Cow<'static, str>,
644        /// The value of the metric
645        gauge: Gauge,
646    },
647    /// Operator defined time
648    Time {
649        /// The provided name of this metric
650        name: Cow<'static, str>,
651        /// The value of the metric
652        time: Time,
653    },
654    /// The time at which execution started
655    StartTimestamp(Timestamp),
656    /// The time at which execution ended
657    EndTimestamp(Timestamp),
658    /// Metrics related to scan pruning
659    PruningMetrics {
660        name: Cow<'static, str>,
661        pruning_metrics: PruningMetrics,
662    },
663    /// Metrics that should be displayed as ratio like (42%)
664    Ratio {
665        name: Cow<'static, str>,
666        ratio_metrics: RatioMetrics,
667    },
668    Custom {
669        /// The provided name of this metric
670        name: Cow<'static, str>,
671        /// A custom implementation of the metric value.
672        value: Arc<dyn CustomMetricValue>,
673    },
674}
675
676// Manually implement PartialEq for `MetricValue` because it contains CustomMetricValue in its
677// definition which is a dyn trait. This wouldn't allow us to just derive PartialEq.
678impl PartialEq for MetricValue {
679    fn eq(&self, other: &Self) -> bool {
680        match (self, other) {
681            (MetricValue::OutputRows(count), MetricValue::OutputRows(other)) => {
682                count == other
683            }
684            (MetricValue::ElapsedCompute(time), MetricValue::ElapsedCompute(other)) => {
685                time == other
686            }
687            (MetricValue::SpillCount(count), MetricValue::SpillCount(other)) => {
688                count == other
689            }
690            (MetricValue::SpilledBytes(count), MetricValue::SpilledBytes(other)) => {
691                count == other
692            }
693            (MetricValue::OutputBytes(count), MetricValue::OutputBytes(other)) => {
694                count == other
695            }
696            (MetricValue::OutputBatches(count), MetricValue::OutputBatches(other)) => {
697                count == other
698            }
699            (MetricValue::SpilledRows(count), MetricValue::SpilledRows(other)) => {
700                count == other
701            }
702            (
703                MetricValue::CurrentMemoryUsage(gauge),
704                MetricValue::CurrentMemoryUsage(other),
705            ) => gauge == other,
706            (
707                MetricValue::Count { name, count },
708                MetricValue::Count {
709                    name: other_name,
710                    count: other_count,
711                },
712            ) => name == other_name && count == other_count,
713            (
714                MetricValue::Gauge { name, gauge },
715                MetricValue::Gauge {
716                    name: other_name,
717                    gauge: other_gauge,
718                },
719            ) => name == other_name && gauge == other_gauge,
720            (
721                MetricValue::Time { name, time },
722                MetricValue::Time {
723                    name: other_name,
724                    time: other_time,
725                },
726            ) => name == other_name && time == other_time,
727
728            (
729                MetricValue::StartTimestamp(timestamp),
730                MetricValue::StartTimestamp(other),
731            ) => timestamp == other,
732            (MetricValue::EndTimestamp(timestamp), MetricValue::EndTimestamp(other)) => {
733                timestamp == other
734            }
735            (
736                MetricValue::PruningMetrics {
737                    name,
738                    pruning_metrics,
739                },
740                MetricValue::PruningMetrics {
741                    name: other_name,
742                    pruning_metrics: other_pruning_metrics,
743                },
744            ) => {
745                name == other_name
746                    && pruning_metrics.pruned() == other_pruning_metrics.pruned()
747                    && pruning_metrics.matched() == other_pruning_metrics.matched()
748            }
749            (
750                MetricValue::Ratio {
751                    name,
752                    ratio_metrics,
753                },
754                MetricValue::Ratio {
755                    name: other_name,
756                    ratio_metrics: other_ratio_metrics,
757                },
758            ) => name == other_name && ratio_metrics == other_ratio_metrics,
759            (
760                MetricValue::Custom { name, value },
761                MetricValue::Custom {
762                    name: other_name,
763                    value: other_value,
764                },
765            ) => name == other_name && value.is_eq(other_value),
766            // Default case when the two sides do not have the same type.
767            _ => false,
768        }
769    }
770}
771
772impl MetricValue {
773    /// Return the name of this SQL metric
774    pub fn name(&self) -> &str {
775        match self {
776            Self::OutputRows(_) => "output_rows",
777            Self::SpillCount(_) => "spill_count",
778            Self::SpilledBytes(_) => "spilled_bytes",
779            Self::OutputBytes(_) => "output_bytes",
780            Self::OutputBatches(_) => "output_batches",
781            Self::SpilledRows(_) => "spilled_rows",
782            Self::CurrentMemoryUsage(_) => "mem_used",
783            Self::ElapsedCompute(_) => "elapsed_compute",
784            Self::Count { name, .. } => name.borrow(),
785            Self::Gauge { name, .. } => name.borrow(),
786            Self::Time { name, .. } => name.borrow(),
787            Self::StartTimestamp(_) => "start_timestamp",
788            Self::EndTimestamp(_) => "end_timestamp",
789            Self::PruningMetrics { name, .. } => name.borrow(),
790            Self::Ratio { name, .. } => name.borrow(),
791            Self::Custom { name, .. } => name.borrow(),
792        }
793    }
794
795    /// Return the value of the metric as a usize value, used to aggregate metric
796    /// value across partitions.
797    pub fn as_usize(&self) -> usize {
798        match self {
799            Self::OutputRows(count) => count.value(),
800            Self::SpillCount(count) => count.value(),
801            Self::SpilledBytes(bytes) => bytes.value(),
802            Self::OutputBytes(bytes) => bytes.value(),
803            Self::OutputBatches(count) => count.value(),
804            Self::SpilledRows(count) => count.value(),
805            Self::CurrentMemoryUsage(used) => used.value(),
806            Self::ElapsedCompute(time) => time.value(),
807            Self::Count { count, .. } => count.value(),
808            Self::Gauge { gauge, .. } => gauge.value(),
809            Self::Time { time, .. } => time.value(),
810            Self::StartTimestamp(timestamp) => timestamp
811                .value()
812                .and_then(|ts| ts.timestamp_nanos_opt())
813                .map(|nanos| nanos as usize)
814                .unwrap_or(0),
815            Self::EndTimestamp(timestamp) => timestamp
816                .value()
817                .and_then(|ts| ts.timestamp_nanos_opt())
818                .map(|nanos| nanos as usize)
819                .unwrap_or(0),
820            // This function is a utility for aggregating metrics, for complex metric
821            // like `PruningMetrics`, this function is not supposed to get called.
822            // Metrics aggregation for them are implemented inside `MetricsSet` directly.
823            Self::PruningMetrics { .. } => 0,
824            // Should not be used. See comments in `PruningMetrics` for details.
825            Self::Ratio { .. } => 0,
826            Self::Custom { value, .. } => value.as_usize(),
827        }
828    }
829
830    /// create a new MetricValue with the same type as `self` suitable
831    /// for accumulating
832    pub fn new_empty(&self) -> Self {
833        match self {
834            Self::OutputRows(_) => Self::OutputRows(Count::new()),
835            Self::SpillCount(_) => Self::SpillCount(Count::new()),
836            Self::SpilledBytes(_) => Self::SpilledBytes(Count::new()),
837            Self::OutputBytes(_) => Self::OutputBytes(Count::new()),
838            Self::OutputBatches(_) => Self::OutputBatches(Count::new()),
839            Self::SpilledRows(_) => Self::SpilledRows(Count::new()),
840            Self::CurrentMemoryUsage(_) => Self::CurrentMemoryUsage(Gauge::new()),
841            Self::ElapsedCompute(_) => Self::ElapsedCompute(Time::new()),
842            Self::Count { name, .. } => Self::Count {
843                name: name.clone(),
844                count: Count::new(),
845            },
846            Self::Gauge { name, .. } => Self::Gauge {
847                name: name.clone(),
848                gauge: Gauge::new(),
849            },
850            Self::Time { name, .. } => Self::Time {
851                name: name.clone(),
852                time: Time::new(),
853            },
854            Self::StartTimestamp(_) => Self::StartTimestamp(Timestamp::new()),
855            Self::EndTimestamp(_) => Self::EndTimestamp(Timestamp::new()),
856            Self::PruningMetrics { name, .. } => Self::PruningMetrics {
857                name: name.clone(),
858                pruning_metrics: PruningMetrics::new(),
859            },
860            Self::Ratio {
861                name,
862                ratio_metrics,
863            } => {
864                let merge_strategy = ratio_metrics.merge_strategy.clone();
865                Self::Ratio {
866                    name: name.clone(),
867                    ratio_metrics: RatioMetrics::new()
868                        .with_merge_strategy(merge_strategy),
869                }
870            }
871            Self::Custom { name, value } => Self::Custom {
872                name: name.clone(),
873                value: value.new_empty(),
874            },
875        }
876    }
877
878    /// Aggregates the value of other to `self`. panic's if the types
879    /// are mismatched or aggregating does not make sense for this
880    /// value
881    ///
882    /// Note this is purposely marked `mut` (even though atomics are
883    /// used) so Rust's type system can be used to ensure the
884    /// appropriate API access. `MetricValues` should be modified
885    /// using the original [`Count`] or [`Time`] they were created
886    /// from.
887    pub fn aggregate(&mut self, other: &Self) {
888        match (self, other) {
889            (Self::OutputRows(count), Self::OutputRows(other_count))
890            | (Self::SpillCount(count), Self::SpillCount(other_count))
891            | (Self::SpilledBytes(count), Self::SpilledBytes(other_count))
892            | (Self::OutputBytes(count), Self::OutputBytes(other_count))
893            | (Self::OutputBatches(count), Self::OutputBatches(other_count))
894            | (Self::SpilledRows(count), Self::SpilledRows(other_count))
895            | (
896                Self::Count { count, .. },
897                Self::Count {
898                    count: other_count, ..
899                },
900            ) => count.add(other_count.value()),
901            (Self::CurrentMemoryUsage(gauge), Self::CurrentMemoryUsage(other_gauge))
902            | (
903                Self::Gauge { gauge, .. },
904                Self::Gauge {
905                    gauge: other_gauge, ..
906                },
907            ) => gauge.add(other_gauge.value()),
908            (Self::ElapsedCompute(time), Self::ElapsedCompute(other_time))
909            | (
910                Self::Time { time, .. },
911                Self::Time {
912                    time: other_time, ..
913                },
914            ) => time.add(other_time),
915            // timestamps are aggregated by min/max
916            (Self::StartTimestamp(timestamp), Self::StartTimestamp(other_timestamp)) => {
917                timestamp.update_to_min(other_timestamp);
918            }
919            // timestamps are aggregated by min/max
920            (Self::EndTimestamp(timestamp), Self::EndTimestamp(other_timestamp)) => {
921                timestamp.update_to_max(other_timestamp);
922            }
923            (
924                Self::PruningMetrics {
925                    pruning_metrics, ..
926                },
927                Self::PruningMetrics {
928                    pruning_metrics: other_pruning_metrics,
929                    ..
930                },
931            ) => {
932                let pruned = other_pruning_metrics.pruned.load(Ordering::Relaxed);
933                let matched = other_pruning_metrics.matched.load(Ordering::Relaxed);
934                let fully_matched =
935                    other_pruning_metrics.fully_matched.load(Ordering::Relaxed);
936                pruning_metrics.add_pruned(pruned);
937                pruning_metrics.add_matched(matched);
938                pruning_metrics.add_fully_matched(fully_matched);
939            }
940            (
941                Self::Ratio { ratio_metrics, .. },
942                Self::Ratio {
943                    ratio_metrics: other_ratio_metrics,
944                    ..
945                },
946            ) => {
947                ratio_metrics.merge(other_ratio_metrics);
948            }
949            (
950                Self::Custom { value, .. },
951                Self::Custom {
952                    value: other_value, ..
953                },
954            ) => {
955                value.aggregate(Arc::clone(other_value));
956            }
957            m @ (_, _) => {
958                panic!(
959                    "Mismatched metric types. Can not aggregate {:?} with value {:?}",
960                    m.0, m.1
961                )
962            }
963        }
964    }
965
966    /// Returns a number by which to sort metrics by display. Lower
967    /// numbers are "more useful" (and displayed first)
968    pub fn display_sort_key(&self) -> u8 {
969        match self {
970            // `BaselineMetrics` that is common for most operators
971            Self::OutputRows(_) => 0,
972            Self::ElapsedCompute(_) => 1,
973            Self::OutputBytes(_) => 2,
974            Self::OutputBatches(_) => 3,
975            // Other metrics
976            Self::PruningMetrics { name, .. } => match name.as_ref() {
977                // The following metrics belong to `DataSourceExec` with a Parquet data source.
978                // They are displayed in a specific order that reflects the actual pruning process,
979                // from coarse-grained to fine-grained pruning levels.
980                //
981                // You may update these metrics as long as their relative order remains unchanged.
982                //
983                // Reference PR: <https://github.com/apache/datafusion/pull/18379>
984                "files_ranges_pruned_statistics" => 4,
985                "row_groups_pruned_statistics" => 5,
986                "row_groups_pruned_bloom_filter" => 6,
987                "page_index_pages_pruned" => 7,
988                "page_index_rows_pruned" => 8,
989                _ => 9,
990            },
991            Self::SpillCount(_) => 10,
992            Self::SpilledBytes(_) => 11,
993            Self::SpilledRows(_) => 12,
994            Self::CurrentMemoryUsage(_) => 13,
995            Self::Count { .. } => 14,
996            Self::Gauge { .. } => 15,
997            Self::Time { .. } => 16,
998            Self::Ratio { .. } => 17,
999            Self::StartTimestamp(_) => 18, // show timestamps last
1000            Self::EndTimestamp(_) => 19,
1001            Self::Custom { .. } => 20,
1002        }
1003    }
1004
1005    /// returns true if this metric has a timestamp value
1006    pub fn is_timestamp(&self) -> bool {
1007        matches!(self, Self::StartTimestamp(_) | Self::EndTimestamp(_))
1008    }
1009}
1010
1011impl Display for MetricValue {
1012    /// Prints the value of this metric
1013    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
1014        match self {
1015            Self::OutputRows(count)
1016            | Self::OutputBatches(count)
1017            | Self::SpillCount(count)
1018            | Self::SpilledRows(count)
1019            | Self::Count { count, .. } => {
1020                write!(f, "{count}")
1021            }
1022            Self::SpilledBytes(count) | Self::OutputBytes(count) => {
1023                let readable_count = human_readable_size(count.value());
1024                write!(f, "{readable_count}")
1025            }
1026            Self::CurrentMemoryUsage(gauge) => {
1027                // CurrentMemoryUsage is in bytes, format like SpilledBytes
1028                let readable_size = human_readable_size(gauge.value());
1029                write!(f, "{readable_size}")
1030            }
1031            Self::Gauge { gauge, .. } => {
1032                // Generic gauge metrics - format with human-readable count
1033                write!(f, "{}", human_readable_count(gauge.value()))
1034            }
1035            Self::ElapsedCompute(time) | Self::Time { time, .. } => {
1036                // distinguish between no time recorded and very small
1037                // amount of time recorded
1038                if time.value() > 0 {
1039                    write!(f, "{time}")
1040                } else {
1041                    write!(f, "NOT RECORDED")
1042                }
1043            }
1044            Self::StartTimestamp(timestamp) | Self::EndTimestamp(timestamp) => {
1045                write!(f, "{timestamp}")
1046            }
1047            Self::PruningMetrics {
1048                pruning_metrics, ..
1049            } => {
1050                write!(f, "{pruning_metrics}")
1051            }
1052            Self::Ratio { ratio_metrics, .. } => write!(f, "{ratio_metrics}"),
1053            Self::Custom { name, value } => {
1054                write!(f, "name:{name} {value}")
1055            }
1056        }
1057    }
1058}
1059
1060#[cfg(test)]
1061mod tests {
1062    use std::any::Any;
1063
1064    use chrono::TimeZone;
1065    use datafusion_common::units::MB;
1066
1067    use super::*;
1068
1069    #[derive(Debug, Default)]
1070    pub struct CustomCounter {
1071        count: AtomicUsize,
1072    }
1073
1074    impl Display for CustomCounter {
1075        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1076            write!(f, "count: {}", self.count.load(Ordering::Relaxed))
1077        }
1078    }
1079
1080    impl CustomMetricValue for CustomCounter {
1081        fn new_empty(&self) -> Arc<dyn CustomMetricValue> {
1082            Arc::new(CustomCounter::default())
1083        }
1084
1085        fn aggregate(&self, other: Arc<dyn CustomMetricValue + 'static>) {
1086            let other = other.as_any().downcast_ref::<Self>().unwrap();
1087            self.count
1088                .fetch_add(other.count.load(Ordering::Relaxed), Ordering::Relaxed);
1089        }
1090
1091        fn as_any(&self) -> &dyn Any {
1092            self
1093        }
1094
1095        fn is_eq(&self, other: &Arc<dyn CustomMetricValue>) -> bool {
1096            let Some(other) = other.as_any().downcast_ref::<Self>() else {
1097                return false;
1098            };
1099
1100            self.count.load(Ordering::Relaxed) == other.count.load(Ordering::Relaxed)
1101        }
1102    }
1103
1104    fn new_custom_counter(name: &'static str, value: usize) -> MetricValue {
1105        let custom_counter = CustomCounter::default();
1106        custom_counter.count.fetch_add(value, Ordering::Relaxed);
1107
1108        MetricValue::Custom {
1109            name: Cow::Borrowed(name),
1110            value: Arc::new(custom_counter),
1111        }
1112    }
1113
1114    #[test]
1115    fn test_custom_metric_with_mismatching_names() {
1116        let mut custom_val = new_custom_counter("Hi", 1);
1117        let other_custom_val = new_custom_counter("Hello", 1);
1118
1119        // Not equal since the name differs.
1120        assert!(other_custom_val != custom_val);
1121
1122        // Should work even though the name differs
1123        custom_val.aggregate(&other_custom_val);
1124
1125        let expected_val = new_custom_counter("Hi", 2);
1126        assert!(expected_val == custom_val);
1127    }
1128
1129    #[test]
1130    fn test_custom_metric() {
1131        let mut custom_val = new_custom_counter("hi", 11);
1132        let other_custom_val = new_custom_counter("hi", 20);
1133
1134        custom_val.aggregate(&other_custom_val);
1135
1136        assert!(custom_val != other_custom_val);
1137
1138        if let MetricValue::Custom { value, .. } = custom_val {
1139            let counter = value
1140                .as_any()
1141                .downcast_ref::<CustomCounter>()
1142                .expect("Expected CustomCounter");
1143            assert_eq!(counter.count.load(Ordering::Relaxed), 31);
1144        } else {
1145            panic!("Unexpected value");
1146        }
1147    }
1148
1149    #[test]
1150    fn test_display_output_rows() {
1151        let count = Count::new();
1152        let values = vec![
1153            MetricValue::OutputRows(count.clone()),
1154            MetricValue::Count {
1155                name: "my_counter".into(),
1156                count: count.clone(),
1157            },
1158        ];
1159
1160        for value in &values {
1161            assert_eq!("0", value.to_string(), "value {value:?}");
1162        }
1163
1164        count.add(42);
1165        for value in &values {
1166            assert_eq!("42", value.to_string(), "value {value:?}");
1167        }
1168    }
1169
1170    #[test]
1171    fn test_display_spilled_bytes() {
1172        let count = Count::new();
1173        let spilled_byte = MetricValue::SpilledBytes(count.clone());
1174
1175        assert_eq!("0.0 B", spilled_byte.to_string());
1176
1177        count.add((100 * MB) as usize);
1178        assert_eq!("100.0 MB", spilled_byte.to_string());
1179
1180        count.add((0.5 * MB as f64) as usize);
1181        assert_eq!("100.5 MB", spilled_byte.to_string());
1182    }
1183
1184    #[test]
1185    fn test_display_time() {
1186        let time = Time::new();
1187        let values = vec![
1188            MetricValue::ElapsedCompute(time.clone()),
1189            MetricValue::Time {
1190                name: "my_time".into(),
1191                time: time.clone(),
1192            },
1193        ];
1194
1195        // if time is not set, it should not be reported as zero
1196        for value in &values {
1197            assert_eq!("NOT RECORDED", value.to_string(), "value {value:?}");
1198        }
1199
1200        time.add_duration(Duration::from_nanos(1042));
1201        for value in &values {
1202            assert_eq!("1.04µs", value.to_string(), "value {value:?}");
1203        }
1204    }
1205
1206    #[test]
1207    fn test_display_ratio() {
1208        let ratio_metrics = RatioMetrics::new();
1209        let ratio = MetricValue::Ratio {
1210            name: Cow::Borrowed("ratio_metric"),
1211            ratio_metrics: ratio_metrics.clone(),
1212        };
1213
1214        assert_eq!("N/A (0/0)", ratio.to_string());
1215
1216        ratio_metrics.add_part(10);
1217        assert_eq!("N/A (10/0)", ratio.to_string());
1218
1219        ratio_metrics.add_total(40);
1220        assert_eq!("25% (10/40)", ratio.to_string());
1221
1222        let tiny_ratio_metrics = RatioMetrics::new();
1223        let tiny_ratio = MetricValue::Ratio {
1224            name: Cow::Borrowed("tiny_ratio_metric"),
1225            ratio_metrics: tiny_ratio_metrics.clone(),
1226        };
1227        tiny_ratio_metrics.add_part(1);
1228        tiny_ratio_metrics.add_total(3000);
1229        assert_eq!("0.033% (1/3.00 K)", tiny_ratio.to_string());
1230    }
1231
1232    #[test]
1233    fn test_ratio_set_methods() {
1234        let ratio_metrics = RatioMetrics::new();
1235
1236        // Ensure set methods don't increment
1237        ratio_metrics.set_part(10);
1238        ratio_metrics.set_part(10);
1239        ratio_metrics.set_total(40);
1240        ratio_metrics.set_total(40);
1241        assert_eq!("25% (10/40)", ratio_metrics.to_string());
1242
1243        let ratio_metrics = RatioMetrics::new();
1244
1245        // Calling set should change the value
1246        ratio_metrics.set_part(10);
1247        ratio_metrics.set_part(30);
1248        ratio_metrics.set_total(40);
1249        ratio_metrics.set_total(50);
1250        assert_eq!("60% (30/50)", ratio_metrics.to_string());
1251    }
1252
1253    #[test]
1254    fn test_ratio_merge_strategy() {
1255        // Test AddPartSetTotal strategy
1256        let ratio_metrics1 =
1257            RatioMetrics::new().with_merge_strategy(RatioMergeStrategy::AddPartSetTotal);
1258
1259        ratio_metrics1.set_part(10);
1260        ratio_metrics1.set_total(40);
1261        assert_eq!("25% (10/40)", ratio_metrics1.to_string());
1262        let ratio_metrics2 =
1263            RatioMetrics::new().with_merge_strategy(RatioMergeStrategy::AddPartSetTotal);
1264        ratio_metrics2.set_part(20);
1265        ratio_metrics2.set_total(40);
1266        assert_eq!("50% (20/40)", ratio_metrics2.to_string());
1267
1268        ratio_metrics1.merge(&ratio_metrics2);
1269        assert_eq!("75% (30/40)", ratio_metrics1.to_string());
1270
1271        // Test SetPartAddTotal strategy
1272        let ratio_metrics1 =
1273            RatioMetrics::new().with_merge_strategy(RatioMergeStrategy::SetPartAddTotal);
1274        ratio_metrics1.set_part(20);
1275        ratio_metrics1.set_total(50);
1276        let ratio_metrics2 = RatioMetrics::new();
1277        ratio_metrics2.set_part(20);
1278        ratio_metrics2.set_total(50);
1279        ratio_metrics1.merge(&ratio_metrics2);
1280        assert_eq!("20% (20/100)", ratio_metrics1.to_string());
1281
1282        // Test AddPartAddTotal strategy (default)
1283        let ratio_metrics1 = RatioMetrics::new();
1284        ratio_metrics1.set_part(20);
1285        ratio_metrics1.set_total(50);
1286        let ratio_metrics2 = RatioMetrics::new();
1287        ratio_metrics2.set_part(20);
1288        ratio_metrics2.set_total(50);
1289        ratio_metrics1.merge(&ratio_metrics2);
1290        assert_eq!("40% (40/100)", ratio_metrics1.to_string());
1291    }
1292
1293    #[test]
1294    fn test_display_timestamp() {
1295        let timestamp = Timestamp::new();
1296        let values = vec![
1297            MetricValue::StartTimestamp(timestamp.clone()),
1298            MetricValue::EndTimestamp(timestamp.clone()),
1299        ];
1300
1301        // if time is not set, it should not be reported as zero
1302        for value in &values {
1303            assert_eq!("NONE", value.to_string(), "value {value:?}");
1304        }
1305
1306        timestamp.set(Utc.timestamp_nanos(1431648000000000));
1307        for value in &values {
1308            assert_eq!(
1309                "1970-01-17 13:40:48 UTC",
1310                value.to_string(),
1311                "value {value:?}"
1312            );
1313        }
1314    }
1315
1316    #[test]
1317    fn test_timer_with_custom_instant() {
1318        let time = Time::new();
1319        let start_time = Instant::now();
1320
1321        // Sleep a bit to ensure some time passes
1322        std::thread::sleep(Duration::from_millis(1));
1323
1324        // Create timer with the earlier start time
1325        let mut timer = time.timer_with(start_time);
1326
1327        // Sleep a bit more
1328        std::thread::sleep(Duration::from_millis(1));
1329
1330        // Stop the timer
1331        timer.stop();
1332
1333        // The recorded time should be at least 20ms (both sleeps)
1334        assert!(
1335            time.value() >= 2_000_000,
1336            "Expected at least 2ms, got {} ns",
1337            time.value()
1338        );
1339    }
1340
1341    #[test]
1342    fn test_stop_with_custom_endpoint() {
1343        let time = Time::new();
1344        let start = Instant::now();
1345        let mut timer = time.timer_with(start);
1346
1347        // Simulate exactly 10ms passing
1348        let end = start + Duration::from_millis(10);
1349
1350        // Stop with custom endpoint
1351        timer.stop_with(end);
1352
1353        // Should record exactly 10ms (10_000_000 nanoseconds)
1354        // Allow for small variations due to timer resolution
1355        let recorded = time.value();
1356        assert!(
1357            (10_000_000..=10_100_000).contains(&recorded),
1358            "Expected ~10ms, got {recorded} ns"
1359        );
1360
1361        // Calling stop_with again should not add more time
1362        timer.stop_with(end);
1363        assert_eq!(
1364            recorded,
1365            time.value(),
1366            "Time should not change after second stop"
1367        );
1368    }
1369
1370    #[test]
1371    fn test_done_with_custom_endpoint() {
1372        let time = Time::new();
1373        let start = Instant::now();
1374
1375        // Create a new scope for the timer
1376        {
1377            let timer = time.timer_with(start);
1378
1379            // Simulate 50ms passing
1380            let end = start + Duration::from_millis(5);
1381
1382            // Call done_with to stop and consume the timer
1383            timer.done_with(end);
1384
1385            // Timer is consumed, can't use it anymore
1386        }
1387
1388        // Should record exactly 5ms
1389        let recorded = time.value();
1390        assert!(
1391            (5_000_000..=5_100_000).contains(&recorded),
1392            "Expected ~5ms, got {recorded} ns",
1393        );
1394
1395        // Test that done_with prevents drop from recording time again
1396        {
1397            let timer2 = time.timer_with(start);
1398            let end2 = start + Duration::from_millis(5);
1399            timer2.done_with(end2);
1400            // drop happens here but should not record additional time
1401        }
1402
1403        // Should have added only 5ms more
1404        let new_recorded = time.value();
1405        assert!(
1406            (10_000_000..=10_100_000).contains(&new_recorded),
1407            "Expected ~10ms total, got {new_recorded} ns",
1408        );
1409    }
1410
1411    #[test]
1412    fn test_human_readable_metric_formatting() {
1413        // Test Count formatting with various sizes
1414        let small_count = Count::new();
1415        small_count.add(42);
1416        assert_eq!(
1417            MetricValue::OutputRows(small_count.clone()).to_string(),
1418            "42"
1419        );
1420
1421        let thousand_count = Count::new();
1422        thousand_count.add(10_100);
1423        assert_eq!(
1424            MetricValue::OutputRows(thousand_count.clone()).to_string(),
1425            "10.10 K"
1426        );
1427
1428        let million_count = Count::new();
1429        million_count.add(1_532_000);
1430        assert_eq!(
1431            MetricValue::SpilledRows(million_count.clone()).to_string(),
1432            "1.53 M"
1433        );
1434
1435        let billion_count = Count::new();
1436        billion_count.add(2_500_000_000);
1437        assert_eq!(
1438            MetricValue::OutputBatches(billion_count.clone()).to_string(),
1439            "2.50 B"
1440        );
1441
1442        // Test Time formatting with various durations
1443        let micros_time = Time::new();
1444        micros_time.add_duration(Duration::from_nanos(1_234));
1445        assert_eq!(
1446            MetricValue::ElapsedCompute(micros_time.clone()).to_string(),
1447            "1.23µs"
1448        );
1449
1450        let millis_time = Time::new();
1451        millis_time.add_duration(Duration::from_nanos(11_295_377));
1452        assert_eq!(
1453            MetricValue::ElapsedCompute(millis_time.clone()).to_string(),
1454            "11.30ms"
1455        );
1456
1457        let seconds_time = Time::new();
1458        seconds_time.add_duration(Duration::from_nanos(1_234_567_890));
1459        assert_eq!(
1460            MetricValue::ElapsedCompute(seconds_time.clone()).to_string(),
1461            "1.23s"
1462        );
1463
1464        // Test CurrentMemoryUsage formatting (should use size, not count)
1465        let mem_gauge = Gauge::new();
1466        mem_gauge.add(100 * MB as usize);
1467        assert_eq!(
1468            MetricValue::CurrentMemoryUsage(mem_gauge.clone()).to_string(),
1469            "100.0 MB"
1470        );
1471
1472        // Test custom Gauge formatting (should use count)
1473        let custom_gauge = Gauge::new();
1474        custom_gauge.add(50_000);
1475        assert_eq!(
1476            MetricValue::Gauge {
1477                name: "custom".into(),
1478                gauge: custom_gauge.clone()
1479            }
1480            .to_string(),
1481            "50.00 K"
1482        );
1483
1484        // Test PruningMetrics formatting
1485        let pruning = PruningMetrics::new();
1486        pruning.add_matched(500_000);
1487        pruning.add_pruned(500_000);
1488        assert_eq!(
1489            MetricValue::PruningMetrics {
1490                name: "test_pruning".into(),
1491                pruning_metrics: pruning.clone()
1492            }
1493            .to_string(),
1494            "1.00 M total → 500.0 K matched"
1495        );
1496
1497        // Test RatioMetrics formatting
1498        let ratio = RatioMetrics::new();
1499        ratio.add_part(250_000);
1500        ratio.add_total(1_000_000);
1501        assert_eq!(
1502            MetricValue::Ratio {
1503                name: "test_ratio".into(),
1504                ratio_metrics: ratio.clone()
1505            }
1506            .to_string(),
1507            "25% (250.0 K/1.00 M)"
1508        );
1509    }
1510}