Skip to main content

fast_telemetry/metric/
min_gauge.rs

1//! Thread-sharded minimum gauge.
2
3use crate::thread_id::thread_id;
4use crossbeam_utils::CachePadded;
5use std::fmt;
6use std::sync::atomic::{AtomicI64, Ordering};
7
8fn make_padded_cell(initial: i64) -> CachePadded<AtomicI64> {
9    CachePadded::new(AtomicI64::new(initial))
10}
11
12/// A thread-sharded minimum tracker exported as a gauge.
13pub struct MinGauge {
14    cells: Vec<CachePadded<AtomicI64>>,
15    reset_value: i64,
16}
17
18impl MinGauge {
19    /// Create a new min gauge with all shards initialized to [`i64::MAX`],
20    /// so any observation displaces the initial value.
21    ///
22    /// `get()` on a gauge that has never been observed returns [`i64::MAX`];
23    /// callers that need a different sentinel should use [`Self::with_value`].
24    pub fn new(shard_count: usize) -> Self {
25        Self::with_value(shard_count, i64::MAX)
26    }
27
28    /// Create a new min gauge with all shards initialized to `initial`.
29    pub fn with_value(shard_count: usize, initial: i64) -> Self {
30        let shard_count = shard_count.next_power_of_two();
31        Self {
32            cells: (0..shard_count)
33                .map(|_| make_padded_cell(initial))
34                .collect(),
35            reset_value: initial,
36        }
37    }
38
39    /// Record a candidate value for the minimum.
40    #[inline]
41    pub fn observe(&self, value: i64) {
42        let idx = thread_id() & (self.cells.len() - 1);
43        let cell = if cfg!(debug_assertions) {
44            self.cells.get(idx).expect("index out of bounds")
45        } else {
46            unsafe { self.cells.get_unchecked(idx) }
47        };
48        cell.fetch_min(value, Ordering::Relaxed);
49    }
50
51    /// Return the current minimum across all shards.
52    #[inline]
53    pub fn get(&self) -> i64 {
54        self.cells
55            .iter()
56            .map(|cell| cell.load(Ordering::Relaxed))
57            .min()
58            .unwrap_or(self.reset_value)
59    }
60
61    /// Reset all shards to the original value configured at construction.
62    pub fn reset(&self) {
63        for cell in &self.cells {
64            cell.store(self.reset_value, Ordering::Relaxed);
65        }
66    }
67
68    /// Reset all shards and return the previous minimum.
69    pub fn swap_reset(&self) -> i64 {
70        self.cells
71            .iter()
72            .map(|cell| cell.swap(self.reset_value, Ordering::Relaxed))
73            .min()
74            .unwrap_or(self.reset_value)
75    }
76}
77
78impl Default for MinGauge {
79    fn default() -> Self {
80        Self::new(4)
81    }
82}
83
84impl fmt::Debug for MinGauge {
85    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
86        f.debug_struct("MinGauge")
87            .field("min", &self.get())
88            .field("cells", &self.cells.len())
89            .finish()
90    }
91}
92
93#[cfg(test)]
94mod tests {
95    use super::*;
96
97    #[test]
98    fn basic_observe() {
99        let gauge = MinGauge::new(4);
100        gauge.observe(3);
101        gauge.observe(-7);
102        gauge.observe(5);
103        assert_eq!(gauge.get(), -7);
104    }
105
106    #[test]
107    fn initial_value_is_respected() {
108        let gauge = MinGauge::with_value(4, 10);
109        assert_eq!(gauge.get(), 10);
110        gauge.observe(3);
111        assert_eq!(gauge.get(), 3);
112    }
113
114    #[test]
115    fn new_tracks_minimum_of_positive_observations() {
116        let gauge = MinGauge::new(4);
117        assert_eq!(gauge.get(), i64::MAX);
118        gauge.observe(8);
119        gauge.observe(3);
120        gauge.observe(5);
121        assert_eq!(gauge.get(), 3);
122    }
123}