datafusion_physical_plan/metrics/
value.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Value representation of metrics
19
20use super::CustomMetricValue;
21use chrono::{DateTime, Utc};
22use datafusion_common::instant::Instant;
23use datafusion_execution::memory_pool::human_readable_size;
24use parking_lot::Mutex;
25use std::{
26    borrow::{Borrow, Cow},
27    fmt::{Debug, Display},
28    sync::{
29        atomic::{AtomicUsize, Ordering},
30        Arc,
31    },
32    time::Duration,
33};
34
35/// A counter to record things such as number of input or output rows
36///
37/// Note `clone`ing counters update the same underlying metrics
38#[derive(Debug, Clone)]
39pub struct Count {
40    /// value of the metric counter
41    value: Arc<AtomicUsize>,
42}
43
44impl PartialEq for Count {
45    fn eq(&self, other: &Self) -> bool {
46        self.value().eq(&other.value())
47    }
48}
49
50impl Display for Count {
51    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
52        write!(f, "{}", self.value())
53    }
54}
55
56impl Default for Count {
57    fn default() -> Self {
58        Self::new()
59    }
60}
61
62impl Count {
63    /// create a new counter
64    pub fn new() -> Self {
65        Self {
66            value: Arc::new(AtomicUsize::new(0)),
67        }
68    }
69
70    /// Add `n` to the metric's value
71    pub fn add(&self, n: usize) {
72        // relaxed ordering for operations on `value` poses no issues
73        // we're purely using atomic ops with no associated memory ops
74        self.value.fetch_add(n, Ordering::Relaxed);
75    }
76
77    /// Get the current value
78    pub fn value(&self) -> usize {
79        self.value.load(Ordering::Relaxed)
80    }
81}
82
83/// A gauge is the simplest metrics type. It just returns a value.
84/// For example, you can easily expose current memory consumption with a gauge.
85///
86/// Note `clone`ing gauge update the same underlying metrics
87#[derive(Debug, Clone)]
88pub struct Gauge {
89    /// value of the metric gauge
90    value: Arc<AtomicUsize>,
91}
92
93impl PartialEq for Gauge {
94    fn eq(&self, other: &Self) -> bool {
95        self.value().eq(&other.value())
96    }
97}
98
99impl Display for Gauge {
100    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
101        write!(f, "{}", self.value())
102    }
103}
104
105impl Default for Gauge {
106    fn default() -> Self {
107        Self::new()
108    }
109}
110
111impl Gauge {
112    /// create a new gauge
113    pub fn new() -> Self {
114        Self {
115            value: Arc::new(AtomicUsize::new(0)),
116        }
117    }
118
119    /// Add `n` to the metric's value
120    pub fn add(&self, n: usize) {
121        // relaxed ordering for operations on `value` poses no issues
122        // we're purely using atomic ops with no associated memory ops
123        self.value.fetch_add(n, Ordering::Relaxed);
124    }
125
126    /// Sub `n` from the metric's value
127    pub fn sub(&self, n: usize) {
128        // relaxed ordering for operations on `value` poses no issues
129        // we're purely using atomic ops with no associated memory ops
130        self.value.fetch_sub(n, Ordering::Relaxed);
131    }
132
133    /// Set metric's value to maximum of `n` and current value
134    pub fn set_max(&self, n: usize) {
135        self.value.fetch_max(n, Ordering::Relaxed);
136    }
137
138    /// Set the metric's value to `n` and return the previous value
139    pub fn set(&self, n: usize) -> usize {
140        // relaxed ordering for operations on `value` poses no issues
141        // we're purely using atomic ops with no associated memory ops
142        self.value.swap(n, Ordering::Relaxed)
143    }
144
145    /// Get the current value
146    pub fn value(&self) -> usize {
147        self.value.load(Ordering::Relaxed)
148    }
149}
150
151/// Measure a potentially non contiguous duration of time
152#[derive(Debug, Clone)]
153pub struct Time {
154    /// elapsed time, in nanoseconds
155    nanos: Arc<AtomicUsize>,
156}
157
158impl Default for Time {
159    fn default() -> Self {
160        Self::new()
161    }
162}
163
164impl PartialEq for Time {
165    fn eq(&self, other: &Self) -> bool {
166        self.value().eq(&other.value())
167    }
168}
169
170impl Display for Time {
171    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
172        let duration = Duration::from_nanos(self.value() as u64);
173        write!(f, "{duration:?}")
174    }
175}
176
177impl Time {
178    /// Create a new [`Time`] wrapper suitable for recording elapsed
179    /// times for operations.
180    pub fn new() -> Self {
181        Self {
182            nanos: Arc::new(AtomicUsize::new(0)),
183        }
184    }
185
186    /// Add elapsed nanoseconds since `start`to self
187    pub fn add_elapsed(&self, start: Instant) {
188        self.add_duration(start.elapsed());
189    }
190
191    /// Add duration of time to self
192    ///
193    /// Note: this will always increment the recorded time by at least 1 nanosecond
194    /// to distinguish between the scenario of no values recorded, in which
195    /// case the value will be 0, and no measurable amount of time having passed,
196    /// in which case the value will be small but not 0.
197    ///
198    /// This is based on the assumption that the timing logic in most cases is likely
199    /// to take at least a nanosecond, and so this is reasonable mechanism to avoid
200    /// ambiguity, especially on systems with low-resolution monotonic clocks
201    pub fn add_duration(&self, duration: Duration) {
202        let more_nanos = duration.as_nanos() as usize;
203        self.nanos.fetch_add(more_nanos.max(1), Ordering::Relaxed);
204    }
205
206    /// Add the number of nanoseconds of other `Time` to self
207    pub fn add(&self, other: &Time) {
208        self.add_duration(Duration::from_nanos(other.value() as u64))
209    }
210
211    /// return a scoped guard that adds the amount of time elapsed
212    /// between its creation and its drop or call to `stop` to the
213    /// underlying metric.
214    pub fn timer(&self) -> ScopedTimerGuard<'_> {
215        ScopedTimerGuard {
216            inner: self,
217            start: Some(Instant::now()),
218        }
219    }
220
221    /// Get the number of nanoseconds record by this Time metric
222    pub fn value(&self) -> usize {
223        self.nanos.load(Ordering::Relaxed)
224    }
225
226    /// Return a scoped guard that adds the amount of time elapsed between the
227    /// given instant and its drop (or the call to `stop`) to the underlying metric
228    pub fn timer_with(&self, now: Instant) -> ScopedTimerGuard<'_> {
229        ScopedTimerGuard {
230            inner: self,
231            start: Some(now),
232        }
233    }
234}
235
236/// Stores a single timestamp, stored as the number of nanoseconds
237/// elapsed from Jan 1, 1970 UTC
238#[derive(Debug, Clone)]
239pub struct Timestamp {
240    /// Time thing started
241    timestamp: Arc<Mutex<Option<DateTime<Utc>>>>,
242}
243
244impl Default for Timestamp {
245    fn default() -> Self {
246        Self::new()
247    }
248}
249
250impl Timestamp {
251    /// Create a new timestamp and sets its value to 0
252    pub fn new() -> Self {
253        Self {
254            timestamp: Arc::new(Mutex::new(None)),
255        }
256    }
257
258    /// Sets the timestamps value to the current time
259    pub fn record(&self) {
260        self.set(Utc::now())
261    }
262
263    /// Sets the timestamps value to a specified time
264    pub fn set(&self, now: DateTime<Utc>) {
265        *self.timestamp.lock() = Some(now);
266    }
267
268    /// return the timestamps value at the last time `record()` was
269    /// called.
270    ///
271    /// Returns `None` if `record()` has not been called
272    pub fn value(&self) -> Option<DateTime<Utc>> {
273        *self.timestamp.lock()
274    }
275
276    /// sets the value of this timestamp to the minimum of this and other
277    pub fn update_to_min(&self, other: &Timestamp) {
278        let min = match (self.value(), other.value()) {
279            (None, None) => None,
280            (Some(v), None) => Some(v),
281            (None, Some(v)) => Some(v),
282            (Some(v1), Some(v2)) => Some(if v1 < v2 { v1 } else { v2 }),
283        };
284
285        *self.timestamp.lock() = min;
286    }
287
288    /// sets the value of this timestamp to the maximum of this and other
289    pub fn update_to_max(&self, other: &Timestamp) {
290        let max = match (self.value(), other.value()) {
291            (None, None) => None,
292            (Some(v), None) => Some(v),
293            (None, Some(v)) => Some(v),
294            (Some(v1), Some(v2)) => Some(if v1 < v2 { v2 } else { v1 }),
295        };
296
297        *self.timestamp.lock() = max;
298    }
299}
300
301impl PartialEq for Timestamp {
302    fn eq(&self, other: &Self) -> bool {
303        self.value().eq(&other.value())
304    }
305}
306
307impl Display for Timestamp {
308    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
309        match self.value() {
310            None => write!(f, "NONE"),
311            Some(v) => {
312                write!(f, "{v}")
313            }
314        }
315    }
316}
317
318/// RAAI structure that adds all time between its construction and
319/// destruction to the CPU time or the first call to `stop` whichever
320/// comes first
321pub struct ScopedTimerGuard<'a> {
322    inner: &'a Time,
323    start: Option<Instant>,
324}
325
326impl ScopedTimerGuard<'_> {
327    /// Stop the timer timing and record the time taken
328    pub fn stop(&mut self) {
329        if let Some(start) = self.start.take() {
330            self.inner.add_elapsed(start)
331        }
332    }
333
334    /// Restarts the timer recording from the current time
335    pub fn restart(&mut self) {
336        self.start = Some(Instant::now())
337    }
338
339    /// Stop the timer, record the time taken and consume self
340    pub fn done(mut self) {
341        self.stop()
342    }
343
344    /// Stop the timer timing and record the time taken since the given endpoint.
345    pub fn stop_with(&mut self, end_time: Instant) {
346        if let Some(start) = self.start.take() {
347            let elapsed = end_time - start;
348            self.inner.add_duration(elapsed)
349        }
350    }
351
352    /// Stop the timer, record the time taken since `end_time` endpoint, and
353    /// consume self.
354    pub fn done_with(mut self, end_time: Instant) {
355        self.stop_with(end_time)
356    }
357}
358
359impl Drop for ScopedTimerGuard<'_> {
360    fn drop(&mut self) {
361        self.stop()
362    }
363}
364
365/// Possible values for a [super::Metric].
366///
367/// Among other differences, the metric types have different ways to
368/// logically interpret their underlying values and some metrics are
369/// so common they are given special treatment.
370#[derive(Debug, Clone)]
371pub enum MetricValue {
372    /// Number of output rows produced: "output_rows" metric
373    OutputRows(Count),
374    /// Elapsed Compute Time: the wall clock time spent in "cpu
375    /// intensive" work.
376    ///
377    /// This measurement represents, roughly:
378    /// ```
379    /// use std::time::Instant;
380    /// let start = Instant::now();
381    /// // ...CPU intensive work here...
382    /// let elapsed_compute = (Instant::now() - start).as_nanos();
383    /// ```
384    ///
385    /// Note 1: Does *not* include time other operators spend
386    /// computing input.
387    ///
388    /// Note 2: *Does* includes time when the thread could have made
389    /// progress but the OS did not schedule it (e.g. due to CPU
390    /// contention), thus making this value different than the
391    /// classical definition of "cpu_time", which is the time reported
392    /// from `clock_gettime(CLOCK_THREAD_CPUTIME_ID, ..)`.
393    ElapsedCompute(Time),
394    /// Number of spills produced: "spill_count" metric
395    SpillCount(Count),
396    /// Total size of spilled bytes produced: "spilled_bytes" metric
397    SpilledBytes(Count),
398    /// Total size of spilled rows produced: "spilled_rows" metric
399    SpilledRows(Count),
400    /// Current memory used
401    CurrentMemoryUsage(Gauge),
402    /// Operator defined count.
403    Count {
404        /// The provided name of this metric
405        name: Cow<'static, str>,
406        /// The value of the metric
407        count: Count,
408    },
409    /// Operator defined gauge.
410    Gauge {
411        /// The provided name of this metric
412        name: Cow<'static, str>,
413        /// The value of the metric
414        gauge: Gauge,
415    },
416    /// Operator defined time
417    Time {
418        /// The provided name of this metric
419        name: Cow<'static, str>,
420        /// The value of the metric
421        time: Time,
422    },
423    /// The time at which execution started
424    StartTimestamp(Timestamp),
425    /// The time at which execution ended
426    EndTimestamp(Timestamp),
427    Custom {
428        /// The provided name of this metric
429        name: Cow<'static, str>,
430        /// A custom implementation of the metric value.
431        value: Arc<dyn CustomMetricValue>,
432    },
433}
434
435// Manually implement PartialEq for `MetricValue` because it contains CustomMetricValue in its
436// definition which is a dyn trait. This wouldn't allow us to just derive PartialEq.
437impl PartialEq for MetricValue {
438    fn eq(&self, other: &Self) -> bool {
439        match (self, other) {
440            (MetricValue::OutputRows(count), MetricValue::OutputRows(other)) => {
441                count == other
442            }
443            (MetricValue::ElapsedCompute(time), MetricValue::ElapsedCompute(other)) => {
444                time == other
445            }
446            (MetricValue::SpillCount(count), MetricValue::SpillCount(other)) => {
447                count == other
448            }
449            (MetricValue::SpilledBytes(count), MetricValue::SpilledBytes(other)) => {
450                count == other
451            }
452            (MetricValue::SpilledRows(count), MetricValue::SpilledRows(other)) => {
453                count == other
454            }
455            (
456                MetricValue::CurrentMemoryUsage(gauge),
457                MetricValue::CurrentMemoryUsage(other),
458            ) => gauge == other,
459            (
460                MetricValue::Count { name, count },
461                MetricValue::Count {
462                    name: other_name,
463                    count: other_count,
464                },
465            ) => name == other_name && count == other_count,
466            (
467                MetricValue::Gauge { name, gauge },
468                MetricValue::Gauge {
469                    name: other_name,
470                    gauge: other_gauge,
471                },
472            ) => name == other_name && gauge == other_gauge,
473            (
474                MetricValue::Time { name, time },
475                MetricValue::Time {
476                    name: other_name,
477                    time: other_time,
478                },
479            ) => name == other_name && time == other_time,
480
481            (
482                MetricValue::StartTimestamp(timestamp),
483                MetricValue::StartTimestamp(other),
484            ) => timestamp == other,
485            (MetricValue::EndTimestamp(timestamp), MetricValue::EndTimestamp(other)) => {
486                timestamp == other
487            }
488            (
489                MetricValue::Custom { name, value },
490                MetricValue::Custom {
491                    name: other_name,
492                    value: other_value,
493                },
494            ) => name == other_name && value.is_eq(other_value),
495            // Default case when the two sides do not have the same type.
496            _ => false,
497        }
498    }
499}
500
501impl MetricValue {
502    /// Return the name of this SQL metric
503    pub fn name(&self) -> &str {
504        match self {
505            Self::OutputRows(_) => "output_rows",
506            Self::SpillCount(_) => "spill_count",
507            Self::SpilledBytes(_) => "spilled_bytes",
508            Self::SpilledRows(_) => "spilled_rows",
509            Self::CurrentMemoryUsage(_) => "mem_used",
510            Self::ElapsedCompute(_) => "elapsed_compute",
511            Self::Count { name, .. } => name.borrow(),
512            Self::Gauge { name, .. } => name.borrow(),
513            Self::Time { name, .. } => name.borrow(),
514            Self::StartTimestamp(_) => "start_timestamp",
515            Self::EndTimestamp(_) => "end_timestamp",
516            Self::Custom { name, .. } => name.borrow(),
517        }
518    }
519
520    /// Return the value of the metric as a usize value
521    pub fn as_usize(&self) -> usize {
522        match self {
523            Self::OutputRows(count) => count.value(),
524            Self::SpillCount(count) => count.value(),
525            Self::SpilledBytes(bytes) => bytes.value(),
526            Self::SpilledRows(count) => count.value(),
527            Self::CurrentMemoryUsage(used) => used.value(),
528            Self::ElapsedCompute(time) => time.value(),
529            Self::Count { count, .. } => count.value(),
530            Self::Gauge { gauge, .. } => gauge.value(),
531            Self::Time { time, .. } => time.value(),
532            Self::StartTimestamp(timestamp) => timestamp
533                .value()
534                .and_then(|ts| ts.timestamp_nanos_opt())
535                .map(|nanos| nanos as usize)
536                .unwrap_or(0),
537            Self::EndTimestamp(timestamp) => timestamp
538                .value()
539                .and_then(|ts| ts.timestamp_nanos_opt())
540                .map(|nanos| nanos as usize)
541                .unwrap_or(0),
542            Self::Custom { value, .. } => value.as_usize(),
543        }
544    }
545
546    /// create a new MetricValue with the same type as `self` suitable
547    /// for accumulating
548    pub fn new_empty(&self) -> Self {
549        match self {
550            Self::OutputRows(_) => Self::OutputRows(Count::new()),
551            Self::SpillCount(_) => Self::SpillCount(Count::new()),
552            Self::SpilledBytes(_) => Self::SpilledBytes(Count::new()),
553            Self::SpilledRows(_) => Self::SpilledRows(Count::new()),
554            Self::CurrentMemoryUsage(_) => Self::CurrentMemoryUsage(Gauge::new()),
555            Self::ElapsedCompute(_) => Self::ElapsedCompute(Time::new()),
556            Self::Count { name, .. } => Self::Count {
557                name: name.clone(),
558                count: Count::new(),
559            },
560            Self::Gauge { name, .. } => Self::Gauge {
561                name: name.clone(),
562                gauge: Gauge::new(),
563            },
564            Self::Time { name, .. } => Self::Time {
565                name: name.clone(),
566                time: Time::new(),
567            },
568            Self::StartTimestamp(_) => Self::StartTimestamp(Timestamp::new()),
569            Self::EndTimestamp(_) => Self::EndTimestamp(Timestamp::new()),
570            Self::Custom { name, value } => Self::Custom {
571                name: name.clone(),
572                value: value.new_empty(),
573            },
574        }
575    }
576
577    /// Aggregates the value of other to `self`. panic's if the types
578    /// are mismatched or aggregating does not make sense for this
579    /// value
580    ///
581    /// Note this is purposely marked `mut` (even though atomics are
582    /// used) so Rust's type system can be used to ensure the
583    /// appropriate API access. `MetricValues` should be modified
584    /// using the original [`Count`] or [`Time`] they were created
585    /// from.
586    pub fn aggregate(&mut self, other: &Self) {
587        match (self, other) {
588            (Self::OutputRows(count), Self::OutputRows(other_count))
589            | (Self::SpillCount(count), Self::SpillCount(other_count))
590            | (Self::SpilledBytes(count), Self::SpilledBytes(other_count))
591            | (Self::SpilledRows(count), Self::SpilledRows(other_count))
592            | (
593                Self::Count { count, .. },
594                Self::Count {
595                    count: other_count, ..
596                },
597            ) => count.add(other_count.value()),
598            (Self::CurrentMemoryUsage(gauge), Self::CurrentMemoryUsage(other_gauge))
599            | (
600                Self::Gauge { gauge, .. },
601                Self::Gauge {
602                    gauge: other_gauge, ..
603                },
604            ) => gauge.add(other_gauge.value()),
605            (Self::ElapsedCompute(time), Self::ElapsedCompute(other_time))
606            | (
607                Self::Time { time, .. },
608                Self::Time {
609                    time: other_time, ..
610                },
611            ) => time.add(other_time),
612            // timestamps are aggregated by min/max
613            (Self::StartTimestamp(timestamp), Self::StartTimestamp(other_timestamp)) => {
614                timestamp.update_to_min(other_timestamp);
615            }
616            // timestamps are aggregated by min/max
617            (Self::EndTimestamp(timestamp), Self::EndTimestamp(other_timestamp)) => {
618                timestamp.update_to_max(other_timestamp);
619            }
620            (
621                Self::Custom { value, .. },
622                Self::Custom {
623                    value: other_value, ..
624                },
625            ) => {
626                value.aggregate(Arc::clone(other_value));
627            }
628            m @ (_, _) => {
629                panic!(
630                    "Mismatched metric types. Can not aggregate {:?} with value {:?}",
631                    m.0, m.1
632                )
633            }
634        }
635    }
636
637    /// Returns a number by which to sort metrics by display. Lower
638    /// numbers are "more useful" (and displayed first)
639    pub fn display_sort_key(&self) -> u8 {
640        match self {
641            Self::OutputRows(_) => 0,     // show first
642            Self::ElapsedCompute(_) => 1, // show second
643            Self::SpillCount(_) => 2,
644            Self::SpilledBytes(_) => 3,
645            Self::SpilledRows(_) => 4,
646            Self::CurrentMemoryUsage(_) => 5,
647            Self::Count { .. } => 6,
648            Self::Gauge { .. } => 7,
649            Self::Time { .. } => 8,
650            Self::StartTimestamp(_) => 9, // show timestamps last
651            Self::EndTimestamp(_) => 10,
652            Self::Custom { .. } => 11,
653        }
654    }
655
656    /// returns true if this metric has a timestamp value
657    pub fn is_timestamp(&self) -> bool {
658        matches!(self, Self::StartTimestamp(_) | Self::EndTimestamp(_))
659    }
660}
661
662impl Display for MetricValue {
663    /// Prints the value of this metric
664    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
665        match self {
666            Self::OutputRows(count)
667            | Self::SpillCount(count)
668            | Self::SpilledRows(count)
669            | Self::Count { count, .. } => {
670                write!(f, "{count}")
671            }
672            Self::SpilledBytes(count) => {
673                let readable_count = human_readable_size(count.value());
674                write!(f, "{readable_count}")
675            }
676            Self::CurrentMemoryUsage(gauge) | Self::Gauge { gauge, .. } => {
677                write!(f, "{gauge}")
678            }
679            Self::ElapsedCompute(time) | Self::Time { time, .. } => {
680                // distinguish between no time recorded and very small
681                // amount of time recorded
682                if time.value() > 0 {
683                    write!(f, "{time}")
684                } else {
685                    write!(f, "NOT RECORDED")
686                }
687            }
688            Self::StartTimestamp(timestamp) | Self::EndTimestamp(timestamp) => {
689                write!(f, "{timestamp}")
690            }
691            Self::Custom { name, value } => {
692                write!(f, "name:{name} {value}")
693            }
694        }
695    }
696}
697
698#[cfg(test)]
699mod tests {
700    use std::any::Any;
701
702    use chrono::TimeZone;
703    use datafusion_execution::memory_pool::units::MB;
704
705    use super::*;
706
707    #[derive(Debug, Default)]
708    pub struct CustomCounter {
709        count: AtomicUsize,
710    }
711
712    impl Display for CustomCounter {
713        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
714            write!(f, "count: {}", self.count.load(Ordering::Relaxed))
715        }
716    }
717
718    impl CustomMetricValue for CustomCounter {
719        fn new_empty(&self) -> Arc<dyn CustomMetricValue> {
720            Arc::new(CustomCounter::default())
721        }
722
723        fn aggregate(&self, other: Arc<dyn CustomMetricValue + 'static>) {
724            let other = other.as_any().downcast_ref::<Self>().unwrap();
725            self.count
726                .fetch_add(other.count.load(Ordering::Relaxed), Ordering::Relaxed);
727        }
728
729        fn as_any(&self) -> &dyn Any {
730            self
731        }
732
733        fn is_eq(&self, other: &Arc<dyn CustomMetricValue>) -> bool {
734            let Some(other) = other.as_any().downcast_ref::<Self>() else {
735                return false;
736            };
737
738            self.count.load(Ordering::Relaxed) == other.count.load(Ordering::Relaxed)
739        }
740    }
741
742    fn new_custom_counter(name: &'static str, value: usize) -> MetricValue {
743        let custom_counter = CustomCounter::default();
744        custom_counter.count.fetch_add(value, Ordering::Relaxed);
745        let custom_val = MetricValue::Custom {
746            name: Cow::Borrowed(name),
747            value: Arc::new(custom_counter),
748        };
749
750        custom_val
751    }
752
753    #[test]
754    fn test_custom_metric_with_mismatching_names() {
755        let mut custom_val = new_custom_counter("Hi", 1);
756        let other_custom_val = new_custom_counter("Hello", 1);
757
758        // Not equal since the name differs.
759        assert!(other_custom_val != custom_val);
760
761        // Should work even though the name differs
762        custom_val.aggregate(&other_custom_val);
763
764        let expected_val = new_custom_counter("Hi", 2);
765        assert!(expected_val == custom_val);
766    }
767
768    #[test]
769    fn test_custom_metric() {
770        let mut custom_val = new_custom_counter("hi", 11);
771        let other_custom_val = new_custom_counter("hi", 20);
772
773        custom_val.aggregate(&other_custom_val);
774
775        assert!(custom_val != other_custom_val);
776
777        if let MetricValue::Custom { value, .. } = custom_val {
778            let counter = value
779                .as_any()
780                .downcast_ref::<CustomCounter>()
781                .expect("Expected CustomCounter");
782            assert_eq!(counter.count.load(Ordering::Relaxed), 31);
783        } else {
784            panic!("Unexpected value");
785        }
786    }
787
788    #[test]
789    fn test_display_output_rows() {
790        let count = Count::new();
791        let values = vec![
792            MetricValue::OutputRows(count.clone()),
793            MetricValue::Count {
794                name: "my_counter".into(),
795                count: count.clone(),
796            },
797        ];
798
799        for value in &values {
800            assert_eq!("0", value.to_string(), "value {value:?}");
801        }
802
803        count.add(42);
804        for value in &values {
805            assert_eq!("42", value.to_string(), "value {value:?}");
806        }
807    }
808
809    #[test]
810    fn test_display_spilled_bytes() {
811        let count = Count::new();
812        let spilled_byte = MetricValue::SpilledBytes(count.clone());
813
814        assert_eq!("0.0 B", spilled_byte.to_string());
815
816        count.add((100 * MB) as usize);
817        assert_eq!("100.0 MB", spilled_byte.to_string());
818
819        count.add((0.5 * MB as f64) as usize);
820        assert_eq!("100.5 MB", spilled_byte.to_string());
821    }
822
823    #[test]
824    fn test_display_time() {
825        let time = Time::new();
826        let values = vec![
827            MetricValue::ElapsedCompute(time.clone()),
828            MetricValue::Time {
829                name: "my_time".into(),
830                time: time.clone(),
831            },
832        ];
833
834        // if time is not set, it should not be reported as zero
835        for value in &values {
836            assert_eq!("NOT RECORDED", value.to_string(), "value {value:?}");
837        }
838
839        time.add_duration(Duration::from_nanos(1042));
840        for value in &values {
841            assert_eq!("1.042µs", value.to_string(), "value {value:?}");
842        }
843    }
844
845    #[test]
846    fn test_display_timestamp() {
847        let timestamp = Timestamp::new();
848        let values = vec![
849            MetricValue::StartTimestamp(timestamp.clone()),
850            MetricValue::EndTimestamp(timestamp.clone()),
851        ];
852
853        // if time is not set, it should not be reported as zero
854        for value in &values {
855            assert_eq!("NONE", value.to_string(), "value {value:?}");
856        }
857
858        timestamp.set(Utc.timestamp_nanos(1431648000000000));
859        for value in &values {
860            assert_eq!(
861                "1970-01-17 13:40:48 UTC",
862                value.to_string(),
863                "value {value:?}"
864            );
865        }
866    }
867
868    #[test]
869    fn test_timer_with_custom_instant() {
870        let time = Time::new();
871        let start_time = Instant::now();
872
873        // Sleep a bit to ensure some time passes
874        std::thread::sleep(Duration::from_millis(1));
875
876        // Create timer with the earlier start time
877        let mut timer = time.timer_with(start_time);
878
879        // Sleep a bit more
880        std::thread::sleep(Duration::from_millis(1));
881
882        // Stop the timer
883        timer.stop();
884
885        // The recorded time should be at least 20ms (both sleeps)
886        assert!(
887            time.value() >= 2_000_000,
888            "Expected at least 2ms, got {} ns",
889            time.value()
890        );
891    }
892
893    #[test]
894    fn test_stop_with_custom_endpoint() {
895        let time = Time::new();
896        let start = Instant::now();
897        let mut timer = time.timer_with(start);
898
899        // Simulate exactly 10ms passing
900        let end = start + Duration::from_millis(10);
901
902        // Stop with custom endpoint
903        timer.stop_with(end);
904
905        // Should record exactly 10ms (10_000_000 nanoseconds)
906        // Allow for small variations due to timer resolution
907        let recorded = time.value();
908        assert!(
909            (10_000_000..=10_100_000).contains(&recorded),
910            "Expected ~10ms, got {recorded} ns"
911        );
912
913        // Calling stop_with again should not add more time
914        timer.stop_with(end);
915        assert_eq!(
916            recorded,
917            time.value(),
918            "Time should not change after second stop"
919        );
920    }
921
922    #[test]
923    fn test_done_with_custom_endpoint() {
924        let time = Time::new();
925        let start = Instant::now();
926
927        // Create a new scope for the timer
928        {
929            let timer = time.timer_with(start);
930
931            // Simulate 50ms passing
932            let end = start + Duration::from_millis(5);
933
934            // Call done_with to stop and consume the timer
935            timer.done_with(end);
936
937            // Timer is consumed, can't use it anymore
938        }
939
940        // Should record exactly 5ms
941        let recorded = time.value();
942        assert!(
943            (5_000_000..=5_100_000).contains(&recorded),
944            "Expected ~5ms, got {recorded} ns",
945        );
946
947        // Test that done_with prevents drop from recording time again
948        {
949            let timer2 = time.timer_with(start);
950            let end2 = start + Duration::from_millis(5);
951            timer2.done_with(end2);
952            // drop happens here but should not record additional time
953        }
954
955        // Should have added only 5ms more
956        let new_recorded = time.value();
957        assert!(
958            (10_000_000..=10_100_000).contains(&new_recorded),
959            "Expected ~10ms total, got {new_recorded} ns",
960        );
961    }
962}