datafusion_physical_expr_common/metrics/
value.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Value representation of metrics
19
20use super::CustomMetricValue;
21use chrono::{DateTime, Utc};
22use datafusion_common::{
23    human_readable_count, human_readable_duration, human_readable_size, instant::Instant,
24};
25use parking_lot::Mutex;
26use std::{
27    borrow::{Borrow, Cow},
28    fmt::{Debug, Display},
29    sync::{
30        Arc,
31        atomic::{AtomicUsize, Ordering},
32    },
33    time::Duration,
34};
35
36/// A counter to record things such as number of input or output rows
37///
38/// Note `clone`ing counters update the same underlying metrics
39#[derive(Debug, Clone)]
40pub struct Count {
41    /// value of the metric counter
42    value: Arc<AtomicUsize>,
43}
44
45impl PartialEq for Count {
46    fn eq(&self, other: &Self) -> bool {
47        self.value().eq(&other.value())
48    }
49}
50
51impl Display for Count {
52    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
53        write!(f, "{}", human_readable_count(self.value()))
54    }
55}
56
57impl Default for Count {
58    fn default() -> Self {
59        Self::new()
60    }
61}
62
63impl Count {
64    /// create a new counter
65    pub fn new() -> Self {
66        Self {
67            value: Arc::new(AtomicUsize::new(0)),
68        }
69    }
70
71    /// Add `n` to the metric's value
72    pub fn add(&self, n: usize) {
73        // relaxed ordering for operations on `value` poses no issues
74        // we're purely using atomic ops with no associated memory ops
75        self.value.fetch_add(n, Ordering::Relaxed);
76    }
77
78    /// Get the current value
79    pub fn value(&self) -> usize {
80        self.value.load(Ordering::Relaxed)
81    }
82}
83
84/// A gauge is the simplest metrics type. It just returns a value.
85/// For example, you can easily expose current memory consumption with a gauge.
86///
87/// Note `clone`ing gauge update the same underlying metrics
88#[derive(Debug, Clone)]
89pub struct Gauge {
90    /// value of the metric gauge
91    value: Arc<AtomicUsize>,
92}
93
94impl PartialEq for Gauge {
95    fn eq(&self, other: &Self) -> bool {
96        self.value().eq(&other.value())
97    }
98}
99
100impl Display for Gauge {
101    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
102        write!(f, "{}", self.value())
103    }
104}
105
106impl Default for Gauge {
107    fn default() -> Self {
108        Self::new()
109    }
110}
111
112impl Gauge {
113    /// create a new gauge
114    pub fn new() -> Self {
115        Self {
116            value: Arc::new(AtomicUsize::new(0)),
117        }
118    }
119
120    /// Add `n` to the metric's value
121    pub fn add(&self, n: usize) {
122        // relaxed ordering for operations on `value` poses no issues
123        // we're purely using atomic ops with no associated memory ops
124        self.value.fetch_add(n, Ordering::Relaxed);
125    }
126
127    /// Sub `n` from the metric's value
128    pub fn sub(&self, n: usize) {
129        // relaxed ordering for operations on `value` poses no issues
130        // we're purely using atomic ops with no associated memory ops
131        self.value.fetch_sub(n, Ordering::Relaxed);
132    }
133
134    /// Set metric's value to maximum of `n` and current value
135    pub fn set_max(&self, n: usize) {
136        self.value.fetch_max(n, Ordering::Relaxed);
137    }
138
139    /// Set the metric's value to `n` and return the previous value
140    pub fn set(&self, n: usize) -> usize {
141        // relaxed ordering for operations on `value` poses no issues
142        // we're purely using atomic ops with no associated memory ops
143        self.value.swap(n, Ordering::Relaxed)
144    }
145
146    /// Get the current value
147    pub fn value(&self) -> usize {
148        self.value.load(Ordering::Relaxed)
149    }
150}
151
152/// Measure a potentially non contiguous duration of time
153#[derive(Debug, Clone)]
154pub struct Time {
155    /// elapsed time, in nanoseconds
156    nanos: Arc<AtomicUsize>,
157}
158
159impl Default for Time {
160    fn default() -> Self {
161        Self::new()
162    }
163}
164
165impl PartialEq for Time {
166    fn eq(&self, other: &Self) -> bool {
167        self.value().eq(&other.value())
168    }
169}
170
171impl Display for Time {
172    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
173        write!(f, "{}", human_readable_duration(self.value() as u64))
174    }
175}
176
177impl Time {
178    /// Create a new [`Time`] wrapper suitable for recording elapsed
179    /// times for operations.
180    pub fn new() -> Self {
181        Self {
182            nanos: Arc::new(AtomicUsize::new(0)),
183        }
184    }
185
186    /// Add elapsed nanoseconds since `start`to self
187    pub fn add_elapsed(&self, start: Instant) {
188        self.add_duration(start.elapsed());
189    }
190
191    /// Add duration of time to self
192    ///
193    /// Note: this will always increment the recorded time by at least 1 nanosecond
194    /// to distinguish between the scenario of no values recorded, in which
195    /// case the value will be 0, and no measurable amount of time having passed,
196    /// in which case the value will be small but not 0.
197    ///
198    /// This is based on the assumption that the timing logic in most cases is likely
199    /// to take at least a nanosecond, and so this is reasonable mechanism to avoid
200    /// ambiguity, especially on systems with low-resolution monotonic clocks
201    pub fn add_duration(&self, duration: Duration) {
202        let more_nanos = duration.as_nanos() as usize;
203        self.nanos.fetch_add(more_nanos.max(1), Ordering::Relaxed);
204    }
205
206    /// Add the number of nanoseconds of other `Time` to self
207    pub fn add(&self, other: &Time) {
208        self.add_duration(Duration::from_nanos(other.value() as u64))
209    }
210
211    /// return a scoped guard that adds the amount of time elapsed
212    /// between its creation and its drop or call to `stop` to the
213    /// underlying metric.
214    pub fn timer(&self) -> ScopedTimerGuard<'_> {
215        ScopedTimerGuard {
216            inner: self,
217            start: Some(Instant::now()),
218        }
219    }
220
221    /// Get the number of nanoseconds record by this Time metric
222    pub fn value(&self) -> usize {
223        self.nanos.load(Ordering::Relaxed)
224    }
225
226    /// Return a scoped guard that adds the amount of time elapsed between the
227    /// given instant and its drop (or the call to `stop`) to the underlying metric
228    pub fn timer_with(&self, now: Instant) -> ScopedTimerGuard<'_> {
229        ScopedTimerGuard {
230            inner: self,
231            start: Some(now),
232        }
233    }
234}
235
236/// Stores a single timestamp, stored as the number of nanoseconds
237/// elapsed from Jan 1, 1970 UTC
238#[derive(Debug, Clone)]
239pub struct Timestamp {
240    /// Time thing started
241    timestamp: Arc<Mutex<Option<DateTime<Utc>>>>,
242}
243
244impl Default for Timestamp {
245    fn default() -> Self {
246        Self::new()
247    }
248}
249
250impl Timestamp {
251    /// Create a new timestamp and sets its value to 0
252    pub fn new() -> Self {
253        Self {
254            timestamp: Arc::new(Mutex::new(None)),
255        }
256    }
257
258    /// Sets the timestamps value to the current time
259    pub fn record(&self) {
260        self.set(Utc::now())
261    }
262
263    /// Sets the timestamps value to a specified time
264    pub fn set(&self, now: DateTime<Utc>) {
265        *self.timestamp.lock() = Some(now);
266    }
267
268    /// return the timestamps value at the last time `record()` was
269    /// called.
270    ///
271    /// Returns `None` if `record()` has not been called
272    pub fn value(&self) -> Option<DateTime<Utc>> {
273        *self.timestamp.lock()
274    }
275
276    /// sets the value of this timestamp to the minimum of this and other
277    pub fn update_to_min(&self, other: &Timestamp) {
278        let min = match (self.value(), other.value()) {
279            (None, None) => None,
280            (Some(v), None) => Some(v),
281            (None, Some(v)) => Some(v),
282            (Some(v1), Some(v2)) => Some(if v1 < v2 { v1 } else { v2 }),
283        };
284
285        *self.timestamp.lock() = min;
286    }
287
288    /// sets the value of this timestamp to the maximum of this and other
289    pub fn update_to_max(&self, other: &Timestamp) {
290        let max = match (self.value(), other.value()) {
291            (None, None) => None,
292            (Some(v), None) => Some(v),
293            (None, Some(v)) => Some(v),
294            (Some(v1), Some(v2)) => Some(if v1 < v2 { v2 } else { v1 }),
295        };
296
297        *self.timestamp.lock() = max;
298    }
299}
300
301impl PartialEq for Timestamp {
302    fn eq(&self, other: &Self) -> bool {
303        self.value().eq(&other.value())
304    }
305}
306
307impl Display for Timestamp {
308    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
309        match self.value() {
310            None => write!(f, "NONE"),
311            Some(v) => {
312                write!(f, "{v}")
313            }
314        }
315    }
316}
317
318/// RAAI structure that adds all time between its construction and
319/// destruction to the CPU time or the first call to `stop` whichever
320/// comes first
321pub struct ScopedTimerGuard<'a> {
322    inner: &'a Time,
323    start: Option<Instant>,
324}
325
326impl ScopedTimerGuard<'_> {
327    /// Stop the timer timing and record the time taken
328    pub fn stop(&mut self) {
329        if let Some(start) = self.start.take() {
330            self.inner.add_elapsed(start)
331        }
332    }
333
334    /// Restarts the timer recording from the current time
335    pub fn restart(&mut self) {
336        self.start = Some(Instant::now())
337    }
338
339    /// Stop the timer, record the time taken and consume self
340    pub fn done(mut self) {
341        self.stop()
342    }
343
344    /// Stop the timer timing and record the time taken since the given endpoint.
345    pub fn stop_with(&mut self, end_time: Instant) {
346        if let Some(start) = self.start.take() {
347            let elapsed = end_time - start;
348            self.inner.add_duration(elapsed)
349        }
350    }
351
352    /// Stop the timer, record the time taken since `end_time` endpoint, and
353    /// consume self.
354    pub fn done_with(mut self, end_time: Instant) {
355        self.stop_with(end_time)
356    }
357}
358
359impl Drop for ScopedTimerGuard<'_> {
360    fn drop(&mut self) {
361        self.stop()
362    }
363}
364
365/// Counters tracking pruning metrics
366///
367/// For example, a file scanner initially is planned to scan 10 files, but skipped
368/// 8 of them using statistics, the pruning metrics would look like: 10 total -> 2 matched
369///
370/// Note `clone`ing update the same underlying metrics
371#[derive(Debug, Clone)]
372pub struct PruningMetrics {
373    pruned: Arc<AtomicUsize>,
374    matched: Arc<AtomicUsize>,
375}
376
377impl Display for PruningMetrics {
378    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
379        let matched = self.matched.load(Ordering::Relaxed);
380        let total = self.pruned.load(Ordering::Relaxed) + matched;
381
382        write!(
383            f,
384            "{} total → {} matched",
385            human_readable_count(total),
386            human_readable_count(matched)
387        )
388    }
389}
390
391impl Default for PruningMetrics {
392    fn default() -> Self {
393        Self::new()
394    }
395}
396
397impl PruningMetrics {
398    /// create a new PruningMetrics
399    pub fn new() -> Self {
400        Self {
401            pruned: Arc::new(AtomicUsize::new(0)),
402            matched: Arc::new(AtomicUsize::new(0)),
403        }
404    }
405
406    /// Add `n` to the metric's pruned value
407    pub fn add_pruned(&self, n: usize) {
408        // relaxed ordering for operations on `value` poses no issues
409        // we're purely using atomic ops with no associated memory ops
410        self.pruned.fetch_add(n, Ordering::Relaxed);
411    }
412
413    /// Add `n` to the metric's matched value
414    pub fn add_matched(&self, n: usize) {
415        // relaxed ordering for operations on `value` poses no issues
416        // we're purely using atomic ops with no associated memory ops
417        self.matched.fetch_add(n, Ordering::Relaxed);
418    }
419
420    /// Subtract `n` to the metric's matched value.
421    pub fn subtract_matched(&self, n: usize) {
422        // relaxed ordering for operations on `value` poses no issues
423        // we're purely using atomic ops with no associated memory ops
424        self.matched.fetch_sub(n, Ordering::Relaxed);
425    }
426
427    /// Number of items pruned
428    pub fn pruned(&self) -> usize {
429        self.pruned.load(Ordering::Relaxed)
430    }
431
432    /// Number of items matched (not pruned)
433    pub fn matched(&self) -> usize {
434        self.matched.load(Ordering::Relaxed)
435    }
436}
437
438/// Counters tracking ratio metrics (e.g. matched vs total)
439///
440/// The counters are thread-safe and shared across clones.
441#[derive(Debug, Clone, Default)]
442pub struct RatioMetrics {
443    part: Arc<AtomicUsize>,
444    total: Arc<AtomicUsize>,
445    merge_strategy: RatioMergeStrategy,
446}
447
448#[derive(Debug, Clone, Default)]
449pub enum RatioMergeStrategy {
450    #[default]
451    AddPartAddTotal,
452    AddPartSetTotal,
453    SetPartAddTotal,
454}
455
456impl RatioMetrics {
457    /// Create a new [`RatioMetrics`]
458    pub fn new() -> Self {
459        Self {
460            part: Arc::new(AtomicUsize::new(0)),
461            total: Arc::new(AtomicUsize::new(0)),
462            merge_strategy: RatioMergeStrategy::AddPartAddTotal,
463        }
464    }
465
466    pub fn with_merge_strategy(mut self, merge_strategy: RatioMergeStrategy) -> Self {
467        self.merge_strategy = merge_strategy;
468        self
469    }
470
471    /// Add `n` to the numerator (`part`) value
472    pub fn add_part(&self, n: usize) {
473        self.part.fetch_add(n, Ordering::Relaxed);
474    }
475
476    /// Add `n` to the denominator (`total`) value
477    pub fn add_total(&self, n: usize) {
478        self.total.fetch_add(n, Ordering::Relaxed);
479    }
480
481    /// Set the numerator (`part`) value to `n`, overwriting any existing value
482    pub fn set_part(&self, n: usize) {
483        self.part.store(n, Ordering::Relaxed);
484    }
485
486    /// Set the denominator (`total`) value to `n`, overwriting any existing value
487    pub fn set_total(&self, n: usize) {
488        self.total.store(n, Ordering::Relaxed);
489    }
490
491    /// Merge the value from `other` into `self`
492    pub fn merge(&self, other: &Self) {
493        match self.merge_strategy {
494            RatioMergeStrategy::AddPartAddTotal => {
495                self.add_part(other.part());
496                self.add_total(other.total());
497            }
498            RatioMergeStrategy::AddPartSetTotal => {
499                self.add_part(other.part());
500                self.set_total(other.total());
501            }
502            RatioMergeStrategy::SetPartAddTotal => {
503                self.set_part(other.part());
504                self.add_total(other.total());
505            }
506        }
507    }
508
509    /// Return the numerator (`part`) value
510    pub fn part(&self) -> usize {
511        self.part.load(Ordering::Relaxed)
512    }
513
514    /// Return the denominator (`total`) value
515    pub fn total(&self) -> usize {
516        self.total.load(Ordering::Relaxed)
517    }
518}
519
520impl PartialEq for RatioMetrics {
521    fn eq(&self, other: &Self) -> bool {
522        self.part() == other.part() && self.total() == other.total()
523    }
524}
525
526/// Format a float number with `digits` most significant numbers.
527///
528/// fmt_significant(12.5) -> "12"
529/// fmt_significant(0.0543) -> "0.054"
530/// fmt_significant(0.000123) -> "0.00012"
531fn fmt_significant(mut x: f64, digits: usize) -> String {
532    if x == 0.0 {
533        return "0".to_string();
534    }
535
536    let exp = x.abs().log10().floor(); // exponent of first significant digit
537    let scale = 10f64.powf(-(exp - (digits as f64 - 1.0)));
538    x = (x * scale).round() / scale; // round to N significant digits
539    format!("{x}")
540}
541
542impl Display for RatioMetrics {
543    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
544        let part = self.part();
545        let total = self.total();
546
547        if total == 0 {
548            if part == 0 {
549                write!(f, "N/A (0/0)")
550            } else {
551                write!(f, "N/A ({}/0)", human_readable_count(part))
552            }
553        } else {
554            let percentage = (part as f64 / total as f64) * 100.0;
555
556            write!(
557                f,
558                "{}% ({}/{})",
559                fmt_significant(percentage, 2),
560                human_readable_count(part),
561                human_readable_count(total)
562            )
563        }
564    }
565}
566
567/// Possible values for a [super::Metric].
568///
569/// Among other differences, the metric types have different ways to
570/// logically interpret their underlying values and some metrics are
571/// so common they are given special treatment.
572#[derive(Debug, Clone)]
573pub enum MetricValue {
574    /// Number of output rows produced: "output_rows" metric
575    OutputRows(Count),
576    /// Elapsed Compute Time: the wall clock time spent in "cpu
577    /// intensive" work.
578    ///
579    /// This measurement represents, roughly:
580    /// ```
581    /// use std::time::Instant;
582    /// let start = Instant::now();
583    /// // ...CPU intensive work here...
584    /// let elapsed_compute = (Instant::now() - start).as_nanos();
585    /// ```
586    ///
587    /// Note 1: Does *not* include time other operators spend
588    /// computing input.
589    ///
590    /// Note 2: *Does* includes time when the thread could have made
591    /// progress but the OS did not schedule it (e.g. due to CPU
592    /// contention), thus making this value different than the
593    /// classical definition of "cpu_time", which is the time reported
594    /// from `clock_gettime(CLOCK_THREAD_CPUTIME_ID, ..)`.
595    ElapsedCompute(Time),
596    /// Number of spills produced: "spill_count" metric
597    SpillCount(Count),
598    /// Total size of spilled bytes produced: "spilled_bytes" metric
599    SpilledBytes(Count),
600    /// Total size of output bytes produced: "output_bytes" metric
601    OutputBytes(Count),
602    /// Total number of output batches produced: "output_batches" metric
603    OutputBatches(Count),
604    /// Total size of spilled rows produced: "spilled_rows" metric
605    SpilledRows(Count),
606    /// Current memory used
607    CurrentMemoryUsage(Gauge),
608    /// Operator defined count.
609    Count {
610        /// The provided name of this metric
611        name: Cow<'static, str>,
612        /// The value of the metric
613        count: Count,
614    },
615    /// Operator defined gauge.
616    Gauge {
617        /// The provided name of this metric
618        name: Cow<'static, str>,
619        /// The value of the metric
620        gauge: Gauge,
621    },
622    /// Operator defined time
623    Time {
624        /// The provided name of this metric
625        name: Cow<'static, str>,
626        /// The value of the metric
627        time: Time,
628    },
629    /// The time at which execution started
630    StartTimestamp(Timestamp),
631    /// The time at which execution ended
632    EndTimestamp(Timestamp),
633    /// Metrics related to scan pruning
634    PruningMetrics {
635        name: Cow<'static, str>,
636        pruning_metrics: PruningMetrics,
637    },
638    /// Metrics that should be displayed as ratio like (42%)
639    Ratio {
640        name: Cow<'static, str>,
641        ratio_metrics: RatioMetrics,
642    },
643    Custom {
644        /// The provided name of this metric
645        name: Cow<'static, str>,
646        /// A custom implementation of the metric value.
647        value: Arc<dyn CustomMetricValue>,
648    },
649}
650
651// Manually implement PartialEq for `MetricValue` because it contains CustomMetricValue in its
652// definition which is a dyn trait. This wouldn't allow us to just derive PartialEq.
653impl PartialEq for MetricValue {
654    fn eq(&self, other: &Self) -> bool {
655        match (self, other) {
656            (MetricValue::OutputRows(count), MetricValue::OutputRows(other)) => {
657                count == other
658            }
659            (MetricValue::ElapsedCompute(time), MetricValue::ElapsedCompute(other)) => {
660                time == other
661            }
662            (MetricValue::SpillCount(count), MetricValue::SpillCount(other)) => {
663                count == other
664            }
665            (MetricValue::SpilledBytes(count), MetricValue::SpilledBytes(other)) => {
666                count == other
667            }
668            (MetricValue::OutputBytes(count), MetricValue::OutputBytes(other)) => {
669                count == other
670            }
671            (MetricValue::OutputBatches(count), MetricValue::OutputBatches(other)) => {
672                count == other
673            }
674            (MetricValue::SpilledRows(count), MetricValue::SpilledRows(other)) => {
675                count == other
676            }
677            (
678                MetricValue::CurrentMemoryUsage(gauge),
679                MetricValue::CurrentMemoryUsage(other),
680            ) => gauge == other,
681            (
682                MetricValue::Count { name, count },
683                MetricValue::Count {
684                    name: other_name,
685                    count: other_count,
686                },
687            ) => name == other_name && count == other_count,
688            (
689                MetricValue::Gauge { name, gauge },
690                MetricValue::Gauge {
691                    name: other_name,
692                    gauge: other_gauge,
693                },
694            ) => name == other_name && gauge == other_gauge,
695            (
696                MetricValue::Time { name, time },
697                MetricValue::Time {
698                    name: other_name,
699                    time: other_time,
700                },
701            ) => name == other_name && time == other_time,
702
703            (
704                MetricValue::StartTimestamp(timestamp),
705                MetricValue::StartTimestamp(other),
706            ) => timestamp == other,
707            (MetricValue::EndTimestamp(timestamp), MetricValue::EndTimestamp(other)) => {
708                timestamp == other
709            }
710            (
711                MetricValue::PruningMetrics {
712                    name,
713                    pruning_metrics,
714                },
715                MetricValue::PruningMetrics {
716                    name: other_name,
717                    pruning_metrics: other_pruning_metrics,
718                },
719            ) => {
720                name == other_name
721                    && pruning_metrics.pruned() == other_pruning_metrics.pruned()
722                    && pruning_metrics.matched() == other_pruning_metrics.matched()
723            }
724            (
725                MetricValue::Ratio {
726                    name,
727                    ratio_metrics,
728                },
729                MetricValue::Ratio {
730                    name: other_name,
731                    ratio_metrics: other_ratio_metrics,
732                },
733            ) => name == other_name && ratio_metrics == other_ratio_metrics,
734            (
735                MetricValue::Custom { name, value },
736                MetricValue::Custom {
737                    name: other_name,
738                    value: other_value,
739                },
740            ) => name == other_name && value.is_eq(other_value),
741            // Default case when the two sides do not have the same type.
742            _ => false,
743        }
744    }
745}
746
747impl MetricValue {
748    /// Return the name of this SQL metric
749    pub fn name(&self) -> &str {
750        match self {
751            Self::OutputRows(_) => "output_rows",
752            Self::SpillCount(_) => "spill_count",
753            Self::SpilledBytes(_) => "spilled_bytes",
754            Self::OutputBytes(_) => "output_bytes",
755            Self::OutputBatches(_) => "output_batches",
756            Self::SpilledRows(_) => "spilled_rows",
757            Self::CurrentMemoryUsage(_) => "mem_used",
758            Self::ElapsedCompute(_) => "elapsed_compute",
759            Self::Count { name, .. } => name.borrow(),
760            Self::Gauge { name, .. } => name.borrow(),
761            Self::Time { name, .. } => name.borrow(),
762            Self::StartTimestamp(_) => "start_timestamp",
763            Self::EndTimestamp(_) => "end_timestamp",
764            Self::PruningMetrics { name, .. } => name.borrow(),
765            Self::Ratio { name, .. } => name.borrow(),
766            Self::Custom { name, .. } => name.borrow(),
767        }
768    }
769
770    /// Return the value of the metric as a usize value, used to aggregate metric
771    /// value across partitions.
772    pub fn as_usize(&self) -> usize {
773        match self {
774            Self::OutputRows(count) => count.value(),
775            Self::SpillCount(count) => count.value(),
776            Self::SpilledBytes(bytes) => bytes.value(),
777            Self::OutputBytes(bytes) => bytes.value(),
778            Self::OutputBatches(count) => count.value(),
779            Self::SpilledRows(count) => count.value(),
780            Self::CurrentMemoryUsage(used) => used.value(),
781            Self::ElapsedCompute(time) => time.value(),
782            Self::Count { count, .. } => count.value(),
783            Self::Gauge { gauge, .. } => gauge.value(),
784            Self::Time { time, .. } => time.value(),
785            Self::StartTimestamp(timestamp) => timestamp
786                .value()
787                .and_then(|ts| ts.timestamp_nanos_opt())
788                .map(|nanos| nanos as usize)
789                .unwrap_or(0),
790            Self::EndTimestamp(timestamp) => timestamp
791                .value()
792                .and_then(|ts| ts.timestamp_nanos_opt())
793                .map(|nanos| nanos as usize)
794                .unwrap_or(0),
795            // This function is a utility for aggregating metrics, for complex metric
796            // like `PruningMetrics`, this function is not supposed to get called.
797            // Metrics aggregation for them are implemented inside `MetricsSet` directly.
798            Self::PruningMetrics { .. } => 0,
799            // Should not be used. See comments in `PruningMetrics` for details.
800            Self::Ratio { .. } => 0,
801            Self::Custom { value, .. } => value.as_usize(),
802        }
803    }
804
805    /// create a new MetricValue with the same type as `self` suitable
806    /// for accumulating
807    pub fn new_empty(&self) -> Self {
808        match self {
809            Self::OutputRows(_) => Self::OutputRows(Count::new()),
810            Self::SpillCount(_) => Self::SpillCount(Count::new()),
811            Self::SpilledBytes(_) => Self::SpilledBytes(Count::new()),
812            Self::OutputBytes(_) => Self::OutputBytes(Count::new()),
813            Self::OutputBatches(_) => Self::OutputBatches(Count::new()),
814            Self::SpilledRows(_) => Self::SpilledRows(Count::new()),
815            Self::CurrentMemoryUsage(_) => Self::CurrentMemoryUsage(Gauge::new()),
816            Self::ElapsedCompute(_) => Self::ElapsedCompute(Time::new()),
817            Self::Count { name, .. } => Self::Count {
818                name: name.clone(),
819                count: Count::new(),
820            },
821            Self::Gauge { name, .. } => Self::Gauge {
822                name: name.clone(),
823                gauge: Gauge::new(),
824            },
825            Self::Time { name, .. } => Self::Time {
826                name: name.clone(),
827                time: Time::new(),
828            },
829            Self::StartTimestamp(_) => Self::StartTimestamp(Timestamp::new()),
830            Self::EndTimestamp(_) => Self::EndTimestamp(Timestamp::new()),
831            Self::PruningMetrics { name, .. } => Self::PruningMetrics {
832                name: name.clone(),
833                pruning_metrics: PruningMetrics::new(),
834            },
835            Self::Ratio {
836                name,
837                ratio_metrics,
838            } => {
839                let merge_strategy = ratio_metrics.merge_strategy.clone();
840                Self::Ratio {
841                    name: name.clone(),
842                    ratio_metrics: RatioMetrics::new()
843                        .with_merge_strategy(merge_strategy),
844                }
845            }
846            Self::Custom { name, value } => Self::Custom {
847                name: name.clone(),
848                value: value.new_empty(),
849            },
850        }
851    }
852
853    /// Aggregates the value of other to `self`. panic's if the types
854    /// are mismatched or aggregating does not make sense for this
855    /// value
856    ///
857    /// Note this is purposely marked `mut` (even though atomics are
858    /// used) so Rust's type system can be used to ensure the
859    /// appropriate API access. `MetricValues` should be modified
860    /// using the original [`Count`] or [`Time`] they were created
861    /// from.
862    pub fn aggregate(&mut self, other: &Self) {
863        match (self, other) {
864            (Self::OutputRows(count), Self::OutputRows(other_count))
865            | (Self::SpillCount(count), Self::SpillCount(other_count))
866            | (Self::SpilledBytes(count), Self::SpilledBytes(other_count))
867            | (Self::OutputBytes(count), Self::OutputBytes(other_count))
868            | (Self::OutputBatches(count), Self::OutputBatches(other_count))
869            | (Self::SpilledRows(count), Self::SpilledRows(other_count))
870            | (
871                Self::Count { count, .. },
872                Self::Count {
873                    count: other_count, ..
874                },
875            ) => count.add(other_count.value()),
876            (Self::CurrentMemoryUsage(gauge), Self::CurrentMemoryUsage(other_gauge))
877            | (
878                Self::Gauge { gauge, .. },
879                Self::Gauge {
880                    gauge: other_gauge, ..
881                },
882            ) => gauge.add(other_gauge.value()),
883            (Self::ElapsedCompute(time), Self::ElapsedCompute(other_time))
884            | (
885                Self::Time { time, .. },
886                Self::Time {
887                    time: other_time, ..
888                },
889            ) => time.add(other_time),
890            // timestamps are aggregated by min/max
891            (Self::StartTimestamp(timestamp), Self::StartTimestamp(other_timestamp)) => {
892                timestamp.update_to_min(other_timestamp);
893            }
894            // timestamps are aggregated by min/max
895            (Self::EndTimestamp(timestamp), Self::EndTimestamp(other_timestamp)) => {
896                timestamp.update_to_max(other_timestamp);
897            }
898            (
899                Self::PruningMetrics {
900                    pruning_metrics, ..
901                },
902                Self::PruningMetrics {
903                    pruning_metrics: other_pruning_metrics,
904                    ..
905                },
906            ) => {
907                let pruned = other_pruning_metrics.pruned.load(Ordering::Relaxed);
908                let matched = other_pruning_metrics.matched.load(Ordering::Relaxed);
909                pruning_metrics.add_pruned(pruned);
910                pruning_metrics.add_matched(matched);
911            }
912            (
913                Self::Ratio { ratio_metrics, .. },
914                Self::Ratio {
915                    ratio_metrics: other_ratio_metrics,
916                    ..
917                },
918            ) => {
919                ratio_metrics.merge(other_ratio_metrics);
920            }
921            (
922                Self::Custom { value, .. },
923                Self::Custom {
924                    value: other_value, ..
925                },
926            ) => {
927                value.aggregate(Arc::clone(other_value));
928            }
929            m @ (_, _) => {
930                panic!(
931                    "Mismatched metric types. Can not aggregate {:?} with value {:?}",
932                    m.0, m.1
933                )
934            }
935        }
936    }
937
938    /// Returns a number by which to sort metrics by display. Lower
939    /// numbers are "more useful" (and displayed first)
940    pub fn display_sort_key(&self) -> u8 {
941        match self {
942            // `BaselineMetrics` that is common for most operators
943            Self::OutputRows(_) => 0,
944            Self::ElapsedCompute(_) => 1,
945            Self::OutputBytes(_) => 2,
946            Self::OutputBatches(_) => 3,
947            // Other metrics
948            Self::PruningMetrics { name, .. } => match name.as_ref() {
949                // The following metrics belong to `DataSourceExec` with a Parquet data source.
950                // They are displayed in a specific order that reflects the actual pruning process,
951                // from coarse-grained to fine-grained pruning levels.
952                //
953                // You may update these metrics as long as their relative order remains unchanged.
954                //
955                // Reference PR: <https://github.com/apache/datafusion/pull/18379>
956                "files_ranges_pruned_statistics" => 4,
957                "row_groups_pruned_statistics" => 5,
958                "row_groups_pruned_bloom_filter" => 6,
959                "page_index_rows_pruned" => 7,
960                _ => 8,
961            },
962            Self::SpillCount(_) => 9,
963            Self::SpilledBytes(_) => 10,
964            Self::SpilledRows(_) => 11,
965            Self::CurrentMemoryUsage(_) => 12,
966            Self::Count { .. } => 13,
967            Self::Gauge { .. } => 14,
968            Self::Time { .. } => 15,
969            Self::Ratio { .. } => 16,
970            Self::StartTimestamp(_) => 17, // show timestamps last
971            Self::EndTimestamp(_) => 18,
972            Self::Custom { .. } => 19,
973        }
974    }
975
976    /// returns true if this metric has a timestamp value
977    pub fn is_timestamp(&self) -> bool {
978        matches!(self, Self::StartTimestamp(_) | Self::EndTimestamp(_))
979    }
980}
981
982impl Display for MetricValue {
983    /// Prints the value of this metric
984    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
985        match self {
986            Self::OutputRows(count)
987            | Self::OutputBatches(count)
988            | Self::SpillCount(count)
989            | Self::SpilledRows(count)
990            | Self::Count { count, .. } => {
991                write!(f, "{count}")
992            }
993            Self::SpilledBytes(count) | Self::OutputBytes(count) => {
994                let readable_count = human_readable_size(count.value());
995                write!(f, "{readable_count}")
996            }
997            Self::CurrentMemoryUsage(gauge) => {
998                // CurrentMemoryUsage is in bytes, format like SpilledBytes
999                let readable_size = human_readable_size(gauge.value());
1000                write!(f, "{readable_size}")
1001            }
1002            Self::Gauge { gauge, .. } => {
1003                // Generic gauge metrics - format with human-readable count
1004                write!(f, "{}", human_readable_count(gauge.value()))
1005            }
1006            Self::ElapsedCompute(time) | Self::Time { time, .. } => {
1007                // distinguish between no time recorded and very small
1008                // amount of time recorded
1009                if time.value() > 0 {
1010                    write!(f, "{time}")
1011                } else {
1012                    write!(f, "NOT RECORDED")
1013                }
1014            }
1015            Self::StartTimestamp(timestamp) | Self::EndTimestamp(timestamp) => {
1016                write!(f, "{timestamp}")
1017            }
1018            Self::PruningMetrics {
1019                pruning_metrics, ..
1020            } => {
1021                write!(f, "{pruning_metrics}")
1022            }
1023            Self::Ratio { ratio_metrics, .. } => write!(f, "{ratio_metrics}"),
1024            Self::Custom { name, value } => {
1025                write!(f, "name:{name} {value}")
1026            }
1027        }
1028    }
1029}
1030
1031#[cfg(test)]
1032mod tests {
1033    use std::any::Any;
1034
1035    use chrono::TimeZone;
1036    use datafusion_common::units::MB;
1037
1038    use super::*;
1039
1040    #[derive(Debug, Default)]
1041    pub struct CustomCounter {
1042        count: AtomicUsize,
1043    }
1044
1045    impl Display for CustomCounter {
1046        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1047            write!(f, "count: {}", self.count.load(Ordering::Relaxed))
1048        }
1049    }
1050
1051    impl CustomMetricValue for CustomCounter {
1052        fn new_empty(&self) -> Arc<dyn CustomMetricValue> {
1053            Arc::new(CustomCounter::default())
1054        }
1055
1056        fn aggregate(&self, other: Arc<dyn CustomMetricValue + 'static>) {
1057            let other = other.as_any().downcast_ref::<Self>().unwrap();
1058            self.count
1059                .fetch_add(other.count.load(Ordering::Relaxed), Ordering::Relaxed);
1060        }
1061
1062        fn as_any(&self) -> &dyn Any {
1063            self
1064        }
1065
1066        fn is_eq(&self, other: &Arc<dyn CustomMetricValue>) -> bool {
1067            let Some(other) = other.as_any().downcast_ref::<Self>() else {
1068                return false;
1069            };
1070
1071            self.count.load(Ordering::Relaxed) == other.count.load(Ordering::Relaxed)
1072        }
1073    }
1074
1075    fn new_custom_counter(name: &'static str, value: usize) -> MetricValue {
1076        let custom_counter = CustomCounter::default();
1077        custom_counter.count.fetch_add(value, Ordering::Relaxed);
1078
1079        MetricValue::Custom {
1080            name: Cow::Borrowed(name),
1081            value: Arc::new(custom_counter),
1082        }
1083    }
1084
1085    #[test]
1086    fn test_custom_metric_with_mismatching_names() {
1087        let mut custom_val = new_custom_counter("Hi", 1);
1088        let other_custom_val = new_custom_counter("Hello", 1);
1089
1090        // Not equal since the name differs.
1091        assert!(other_custom_val != custom_val);
1092
1093        // Should work even though the name differs
1094        custom_val.aggregate(&other_custom_val);
1095
1096        let expected_val = new_custom_counter("Hi", 2);
1097        assert!(expected_val == custom_val);
1098    }
1099
1100    #[test]
1101    fn test_custom_metric() {
1102        let mut custom_val = new_custom_counter("hi", 11);
1103        let other_custom_val = new_custom_counter("hi", 20);
1104
1105        custom_val.aggregate(&other_custom_val);
1106
1107        assert!(custom_val != other_custom_val);
1108
1109        if let MetricValue::Custom { value, .. } = custom_val {
1110            let counter = value
1111                .as_any()
1112                .downcast_ref::<CustomCounter>()
1113                .expect("Expected CustomCounter");
1114            assert_eq!(counter.count.load(Ordering::Relaxed), 31);
1115        } else {
1116            panic!("Unexpected value");
1117        }
1118    }
1119
1120    #[test]
1121    fn test_display_output_rows() {
1122        let count = Count::new();
1123        let values = vec![
1124            MetricValue::OutputRows(count.clone()),
1125            MetricValue::Count {
1126                name: "my_counter".into(),
1127                count: count.clone(),
1128            },
1129        ];
1130
1131        for value in &values {
1132            assert_eq!("0", value.to_string(), "value {value:?}");
1133        }
1134
1135        count.add(42);
1136        for value in &values {
1137            assert_eq!("42", value.to_string(), "value {value:?}");
1138        }
1139    }
1140
1141    #[test]
1142    fn test_display_spilled_bytes() {
1143        let count = Count::new();
1144        let spilled_byte = MetricValue::SpilledBytes(count.clone());
1145
1146        assert_eq!("0.0 B", spilled_byte.to_string());
1147
1148        count.add((100 * MB) as usize);
1149        assert_eq!("100.0 MB", spilled_byte.to_string());
1150
1151        count.add((0.5 * MB as f64) as usize);
1152        assert_eq!("100.5 MB", spilled_byte.to_string());
1153    }
1154
1155    #[test]
1156    fn test_display_time() {
1157        let time = Time::new();
1158        let values = vec![
1159            MetricValue::ElapsedCompute(time.clone()),
1160            MetricValue::Time {
1161                name: "my_time".into(),
1162                time: time.clone(),
1163            },
1164        ];
1165
1166        // if time is not set, it should not be reported as zero
1167        for value in &values {
1168            assert_eq!("NOT RECORDED", value.to_string(), "value {value:?}");
1169        }
1170
1171        time.add_duration(Duration::from_nanos(1042));
1172        for value in &values {
1173            assert_eq!("1.04µs", value.to_string(), "value {value:?}");
1174        }
1175    }
1176
1177    #[test]
1178    fn test_display_ratio() {
1179        let ratio_metrics = RatioMetrics::new();
1180        let ratio = MetricValue::Ratio {
1181            name: Cow::Borrowed("ratio_metric"),
1182            ratio_metrics: ratio_metrics.clone(),
1183        };
1184
1185        assert_eq!("N/A (0/0)", ratio.to_string());
1186
1187        ratio_metrics.add_part(10);
1188        assert_eq!("N/A (10/0)", ratio.to_string());
1189
1190        ratio_metrics.add_total(40);
1191        assert_eq!("25% (10/40)", ratio.to_string());
1192
1193        let tiny_ratio_metrics = RatioMetrics::new();
1194        let tiny_ratio = MetricValue::Ratio {
1195            name: Cow::Borrowed("tiny_ratio_metric"),
1196            ratio_metrics: tiny_ratio_metrics.clone(),
1197        };
1198        tiny_ratio_metrics.add_part(1);
1199        tiny_ratio_metrics.add_total(3000);
1200        assert_eq!("0.033% (1/3.00 K)", tiny_ratio.to_string());
1201    }
1202
1203    #[test]
1204    fn test_ratio_set_methods() {
1205        let ratio_metrics = RatioMetrics::new();
1206
1207        // Ensure set methods don't increment
1208        ratio_metrics.set_part(10);
1209        ratio_metrics.set_part(10);
1210        ratio_metrics.set_total(40);
1211        ratio_metrics.set_total(40);
1212        assert_eq!("25% (10/40)", ratio_metrics.to_string());
1213
1214        let ratio_metrics = RatioMetrics::new();
1215
1216        // Calling set should change the value
1217        ratio_metrics.set_part(10);
1218        ratio_metrics.set_part(30);
1219        ratio_metrics.set_total(40);
1220        ratio_metrics.set_total(50);
1221        assert_eq!("60% (30/50)", ratio_metrics.to_string());
1222    }
1223
1224    #[test]
1225    fn test_ratio_merge_strategy() {
1226        // Test AddPartSetTotal strategy
1227        let ratio_metrics1 =
1228            RatioMetrics::new().with_merge_strategy(RatioMergeStrategy::AddPartSetTotal);
1229
1230        ratio_metrics1.set_part(10);
1231        ratio_metrics1.set_total(40);
1232        assert_eq!("25% (10/40)", ratio_metrics1.to_string());
1233        let ratio_metrics2 =
1234            RatioMetrics::new().with_merge_strategy(RatioMergeStrategy::AddPartSetTotal);
1235        ratio_metrics2.set_part(20);
1236        ratio_metrics2.set_total(40);
1237        assert_eq!("50% (20/40)", ratio_metrics2.to_string());
1238
1239        ratio_metrics1.merge(&ratio_metrics2);
1240        assert_eq!("75% (30/40)", ratio_metrics1.to_string());
1241
1242        // Test SetPartAddTotal strategy
1243        let ratio_metrics1 =
1244            RatioMetrics::new().with_merge_strategy(RatioMergeStrategy::SetPartAddTotal);
1245        ratio_metrics1.set_part(20);
1246        ratio_metrics1.set_total(50);
1247        let ratio_metrics2 = RatioMetrics::new();
1248        ratio_metrics2.set_part(20);
1249        ratio_metrics2.set_total(50);
1250        ratio_metrics1.merge(&ratio_metrics2);
1251        assert_eq!("20% (20/100)", ratio_metrics1.to_string());
1252
1253        // Test AddPartAddTotal strategy (default)
1254        let ratio_metrics1 = RatioMetrics::new();
1255        ratio_metrics1.set_part(20);
1256        ratio_metrics1.set_total(50);
1257        let ratio_metrics2 = RatioMetrics::new();
1258        ratio_metrics2.set_part(20);
1259        ratio_metrics2.set_total(50);
1260        ratio_metrics1.merge(&ratio_metrics2);
1261        assert_eq!("40% (40/100)", ratio_metrics1.to_string());
1262    }
1263
1264    #[test]
1265    fn test_display_timestamp() {
1266        let timestamp = Timestamp::new();
1267        let values = vec![
1268            MetricValue::StartTimestamp(timestamp.clone()),
1269            MetricValue::EndTimestamp(timestamp.clone()),
1270        ];
1271
1272        // if time is not set, it should not be reported as zero
1273        for value in &values {
1274            assert_eq!("NONE", value.to_string(), "value {value:?}");
1275        }
1276
1277        timestamp.set(Utc.timestamp_nanos(1431648000000000));
1278        for value in &values {
1279            assert_eq!(
1280                "1970-01-17 13:40:48 UTC",
1281                value.to_string(),
1282                "value {value:?}"
1283            );
1284        }
1285    }
1286
1287    #[test]
1288    fn test_timer_with_custom_instant() {
1289        let time = Time::new();
1290        let start_time = Instant::now();
1291
1292        // Sleep a bit to ensure some time passes
1293        std::thread::sleep(Duration::from_millis(1));
1294
1295        // Create timer with the earlier start time
1296        let mut timer = time.timer_with(start_time);
1297
1298        // Sleep a bit more
1299        std::thread::sleep(Duration::from_millis(1));
1300
1301        // Stop the timer
1302        timer.stop();
1303
1304        // The recorded time should be at least 20ms (both sleeps)
1305        assert!(
1306            time.value() >= 2_000_000,
1307            "Expected at least 2ms, got {} ns",
1308            time.value()
1309        );
1310    }
1311
1312    #[test]
1313    fn test_stop_with_custom_endpoint() {
1314        let time = Time::new();
1315        let start = Instant::now();
1316        let mut timer = time.timer_with(start);
1317
1318        // Simulate exactly 10ms passing
1319        let end = start + Duration::from_millis(10);
1320
1321        // Stop with custom endpoint
1322        timer.stop_with(end);
1323
1324        // Should record exactly 10ms (10_000_000 nanoseconds)
1325        // Allow for small variations due to timer resolution
1326        let recorded = time.value();
1327        assert!(
1328            (10_000_000..=10_100_000).contains(&recorded),
1329            "Expected ~10ms, got {recorded} ns"
1330        );
1331
1332        // Calling stop_with again should not add more time
1333        timer.stop_with(end);
1334        assert_eq!(
1335            recorded,
1336            time.value(),
1337            "Time should not change after second stop"
1338        );
1339    }
1340
1341    #[test]
1342    fn test_done_with_custom_endpoint() {
1343        let time = Time::new();
1344        let start = Instant::now();
1345
1346        // Create a new scope for the timer
1347        {
1348            let timer = time.timer_with(start);
1349
1350            // Simulate 50ms passing
1351            let end = start + Duration::from_millis(5);
1352
1353            // Call done_with to stop and consume the timer
1354            timer.done_with(end);
1355
1356            // Timer is consumed, can't use it anymore
1357        }
1358
1359        // Should record exactly 5ms
1360        let recorded = time.value();
1361        assert!(
1362            (5_000_000..=5_100_000).contains(&recorded),
1363            "Expected ~5ms, got {recorded} ns",
1364        );
1365
1366        // Test that done_with prevents drop from recording time again
1367        {
1368            let timer2 = time.timer_with(start);
1369            let end2 = start + Duration::from_millis(5);
1370            timer2.done_with(end2);
1371            // drop happens here but should not record additional time
1372        }
1373
1374        // Should have added only 5ms more
1375        let new_recorded = time.value();
1376        assert!(
1377            (10_000_000..=10_100_000).contains(&new_recorded),
1378            "Expected ~10ms total, got {new_recorded} ns",
1379        );
1380    }
1381
1382    #[test]
1383    fn test_human_readable_metric_formatting() {
1384        // Test Count formatting with various sizes
1385        let small_count = Count::new();
1386        small_count.add(42);
1387        assert_eq!(
1388            MetricValue::OutputRows(small_count.clone()).to_string(),
1389            "42"
1390        );
1391
1392        let thousand_count = Count::new();
1393        thousand_count.add(10_100);
1394        assert_eq!(
1395            MetricValue::OutputRows(thousand_count.clone()).to_string(),
1396            "10.10 K"
1397        );
1398
1399        let million_count = Count::new();
1400        million_count.add(1_532_000);
1401        assert_eq!(
1402            MetricValue::SpilledRows(million_count.clone()).to_string(),
1403            "1.53 M"
1404        );
1405
1406        let billion_count = Count::new();
1407        billion_count.add(2_500_000_000);
1408        assert_eq!(
1409            MetricValue::OutputBatches(billion_count.clone()).to_string(),
1410            "2.50 B"
1411        );
1412
1413        // Test Time formatting with various durations
1414        let micros_time = Time::new();
1415        micros_time.add_duration(Duration::from_nanos(1_234));
1416        assert_eq!(
1417            MetricValue::ElapsedCompute(micros_time.clone()).to_string(),
1418            "1.23µs"
1419        );
1420
1421        let millis_time = Time::new();
1422        millis_time.add_duration(Duration::from_nanos(11_295_377));
1423        assert_eq!(
1424            MetricValue::ElapsedCompute(millis_time.clone()).to_string(),
1425            "11.30ms"
1426        );
1427
1428        let seconds_time = Time::new();
1429        seconds_time.add_duration(Duration::from_nanos(1_234_567_890));
1430        assert_eq!(
1431            MetricValue::ElapsedCompute(seconds_time.clone()).to_string(),
1432            "1.23s"
1433        );
1434
1435        // Test CurrentMemoryUsage formatting (should use size, not count)
1436        let mem_gauge = Gauge::new();
1437        mem_gauge.add(100 * MB as usize);
1438        assert_eq!(
1439            MetricValue::CurrentMemoryUsage(mem_gauge.clone()).to_string(),
1440            "100.0 MB"
1441        );
1442
1443        // Test custom Gauge formatting (should use count)
1444        let custom_gauge = Gauge::new();
1445        custom_gauge.add(50_000);
1446        assert_eq!(
1447            MetricValue::Gauge {
1448                name: "custom".into(),
1449                gauge: custom_gauge.clone()
1450            }
1451            .to_string(),
1452            "50.00 K"
1453        );
1454
1455        // Test PruningMetrics formatting
1456        let pruning = PruningMetrics::new();
1457        pruning.add_matched(500_000);
1458        pruning.add_pruned(500_000);
1459        assert_eq!(
1460            MetricValue::PruningMetrics {
1461                name: "test_pruning".into(),
1462                pruning_metrics: pruning.clone()
1463            }
1464            .to_string(),
1465            "1.00 M total → 500.0 K matched"
1466        );
1467
1468        // Test RatioMetrics formatting
1469        let ratio = RatioMetrics::new();
1470        ratio.add_part(250_000);
1471        ratio.add_total(1_000_000);
1472        assert_eq!(
1473            MetricValue::Ratio {
1474                name: "test_ratio".into(),
1475                ratio_metrics: ratio.clone()
1476            }
1477            .to_string(),
1478            "25% (250.0 K/1.00 M)"
1479        );
1480    }
1481}