fast_telemetry/metric/
counter.rs1use crate::thread_id::thread_id;
9use crossbeam_utils::CachePadded;
10use std::fmt;
11use std::sync::atomic::{AtomicIsize, Ordering};
12
13fn make_padded_counter() -> CachePadded<AtomicIsize> {
14 CachePadded::new(AtomicIsize::new(0))
15}
16
17pub struct Counter {
22 cells: Vec<CachePadded<AtomicIsize>>,
23}
24
25impl fmt::Debug for Counter {
26 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
27 f.debug_struct("Counter")
28 .field("sum", &self.sum())
29 .field("cells", &self.cells.len())
30 .finish()
31 }
32}
33
34impl Counter {
35 #[inline]
39 pub fn new(count: usize) -> Self {
40 let count = count.next_power_of_two();
41 Self {
42 cells: (0..count).map(|_| make_padded_counter()).collect(),
43 }
44 }
45
46 #[inline]
48 pub fn add(&self, value: isize) {
49 self.add_with_ordering(value, Ordering::Relaxed)
50 }
51
52 #[inline]
54 pub fn inc(&self) {
55 self.add(1)
56 }
57
58 #[inline]
60 pub fn add_with_ordering(&self, value: isize, ordering: Ordering) {
61 let idx = thread_id() & (self.cells.len() - 1);
62 let cell = if cfg!(debug_assertions) {
64 self.cells.get(idx).expect("index out of bounds")
65 } else {
66 unsafe { self.cells.get_unchecked(idx) }
67 };
68 cell.fetch_add(value, ordering);
69 }
70
71 #[inline]
79 pub fn sum(&self) -> isize {
80 self.sum_with_ordering(Ordering::Relaxed)
81 }
82
83 #[inline]
85 pub fn sum_with_ordering(&self, ordering: Ordering) -> isize {
86 self.cells.iter().map(|c| c.load(ordering)).sum()
87 }
88
89 #[inline]
102 pub fn swap(&self) -> isize {
103 self.cells
104 .iter()
105 .map(|c| c.swap(0, Ordering::Relaxed))
106 .sum()
107 }
108}
109
110#[cfg(test)]
111mod tests {
112 use super::*;
113
114 #[test]
115 fn basic_test() {
116 let counter = Counter::new(1);
117 counter.add(1);
118 assert_eq!(counter.sum(), 1);
119 }
120
121 #[test]
122 fn increment_multiple_times() {
123 let counter = Counter::new(1);
124 counter.add(1);
125 counter.add(1);
126 counter.add(1);
127 assert_eq!(counter.sum(), 3);
128 }
129
130 #[test]
131 fn test_inc() {
132 let counter = Counter::new(4);
133 counter.inc();
134 counter.inc();
135 assert_eq!(counter.sum(), 2);
136 }
137
138 #[test]
139 fn test_swap() {
140 let counter = Counter::new(4);
141 counter.add(100);
142 let val = counter.swap();
143 assert_eq!(val, 100);
144 assert_eq!(counter.sum(), 0);
145 }
146
147 #[test]
148 fn two_threads_incrementing_concurrently() {
149 let counter = Counter::new(2);
150
151 std::thread::scope(|s| {
152 for _ in 0..2 {
153 s.spawn(|| {
154 counter.add(1);
155 });
156 }
157 });
158
159 assert_eq!(counter.sum(), 2);
160 }
161
162 #[test]
163 fn multiple_threads_incrementing_many_times() {
164 const WRITE_COUNT: isize = 1_000_000;
165 const THREAD_COUNT: isize = 8;
166
167 let counter = Counter::new(THREAD_COUNT as usize);
168
169 std::thread::scope(|s| {
170 for _ in 0..THREAD_COUNT {
171 s.spawn(|| {
172 for _ in 0..WRITE_COUNT {
173 counter.add(1);
174 }
175 });
176 }
177 });
178
179 assert_eq!(counter.sum(), THREAD_COUNT * WRITE_COUNT);
180 }
181
182 #[test]
183 fn debug_format() {
184 let counter = Counter::new(8);
185 counter.add(42);
186 let debug = format!("{counter:?}");
187 assert!(debug.contains("sum: 42"));
188 assert!(debug.contains("cells: 8"));
189 }
190}