Skip to main content

fast_telemetry/metric/
max_gauge.rs

1//! Thread-sharded maximum gauge.
2//!
3//! Records the maximum observed value without introducing a single contended
4//! atomic on the hot path. Each thread updates its own shard with `fetch_max`,
5//! and reads aggregate by taking the maximum across shards.
6
7use crate::thread_id::thread_id;
8use crossbeam_utils::CachePadded;
9use std::fmt;
10use std::sync::atomic::{AtomicI64, Ordering};
11
12fn make_padded_cell(initial: i64) -> CachePadded<AtomicI64> {
13    CachePadded::new(AtomicI64::new(initial))
14}
15
16/// A thread-sharded maximum tracker exported as a gauge.
17///
18/// This is useful for recording peaks such as maximum queue depth or the
19/// highest number of in-flight requests seen during an export interval.
20///
21/// Unlike [`crate::Gauge`], this is not a point-in-time `set()` value.
22/// Writers call [`observe`](Self::observe), and readers aggregate by taking
23/// the maximum across shards.
24pub struct MaxGauge {
25    cells: Vec<CachePadded<AtomicI64>>,
26    reset_value: i64,
27}
28
29impl MaxGauge {
30    /// Create a new max gauge with all shards initialized to zero.
31    ///
32    /// This is appropriate for metrics that are naturally non-negative.
33    pub fn new(shard_count: usize) -> Self {
34        Self::with_value(shard_count, 0)
35    }
36
37    /// Create a new max gauge with all shards initialized to `initial`.
38    pub fn with_value(shard_count: usize, initial: i64) -> Self {
39        let shard_count = shard_count.next_power_of_two();
40        Self {
41            cells: (0..shard_count)
42                .map(|_| make_padded_cell(initial))
43                .collect(),
44            reset_value: initial,
45        }
46    }
47
48    /// Record a candidate value for the maximum.
49    #[inline]
50    pub fn observe(&self, value: i64) {
51        let idx = thread_id() & (self.cells.len() - 1);
52        let cell = if cfg!(debug_assertions) {
53            self.cells.get(idx).expect("index out of bounds")
54        } else {
55            unsafe { self.cells.get_unchecked(idx) }
56        };
57        cell.fetch_max(value, Ordering::Relaxed);
58    }
59
60    /// Return the current maximum across all shards.
61    #[inline]
62    pub fn get(&self) -> i64 {
63        self.cells
64            .iter()
65            .map(|cell| cell.load(Ordering::Relaxed))
66            .max()
67            .unwrap_or(self.reset_value)
68    }
69
70    /// Reset all shards to the original value configured at construction.
71    ///
72    /// This is intended for export/sampling code, not the hot path.
73    pub fn reset(&self) {
74        for cell in &self.cells {
75            cell.store(self.reset_value, Ordering::Relaxed);
76        }
77    }
78
79    /// Reset all shards and return the previous maximum.
80    ///
81    /// Concurrent observations that land on already-reset shards may be
82    /// attributed to the next window rather than the current one. No maxima
83    /// are lost, but timing near the reset boundary is eventually consistent
84    /// in the same way as [`crate::Counter::swap`].
85    pub fn swap_reset(&self) -> i64 {
86        self.cells
87            .iter()
88            .map(|cell| cell.swap(self.reset_value, Ordering::Relaxed))
89            .max()
90            .unwrap_or(self.reset_value)
91    }
92}
93
94impl Default for MaxGauge {
95    fn default() -> Self {
96        Self::new(4)
97    }
98}
99
100impl fmt::Debug for MaxGauge {
101    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
102        f.debug_struct("MaxGauge")
103            .field("max", &self.get())
104            .field("cells", &self.cells.len())
105            .finish()
106    }
107}
108
109#[cfg(test)]
110mod tests {
111    use super::*;
112
113    #[test]
114    fn basic_observe() {
115        let gauge = MaxGauge::new(4);
116        gauge.observe(3);
117        gauge.observe(7);
118        gauge.observe(5);
119        assert_eq!(gauge.get(), 7);
120    }
121
122    #[test]
123    fn initial_value_is_respected() {
124        let gauge = MaxGauge::with_value(4, -10);
125        assert_eq!(gauge.get(), -10);
126        gauge.observe(-3);
127        assert_eq!(gauge.get(), -3);
128    }
129
130    #[test]
131    fn reset_and_swap_reset() {
132        let gauge = MaxGauge::with_value(4, 0);
133        gauge.observe(9);
134        gauge.observe(4);
135        assert_eq!(gauge.swap_reset(), 9);
136        assert_eq!(gauge.get(), 0);
137        gauge.observe(2);
138        gauge.reset();
139        assert_eq!(gauge.get(), 0);
140    }
141
142    #[test]
143    fn concurrent_observe() {
144        let gauge = MaxGauge::with_value(8, 0);
145        std::thread::scope(|s| {
146            for value in [10, 30, 20, 40, 15, 25, 35, 5] {
147                let gauge = &gauge;
148                s.spawn(move || {
149                    gauge.observe(value);
150                });
151            }
152        });
153        assert_eq!(gauge.get(), 40);
154    }
155}