Skip to main content

fast_telemetry/metric/
counter.rs

1//! Thread-sharded atomic counter.
2//!
3//! Forked from https://crates.io/crates/fast-counter (MIT/Apache licensed)
4//! Originally authored by https://crates.io/users/JackThomson2
5//! Modified to use crossbeam's CachePadded for more correct cache line sizing,
6//! and to support swap operations and export operations.
7
8use crate::thread_id::thread_id;
9use crossbeam_utils::CachePadded;
10use std::fmt;
11use std::sync::atomic::{AtomicIsize, Ordering};
12
13fn make_padded_counter() -> CachePadded<AtomicIsize> {
14    CachePadded::new(AtomicIsize::new(0))
15}
16
17/// A sharded atomic counter.
18///
19/// Shards cache-line aligned AtomicIsize values across a vector for faster
20/// updates in high contention scenarios.
21pub struct Counter {
22    cells: Vec<CachePadded<AtomicIsize>>,
23}
24
25impl fmt::Debug for Counter {
26    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
27        f.debug_struct("Counter")
28            .field("sum", &self.sum())
29            .field("cells", &self.cells.len())
30            .finish()
31    }
32}
33
34impl Counter {
35    /// Creates a new Counter with at least `count` cells.
36    ///
37    /// The count is rounded up to the next power of two for fast modulo.
38    #[inline]
39    pub fn new(count: usize) -> Self {
40        let count = count.next_power_of_two();
41        Self {
42            cells: (0..count).map(|_| make_padded_counter()).collect(),
43        }
44    }
45
46    /// Adds a value to the counter using relaxed ordering.
47    #[inline]
48    pub fn add(&self, value: isize) {
49        self.add_with_ordering(value, Ordering::Relaxed)
50    }
51
52    /// Increments the counter by 1.
53    #[inline]
54    pub fn inc(&self) {
55        self.add(1)
56    }
57
58    /// Adds a value to the counter with the specified ordering.
59    #[inline]
60    pub fn add_with_ordering(&self, value: isize, ordering: Ordering) {
61        let idx = thread_id() & (self.cells.len() - 1);
62        // SAFETY: idx is always < cells.len() due to power-of-two masking
63        let cell = if cfg!(debug_assertions) {
64            self.cells.get(idx).expect("index out of bounds")
65        } else {
66            unsafe { self.cells.get_unchecked(idx) }
67        };
68        cell.fetch_add(value, ordering);
69    }
70
71    /// Returns the sum of all shards using relaxed ordering.
72    ///
73    /// # Eventual Consistency
74    ///
75    /// Due to sharding, this may be slightly inaccurate under heavy concurrent
76    /// modification - writes to already-summed shards won't be reflected until
77    /// the next call. The total is eventually consistent.
78    #[inline]
79    pub fn sum(&self) -> isize {
80        self.sum_with_ordering(Ordering::Relaxed)
81    }
82
83    /// Returns the sum of all shards with the specified ordering.
84    #[inline]
85    pub fn sum_with_ordering(&self, ordering: Ordering) -> isize {
86        self.cells.iter().map(|c| c.load(ordering)).sum()
87    }
88
89    /// Resets all shards to zero and returns the previous sum.
90    ///
91    /// Useful for delta-style metrics export.
92    ///
93    /// # Eventual Consistency
94    ///
95    /// Writes that occur concurrently with `swap()` may be attributed to the
96    /// next window rather than the current one. This is because shards are
97    /// swapped sequentially - a write landing on an already-swapped shard
98    /// will be picked up by the next `swap()` call. No counts are lost; they
99    /// simply shift to the next export window. For telemetry purposes with
100    /// multi-second export intervals, this timing skew is negligible.
101    #[inline]
102    pub fn swap(&self) -> isize {
103        self.cells
104            .iter()
105            .map(|c| c.swap(0, Ordering::Relaxed))
106            .sum()
107    }
108}
109
110#[cfg(test)]
111mod tests {
112    use super::*;
113
114    #[test]
115    fn basic_test() {
116        let counter = Counter::new(1);
117        counter.add(1);
118        assert_eq!(counter.sum(), 1);
119    }
120
121    #[test]
122    fn increment_multiple_times() {
123        let counter = Counter::new(1);
124        counter.add(1);
125        counter.add(1);
126        counter.add(1);
127        assert_eq!(counter.sum(), 3);
128    }
129
130    #[test]
131    fn test_inc() {
132        let counter = Counter::new(4);
133        counter.inc();
134        counter.inc();
135        assert_eq!(counter.sum(), 2);
136    }
137
138    #[test]
139    fn test_swap() {
140        let counter = Counter::new(4);
141        counter.add(100);
142        let val = counter.swap();
143        assert_eq!(val, 100);
144        assert_eq!(counter.sum(), 0);
145    }
146
147    #[test]
148    fn two_threads_incrementing_concurrently() {
149        let counter = Counter::new(2);
150
151        std::thread::scope(|s| {
152            for _ in 0..2 {
153                s.spawn(|| {
154                    counter.add(1);
155                });
156            }
157        });
158
159        assert_eq!(counter.sum(), 2);
160    }
161
162    #[test]
163    fn multiple_threads_incrementing_many_times() {
164        const WRITE_COUNT: isize = 1_000_000;
165        const THREAD_COUNT: isize = 8;
166
167        let counter = Counter::new(THREAD_COUNT as usize);
168
169        std::thread::scope(|s| {
170            for _ in 0..THREAD_COUNT {
171                s.spawn(|| {
172                    for _ in 0..WRITE_COUNT {
173                        counter.add(1);
174                    }
175                });
176            }
177        });
178
179        assert_eq!(counter.sum(), THREAD_COUNT * WRITE_COUNT);
180    }
181
182    #[test]
183    fn debug_format() {
184        let counter = Counter::new(8);
185        counter.add(42);
186        let debug = format!("{counter:?}");
187        assert!(debug.contains("sum: 42"));
188        assert!(debug.contains("cells: 8"));
189    }
190}