fast_telemetry/metric/
distribution.rs1use crate::exp_buckets::{ExpBuckets, ExpBucketsSnapshot};
11use crate::thread_id::thread_id;
12use crossbeam_utils::CachePadded;
13
14pub struct Distribution {
19 cells: Vec<CachePadded<ExpBuckets>>,
20 shard_mask: usize,
21}
22
23impl Distribution {
24 pub fn new(shard_count: usize) -> Self {
28 let shard_count = shard_count.next_power_of_two();
29 Self {
30 cells: (0..shard_count)
31 .map(|_| CachePadded::new(ExpBuckets::new()))
32 .collect(),
33 shard_mask: shard_count - 1,
34 }
35 }
36
37 #[inline]
39 pub fn record(&self, value: u64) {
40 let idx = thread_id() & self.shard_mask;
41 let cell = if cfg!(debug_assertions) {
43 self.cells.get(idx).expect("index out of bounds")
44 } else {
45 unsafe { self.cells.get_unchecked(idx) }
46 };
47 cell.record(value);
48 }
49
50 pub fn count(&self) -> u64 {
52 self.cells.iter().map(|c| c.get_count()).sum()
53 }
54
55 pub fn sum(&self) -> u64 {
57 self.cells.iter().map(|c| c.get_sum()).sum()
58 }
59
60 pub fn min(&self) -> Option<u64> {
62 self.buckets_snapshot().min()
63 }
64
65 pub fn max(&self) -> Option<u64> {
67 self.buckets_snapshot().max()
68 }
69
70 pub fn mean(&self) -> Option<f64> {
72 let count = self.count();
73 if count == 0 {
74 return None;
75 }
76 Some(self.sum() as f64 / count as f64)
77 }
78
79 pub fn buckets_snapshot(&self) -> ExpBucketsSnapshot {
81 let mut positive = [0u64; 64];
82 let mut zero_count = 0u64;
83 let mut sum = 0u64;
84 let mut count = 0u64;
85
86 for cell in &self.cells {
87 let shard_buckets = cell.get_positive_buckets();
88 for (i, &c) in shard_buckets.iter().enumerate() {
89 positive[i] += c;
90 }
91 zero_count += cell.get_zero_count();
92 sum += cell.get_sum();
93 count += cell.get_count();
94 }
95
96 ExpBucketsSnapshot {
97 positive,
98 zero_count,
99 sum,
100 count,
101 }
102 }
103}
104
105impl Default for Distribution {
106 fn default() -> Self {
107 Self::new(4)
108 }
109}
110
111impl std::fmt::Debug for Distribution {
112 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
113 f.debug_struct("Distribution")
114 .field("count", &self.count())
115 .field("sum", &self.sum())
116 .field("min", &self.min())
117 .field("max", &self.max())
118 .finish()
119 }
120}
121
122#[cfg(test)]
123mod tests {
124 use super::*;
125 use std::sync::Arc;
126
127 #[test]
128 fn basic_recording() {
129 let dist = Distribution::new(4);
130 dist.record(100);
131 dist.record(200);
132 dist.record(300);
133
134 assert_eq!(dist.count(), 3);
135 assert_eq!(dist.sum(), 600);
136 assert!(dist.min().is_some());
137 assert!(dist.max().is_some());
138 }
139
140 #[test]
141 fn empty() {
142 let dist = Distribution::new(4);
143 assert_eq!(dist.count(), 0);
144 assert_eq!(dist.min(), None);
145 assert_eq!(dist.max(), None);
146 assert_eq!(dist.mean(), None);
147 }
148
149 #[test]
150 fn mean() {
151 let dist = Distribution::new(4);
152 dist.record(100);
153 dist.record(200);
154 dist.record(300);
155
156 let mean = dist.mean().expect("should have mean");
157 assert!((mean - 200.0).abs() < 0.01);
158 }
159
160 #[test]
161 fn concurrent_recording() {
162 let dist = Arc::new(Distribution::new(8));
163 let threads: Vec<_> = (0..8)
164 .map(|_| {
165 let d = Arc::clone(&dist);
166 std::thread::spawn(move || {
167 for i in 1..=1000u64 {
168 d.record(i);
169 }
170 })
171 })
172 .collect();
173
174 for t in threads {
175 t.join().expect("thread panicked");
176 }
177 assert_eq!(dist.count(), 8000);
178 assert_eq!(dist.sum(), 8 * (1000 * 1001 / 2));
179 }
180
181 #[test]
182 fn multiple_distributions_independent() {
183 let dist1 = Distribution::new(4);
184 let dist2 = Distribution::new(4);
185
186 dist1.record(100);
187 dist2.record(200);
188
189 assert_eq!(dist1.count(), 1);
190 assert_eq!(dist1.sum(), 100);
191 assert_eq!(dist2.count(), 1);
192 assert_eq!(dist2.sum(), 200);
193 }
194
195 #[test]
196 fn buckets_snapshot_merges_threads() {
197 let dist = Arc::new(Distribution::new(4));
198
199 let threads: Vec<_> = (0..4)
200 .map(|_| {
201 let d = Arc::clone(&dist);
202 std::thread::spawn(move || {
203 d.record(100); })
205 })
206 .collect();
207
208 for t in threads {
209 t.join().expect("thread panicked");
210 }
211
212 let snap = dist.buckets_snapshot();
213 assert_eq!(snap.count, 4);
214 assert_eq!(snap.sum, 400);
215 assert_eq!(snap.positive[6], 4); }
217
218 #[test]
219 fn records_zero() {
220 let dist = Distribution::new(4);
221 dist.record(0);
222 dist.record(0);
223 dist.record(42);
224
225 assert_eq!(dist.count(), 3);
226 assert_eq!(dist.sum(), 42);
227
228 let snap = dist.buckets_snapshot();
229 assert_eq!(snap.zero_count, 2);
230 assert_eq!(snap.min(), Some(0));
231 }
232
233 #[test]
234 fn min_max_approximate() {
235 let dist = Distribution::new(4);
236 dist.record(100); assert_eq!(dist.min(), Some(64));
240 assert_eq!(dist.max(), Some(127));
242 }
243}