datafusion_physical_plan/metrics/
value.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Value representation of metrics
19
20use super::CustomMetricValue;
21use chrono::{DateTime, Utc};
22use datafusion_common::instant::Instant;
23use datafusion_execution::memory_pool::human_readable_size;
24use parking_lot::Mutex;
25use std::{
26    borrow::{Borrow, Cow},
27    fmt::{Debug, Display},
28    sync::{
29        atomic::{AtomicUsize, Ordering},
30        Arc,
31    },
32    time::Duration,
33};
34
35/// A counter to record things such as number of input or output rows
36///
37/// Note `clone`ing counters update the same underlying metrics
38#[derive(Debug, Clone)]
39pub struct Count {
40    /// value of the metric counter
41    value: Arc<AtomicUsize>,
42}
43
44impl PartialEq for Count {
45    fn eq(&self, other: &Self) -> bool {
46        self.value().eq(&other.value())
47    }
48}
49
50impl Display for Count {
51    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
52        write!(f, "{}", self.value())
53    }
54}
55
56impl Default for Count {
57    fn default() -> Self {
58        Self::new()
59    }
60}
61
62impl Count {
63    /// create a new counter
64    pub fn new() -> Self {
65        Self {
66            value: Arc::new(AtomicUsize::new(0)),
67        }
68    }
69
70    /// Add `n` to the metric's value
71    pub fn add(&self, n: usize) {
72        // relaxed ordering for operations on `value` poses no issues
73        // we're purely using atomic ops with no associated memory ops
74        self.value.fetch_add(n, Ordering::Relaxed);
75    }
76
77    /// Get the current value
78    pub fn value(&self) -> usize {
79        self.value.load(Ordering::Relaxed)
80    }
81}
82
83/// A gauge is the simplest metrics type. It just returns a value.
84/// For example, you can easily expose current memory consumption with a gauge.
85///
86/// Note `clone`ing gauge update the same underlying metrics
87#[derive(Debug, Clone)]
88pub struct Gauge {
89    /// value of the metric gauge
90    value: Arc<AtomicUsize>,
91}
92
93impl PartialEq for Gauge {
94    fn eq(&self, other: &Self) -> bool {
95        self.value().eq(&other.value())
96    }
97}
98
99impl Display for Gauge {
100    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
101        write!(f, "{}", self.value())
102    }
103}
104
105impl Default for Gauge {
106    fn default() -> Self {
107        Self::new()
108    }
109}
110
111impl Gauge {
112    /// create a new gauge
113    pub fn new() -> Self {
114        Self {
115            value: Arc::new(AtomicUsize::new(0)),
116        }
117    }
118
119    /// Add `n` to the metric's value
120    pub fn add(&self, n: usize) {
121        // relaxed ordering for operations on `value` poses no issues
122        // we're purely using atomic ops with no associated memory ops
123        self.value.fetch_add(n, Ordering::Relaxed);
124    }
125
126    /// Sub `n` from the metric's value
127    pub fn sub(&self, n: usize) {
128        // relaxed ordering for operations on `value` poses no issues
129        // we're purely using atomic ops with no associated memory ops
130        self.value.fetch_sub(n, Ordering::Relaxed);
131    }
132
133    /// Set metric's value to maximum of `n` and current value
134    pub fn set_max(&self, n: usize) {
135        self.value.fetch_max(n, Ordering::Relaxed);
136    }
137
138    /// Set the metric's value to `n` and return the previous value
139    pub fn set(&self, n: usize) -> usize {
140        // relaxed ordering for operations on `value` poses no issues
141        // we're purely using atomic ops with no associated memory ops
142        self.value.swap(n, Ordering::Relaxed)
143    }
144
145    /// Get the current value
146    pub fn value(&self) -> usize {
147        self.value.load(Ordering::Relaxed)
148    }
149}
150
151/// Measure a potentially non contiguous duration of time
152#[derive(Debug, Clone)]
153pub struct Time {
154    /// elapsed time, in nanoseconds
155    nanos: Arc<AtomicUsize>,
156}
157
158impl Default for Time {
159    fn default() -> Self {
160        Self::new()
161    }
162}
163
164impl PartialEq for Time {
165    fn eq(&self, other: &Self) -> bool {
166        self.value().eq(&other.value())
167    }
168}
169
170impl Display for Time {
171    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
172        let duration = Duration::from_nanos(self.value() as u64);
173        write!(f, "{duration:?}")
174    }
175}
176
177impl Time {
178    /// Create a new [`Time`] wrapper suitable for recording elapsed
179    /// times for operations.
180    pub fn new() -> Self {
181        Self {
182            nanos: Arc::new(AtomicUsize::new(0)),
183        }
184    }
185
186    /// Add elapsed nanoseconds since `start`to self
187    pub fn add_elapsed(&self, start: Instant) {
188        self.add_duration(start.elapsed());
189    }
190
191    /// Add duration of time to self
192    ///
193    /// Note: this will always increment the recorded time by at least 1 nanosecond
194    /// to distinguish between the scenario of no values recorded, in which
195    /// case the value will be 0, and no measurable amount of time having passed,
196    /// in which case the value will be small but not 0.
197    ///
198    /// This is based on the assumption that the timing logic in most cases is likely
199    /// to take at least a nanosecond, and so this is reasonable mechanism to avoid
200    /// ambiguity, especially on systems with low-resolution monotonic clocks
201    pub fn add_duration(&self, duration: Duration) {
202        let more_nanos = duration.as_nanos() as usize;
203        self.nanos.fetch_add(more_nanos.max(1), Ordering::Relaxed);
204    }
205
206    /// Add the number of nanoseconds of other `Time` to self
207    pub fn add(&self, other: &Time) {
208        self.add_duration(Duration::from_nanos(other.value() as u64))
209    }
210
211    /// return a scoped guard that adds the amount of time elapsed
212    /// between its creation and its drop or call to `stop` to the
213    /// underlying metric.
214    pub fn timer(&self) -> ScopedTimerGuard<'_> {
215        ScopedTimerGuard {
216            inner: self,
217            start: Some(Instant::now()),
218        }
219    }
220
221    /// Get the number of nanoseconds record by this Time metric
222    pub fn value(&self) -> usize {
223        self.nanos.load(Ordering::Relaxed)
224    }
225
226    /// Return a scoped guard that adds the amount of time elapsed between the
227    /// given instant and its drop (or the call to `stop`) to the underlying metric
228    pub fn timer_with(&self, now: Instant) -> ScopedTimerGuard<'_> {
229        ScopedTimerGuard {
230            inner: self,
231            start: Some(now),
232        }
233    }
234}
235
236/// Stores a single timestamp, stored as the number of nanoseconds
237/// elapsed from Jan 1, 1970 UTC
238#[derive(Debug, Clone)]
239pub struct Timestamp {
240    /// Time thing started
241    timestamp: Arc<Mutex<Option<DateTime<Utc>>>>,
242}
243
244impl Default for Timestamp {
245    fn default() -> Self {
246        Self::new()
247    }
248}
249
250impl Timestamp {
251    /// Create a new timestamp and sets its value to 0
252    pub fn new() -> Self {
253        Self {
254            timestamp: Arc::new(Mutex::new(None)),
255        }
256    }
257
258    /// Sets the timestamps value to the current time
259    pub fn record(&self) {
260        self.set(Utc::now())
261    }
262
263    /// Sets the timestamps value to a specified time
264    pub fn set(&self, now: DateTime<Utc>) {
265        *self.timestamp.lock() = Some(now);
266    }
267
268    /// return the timestamps value at the last time `record()` was
269    /// called.
270    ///
271    /// Returns `None` if `record()` has not been called
272    pub fn value(&self) -> Option<DateTime<Utc>> {
273        *self.timestamp.lock()
274    }
275
276    /// sets the value of this timestamp to the minimum of this and other
277    pub fn update_to_min(&self, other: &Timestamp) {
278        let min = match (self.value(), other.value()) {
279            (None, None) => None,
280            (Some(v), None) => Some(v),
281            (None, Some(v)) => Some(v),
282            (Some(v1), Some(v2)) => Some(if v1 < v2 { v1 } else { v2 }),
283        };
284
285        *self.timestamp.lock() = min;
286    }
287
288    /// sets the value of this timestamp to the maximum of this and other
289    pub fn update_to_max(&self, other: &Timestamp) {
290        let max = match (self.value(), other.value()) {
291            (None, None) => None,
292            (Some(v), None) => Some(v),
293            (None, Some(v)) => Some(v),
294            (Some(v1), Some(v2)) => Some(if v1 < v2 { v2 } else { v1 }),
295        };
296
297        *self.timestamp.lock() = max;
298    }
299}
300
301impl PartialEq for Timestamp {
302    fn eq(&self, other: &Self) -> bool {
303        self.value().eq(&other.value())
304    }
305}
306
307impl Display for Timestamp {
308    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
309        match self.value() {
310            None => write!(f, "NONE"),
311            Some(v) => {
312                write!(f, "{v}")
313            }
314        }
315    }
316}
317
318/// RAAI structure that adds all time between its construction and
319/// destruction to the CPU time or the first call to `stop` whichever
320/// comes first
321pub struct ScopedTimerGuard<'a> {
322    inner: &'a Time,
323    start: Option<Instant>,
324}
325
326impl ScopedTimerGuard<'_> {
327    /// Stop the timer timing and record the time taken
328    pub fn stop(&mut self) {
329        if let Some(start) = self.start.take() {
330            self.inner.add_elapsed(start)
331        }
332    }
333
334    /// Restarts the timer recording from the current time
335    pub fn restart(&mut self) {
336        self.start = Some(Instant::now())
337    }
338
339    /// Stop the timer, record the time taken and consume self
340    pub fn done(mut self) {
341        self.stop()
342    }
343
344    /// Stop the timer timing and record the time taken since the given endpoint.
345    pub fn stop_with(&mut self, end_time: Instant) {
346        if let Some(start) = self.start.take() {
347            let elapsed = end_time - start;
348            self.inner.add_duration(elapsed)
349        }
350    }
351
352    /// Stop the timer, record the time taken since `end_time` endpoint, and
353    /// consume self.
354    pub fn done_with(mut self, end_time: Instant) {
355        self.stop_with(end_time)
356    }
357}
358
359impl Drop for ScopedTimerGuard<'_> {
360    fn drop(&mut self) {
361        self.stop()
362    }
363}
364
365/// Counters tracking pruning metrics
366///
367/// For example, a file scanner initially is planned to scan 10 files, but skipped
368/// 8 of them using statistics, the pruning metrics would look like: 10 total -> 2 matched
369///
370/// Note `clone`ing update the same underlying metrics
371#[derive(Debug, Clone)]
372pub struct PruningMetrics {
373    pruned: Arc<AtomicUsize>,
374    matched: Arc<AtomicUsize>,
375}
376
377impl Display for PruningMetrics {
378    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
379        let matched = self.matched.load(Ordering::Relaxed);
380        let total = self.pruned.load(Ordering::Relaxed) + matched;
381
382        write!(f, "{total} total → {matched} matched")
383    }
384}
385
386impl Default for PruningMetrics {
387    fn default() -> Self {
388        Self::new()
389    }
390}
391
392impl PruningMetrics {
393    /// create a new PruningMetrics
394    pub fn new() -> Self {
395        Self {
396            pruned: Arc::new(AtomicUsize::new(0)),
397            matched: Arc::new(AtomicUsize::new(0)),
398        }
399    }
400
401    /// Add `n` to the metric's pruned value
402    pub fn add_pruned(&self, n: usize) {
403        // relaxed ordering for operations on `value` poses no issues
404        // we're purely using atomic ops with no associated memory ops
405        self.pruned.fetch_add(n, Ordering::Relaxed);
406    }
407
408    /// Add `n` to the metric's matched value
409    pub fn add_matched(&self, n: usize) {
410        // relaxed ordering for operations on `value` poses no issues
411        // we're purely using atomic ops with no associated memory ops
412        self.matched.fetch_add(n, Ordering::Relaxed);
413    }
414
415    /// Subtract `n` to the metric's matched value.
416    pub fn subtract_matched(&self, n: usize) {
417        // relaxed ordering for operations on `value` poses no issues
418        // we're purely using atomic ops with no associated memory ops
419        self.matched.fetch_sub(n, Ordering::Relaxed);
420    }
421
422    /// Number of items pruned
423    pub fn pruned(&self) -> usize {
424        self.pruned.load(Ordering::Relaxed)
425    }
426
427    /// Number of items matched (not pruned)
428    pub fn matched(&self) -> usize {
429        self.matched.load(Ordering::Relaxed)
430    }
431}
432
433/// Counters tracking ratio metrics (e.g. matched vs total)
434///
435/// The counters are thread-safe and shared across clones.
436#[derive(Debug, Clone, Default)]
437pub struct RatioMetrics {
438    part: Arc<AtomicUsize>,
439    total: Arc<AtomicUsize>,
440}
441
442impl RatioMetrics {
443    /// Create a new [`RatioMetrics`]
444    pub fn new() -> Self {
445        Self {
446            part: Arc::new(AtomicUsize::new(0)),
447            total: Arc::new(AtomicUsize::new(0)),
448        }
449    }
450
451    /// Add `n` to the numerator (`part`) value
452    pub fn add_part(&self, n: usize) {
453        self.part.fetch_add(n, Ordering::Relaxed);
454    }
455
456    /// Add `n` to the denominator (`total`) value
457    pub fn add_total(&self, n: usize) {
458        self.total.fetch_add(n, Ordering::Relaxed);
459    }
460
461    /// Merge the value from `other` into `self`
462    pub fn merge(&self, other: &Self) {
463        self.add_part(other.part());
464        self.add_total(other.total());
465    }
466
467    /// Return the numerator (`part`) value
468    pub fn part(&self) -> usize {
469        self.part.load(Ordering::Relaxed)
470    }
471
472    /// Return the denominator (`total`) value
473    pub fn total(&self) -> usize {
474        self.total.load(Ordering::Relaxed)
475    }
476}
477
478impl PartialEq for RatioMetrics {
479    fn eq(&self, other: &Self) -> bool {
480        self.part() == other.part() && self.total() == other.total()
481    }
482}
483
484/// Format a float number with `digits` most significant numbers.
485///
486/// fmt_significant(12.5) -> "12"
487/// fmt_significant(0.0543) -> "0.054"
488/// fmt_significant(0.000123) -> "0.00012"
489fn fmt_significant(mut x: f64, digits: usize) -> String {
490    if x == 0.0 {
491        return "0".to_string();
492    }
493
494    let exp = x.abs().log10().floor(); // exponent of first significant digit
495    let scale = 10f64.powf(-(exp - (digits as f64 - 1.0)));
496    x = (x * scale).round() / scale; // round to N significant digits
497    format!("{x}")
498}
499
500impl Display for RatioMetrics {
501    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
502        let part = self.part();
503        let total = self.total();
504
505        if total == 0 {
506            if part == 0 {
507                write!(f, "N/A (0/0)")
508            } else {
509                write!(f, "N/A ({part}/0)")
510            }
511        } else {
512            let percentage = (part as f64 / total as f64) * 100.0;
513
514            write!(f, "{}% ({part}/{total})", fmt_significant(percentage, 2))
515        }
516    }
517}
518
519/// Possible values for a [super::Metric].
520///
521/// Among other differences, the metric types have different ways to
522/// logically interpret their underlying values and some metrics are
523/// so common they are given special treatment.
524#[derive(Debug, Clone)]
525pub enum MetricValue {
526    /// Number of output rows produced: "output_rows" metric
527    OutputRows(Count),
528    /// Elapsed Compute Time: the wall clock time spent in "cpu
529    /// intensive" work.
530    ///
531    /// This measurement represents, roughly:
532    /// ```
533    /// use std::time::Instant;
534    /// let start = Instant::now();
535    /// // ...CPU intensive work here...
536    /// let elapsed_compute = (Instant::now() - start).as_nanos();
537    /// ```
538    ///
539    /// Note 1: Does *not* include time other operators spend
540    /// computing input.
541    ///
542    /// Note 2: *Does* includes time when the thread could have made
543    /// progress but the OS did not schedule it (e.g. due to CPU
544    /// contention), thus making this value different than the
545    /// classical definition of "cpu_time", which is the time reported
546    /// from `clock_gettime(CLOCK_THREAD_CPUTIME_ID, ..)`.
547    ElapsedCompute(Time),
548    /// Number of spills produced: "spill_count" metric
549    SpillCount(Count),
550    /// Total size of spilled bytes produced: "spilled_bytes" metric
551    SpilledBytes(Count),
552    /// Total size of output bytes produced: "output_bytes" metric
553    OutputBytes(Count),
554    /// Total size of spilled rows produced: "spilled_rows" metric
555    SpilledRows(Count),
556    /// Current memory used
557    CurrentMemoryUsage(Gauge),
558    /// Operator defined count.
559    Count {
560        /// The provided name of this metric
561        name: Cow<'static, str>,
562        /// The value of the metric
563        count: Count,
564    },
565    /// Operator defined gauge.
566    Gauge {
567        /// The provided name of this metric
568        name: Cow<'static, str>,
569        /// The value of the metric
570        gauge: Gauge,
571    },
572    /// Operator defined time
573    Time {
574        /// The provided name of this metric
575        name: Cow<'static, str>,
576        /// The value of the metric
577        time: Time,
578    },
579    /// The time at which execution started
580    StartTimestamp(Timestamp),
581    /// The time at which execution ended
582    EndTimestamp(Timestamp),
583    /// Metrics related to scan pruning
584    PruningMetrics {
585        name: Cow<'static, str>,
586        pruning_metrics: PruningMetrics,
587    },
588    /// Metrics that should be displayed as ratio like (42%)
589    Ratio {
590        name: Cow<'static, str>,
591        ratio_metrics: RatioMetrics,
592    },
593    Custom {
594        /// The provided name of this metric
595        name: Cow<'static, str>,
596        /// A custom implementation of the metric value.
597        value: Arc<dyn CustomMetricValue>,
598    },
599}
600
601// Manually implement PartialEq for `MetricValue` because it contains CustomMetricValue in its
602// definition which is a dyn trait. This wouldn't allow us to just derive PartialEq.
603impl PartialEq for MetricValue {
604    fn eq(&self, other: &Self) -> bool {
605        match (self, other) {
606            (MetricValue::OutputRows(count), MetricValue::OutputRows(other)) => {
607                count == other
608            }
609            (MetricValue::ElapsedCompute(time), MetricValue::ElapsedCompute(other)) => {
610                time == other
611            }
612            (MetricValue::SpillCount(count), MetricValue::SpillCount(other)) => {
613                count == other
614            }
615            (MetricValue::SpilledBytes(count), MetricValue::SpilledBytes(other)) => {
616                count == other
617            }
618            (MetricValue::OutputBytes(count), MetricValue::OutputBytes(other)) => {
619                count == other
620            }
621            (MetricValue::SpilledRows(count), MetricValue::SpilledRows(other)) => {
622                count == other
623            }
624            (
625                MetricValue::CurrentMemoryUsage(gauge),
626                MetricValue::CurrentMemoryUsage(other),
627            ) => gauge == other,
628            (
629                MetricValue::Count { name, count },
630                MetricValue::Count {
631                    name: other_name,
632                    count: other_count,
633                },
634            ) => name == other_name && count == other_count,
635            (
636                MetricValue::Gauge { name, gauge },
637                MetricValue::Gauge {
638                    name: other_name,
639                    gauge: other_gauge,
640                },
641            ) => name == other_name && gauge == other_gauge,
642            (
643                MetricValue::Time { name, time },
644                MetricValue::Time {
645                    name: other_name,
646                    time: other_time,
647                },
648            ) => name == other_name && time == other_time,
649
650            (
651                MetricValue::StartTimestamp(timestamp),
652                MetricValue::StartTimestamp(other),
653            ) => timestamp == other,
654            (MetricValue::EndTimestamp(timestamp), MetricValue::EndTimestamp(other)) => {
655                timestamp == other
656            }
657            (
658                MetricValue::PruningMetrics {
659                    name,
660                    pruning_metrics,
661                },
662                MetricValue::PruningMetrics {
663                    name: other_name,
664                    pruning_metrics: other_pruning_metrics,
665                },
666            ) => {
667                name == other_name
668                    && pruning_metrics.pruned() == other_pruning_metrics.pruned()
669                    && pruning_metrics.matched() == other_pruning_metrics.matched()
670            }
671            (
672                MetricValue::Ratio {
673                    name,
674                    ratio_metrics,
675                },
676                MetricValue::Ratio {
677                    name: other_name,
678                    ratio_metrics: other_ratio_metrics,
679                },
680            ) => name == other_name && ratio_metrics == other_ratio_metrics,
681            (
682                MetricValue::Custom { name, value },
683                MetricValue::Custom {
684                    name: other_name,
685                    value: other_value,
686                },
687            ) => name == other_name && value.is_eq(other_value),
688            // Default case when the two sides do not have the same type.
689            _ => false,
690        }
691    }
692}
693
694impl MetricValue {
695    /// Return the name of this SQL metric
696    pub fn name(&self) -> &str {
697        match self {
698            Self::OutputRows(_) => "output_rows",
699            Self::SpillCount(_) => "spill_count",
700            Self::SpilledBytes(_) => "spilled_bytes",
701            Self::OutputBytes(_) => "output_bytes",
702            Self::SpilledRows(_) => "spilled_rows",
703            Self::CurrentMemoryUsage(_) => "mem_used",
704            Self::ElapsedCompute(_) => "elapsed_compute",
705            Self::Count { name, .. } => name.borrow(),
706            Self::Gauge { name, .. } => name.borrow(),
707            Self::Time { name, .. } => name.borrow(),
708            Self::StartTimestamp(_) => "start_timestamp",
709            Self::EndTimestamp(_) => "end_timestamp",
710            Self::PruningMetrics { name, .. } => name.borrow(),
711            Self::Ratio { name, .. } => name.borrow(),
712            Self::Custom { name, .. } => name.borrow(),
713        }
714    }
715
716    /// Return the value of the metric as a usize value, used to aggregate metric
717    /// value across partitions.
718    pub fn as_usize(&self) -> usize {
719        match self {
720            Self::OutputRows(count) => count.value(),
721            Self::SpillCount(count) => count.value(),
722            Self::SpilledBytes(bytes) => bytes.value(),
723            Self::OutputBytes(bytes) => bytes.value(),
724            Self::SpilledRows(count) => count.value(),
725            Self::CurrentMemoryUsage(used) => used.value(),
726            Self::ElapsedCompute(time) => time.value(),
727            Self::Count { count, .. } => count.value(),
728            Self::Gauge { gauge, .. } => gauge.value(),
729            Self::Time { time, .. } => time.value(),
730            Self::StartTimestamp(timestamp) => timestamp
731                .value()
732                .and_then(|ts| ts.timestamp_nanos_opt())
733                .map(|nanos| nanos as usize)
734                .unwrap_or(0),
735            Self::EndTimestamp(timestamp) => timestamp
736                .value()
737                .and_then(|ts| ts.timestamp_nanos_opt())
738                .map(|nanos| nanos as usize)
739                .unwrap_or(0),
740            // This function is a utility for aggregating metrics, for complex metric
741            // like `PruningMetrics`, this function is not supposed to get called.
742            // Metrics aggregation for them are implemented inside `MetricsSet` directly.
743            Self::PruningMetrics { .. } => 0,
744            // Should not be used. See comments in `PruningMetrics` for details.
745            Self::Ratio { .. } => 0,
746            Self::Custom { value, .. } => value.as_usize(),
747        }
748    }
749
750    /// create a new MetricValue with the same type as `self` suitable
751    /// for accumulating
752    pub fn new_empty(&self) -> Self {
753        match self {
754            Self::OutputRows(_) => Self::OutputRows(Count::new()),
755            Self::SpillCount(_) => Self::SpillCount(Count::new()),
756            Self::SpilledBytes(_) => Self::SpilledBytes(Count::new()),
757            Self::OutputBytes(_) => Self::OutputBytes(Count::new()),
758            Self::SpilledRows(_) => Self::SpilledRows(Count::new()),
759            Self::CurrentMemoryUsage(_) => Self::CurrentMemoryUsage(Gauge::new()),
760            Self::ElapsedCompute(_) => Self::ElapsedCompute(Time::new()),
761            Self::Count { name, .. } => Self::Count {
762                name: name.clone(),
763                count: Count::new(),
764            },
765            Self::Gauge { name, .. } => Self::Gauge {
766                name: name.clone(),
767                gauge: Gauge::new(),
768            },
769            Self::Time { name, .. } => Self::Time {
770                name: name.clone(),
771                time: Time::new(),
772            },
773            Self::StartTimestamp(_) => Self::StartTimestamp(Timestamp::new()),
774            Self::EndTimestamp(_) => Self::EndTimestamp(Timestamp::new()),
775            Self::PruningMetrics { name, .. } => Self::PruningMetrics {
776                name: name.clone(),
777                pruning_metrics: PruningMetrics::new(),
778            },
779            Self::Ratio { name, .. } => Self::Ratio {
780                name: name.clone(),
781                ratio_metrics: RatioMetrics::new(),
782            },
783            Self::Custom { name, value } => Self::Custom {
784                name: name.clone(),
785                value: value.new_empty(),
786            },
787        }
788    }
789
790    /// Aggregates the value of other to `self`. panic's if the types
791    /// are mismatched or aggregating does not make sense for this
792    /// value
793    ///
794    /// Note this is purposely marked `mut` (even though atomics are
795    /// used) so Rust's type system can be used to ensure the
796    /// appropriate API access. `MetricValues` should be modified
797    /// using the original [`Count`] or [`Time`] they were created
798    /// from.
799    pub fn aggregate(&mut self, other: &Self) {
800        match (self, other) {
801            (Self::OutputRows(count), Self::OutputRows(other_count))
802            | (Self::SpillCount(count), Self::SpillCount(other_count))
803            | (Self::SpilledBytes(count), Self::SpilledBytes(other_count))
804            | (Self::OutputBytes(count), Self::OutputBytes(other_count))
805            | (Self::SpilledRows(count), Self::SpilledRows(other_count))
806            | (
807                Self::Count { count, .. },
808                Self::Count {
809                    count: other_count, ..
810                },
811            ) => count.add(other_count.value()),
812            (Self::CurrentMemoryUsage(gauge), Self::CurrentMemoryUsage(other_gauge))
813            | (
814                Self::Gauge { gauge, .. },
815                Self::Gauge {
816                    gauge: other_gauge, ..
817                },
818            ) => gauge.add(other_gauge.value()),
819            (Self::ElapsedCompute(time), Self::ElapsedCompute(other_time))
820            | (
821                Self::Time { time, .. },
822                Self::Time {
823                    time: other_time, ..
824                },
825            ) => time.add(other_time),
826            // timestamps are aggregated by min/max
827            (Self::StartTimestamp(timestamp), Self::StartTimestamp(other_timestamp)) => {
828                timestamp.update_to_min(other_timestamp);
829            }
830            // timestamps are aggregated by min/max
831            (Self::EndTimestamp(timestamp), Self::EndTimestamp(other_timestamp)) => {
832                timestamp.update_to_max(other_timestamp);
833            }
834            (
835                Self::PruningMetrics {
836                    pruning_metrics, ..
837                },
838                Self::PruningMetrics {
839                    pruning_metrics: other_pruning_metrics,
840                    ..
841                },
842            ) => {
843                let pruned = other_pruning_metrics.pruned.load(Ordering::Relaxed);
844                let matched = other_pruning_metrics.matched.load(Ordering::Relaxed);
845                pruning_metrics.add_pruned(pruned);
846                pruning_metrics.add_matched(matched);
847            }
848            (
849                Self::Ratio { ratio_metrics, .. },
850                Self::Ratio {
851                    ratio_metrics: other_ratio_metrics,
852                    ..
853                },
854            ) => {
855                ratio_metrics.merge(other_ratio_metrics);
856            }
857            (
858                Self::Custom { value, .. },
859                Self::Custom {
860                    value: other_value, ..
861                },
862            ) => {
863                value.aggregate(Arc::clone(other_value));
864            }
865            m @ (_, _) => {
866                panic!(
867                    "Mismatched metric types. Can not aggregate {:?} with value {:?}",
868                    m.0, m.1
869                )
870            }
871        }
872    }
873
874    /// Returns a number by which to sort metrics by display. Lower
875    /// numbers are "more useful" (and displayed first)
876    pub fn display_sort_key(&self) -> u8 {
877        match self {
878            // `BaselineMetrics` that is common for most operators
879            Self::OutputRows(_) => 0,
880            Self::ElapsedCompute(_) => 1,
881            Self::OutputBytes(_) => 2,
882            // Other metrics
883            Self::PruningMetrics { name, .. } => match name.as_ref() {
884                // The following metrics belong to `DataSourceExec` with a Parquet data source.
885                // They are displayed in a specific order that reflects the actual pruning process,
886                // from coarse-grained to fine-grained pruning levels.
887                //
888                // You may update these metrics as long as their relative order remains unchanged.
889                //
890                // Reference PR: <https://github.com/apache/datafusion/pull/18379>
891                "files_ranges_pruned_statistics" => 3,
892                "row_groups_pruned_statistics" => 4,
893                "row_groups_pruned_bloom_filter" => 5,
894                "page_index_rows_pruned" => 6,
895                _ => 7,
896            },
897            Self::SpillCount(_) => 8,
898            Self::SpilledBytes(_) => 9,
899            Self::SpilledRows(_) => 10,
900            Self::CurrentMemoryUsage(_) => 11,
901            Self::Count { .. } => 12,
902            Self::Gauge { .. } => 13,
903            Self::Time { .. } => 14,
904            Self::Ratio { .. } => 15,
905            Self::StartTimestamp(_) => 16, // show timestamps last
906            Self::EndTimestamp(_) => 17,
907            Self::Custom { .. } => 18,
908        }
909    }
910
911    /// returns true if this metric has a timestamp value
912    pub fn is_timestamp(&self) -> bool {
913        matches!(self, Self::StartTimestamp(_) | Self::EndTimestamp(_))
914    }
915}
916
917impl Display for MetricValue {
918    /// Prints the value of this metric
919    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
920        match self {
921            Self::OutputRows(count)
922            | Self::SpillCount(count)
923            | Self::SpilledRows(count)
924            | Self::Count { count, .. } => {
925                write!(f, "{count}")
926            }
927            Self::SpilledBytes(count) | Self::OutputBytes(count) => {
928                let readable_count = human_readable_size(count.value());
929                write!(f, "{readable_count}")
930            }
931            Self::CurrentMemoryUsage(gauge) | Self::Gauge { gauge, .. } => {
932                write!(f, "{gauge}")
933            }
934            Self::ElapsedCompute(time) | Self::Time { time, .. } => {
935                // distinguish between no time recorded and very small
936                // amount of time recorded
937                if time.value() > 0 {
938                    write!(f, "{time}")
939                } else {
940                    write!(f, "NOT RECORDED")
941                }
942            }
943            Self::StartTimestamp(timestamp) | Self::EndTimestamp(timestamp) => {
944                write!(f, "{timestamp}")
945            }
946            Self::PruningMetrics {
947                pruning_metrics, ..
948            } => {
949                write!(f, "{pruning_metrics}")
950            }
951            Self::Ratio { ratio_metrics, .. } => write!(f, "{ratio_metrics}"),
952            Self::Custom { name, value } => {
953                write!(f, "name:{name} {value}")
954            }
955        }
956    }
957}
958
959#[cfg(test)]
960mod tests {
961    use std::any::Any;
962
963    use chrono::TimeZone;
964    use datafusion_execution::memory_pool::units::MB;
965
966    use super::*;
967
968    #[derive(Debug, Default)]
969    pub struct CustomCounter {
970        count: AtomicUsize,
971    }
972
973    impl Display for CustomCounter {
974        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
975            write!(f, "count: {}", self.count.load(Ordering::Relaxed))
976        }
977    }
978
979    impl CustomMetricValue for CustomCounter {
980        fn new_empty(&self) -> Arc<dyn CustomMetricValue> {
981            Arc::new(CustomCounter::default())
982        }
983
984        fn aggregate(&self, other: Arc<dyn CustomMetricValue + 'static>) {
985            let other = other.as_any().downcast_ref::<Self>().unwrap();
986            self.count
987                .fetch_add(other.count.load(Ordering::Relaxed), Ordering::Relaxed);
988        }
989
990        fn as_any(&self) -> &dyn Any {
991            self
992        }
993
994        fn is_eq(&self, other: &Arc<dyn CustomMetricValue>) -> bool {
995            let Some(other) = other.as_any().downcast_ref::<Self>() else {
996                return false;
997            };
998
999            self.count.load(Ordering::Relaxed) == other.count.load(Ordering::Relaxed)
1000        }
1001    }
1002
1003    fn new_custom_counter(name: &'static str, value: usize) -> MetricValue {
1004        let custom_counter = CustomCounter::default();
1005        custom_counter.count.fetch_add(value, Ordering::Relaxed);
1006        let custom_val = MetricValue::Custom {
1007            name: Cow::Borrowed(name),
1008            value: Arc::new(custom_counter),
1009        };
1010
1011        custom_val
1012    }
1013
1014    #[test]
1015    fn test_custom_metric_with_mismatching_names() {
1016        let mut custom_val = new_custom_counter("Hi", 1);
1017        let other_custom_val = new_custom_counter("Hello", 1);
1018
1019        // Not equal since the name differs.
1020        assert!(other_custom_val != custom_val);
1021
1022        // Should work even though the name differs
1023        custom_val.aggregate(&other_custom_val);
1024
1025        let expected_val = new_custom_counter("Hi", 2);
1026        assert!(expected_val == custom_val);
1027    }
1028
1029    #[test]
1030    fn test_custom_metric() {
1031        let mut custom_val = new_custom_counter("hi", 11);
1032        let other_custom_val = new_custom_counter("hi", 20);
1033
1034        custom_val.aggregate(&other_custom_val);
1035
1036        assert!(custom_val != other_custom_val);
1037
1038        if let MetricValue::Custom { value, .. } = custom_val {
1039            let counter = value
1040                .as_any()
1041                .downcast_ref::<CustomCounter>()
1042                .expect("Expected CustomCounter");
1043            assert_eq!(counter.count.load(Ordering::Relaxed), 31);
1044        } else {
1045            panic!("Unexpected value");
1046        }
1047    }
1048
1049    #[test]
1050    fn test_display_output_rows() {
1051        let count = Count::new();
1052        let values = vec![
1053            MetricValue::OutputRows(count.clone()),
1054            MetricValue::Count {
1055                name: "my_counter".into(),
1056                count: count.clone(),
1057            },
1058        ];
1059
1060        for value in &values {
1061            assert_eq!("0", value.to_string(), "value {value:?}");
1062        }
1063
1064        count.add(42);
1065        for value in &values {
1066            assert_eq!("42", value.to_string(), "value {value:?}");
1067        }
1068    }
1069
1070    #[test]
1071    fn test_display_spilled_bytes() {
1072        let count = Count::new();
1073        let spilled_byte = MetricValue::SpilledBytes(count.clone());
1074
1075        assert_eq!("0.0 B", spilled_byte.to_string());
1076
1077        count.add((100 * MB) as usize);
1078        assert_eq!("100.0 MB", spilled_byte.to_string());
1079
1080        count.add((0.5 * MB as f64) as usize);
1081        assert_eq!("100.5 MB", spilled_byte.to_string());
1082    }
1083
1084    #[test]
1085    fn test_display_time() {
1086        let time = Time::new();
1087        let values = vec![
1088            MetricValue::ElapsedCompute(time.clone()),
1089            MetricValue::Time {
1090                name: "my_time".into(),
1091                time: time.clone(),
1092            },
1093        ];
1094
1095        // if time is not set, it should not be reported as zero
1096        for value in &values {
1097            assert_eq!("NOT RECORDED", value.to_string(), "value {value:?}");
1098        }
1099
1100        time.add_duration(Duration::from_nanos(1042));
1101        for value in &values {
1102            assert_eq!("1.042µs", value.to_string(), "value {value:?}");
1103        }
1104    }
1105
1106    #[test]
1107    fn test_display_ratio() {
1108        let ratio_metrics = RatioMetrics::new();
1109        let ratio = MetricValue::Ratio {
1110            name: Cow::Borrowed("ratio_metric"),
1111            ratio_metrics: ratio_metrics.clone(),
1112        };
1113
1114        assert_eq!("N/A (0/0)", ratio.to_string());
1115
1116        ratio_metrics.add_part(10);
1117        assert_eq!("N/A (10/0)", ratio.to_string());
1118
1119        ratio_metrics.add_total(40);
1120        assert_eq!("25% (10/40)", ratio.to_string());
1121
1122        let tiny_ratio_metrics = RatioMetrics::new();
1123        let tiny_ratio = MetricValue::Ratio {
1124            name: Cow::Borrowed("tiny_ratio_metric"),
1125            ratio_metrics: tiny_ratio_metrics.clone(),
1126        };
1127        tiny_ratio_metrics.add_part(1);
1128        tiny_ratio_metrics.add_total(3000);
1129        assert_eq!("0.033% (1/3000)", tiny_ratio.to_string());
1130    }
1131
1132    #[test]
1133    fn test_display_timestamp() {
1134        let timestamp = Timestamp::new();
1135        let values = vec![
1136            MetricValue::StartTimestamp(timestamp.clone()),
1137            MetricValue::EndTimestamp(timestamp.clone()),
1138        ];
1139
1140        // if time is not set, it should not be reported as zero
1141        for value in &values {
1142            assert_eq!("NONE", value.to_string(), "value {value:?}");
1143        }
1144
1145        timestamp.set(Utc.timestamp_nanos(1431648000000000));
1146        for value in &values {
1147            assert_eq!(
1148                "1970-01-17 13:40:48 UTC",
1149                value.to_string(),
1150                "value {value:?}"
1151            );
1152        }
1153    }
1154
1155    #[test]
1156    fn test_timer_with_custom_instant() {
1157        let time = Time::new();
1158        let start_time = Instant::now();
1159
1160        // Sleep a bit to ensure some time passes
1161        std::thread::sleep(Duration::from_millis(1));
1162
1163        // Create timer with the earlier start time
1164        let mut timer = time.timer_with(start_time);
1165
1166        // Sleep a bit more
1167        std::thread::sleep(Duration::from_millis(1));
1168
1169        // Stop the timer
1170        timer.stop();
1171
1172        // The recorded time should be at least 20ms (both sleeps)
1173        assert!(
1174            time.value() >= 2_000_000,
1175            "Expected at least 2ms, got {} ns",
1176            time.value()
1177        );
1178    }
1179
1180    #[test]
1181    fn test_stop_with_custom_endpoint() {
1182        let time = Time::new();
1183        let start = Instant::now();
1184        let mut timer = time.timer_with(start);
1185
1186        // Simulate exactly 10ms passing
1187        let end = start + Duration::from_millis(10);
1188
1189        // Stop with custom endpoint
1190        timer.stop_with(end);
1191
1192        // Should record exactly 10ms (10_000_000 nanoseconds)
1193        // Allow for small variations due to timer resolution
1194        let recorded = time.value();
1195        assert!(
1196            (10_000_000..=10_100_000).contains(&recorded),
1197            "Expected ~10ms, got {recorded} ns"
1198        );
1199
1200        // Calling stop_with again should not add more time
1201        timer.stop_with(end);
1202        assert_eq!(
1203            recorded,
1204            time.value(),
1205            "Time should not change after second stop"
1206        );
1207    }
1208
1209    #[test]
1210    fn test_done_with_custom_endpoint() {
1211        let time = Time::new();
1212        let start = Instant::now();
1213
1214        // Create a new scope for the timer
1215        {
1216            let timer = time.timer_with(start);
1217
1218            // Simulate 50ms passing
1219            let end = start + Duration::from_millis(5);
1220
1221            // Call done_with to stop and consume the timer
1222            timer.done_with(end);
1223
1224            // Timer is consumed, can't use it anymore
1225        }
1226
1227        // Should record exactly 5ms
1228        let recorded = time.value();
1229        assert!(
1230            (5_000_000..=5_100_000).contains(&recorded),
1231            "Expected ~5ms, got {recorded} ns",
1232        );
1233
1234        // Test that done_with prevents drop from recording time again
1235        {
1236            let timer2 = time.timer_with(start);
1237            let end2 = start + Duration::from_millis(5);
1238            timer2.done_with(end2);
1239            // drop happens here but should not record additional time
1240        }
1241
1242        // Should have added only 5ms more
1243        let new_recorded = time.value();
1244        assert!(
1245            (10_000_000..=10_100_000).contains(&new_recorded),
1246            "Expected ~10ms total, got {new_recorded} ns",
1247        );
1248    }
1249}