Skip to main content

datafusion_physical_expr_common/metrics/
value.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Value representation of metrics
19
20use super::CustomMetricValue;
21use chrono::{DateTime, Utc};
22use datafusion_common::{
23    human_readable_count, human_readable_duration, human_readable_size, instant::Instant,
24};
25use parking_lot::Mutex;
26use std::{
27    borrow::{Borrow, Cow},
28    fmt::{Debug, Display},
29    sync::{
30        Arc,
31        atomic::{AtomicUsize, Ordering},
32    },
33    time::Duration,
34};
35
36/// A counter to record things such as number of input or output rows
37///
38/// Note `clone`ing counters update the same underlying metrics
39#[derive(Debug, Clone)]
40pub struct Count {
41    /// value of the metric counter
42    value: Arc<AtomicUsize>,
43}
44
45impl PartialEq for Count {
46    fn eq(&self, other: &Self) -> bool {
47        self.value().eq(&other.value())
48    }
49}
50
51impl Display for Count {
52    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
53        write!(f, "{}", human_readable_count(self.value()))
54    }
55}
56
57impl Default for Count {
58    fn default() -> Self {
59        Self::new()
60    }
61}
62
63impl Count {
64    /// create a new counter
65    pub fn new() -> Self {
66        Self {
67            value: Arc::new(AtomicUsize::new(0)),
68        }
69    }
70
71    /// Add `n` to the metric's value
72    pub fn add(&self, n: usize) {
73        // relaxed ordering for operations on `value` poses no issues
74        // we're purely using atomic ops with no associated memory ops
75        self.value.fetch_add(n, Ordering::Relaxed);
76    }
77
78    /// Get the current value
79    pub fn value(&self) -> usize {
80        self.value.load(Ordering::Relaxed)
81    }
82}
83
84/// A gauge is the simplest metrics type. It just returns a value.
85/// For example, you can easily expose current memory consumption with a gauge.
86///
87/// Note `clone`ing gauge update the same underlying metrics
88#[derive(Debug, Clone)]
89pub struct Gauge {
90    /// value of the metric gauge
91    value: Arc<AtomicUsize>,
92}
93
94impl PartialEq for Gauge {
95    fn eq(&self, other: &Self) -> bool {
96        self.value().eq(&other.value())
97    }
98}
99
100impl Display for Gauge {
101    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
102        write!(f, "{}", self.value())
103    }
104}
105
106impl Default for Gauge {
107    fn default() -> Self {
108        Self::new()
109    }
110}
111
112impl Gauge {
113    /// create a new gauge
114    pub fn new() -> Self {
115        Self {
116            value: Arc::new(AtomicUsize::new(0)),
117        }
118    }
119
120    /// Add `n` to the metric's value
121    pub fn add(&self, n: usize) {
122        // relaxed ordering for operations on `value` poses no issues
123        // we're purely using atomic ops with no associated memory ops
124        self.value.fetch_add(n, Ordering::Relaxed);
125    }
126
127    /// Sub `n` from the metric's value
128    pub fn sub(&self, n: usize) {
129        // relaxed ordering for operations on `value` poses no issues
130        // we're purely using atomic ops with no associated memory ops
131        self.value.fetch_sub(n, Ordering::Relaxed);
132    }
133
134    /// Set metric's value to maximum of `n` and current value
135    pub fn set_max(&self, n: usize) {
136        self.value.fetch_max(n, Ordering::Relaxed);
137    }
138
139    /// Set the metric's value to `n` and return the previous value
140    pub fn set(&self, n: usize) -> usize {
141        // relaxed ordering for operations on `value` poses no issues
142        // we're purely using atomic ops with no associated memory ops
143        self.value.swap(n, Ordering::Relaxed)
144    }
145
146    /// Get the current value
147    pub fn value(&self) -> usize {
148        self.value.load(Ordering::Relaxed)
149    }
150}
151
152/// Measure a potentially non contiguous duration of time
153#[derive(Debug, Clone)]
154pub struct Time {
155    /// elapsed time, in nanoseconds
156    nanos: Arc<AtomicUsize>,
157}
158
159impl Default for Time {
160    fn default() -> Self {
161        Self::new()
162    }
163}
164
165impl PartialEq for Time {
166    fn eq(&self, other: &Self) -> bool {
167        self.value().eq(&other.value())
168    }
169}
170
171impl Display for Time {
172    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
173        write!(f, "{}", human_readable_duration(self.value() as u64))
174    }
175}
176
177impl Time {
178    /// Create a new [`Time`] wrapper suitable for recording elapsed
179    /// times for operations.
180    pub fn new() -> Self {
181        Self {
182            nanos: Arc::new(AtomicUsize::new(0)),
183        }
184    }
185
186    /// Add elapsed nanoseconds since `start`to self
187    pub fn add_elapsed(&self, start: Instant) {
188        self.add_duration(start.elapsed());
189    }
190
191    /// Add duration of time to self
192    ///
193    /// Note: this will always increment the recorded time by at least 1 nanosecond
194    /// to distinguish between the scenario of no values recorded, in which
195    /// case the value will be 0, and no measurable amount of time having passed,
196    /// in which case the value will be small but not 0.
197    ///
198    /// This is based on the assumption that the timing logic in most cases is likely
199    /// to take at least a nanosecond, and so this is reasonable mechanism to avoid
200    /// ambiguity, especially on systems with low-resolution monotonic clocks
201    pub fn add_duration(&self, duration: Duration) {
202        let more_nanos = duration.as_nanos() as usize;
203        self.nanos.fetch_add(more_nanos.max(1), Ordering::Relaxed);
204    }
205
206    /// Add the number of nanoseconds of other `Time` to self
207    pub fn add(&self, other: &Time) {
208        self.add_duration(Duration::from_nanos(other.value() as u64))
209    }
210
211    /// return a scoped guard that adds the amount of time elapsed
212    /// between its creation and its drop or call to `stop` to the
213    /// underlying metric.
214    pub fn timer(&self) -> ScopedTimerGuard<'_> {
215        ScopedTimerGuard {
216            inner: self,
217            start: Some(Instant::now()),
218        }
219    }
220
221    /// Get the number of nanoseconds record by this Time metric
222    pub fn value(&self) -> usize {
223        self.nanos.load(Ordering::Relaxed)
224    }
225
226    /// Return a scoped guard that adds the amount of time elapsed between the
227    /// given instant and its drop (or the call to `stop`) to the underlying metric
228    pub fn timer_with(&self, now: Instant) -> ScopedTimerGuard<'_> {
229        ScopedTimerGuard {
230            inner: self,
231            start: Some(now),
232        }
233    }
234}
235
236/// Stores a single timestamp, stored as the number of nanoseconds
237/// elapsed from Jan 1, 1970 UTC
238#[derive(Debug, Clone)]
239pub struct Timestamp {
240    /// Time thing started
241    timestamp: Arc<Mutex<Option<DateTime<Utc>>>>,
242}
243
244impl Default for Timestamp {
245    fn default() -> Self {
246        Self::new()
247    }
248}
249
250impl Timestamp {
251    /// Create a new timestamp and sets its value to 0
252    pub fn new() -> Self {
253        Self {
254            timestamp: Arc::new(Mutex::new(None)),
255        }
256    }
257
258    /// Sets the timestamps value to the current time
259    pub fn record(&self) {
260        self.set(Utc::now())
261    }
262
263    /// Sets the timestamps value to a specified time
264    pub fn set(&self, now: DateTime<Utc>) {
265        *self.timestamp.lock() = Some(now);
266    }
267
268    /// return the timestamps value at the last time `record()` was
269    /// called.
270    ///
271    /// Returns `None` if `record()` has not been called
272    pub fn value(&self) -> Option<DateTime<Utc>> {
273        *self.timestamp.lock()
274    }
275
276    /// sets the value of this timestamp to the minimum of this and other
277    pub fn update_to_min(&self, other: &Timestamp) {
278        let min = match (self.value(), other.value()) {
279            (None, None) => None,
280            (Some(v), None) => Some(v),
281            (None, Some(v)) => Some(v),
282            (Some(v1), Some(v2)) => Some(if v1 < v2 { v1 } else { v2 }),
283        };
284
285        *self.timestamp.lock() = min;
286    }
287
288    /// sets the value of this timestamp to the maximum of this and other
289    pub fn update_to_max(&self, other: &Timestamp) {
290        let max = match (self.value(), other.value()) {
291            (None, None) => None,
292            (Some(v), None) => Some(v),
293            (None, Some(v)) => Some(v),
294            (Some(v1), Some(v2)) => Some(if v1 < v2 { v2 } else { v1 }),
295        };
296
297        *self.timestamp.lock() = max;
298    }
299}
300
301impl PartialEq for Timestamp {
302    fn eq(&self, other: &Self) -> bool {
303        self.value().eq(&other.value())
304    }
305}
306
307impl Display for Timestamp {
308    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
309        match self.value() {
310            None => write!(f, "NONE"),
311            Some(v) => {
312                write!(f, "{v}")
313            }
314        }
315    }
316}
317
318/// RAAI structure that adds all time between its construction and
319/// destruction to the CPU time or the first call to `stop` whichever
320/// comes first
321pub struct ScopedTimerGuard<'a> {
322    inner: &'a Time,
323    start: Option<Instant>,
324}
325
326impl ScopedTimerGuard<'_> {
327    /// Stop the timer timing and record the time taken
328    pub fn stop(&mut self) {
329        if let Some(start) = self.start.take() {
330            self.inner.add_elapsed(start)
331        }
332    }
333
334    /// Restarts the timer recording from the current time
335    pub fn restart(&mut self) {
336        self.start = Some(Instant::now())
337    }
338
339    /// Stop the timer, record the time taken and consume self
340    pub fn done(mut self) {
341        self.stop()
342    }
343
344    /// Stop the timer timing and record the time taken since the given endpoint.
345    pub fn stop_with(&mut self, end_time: Instant) {
346        if let Some(start) = self.start.take() {
347            let elapsed = end_time - start;
348            self.inner.add_duration(elapsed)
349        }
350    }
351
352    /// Stop the timer, record the time taken since `end_time` endpoint, and
353    /// consume self.
354    pub fn done_with(mut self, end_time: Instant) {
355        self.stop_with(end_time)
356    }
357}
358
359impl Drop for ScopedTimerGuard<'_> {
360    fn drop(&mut self) {
361        self.stop()
362    }
363}
364
365/// Counters tracking pruning metrics
366///
367/// For example, a file scanner initially is planned to scan 10 files, but skipped
368/// 8 of them using statistics, the pruning metrics would look like: 10 total -> 2 matched
369///
370/// Note `clone`ing update the same underlying metrics
371#[derive(Debug, Clone)]
372pub struct PruningMetrics {
373    pruned: Arc<AtomicUsize>,
374    matched: Arc<AtomicUsize>,
375    fully_matched: Arc<AtomicUsize>,
376}
377
378impl Display for PruningMetrics {
379    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
380        let matched = self.matched.load(Ordering::Relaxed);
381        let total = self.pruned.load(Ordering::Relaxed) + matched;
382        let fully_matched = self.fully_matched.load(Ordering::Relaxed);
383
384        if fully_matched != 0 {
385            write!(
386                f,
387                "{} total → {} matched -> {} fully matched",
388                human_readable_count(total),
389                human_readable_count(matched),
390                human_readable_count(fully_matched)
391            )
392        } else {
393            write!(
394                f,
395                "{} total → {} matched",
396                human_readable_count(total),
397                human_readable_count(matched)
398            )
399        }
400    }
401}
402
403impl Default for PruningMetrics {
404    fn default() -> Self {
405        Self::new()
406    }
407}
408
409impl PruningMetrics {
410    /// create a new PruningMetrics
411    pub fn new() -> Self {
412        Self {
413            pruned: Arc::new(AtomicUsize::new(0)),
414            matched: Arc::new(AtomicUsize::new(0)),
415            fully_matched: Arc::new(AtomicUsize::new(0)),
416        }
417    }
418
419    /// Add `n` to the metric's pruned value
420    pub fn add_pruned(&self, n: usize) {
421        // relaxed ordering for operations on `value` poses no issues
422        // we're purely using atomic ops with no associated memory ops
423        self.pruned.fetch_add(n, Ordering::Relaxed);
424    }
425
426    /// Add `n` to the metric's matched value
427    pub fn add_matched(&self, n: usize) {
428        // relaxed ordering for operations on `value` poses no issues
429        // we're purely using atomic ops with no associated memory ops
430        self.matched.fetch_add(n, Ordering::Relaxed);
431    }
432
433    /// Add `n` to the metric's fully matched value
434    pub fn add_fully_matched(&self, n: usize) {
435        // relaxed ordering for operations on `value` poses no issues
436        // we're purely using atomic ops with no associated memory ops
437        self.fully_matched.fetch_add(n, Ordering::Relaxed);
438    }
439
440    /// Subtract `n` to the metric's matched value.
441    pub fn subtract_matched(&self, n: usize) {
442        // relaxed ordering for operations on `value` poses no issues
443        // we're purely using atomic ops with no associated memory ops
444        self.matched.fetch_sub(n, Ordering::Relaxed);
445    }
446
447    /// Number of items pruned
448    pub fn pruned(&self) -> usize {
449        self.pruned.load(Ordering::Relaxed)
450    }
451
452    /// Number of items matched (not pruned)
453    pub fn matched(&self) -> usize {
454        self.matched.load(Ordering::Relaxed)
455    }
456
457    /// Number of items fully matched
458    pub fn fully_matched(&self) -> usize {
459        self.fully_matched.load(Ordering::Relaxed)
460    }
461}
462
463/// Counters tracking ratio metrics (e.g. matched vs total)
464///
465/// The counters are thread-safe and shared across clones.
466#[derive(Debug, Clone, Default)]
467pub struct RatioMetrics {
468    part: Arc<AtomicUsize>,
469    total: Arc<AtomicUsize>,
470    merge_strategy: RatioMergeStrategy,
471    /// Ratios are displayed as `1% (1/100)`; this controls the latter part.
472    display_raw_values: bool,
473}
474
475#[derive(Debug, Clone, Default)]
476pub enum RatioMergeStrategy {
477    #[default]
478    AddPartAddTotal,
479    AddPartSetTotal,
480    SetPartAddTotal,
481}
482
483impl RatioMetrics {
484    /// Create a new [`RatioMetrics`]
485    pub fn new() -> Self {
486        Self {
487            part: Arc::new(AtomicUsize::new(0)),
488            total: Arc::new(AtomicUsize::new(0)),
489            merge_strategy: RatioMergeStrategy::AddPartAddTotal,
490            display_raw_values: true,
491        }
492    }
493
494    pub fn with_merge_strategy(mut self, merge_strategy: RatioMergeStrategy) -> Self {
495        self.merge_strategy = merge_strategy;
496        self
497    }
498
499    pub fn with_display_raw_values(mut self, display_raw_values: bool) -> Self {
500        self.display_raw_values = display_raw_values;
501        self
502    }
503
504    /// Add `n` to the numerator (`part`) value
505    pub fn add_part(&self, n: usize) {
506        self.part.fetch_add(n, Ordering::Relaxed);
507    }
508
509    /// Add `n` to the denominator (`total`) value
510    pub fn add_total(&self, n: usize) {
511        self.total.fetch_add(n, Ordering::Relaxed);
512    }
513
514    /// Set the numerator (`part`) value to `n`, overwriting any existing value
515    pub fn set_part(&self, n: usize) {
516        self.part.store(n, Ordering::Relaxed);
517    }
518
519    /// Set the denominator (`total`) value to `n`, overwriting any existing value
520    pub fn set_total(&self, n: usize) {
521        self.total.store(n, Ordering::Relaxed);
522    }
523
524    /// Merge the value from `other` into `self`
525    pub fn merge(&self, other: &Self) {
526        match self.merge_strategy {
527            RatioMergeStrategy::AddPartAddTotal => {
528                self.add_part(other.part());
529                self.add_total(other.total());
530            }
531            RatioMergeStrategy::AddPartSetTotal => {
532                self.add_part(other.part());
533                self.set_total(other.total());
534            }
535            RatioMergeStrategy::SetPartAddTotal => {
536                self.set_part(other.part());
537                self.add_total(other.total());
538            }
539        }
540    }
541
542    /// Return the numerator (`part`) value
543    pub fn part(&self) -> usize {
544        self.part.load(Ordering::Relaxed)
545    }
546
547    /// Return the denominator (`total`) value
548    pub fn total(&self) -> usize {
549        self.total.load(Ordering::Relaxed)
550    }
551
552    /// Return the strategy used to merge two [`RatioMetrics`] values
553    pub fn merge_strategy(&self) -> &RatioMergeStrategy {
554        &self.merge_strategy
555    }
556
557    /// Whether `Display` for this metric appends the raw `(part/total)` numbers
558    /// alongside the percentage
559    pub fn display_raw_values(&self) -> bool {
560        self.display_raw_values
561    }
562}
563
564impl PartialEq for RatioMetrics {
565    fn eq(&self, other: &Self) -> bool {
566        self.part() == other.part()
567            && self.total() == other.total()
568            && self.display_raw_values == other.display_raw_values
569    }
570}
571
572impl Display for RatioMetrics {
573    /// Format the ratio to a format like '18.26% (220/1150)'
574    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
575        let part = self.part();
576        let total = self.total();
577
578        // Format the ratio first (for example, `6667/10000` -> `66.67%`),
579        // then optionally append the raw values as ` (6.67 K/10.00 K)`.
580        if total == 0 {
581            write!(f, "N/A")?;
582        } else {
583            // Use basis points so we can round with integer math:
584            // e.g. 18.26% has basis point 1826
585            let basis_points = (((part as u128 * 10_000) + (total as u128 / 2))
586                / total as u128) as usize;
587            let whole = basis_points / 100;
588            let fractional = basis_points % 100;
589
590            if fractional == 0 {
591                write!(f, "{whole}%")?;
592            } else if fractional.is_multiple_of(10) {
593                write!(f, "{whole}.{}%", fractional / 10)?;
594            } else {
595                write!(f, "{whole}.{fractional:02}%")?;
596            }
597        }
598
599        if !self.display_raw_values {
600            return Ok(());
601        }
602
603        if total == 0 {
604            if part == 0 {
605                write!(f, " (0/0)")
606            } else {
607                write!(f, " ({}/0)", human_readable_count(part))
608            }
609        } else {
610            write!(
611                f,
612                " ({}/{})",
613                human_readable_count(part),
614                human_readable_count(total)
615            )
616        }
617    }
618}
619
620/// Possible values for a [super::Metric].
621///
622/// Among other differences, the metric types have different ways to
623/// logically interpret their underlying values and some metrics are
624/// so common they are given special treatment.
625#[derive(Debug, Clone)]
626pub enum MetricValue {
627    /// Number of output rows produced: "output_rows" metric
628    OutputRows(Count),
629    /// Elapsed Compute Time: the wall clock time spent in "cpu
630    /// intensive" work.
631    ///
632    /// This measurement represents, roughly:
633    /// ```
634    /// use std::time::Instant;
635    /// let start = Instant::now();
636    /// // ...CPU intensive work here...
637    /// let elapsed_compute = (Instant::now() - start).as_nanos();
638    /// ```
639    ///
640    /// Note 1: Does *not* include time other operators spend
641    /// computing input.
642    ///
643    /// Note 2: *Does* includes time when the thread could have made
644    /// progress but the OS did not schedule it (e.g. due to CPU
645    /// contention), thus making this value different than the
646    /// classical definition of "cpu_time", which is the time reported
647    /// from `clock_gettime(CLOCK_THREAD_CPUTIME_ID, ..)`.
648    ElapsedCompute(Time),
649    /// Number of spills produced: "spill_count" metric
650    SpillCount(Count),
651    /// Total size of spilled bytes produced: "spilled_bytes" metric
652    SpilledBytes(Count),
653    /// Total size of output bytes produced: "output_bytes" metric
654    OutputBytes(Count),
655    /// Total number of output batches produced: "output_batches" metric
656    OutputBatches(Count),
657    /// Total size of spilled rows produced: "spilled_rows" metric
658    SpilledRows(Count),
659    /// Current memory used
660    CurrentMemoryUsage(Gauge),
661    /// Operator defined count.
662    Count {
663        /// The provided name of this metric
664        name: Cow<'static, str>,
665        /// The value of the metric
666        count: Count,
667    },
668    /// Operator defined gauge.
669    Gauge {
670        /// The provided name of this metric
671        name: Cow<'static, str>,
672        /// The value of the metric
673        gauge: Gauge,
674    },
675    /// Operator defined time
676    Time {
677        /// The provided name of this metric
678        name: Cow<'static, str>,
679        /// The value of the metric
680        time: Time,
681    },
682    /// The time at which execution started
683    StartTimestamp(Timestamp),
684    /// The time at which execution ended
685    EndTimestamp(Timestamp),
686    /// Metrics related to scan pruning
687    PruningMetrics {
688        name: Cow<'static, str>,
689        pruning_metrics: PruningMetrics,
690    },
691    /// Metrics that should be displayed as ratio like (42%)
692    Ratio {
693        name: Cow<'static, str>,
694        ratio_metrics: RatioMetrics,
695    },
696    Custom {
697        /// The provided name of this metric
698        name: Cow<'static, str>,
699        /// A custom implementation of the metric value.
700        value: Arc<dyn CustomMetricValue>,
701    },
702}
703
704// Manually implement PartialEq for `MetricValue` because it contains CustomMetricValue in its
705// definition which is a dyn trait. This wouldn't allow us to just derive PartialEq.
706impl PartialEq for MetricValue {
707    fn eq(&self, other: &Self) -> bool {
708        match (self, other) {
709            (MetricValue::OutputRows(count), MetricValue::OutputRows(other)) => {
710                count == other
711            }
712            (MetricValue::ElapsedCompute(time), MetricValue::ElapsedCompute(other)) => {
713                time == other
714            }
715            (MetricValue::SpillCount(count), MetricValue::SpillCount(other)) => {
716                count == other
717            }
718            (MetricValue::SpilledBytes(count), MetricValue::SpilledBytes(other)) => {
719                count == other
720            }
721            (MetricValue::OutputBytes(count), MetricValue::OutputBytes(other)) => {
722                count == other
723            }
724            (MetricValue::OutputBatches(count), MetricValue::OutputBatches(other)) => {
725                count == other
726            }
727            (MetricValue::SpilledRows(count), MetricValue::SpilledRows(other)) => {
728                count == other
729            }
730            (
731                MetricValue::CurrentMemoryUsage(gauge),
732                MetricValue::CurrentMemoryUsage(other),
733            ) => gauge == other,
734            (
735                MetricValue::Count { name, count },
736                MetricValue::Count {
737                    name: other_name,
738                    count: other_count,
739                },
740            ) => name == other_name && count == other_count,
741            (
742                MetricValue::Gauge { name, gauge },
743                MetricValue::Gauge {
744                    name: other_name,
745                    gauge: other_gauge,
746                },
747            ) => name == other_name && gauge == other_gauge,
748            (
749                MetricValue::Time { name, time },
750                MetricValue::Time {
751                    name: other_name,
752                    time: other_time,
753                },
754            ) => name == other_name && time == other_time,
755
756            (
757                MetricValue::StartTimestamp(timestamp),
758                MetricValue::StartTimestamp(other),
759            ) => timestamp == other,
760            (MetricValue::EndTimestamp(timestamp), MetricValue::EndTimestamp(other)) => {
761                timestamp == other
762            }
763            (
764                MetricValue::PruningMetrics {
765                    name,
766                    pruning_metrics,
767                },
768                MetricValue::PruningMetrics {
769                    name: other_name,
770                    pruning_metrics: other_pruning_metrics,
771                },
772            ) => {
773                name == other_name
774                    && pruning_metrics.pruned() == other_pruning_metrics.pruned()
775                    && pruning_metrics.matched() == other_pruning_metrics.matched()
776            }
777            (
778                MetricValue::Ratio {
779                    name,
780                    ratio_metrics,
781                },
782                MetricValue::Ratio {
783                    name: other_name,
784                    ratio_metrics: other_ratio_metrics,
785                },
786            ) => name == other_name && ratio_metrics == other_ratio_metrics,
787            (
788                MetricValue::Custom { name, value },
789                MetricValue::Custom {
790                    name: other_name,
791                    value: other_value,
792                },
793            ) => name == other_name && value.is_eq(other_value),
794            // Default case when the two sides do not have the same type.
795            _ => false,
796        }
797    }
798}
799
800impl MetricValue {
801    /// Return the name of this SQL metric
802    pub fn name(&self) -> &str {
803        match self {
804            Self::OutputRows(_) => "output_rows",
805            Self::SpillCount(_) => "spill_count",
806            Self::SpilledBytes(_) => "spilled_bytes",
807            Self::OutputBytes(_) => "output_bytes",
808            Self::OutputBatches(_) => "output_batches",
809            Self::SpilledRows(_) => "spilled_rows",
810            Self::CurrentMemoryUsage(_) => "mem_used",
811            Self::ElapsedCompute(_) => "elapsed_compute",
812            Self::Count { name, .. } => name.borrow(),
813            Self::Gauge { name, .. } => name.borrow(),
814            Self::Time { name, .. } => name.borrow(),
815            Self::StartTimestamp(_) => "start_timestamp",
816            Self::EndTimestamp(_) => "end_timestamp",
817            Self::PruningMetrics { name, .. } => name.borrow(),
818            Self::Ratio { name, .. } => name.borrow(),
819            Self::Custom { name, .. } => name.borrow(),
820        }
821    }
822
823    /// Return the value of the metric as a usize value, used to aggregate metric
824    /// value across partitions.
825    pub fn as_usize(&self) -> usize {
826        match self {
827            Self::OutputRows(count) => count.value(),
828            Self::SpillCount(count) => count.value(),
829            Self::SpilledBytes(bytes) => bytes.value(),
830            Self::OutputBytes(bytes) => bytes.value(),
831            Self::OutputBatches(count) => count.value(),
832            Self::SpilledRows(count) => count.value(),
833            Self::CurrentMemoryUsage(used) => used.value(),
834            Self::ElapsedCompute(time) => time.value(),
835            Self::Count { count, .. } => count.value(),
836            Self::Gauge { gauge, .. } => gauge.value(),
837            Self::Time { time, .. } => time.value(),
838            Self::StartTimestamp(timestamp) => timestamp
839                .value()
840                .and_then(|ts| ts.timestamp_nanos_opt())
841                .map(|nanos| nanos as usize)
842                .unwrap_or(0),
843            Self::EndTimestamp(timestamp) => timestamp
844                .value()
845                .and_then(|ts| ts.timestamp_nanos_opt())
846                .map(|nanos| nanos as usize)
847                .unwrap_or(0),
848            // This function is a utility for aggregating metrics, for complex metric
849            // like `PruningMetrics`, this function is not supposed to get called.
850            // Metrics aggregation for them are implemented inside `MetricsSet` directly.
851            Self::PruningMetrics { .. } => 0,
852            // Should not be used. See comments in `PruningMetrics` for details.
853            Self::Ratio { .. } => 0,
854            Self::Custom { value, .. } => value.as_usize(),
855        }
856    }
857
858    /// create a new MetricValue with the same type as `self` suitable
859    /// for accumulating
860    pub fn new_empty(&self) -> Self {
861        match self {
862            Self::OutputRows(_) => Self::OutputRows(Count::new()),
863            Self::SpillCount(_) => Self::SpillCount(Count::new()),
864            Self::SpilledBytes(_) => Self::SpilledBytes(Count::new()),
865            Self::OutputBytes(_) => Self::OutputBytes(Count::new()),
866            Self::OutputBatches(_) => Self::OutputBatches(Count::new()),
867            Self::SpilledRows(_) => Self::SpilledRows(Count::new()),
868            Self::CurrentMemoryUsage(_) => Self::CurrentMemoryUsage(Gauge::new()),
869            Self::ElapsedCompute(_) => Self::ElapsedCompute(Time::new()),
870            Self::Count { name, .. } => Self::Count {
871                name: name.clone(),
872                count: Count::new(),
873            },
874            Self::Gauge { name, .. } => Self::Gauge {
875                name: name.clone(),
876                gauge: Gauge::new(),
877            },
878            Self::Time { name, .. } => Self::Time {
879                name: name.clone(),
880                time: Time::new(),
881            },
882            Self::StartTimestamp(_) => Self::StartTimestamp(Timestamp::new()),
883            Self::EndTimestamp(_) => Self::EndTimestamp(Timestamp::new()),
884            Self::PruningMetrics { name, .. } => Self::PruningMetrics {
885                name: name.clone(),
886                pruning_metrics: PruningMetrics::new(),
887            },
888            Self::Ratio {
889                name,
890                ratio_metrics,
891            } => {
892                let merge_strategy = ratio_metrics.merge_strategy.clone();
893                Self::Ratio {
894                    name: name.clone(),
895                    ratio_metrics: RatioMetrics::new()
896                        .with_merge_strategy(merge_strategy)
897                        .with_display_raw_values(ratio_metrics.display_raw_values),
898                }
899            }
900            Self::Custom { name, value } => Self::Custom {
901                name: name.clone(),
902                value: value.new_empty(),
903            },
904        }
905    }
906
907    /// Aggregates the value of other to `self`. panic's if the types
908    /// are mismatched or aggregating does not make sense for this
909    /// value
910    ///
911    /// Note this is purposely marked `mut` (even though atomics are
912    /// used) so Rust's type system can be used to ensure the
913    /// appropriate API access. `MetricValues` should be modified
914    /// using the original [`Count`] or [`Time`] they were created
915    /// from.
916    pub fn aggregate(&mut self, other: &Self) {
917        match (self, other) {
918            (Self::OutputRows(count), Self::OutputRows(other_count))
919            | (Self::SpillCount(count), Self::SpillCount(other_count))
920            | (Self::SpilledBytes(count), Self::SpilledBytes(other_count))
921            | (Self::OutputBytes(count), Self::OutputBytes(other_count))
922            | (Self::OutputBatches(count), Self::OutputBatches(other_count))
923            | (Self::SpilledRows(count), Self::SpilledRows(other_count))
924            | (
925                Self::Count { count, .. },
926                Self::Count {
927                    count: other_count, ..
928                },
929            ) => count.add(other_count.value()),
930            (Self::CurrentMemoryUsage(gauge), Self::CurrentMemoryUsage(other_gauge))
931            | (
932                Self::Gauge { gauge, .. },
933                Self::Gauge {
934                    gauge: other_gauge, ..
935                },
936            ) => gauge.add(other_gauge.value()),
937            (Self::ElapsedCompute(time), Self::ElapsedCompute(other_time))
938            | (
939                Self::Time { time, .. },
940                Self::Time {
941                    time: other_time, ..
942                },
943            ) => time.add(other_time),
944            // timestamps are aggregated by min/max
945            (Self::StartTimestamp(timestamp), Self::StartTimestamp(other_timestamp)) => {
946                timestamp.update_to_min(other_timestamp);
947            }
948            // timestamps are aggregated by min/max
949            (Self::EndTimestamp(timestamp), Self::EndTimestamp(other_timestamp)) => {
950                timestamp.update_to_max(other_timestamp);
951            }
952            (
953                Self::PruningMetrics {
954                    pruning_metrics, ..
955                },
956                Self::PruningMetrics {
957                    pruning_metrics: other_pruning_metrics,
958                    ..
959                },
960            ) => {
961                let pruned = other_pruning_metrics.pruned.load(Ordering::Relaxed);
962                let matched = other_pruning_metrics.matched.load(Ordering::Relaxed);
963                let fully_matched =
964                    other_pruning_metrics.fully_matched.load(Ordering::Relaxed);
965                pruning_metrics.add_pruned(pruned);
966                pruning_metrics.add_matched(matched);
967                pruning_metrics.add_fully_matched(fully_matched);
968            }
969            (
970                Self::Ratio { ratio_metrics, .. },
971                Self::Ratio {
972                    ratio_metrics: other_ratio_metrics,
973                    ..
974                },
975            ) => {
976                ratio_metrics.merge(other_ratio_metrics);
977            }
978            (
979                Self::Custom { value, .. },
980                Self::Custom {
981                    value: other_value, ..
982                },
983            ) => {
984                value.aggregate(Arc::clone(other_value));
985            }
986            m @ (_, _) => {
987                panic!(
988                    "Mismatched metric types. Can not aggregate {:?} with value {:?}",
989                    m.0, m.1
990                )
991            }
992        }
993    }
994
995    /// Returns a number by which to sort metrics by display. Lower
996    /// numbers are "more useful" (and displayed first)
997    pub fn display_sort_key(&self) -> u8 {
998        match self {
999            // `BaselineMetrics` that is common for most operators
1000            Self::OutputRows(_) => 0,
1001            Self::ElapsedCompute(_) => 1,
1002            Self::OutputBytes(_) => 2,
1003            Self::OutputBatches(_) => 3,
1004            // Other metrics
1005            Self::PruningMetrics { name, .. } => match name.as_ref() {
1006                // The following metrics belong to `DataSourceExec` with a Parquet data source.
1007                // They are displayed in a specific order that reflects the actual pruning process,
1008                // from coarse-grained to fine-grained pruning levels.
1009                //
1010                // You may update these metrics as long as their relative order remains unchanged.
1011                //
1012                // Reference PR: <https://github.com/apache/datafusion/pull/18379>
1013                "files_ranges_pruned_statistics" => 4,
1014                "row_groups_pruned_statistics" => 5,
1015                "row_groups_pruned_bloom_filter" => 6,
1016                "page_index_pages_pruned" => 7,
1017                "page_index_rows_pruned" => 8,
1018                _ => 9,
1019            },
1020            Self::SpillCount(_) => 10,
1021            Self::SpilledBytes(_) => 11,
1022            Self::SpilledRows(_) => 12,
1023            Self::CurrentMemoryUsage(_) => 13,
1024            Self::Count { name, .. } => match name.as_ref() {
1025                // This Parquet page-index metric is a plain Count because it
1026                // records pages that skipped page-index evaluation, not a
1027                // pruned/matched pair. Keep it grouped with the other
1028                // page-index pruning metrics in EXPLAIN output.
1029                "page_index_pages_skipped_by_fully_matched" => 8,
1030                _ => 14,
1031            },
1032            Self::Gauge { .. } => 15,
1033            Self::Time { .. } => 16,
1034            Self::Ratio { .. } => 17,
1035            Self::StartTimestamp(_) => 18, // show timestamps last
1036            Self::EndTimestamp(_) => 19,
1037            Self::Custom { .. } => 20,
1038        }
1039    }
1040
1041    /// returns true if this metric has a timestamp value
1042    pub fn is_timestamp(&self) -> bool {
1043        matches!(self, Self::StartTimestamp(_) | Self::EndTimestamp(_))
1044    }
1045}
1046
1047impl Display for MetricValue {
1048    /// Prints the value of this metric
1049    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
1050        match self {
1051            Self::OutputRows(count)
1052            | Self::OutputBatches(count)
1053            | Self::SpillCount(count)
1054            | Self::SpilledRows(count)
1055            | Self::Count { count, .. } => {
1056                write!(f, "{count}")
1057            }
1058            Self::SpilledBytes(count) | Self::OutputBytes(count) => {
1059                let readable_count = human_readable_size(count.value());
1060                write!(f, "{readable_count}")
1061            }
1062            Self::CurrentMemoryUsage(gauge) => {
1063                // CurrentMemoryUsage is in bytes, format like SpilledBytes
1064                let readable_size = human_readable_size(gauge.value());
1065                write!(f, "{readable_size}")
1066            }
1067            Self::Gauge { gauge, .. } => {
1068                // Generic gauge metrics - format with human-readable count
1069                write!(f, "{}", human_readable_count(gauge.value()))
1070            }
1071            Self::ElapsedCompute(time) | Self::Time { time, .. } => {
1072                // distinguish between no time recorded and very small
1073                // amount of time recorded
1074                if time.value() > 0 {
1075                    write!(f, "{time}")
1076                } else {
1077                    write!(f, "NOT RECORDED")
1078                }
1079            }
1080            Self::StartTimestamp(timestamp) | Self::EndTimestamp(timestamp) => {
1081                write!(f, "{timestamp}")
1082            }
1083            Self::PruningMetrics {
1084                pruning_metrics, ..
1085            } => {
1086                write!(f, "{pruning_metrics}")
1087            }
1088            Self::Ratio { ratio_metrics, .. } => write!(f, "{ratio_metrics}"),
1089            Self::Custom { value, .. } => {
1090                write!(f, "{value}")
1091            }
1092        }
1093    }
1094}
1095
1096#[cfg(test)]
1097mod tests {
1098    use std::any::Any;
1099
1100    use chrono::TimeZone;
1101    use datafusion_common::units::MB;
1102
1103    use super::*;
1104
1105    #[derive(Debug, Default)]
1106    pub struct CustomCounter {
1107        count: AtomicUsize,
1108    }
1109
1110    impl Display for CustomCounter {
1111        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1112            write!(f, "count: {}", self.count.load(Ordering::Relaxed))
1113        }
1114    }
1115
1116    impl CustomMetricValue for CustomCounter {
1117        fn new_empty(&self) -> Arc<dyn CustomMetricValue> {
1118            Arc::new(CustomCounter::default())
1119        }
1120
1121        fn aggregate(&self, other: Arc<dyn CustomMetricValue + 'static>) {
1122            let other = other.as_any().downcast_ref::<Self>().unwrap();
1123            self.count
1124                .fetch_add(other.count.load(Ordering::Relaxed), Ordering::Relaxed);
1125        }
1126
1127        fn as_any(&self) -> &dyn Any {
1128            self
1129        }
1130
1131        fn is_eq(&self, other: &Arc<dyn CustomMetricValue>) -> bool {
1132            let Some(other) = other.as_any().downcast_ref::<Self>() else {
1133                return false;
1134            };
1135
1136            self.count.load(Ordering::Relaxed) == other.count.load(Ordering::Relaxed)
1137        }
1138    }
1139
1140    fn new_custom_counter(name: &'static str, value: usize) -> MetricValue {
1141        let custom_counter = CustomCounter::default();
1142        custom_counter.count.fetch_add(value, Ordering::Relaxed);
1143
1144        MetricValue::Custom {
1145            name: Cow::Borrowed(name),
1146            value: Arc::new(custom_counter),
1147        }
1148    }
1149
1150    #[test]
1151    fn test_custom_metric_with_mismatching_names() {
1152        let mut custom_val = new_custom_counter("Hi", 1);
1153        let other_custom_val = new_custom_counter("Hello", 1);
1154
1155        // Not equal since the name differs.
1156        assert!(other_custom_val != custom_val);
1157
1158        // Should work even though the name differs
1159        custom_val.aggregate(&other_custom_val);
1160
1161        let expected_val = new_custom_counter("Hi", 2);
1162        assert!(expected_val == custom_val);
1163    }
1164
1165    #[test]
1166    fn test_custom_metric() {
1167        let mut custom_val = new_custom_counter("hi", 11);
1168        let other_custom_val = new_custom_counter("hi", 20);
1169
1170        custom_val.aggregate(&other_custom_val);
1171
1172        assert!(custom_val != other_custom_val);
1173
1174        if let MetricValue::Custom { value, .. } = custom_val {
1175            let counter = value
1176                .as_any()
1177                .downcast_ref::<CustomCounter>()
1178                .expect("Expected CustomCounter");
1179            assert_eq!(counter.count.load(Ordering::Relaxed), 31);
1180        } else {
1181            panic!("Unexpected value");
1182        }
1183    }
1184
1185    #[test]
1186    fn test_display_custom_metric() {
1187        let custom_val = new_custom_counter("hi", 11);
1188        assert_eq!(custom_val.to_string(), "count: 11");
1189    }
1190
1191    #[test]
1192    fn test_display_output_rows() {
1193        let count = Count::new();
1194        let values = vec![
1195            MetricValue::OutputRows(count.clone()),
1196            MetricValue::Count {
1197                name: "my_counter".into(),
1198                count: count.clone(),
1199            },
1200        ];
1201
1202        for value in &values {
1203            assert_eq!("0", value.to_string(), "value {value:?}");
1204        }
1205
1206        count.add(42);
1207        for value in &values {
1208            assert_eq!("42", value.to_string(), "value {value:?}");
1209        }
1210    }
1211
1212    #[test]
1213    fn test_display_spilled_bytes() {
1214        let count = Count::new();
1215        let spilled_byte = MetricValue::SpilledBytes(count.clone());
1216
1217        assert_eq!("0.0 B", spilled_byte.to_string());
1218
1219        count.add((100 * MB) as usize);
1220        assert_eq!("100.0 MB", spilled_byte.to_string());
1221
1222        count.add((0.5 * MB as f64) as usize);
1223        assert_eq!("100.5 MB", spilled_byte.to_string());
1224    }
1225
1226    #[test]
1227    fn test_display_time() {
1228        let time = Time::new();
1229        let values = vec![
1230            MetricValue::ElapsedCompute(time.clone()),
1231            MetricValue::Time {
1232                name: "my_time".into(),
1233                time: time.clone(),
1234            },
1235        ];
1236
1237        // if time is not set, it should not be reported as zero
1238        for value in &values {
1239            assert_eq!("NOT RECORDED", value.to_string(), "value {value:?}");
1240        }
1241
1242        time.add_duration(Duration::from_nanos(1042));
1243        for value in &values {
1244            assert_eq!("1.04µs", value.to_string(), "value {value:?}");
1245        }
1246    }
1247
1248    #[test]
1249    fn test_display_ratio() {
1250        let ratio_metrics = RatioMetrics::new();
1251        let ratio = MetricValue::Ratio {
1252            name: Cow::Borrowed("ratio_metric"),
1253            ratio_metrics: ratio_metrics.clone(),
1254        };
1255
1256        assert_eq!("N/A (0/0)", ratio.to_string());
1257
1258        ratio_metrics.add_part(10);
1259        assert_eq!("N/A (10/0)", ratio.to_string());
1260
1261        ratio_metrics.add_total(40);
1262        assert_eq!("25% (10/40)", ratio.to_string());
1263
1264        let tiny_ratio_metrics = RatioMetrics::new();
1265        let tiny_ratio = MetricValue::Ratio {
1266            name: Cow::Borrowed("tiny_ratio_metric"),
1267            ratio_metrics: tiny_ratio_metrics.clone(),
1268        };
1269        tiny_ratio_metrics.add_part(1);
1270        tiny_ratio_metrics.add_total(3000);
1271        assert_eq!("0.03% (1/3.00 K)", tiny_ratio.to_string());
1272
1273        ratio_metrics.set_part(6667);
1274        ratio_metrics.set_total(10_000);
1275        assert_eq!("66.67% (6.67 K/10.00 K)", ratio.to_string());
1276
1277        let percentage_only = RatioMetrics::new().with_display_raw_values(false);
1278        let ratio = MetricValue::Ratio {
1279            name: Cow::Borrowed("percentage_only"),
1280            ratio_metrics: percentage_only.clone(),
1281        };
1282        assert_eq!("N/A", ratio.to_string());
1283        percentage_only.set_part(6667);
1284        percentage_only.set_total(10_000);
1285        assert_eq!("66.67%", ratio.to_string());
1286    }
1287
1288    #[test]
1289    fn test_ratio_set_methods() {
1290        let ratio_metrics = RatioMetrics::new();
1291
1292        // Ensure set methods don't increment
1293        ratio_metrics.set_part(10);
1294        ratio_metrics.set_part(10);
1295        ratio_metrics.set_total(40);
1296        ratio_metrics.set_total(40);
1297        assert_eq!("25% (10/40)", ratio_metrics.to_string());
1298
1299        let ratio_metrics = RatioMetrics::new();
1300
1301        // Calling set should change the value
1302        ratio_metrics.set_part(10);
1303        ratio_metrics.set_part(30);
1304        ratio_metrics.set_total(40);
1305        ratio_metrics.set_total(50);
1306        assert_eq!("60% (30/50)", ratio_metrics.to_string());
1307    }
1308
1309    #[test]
1310    fn test_ratio_merge_strategy() {
1311        // Test AddPartSetTotal strategy
1312        let ratio_metrics1 =
1313            RatioMetrics::new().with_merge_strategy(RatioMergeStrategy::AddPartSetTotal);
1314
1315        ratio_metrics1.set_part(10);
1316        ratio_metrics1.set_total(40);
1317        assert_eq!("25% (10/40)", ratio_metrics1.to_string());
1318        let ratio_metrics2 =
1319            RatioMetrics::new().with_merge_strategy(RatioMergeStrategy::AddPartSetTotal);
1320        ratio_metrics2.set_part(20);
1321        ratio_metrics2.set_total(40);
1322        assert_eq!("50% (20/40)", ratio_metrics2.to_string());
1323
1324        ratio_metrics1.merge(&ratio_metrics2);
1325        assert_eq!("75% (30/40)", ratio_metrics1.to_string());
1326
1327        // Test SetPartAddTotal strategy
1328        let ratio_metrics1 =
1329            RatioMetrics::new().with_merge_strategy(RatioMergeStrategy::SetPartAddTotal);
1330        ratio_metrics1.set_part(20);
1331        ratio_metrics1.set_total(50);
1332        let ratio_metrics2 = RatioMetrics::new();
1333        ratio_metrics2.set_part(20);
1334        ratio_metrics2.set_total(50);
1335        ratio_metrics1.merge(&ratio_metrics2);
1336        assert_eq!("20% (20/100)", ratio_metrics1.to_string());
1337
1338        // Test AddPartAddTotal strategy (default)
1339        let ratio_metrics1 = RatioMetrics::new();
1340        ratio_metrics1.set_part(20);
1341        ratio_metrics1.set_total(50);
1342        let ratio_metrics2 = RatioMetrics::new();
1343        ratio_metrics2.set_part(20);
1344        ratio_metrics2.set_total(50);
1345        ratio_metrics1.merge(&ratio_metrics2);
1346        assert_eq!("40% (40/100)", ratio_metrics1.to_string());
1347    }
1348
1349    #[test]
1350    fn test_display_timestamp() {
1351        let timestamp = Timestamp::new();
1352        let values = vec![
1353            MetricValue::StartTimestamp(timestamp.clone()),
1354            MetricValue::EndTimestamp(timestamp.clone()),
1355        ];
1356
1357        // if time is not set, it should not be reported as zero
1358        for value in &values {
1359            assert_eq!("NONE", value.to_string(), "value {value:?}");
1360        }
1361
1362        timestamp.set(Utc.timestamp_nanos(1431648000000000));
1363        for value in &values {
1364            assert_eq!(
1365                "1970-01-17 13:40:48 UTC",
1366                value.to_string(),
1367                "value {value:?}"
1368            );
1369        }
1370    }
1371
1372    #[test]
1373    fn test_timer_with_custom_instant() {
1374        let time = Time::new();
1375        let start_time = Instant::now();
1376
1377        // Sleep a bit to ensure some time passes
1378        std::thread::sleep(Duration::from_millis(1));
1379
1380        // Create timer with the earlier start time
1381        let mut timer = time.timer_with(start_time);
1382
1383        // Sleep a bit more
1384        std::thread::sleep(Duration::from_millis(1));
1385
1386        // Stop the timer
1387        timer.stop();
1388
1389        // The recorded time should be at least 20ms (both sleeps)
1390        assert!(
1391            time.value() >= 2_000_000,
1392            "Expected at least 2ms, got {} ns",
1393            time.value()
1394        );
1395    }
1396
1397    #[test]
1398    fn test_stop_with_custom_endpoint() {
1399        let time = Time::new();
1400        let start = Instant::now();
1401        let mut timer = time.timer_with(start);
1402
1403        // Simulate exactly 10ms passing
1404        let end = start + Duration::from_millis(10);
1405
1406        // Stop with custom endpoint
1407        timer.stop_with(end);
1408
1409        // Should record exactly 10ms (10_000_000 nanoseconds)
1410        // Allow for small variations due to timer resolution
1411        let recorded = time.value();
1412        assert!(
1413            (10_000_000..=10_100_000).contains(&recorded),
1414            "Expected ~10ms, got {recorded} ns"
1415        );
1416
1417        // Calling stop_with again should not add more time
1418        timer.stop_with(end);
1419        assert_eq!(
1420            recorded,
1421            time.value(),
1422            "Time should not change after second stop"
1423        );
1424    }
1425
1426    #[test]
1427    fn test_done_with_custom_endpoint() {
1428        let time = Time::new();
1429        let start = Instant::now();
1430
1431        // Create a new scope for the timer
1432        {
1433            let timer = time.timer_with(start);
1434
1435            // Simulate 50ms passing
1436            let end = start + Duration::from_millis(5);
1437
1438            // Call done_with to stop and consume the timer
1439            timer.done_with(end);
1440
1441            // Timer is consumed, can't use it anymore
1442        }
1443
1444        // Should record exactly 5ms
1445        let recorded = time.value();
1446        assert!(
1447            (5_000_000..=5_100_000).contains(&recorded),
1448            "Expected ~5ms, got {recorded} ns",
1449        );
1450
1451        // Test that done_with prevents drop from recording time again
1452        {
1453            let timer2 = time.timer_with(start);
1454            let end2 = start + Duration::from_millis(5);
1455            timer2.done_with(end2);
1456            // drop happens here but should not record additional time
1457        }
1458
1459        // Should have added only 5ms more
1460        let new_recorded = time.value();
1461        assert!(
1462            (10_000_000..=10_100_000).contains(&new_recorded),
1463            "Expected ~10ms total, got {new_recorded} ns",
1464        );
1465    }
1466
1467    #[test]
1468    fn test_human_readable_metric_formatting() {
1469        // Test Count formatting with various sizes
1470        let small_count = Count::new();
1471        small_count.add(42);
1472        assert_eq!(
1473            MetricValue::OutputRows(small_count.clone()).to_string(),
1474            "42"
1475        );
1476
1477        let thousand_count = Count::new();
1478        thousand_count.add(10_100);
1479        assert_eq!(
1480            MetricValue::OutputRows(thousand_count.clone()).to_string(),
1481            "10.10 K"
1482        );
1483
1484        let million_count = Count::new();
1485        million_count.add(1_532_000);
1486        assert_eq!(
1487            MetricValue::SpilledRows(million_count.clone()).to_string(),
1488            "1.53 M"
1489        );
1490
1491        let billion_count = Count::new();
1492        billion_count.add(2_500_000_000);
1493        assert_eq!(
1494            MetricValue::OutputBatches(billion_count.clone()).to_string(),
1495            "2.50 B"
1496        );
1497
1498        // Test Time formatting with various durations
1499        let micros_time = Time::new();
1500        micros_time.add_duration(Duration::from_nanos(1_234));
1501        assert_eq!(
1502            MetricValue::ElapsedCompute(micros_time.clone()).to_string(),
1503            "1.23µs"
1504        );
1505
1506        let millis_time = Time::new();
1507        millis_time.add_duration(Duration::from_nanos(11_295_377));
1508        assert_eq!(
1509            MetricValue::ElapsedCompute(millis_time.clone()).to_string(),
1510            "11.30ms"
1511        );
1512
1513        let seconds_time = Time::new();
1514        seconds_time.add_duration(Duration::from_nanos(1_234_567_890));
1515        assert_eq!(
1516            MetricValue::ElapsedCompute(seconds_time.clone()).to_string(),
1517            "1.23s"
1518        );
1519
1520        // Test CurrentMemoryUsage formatting (should use size, not count)
1521        let mem_gauge = Gauge::new();
1522        mem_gauge.add(100 * MB as usize);
1523        assert_eq!(
1524            MetricValue::CurrentMemoryUsage(mem_gauge.clone()).to_string(),
1525            "100.0 MB"
1526        );
1527
1528        // Test custom Gauge formatting (should use count)
1529        let custom_gauge = Gauge::new();
1530        custom_gauge.add(50_000);
1531        assert_eq!(
1532            MetricValue::Gauge {
1533                name: "custom".into(),
1534                gauge: custom_gauge.clone()
1535            }
1536            .to_string(),
1537            "50.00 K"
1538        );
1539
1540        // Test PruningMetrics formatting
1541        let pruning = PruningMetrics::new();
1542        pruning.add_matched(500_000);
1543        pruning.add_pruned(500_000);
1544        assert_eq!(
1545            MetricValue::PruningMetrics {
1546                name: "test_pruning".into(),
1547                pruning_metrics: pruning.clone()
1548            }
1549            .to_string(),
1550            "1.00 M total → 500.0 K matched"
1551        );
1552
1553        // Test RatioMetrics formatting
1554        let ratio = RatioMetrics::new();
1555        ratio.add_part(250_000);
1556        ratio.add_total(1_000_000);
1557        assert_eq!(
1558            MetricValue::Ratio {
1559                name: "test_ratio".into(),
1560                ratio_metrics: ratio.clone()
1561            }
1562            .to_string(),
1563            "25% (250.0 K/1.00 M)"
1564        );
1565    }
1566}