fast_telemetry/metric/
distribution.rs1use crate::exp_buckets::{ExpBuckets, ExpBucketsSnapshot};
11use crate::thread_id::thread_id;
12use crossbeam_utils::CachePadded;
13
14pub struct Distribution {
19 cells: Vec<CachePadded<ExpBuckets>>,
20 shard_mask: usize,
21}
22
23impl Distribution {
24 pub fn new(shard_count: usize) -> Self {
28 let shard_count = shard_count.next_power_of_two();
29 Self {
30 cells: (0..shard_count)
31 .map(|_| CachePadded::new(ExpBuckets::new()))
32 .collect(),
33 shard_mask: shard_count - 1,
34 }
35 }
36
37 #[inline]
39 pub fn record(&self, value: u64) {
40 let idx = thread_id() & self.shard_mask;
41 let cell = if cfg!(debug_assertions) {
43 self.cells.get(idx).expect("index out of bounds")
44 } else {
45 unsafe { self.cells.get_unchecked(idx) }
46 };
47 cell.record(value);
48 }
49
50 pub fn count(&self) -> u64 {
52 self.cells.iter().map(|c| c.get_count()).sum()
53 }
54
55 pub fn sum(&self) -> u64 {
57 self.cells.iter().map(|c| c.get_sum()).sum()
58 }
59
60 pub fn sum_and_count(&self) -> (u64, u64) {
67 let mut sum = 0u64;
68 let mut count = 0u64;
69 for cell in &self.cells {
70 sum += cell.get_sum();
71 count += cell.get_count();
72 }
73 (sum, count)
74 }
75
76 pub fn min(&self) -> Option<u64> {
78 self.buckets_snapshot().min()
79 }
80
81 pub fn max(&self) -> Option<u64> {
83 self.buckets_snapshot().max()
84 }
85
86 pub fn mean(&self) -> Option<f64> {
88 let count = self.count();
89 if count == 0 {
90 return None;
91 }
92 Some(self.sum() as f64 / count as f64)
93 }
94
95 pub fn buckets_snapshot(&self) -> ExpBucketsSnapshot {
97 let mut positive = [0u64; 64];
98 let mut zero_count = 0u64;
99 let mut sum = 0u64;
100 let mut count = 0u64;
101
102 for cell in &self.cells {
103 let shard_buckets = cell.get_positive_buckets();
104 for (i, &c) in shard_buckets.iter().enumerate() {
105 positive[i] += c;
106 }
107 zero_count += cell.get_zero_count();
108 sum += cell.get_sum();
109 count += cell.get_count();
110 }
111
112 ExpBucketsSnapshot {
113 positive,
114 zero_count,
115 sum,
116 count,
117 }
118 }
119}
120
121impl Default for Distribution {
122 fn default() -> Self {
123 Self::new(4)
124 }
125}
126
127impl std::fmt::Debug for Distribution {
128 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
129 f.debug_struct("Distribution")
130 .field("count", &self.count())
131 .field("sum", &self.sum())
132 .field("min", &self.min())
133 .field("max", &self.max())
134 .finish()
135 }
136}
137
138#[cfg(test)]
139mod tests {
140 use super::*;
141 use std::sync::Arc;
142
143 #[test]
144 fn basic_recording() {
145 let dist = Distribution::new(4);
146 dist.record(100);
147 dist.record(200);
148 dist.record(300);
149
150 assert_eq!(dist.count(), 3);
151 assert_eq!(dist.sum(), 600);
152 assert!(dist.min().is_some());
153 assert!(dist.max().is_some());
154 }
155
156 #[test]
157 fn empty() {
158 let dist = Distribution::new(4);
159 assert_eq!(dist.count(), 0);
160 assert_eq!(dist.min(), None);
161 assert_eq!(dist.max(), None);
162 assert_eq!(dist.mean(), None);
163 }
164
165 #[test]
166 fn mean() {
167 let dist = Distribution::new(4);
168 dist.record(100);
169 dist.record(200);
170 dist.record(300);
171
172 let mean = dist.mean().expect("should have mean");
173 assert!((mean - 200.0).abs() < 0.01);
174 }
175
176 #[test]
177 fn concurrent_recording() {
178 let dist = Arc::new(Distribution::new(8));
179 let threads: Vec<_> = (0..8)
180 .map(|_| {
181 let d = Arc::clone(&dist);
182 std::thread::spawn(move || {
183 for i in 1..=1000u64 {
184 d.record(i);
185 }
186 })
187 })
188 .collect();
189
190 for t in threads {
191 t.join().expect("thread panicked");
192 }
193 assert_eq!(dist.count(), 8000);
194 assert_eq!(dist.sum(), 8 * (1000 * 1001 / 2));
195 }
196
197 #[test]
198 fn multiple_distributions_independent() {
199 let dist1 = Distribution::new(4);
200 let dist2 = Distribution::new(4);
201
202 dist1.record(100);
203 dist2.record(200);
204
205 assert_eq!(dist1.count(), 1);
206 assert_eq!(dist1.sum(), 100);
207 assert_eq!(dist2.count(), 1);
208 assert_eq!(dist2.sum(), 200);
209 }
210
211 #[test]
212 fn buckets_snapshot_merges_threads() {
213 let dist = Arc::new(Distribution::new(4));
214
215 let threads: Vec<_> = (0..4)
216 .map(|_| {
217 let d = Arc::clone(&dist);
218 std::thread::spawn(move || {
219 d.record(100); })
221 })
222 .collect();
223
224 for t in threads {
225 t.join().expect("thread panicked");
226 }
227
228 let snap = dist.buckets_snapshot();
229 assert_eq!(snap.count, 4);
230 assert_eq!(snap.sum, 400);
231 assert_eq!(snap.positive[6], 4); }
233
234 #[test]
235 fn records_zero() {
236 let dist = Distribution::new(4);
237 dist.record(0);
238 dist.record(0);
239 dist.record(42);
240
241 assert_eq!(dist.count(), 3);
242 assert_eq!(dist.sum(), 42);
243
244 let snap = dist.buckets_snapshot();
245 assert_eq!(snap.zero_count, 2);
246 assert_eq!(snap.min(), Some(0));
247 }
248
249 #[test]
250 fn min_max_approximate() {
251 let dist = Distribution::new(4);
252 dist.record(100); assert_eq!(dist.min(), Some(64));
256 assert_eq!(dist.max(), Some(127));
258 }
259}