fast_telemetry/metric/
max_gauge.rs1use crate::thread_id::thread_id;
8use crossbeam_utils::CachePadded;
9use std::fmt;
10use std::sync::atomic::{AtomicI64, Ordering};
11
12fn make_padded_cell(initial: i64) -> CachePadded<AtomicI64> {
13 CachePadded::new(AtomicI64::new(initial))
14}
15
16pub struct MaxGauge {
25 cells: Vec<CachePadded<AtomicI64>>,
26 reset_value: i64,
27}
28
29impl MaxGauge {
30 pub fn new(shard_count: usize) -> Self {
34 Self::with_value(shard_count, 0)
35 }
36
37 pub fn with_value(shard_count: usize, initial: i64) -> Self {
39 let shard_count = shard_count.next_power_of_two();
40 Self {
41 cells: (0..shard_count)
42 .map(|_| make_padded_cell(initial))
43 .collect(),
44 reset_value: initial,
45 }
46 }
47
48 #[inline]
50 pub fn observe(&self, value: i64) {
51 let idx = thread_id() & (self.cells.len() - 1);
52 let cell = if cfg!(debug_assertions) {
53 self.cells.get(idx).expect("index out of bounds")
54 } else {
55 unsafe { self.cells.get_unchecked(idx) }
56 };
57 cell.fetch_max(value, Ordering::Relaxed);
58 }
59
60 #[inline]
62 pub fn get(&self) -> i64 {
63 self.cells
64 .iter()
65 .map(|cell| cell.load(Ordering::Relaxed))
66 .max()
67 .unwrap_or(self.reset_value)
68 }
69
70 pub fn reset(&self) {
74 for cell in &self.cells {
75 cell.store(self.reset_value, Ordering::Relaxed);
76 }
77 }
78
79 pub fn swap_reset(&self) -> i64 {
86 self.cells
87 .iter()
88 .map(|cell| cell.swap(self.reset_value, Ordering::Relaxed))
89 .max()
90 .unwrap_or(self.reset_value)
91 }
92}
93
94impl Default for MaxGauge {
95 fn default() -> Self {
96 Self::new(4)
97 }
98}
99
100impl fmt::Debug for MaxGauge {
101 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
102 f.debug_struct("MaxGauge")
103 .field("max", &self.get())
104 .field("cells", &self.cells.len())
105 .finish()
106 }
107}
108
109#[cfg(test)]
110mod tests {
111 use super::*;
112
113 #[test]
114 fn basic_observe() {
115 let gauge = MaxGauge::new(4);
116 gauge.observe(3);
117 gauge.observe(7);
118 gauge.observe(5);
119 assert_eq!(gauge.get(), 7);
120 }
121
122 #[test]
123 fn initial_value_is_respected() {
124 let gauge = MaxGauge::with_value(4, -10);
125 assert_eq!(gauge.get(), -10);
126 gauge.observe(-3);
127 assert_eq!(gauge.get(), -3);
128 }
129
130 #[test]
131 fn reset_and_swap_reset() {
132 let gauge = MaxGauge::with_value(4, 0);
133 gauge.observe(9);
134 gauge.observe(4);
135 assert_eq!(gauge.swap_reset(), 9);
136 assert_eq!(gauge.get(), 0);
137 gauge.observe(2);
138 gauge.reset();
139 assert_eq!(gauge.get(), 0);
140 }
141
142 #[test]
143 fn concurrent_observe() {
144 let gauge = MaxGauge::with_value(8, 0);
145 std::thread::scope(|s| {
146 for value in [10, 30, 20, 40, 15, 25, 35, 5] {
147 let gauge = &gauge;
148 s.spawn(move || {
149 gauge.observe(value);
150 });
151 }
152 });
153 assert_eq!(gauge.get(), 40);
154 }
155}