Skip to main content

fast_telemetry/metric/
distribution.rs

1//! Distribution: exponential histogram storage for high-performance recording.
2//!
3//! Uses base-2 exponential buckets with pre-allocated, thread-sharded cells.
4//! The hot path is a TLS read + bitmask index + 3 relaxed atomics (bucket
5//! increment, sum add, count add) — identical cost to a sharded counter.
6//!
7//! OTLP export produces a native `ExponentialHistogram`.
8//! DogStatsD export produces count + sum counters (same as `Histogram`).
9
10use crate::exp_buckets::{ExpBuckets, ExpBucketsSnapshot};
11use crate::thread_id::thread_id;
12use crossbeam_utils::CachePadded;
13
14/// Exponential histogram distribution for high-performance metric recording.
15///
16/// Cells are pre-allocated and indexed by `thread_id & mask`, so recording is
17/// lock-free with no per-call TLS bookkeeping.
18pub struct Distribution {
19    cells: Vec<CachePadded<ExpBuckets>>,
20    shard_mask: usize,
21}
22
23impl Distribution {
24    /// Create a new Distribution with `shard_count` shards.
25    ///
26    /// The count is rounded up to the next power of two for fast modulo.
27    pub fn new(shard_count: usize) -> Self {
28        let shard_count = shard_count.next_power_of_two();
29        Self {
30            cells: (0..shard_count)
31                .map(|_| CachePadded::new(ExpBuckets::new()))
32                .collect(),
33            shard_mask: shard_count - 1,
34        }
35    }
36
37    /// Record a value.
38    #[inline]
39    pub fn record(&self, value: u64) {
40        let idx = thread_id() & self.shard_mask;
41        // SAFETY: idx is always < cells.len() due to power-of-two masking
42        let cell = if cfg!(debug_assertions) {
43            self.cells.get(idx).expect("index out of bounds")
44        } else {
45            unsafe { self.cells.get_unchecked(idx) }
46        };
47        cell.record(value);
48    }
49
50    /// Cumulative count across all shards.
51    pub fn count(&self) -> u64 {
52        self.cells.iter().map(|c| c.get_count()).sum()
53    }
54
55    /// Cumulative sum across all shards.
56    pub fn sum(&self) -> u64 {
57        self.cells.iter().map(|c| c.get_sum()).sum()
58    }
59
60    /// Approximate minimum from bucket boundaries.
61    pub fn min(&self) -> Option<u64> {
62        self.buckets_snapshot().min()
63    }
64
65    /// Approximate maximum from bucket boundaries.
66    pub fn max(&self) -> Option<u64> {
67        self.buckets_snapshot().max()
68    }
69
70    /// Mean value, or `None` if no values have been recorded.
71    pub fn mean(&self) -> Option<f64> {
72        let count = self.count();
73        if count == 0 {
74            return None;
75        }
76        Some(self.sum() as f64 / count as f64)
77    }
78
79    /// Merge bucket counts from all shards into a single snapshot.
80    pub fn buckets_snapshot(&self) -> ExpBucketsSnapshot {
81        let mut positive = [0u64; 64];
82        let mut zero_count = 0u64;
83        let mut sum = 0u64;
84        let mut count = 0u64;
85
86        for cell in &self.cells {
87            let shard_buckets = cell.get_positive_buckets();
88            for (i, &c) in shard_buckets.iter().enumerate() {
89                positive[i] += c;
90            }
91            zero_count += cell.get_zero_count();
92            sum += cell.get_sum();
93            count += cell.get_count();
94        }
95
96        ExpBucketsSnapshot {
97            positive,
98            zero_count,
99            sum,
100            count,
101        }
102    }
103}
104
105impl Default for Distribution {
106    fn default() -> Self {
107        Self::new(4)
108    }
109}
110
111impl std::fmt::Debug for Distribution {
112    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
113        f.debug_struct("Distribution")
114            .field("count", &self.count())
115            .field("sum", &self.sum())
116            .field("min", &self.min())
117            .field("max", &self.max())
118            .finish()
119    }
120}
121
122#[cfg(test)]
123mod tests {
124    use super::*;
125    use std::sync::Arc;
126
127    #[test]
128    fn basic_recording() {
129        let dist = Distribution::new(4);
130        dist.record(100);
131        dist.record(200);
132        dist.record(300);
133
134        assert_eq!(dist.count(), 3);
135        assert_eq!(dist.sum(), 600);
136        assert!(dist.min().is_some());
137        assert!(dist.max().is_some());
138    }
139
140    #[test]
141    fn empty() {
142        let dist = Distribution::new(4);
143        assert_eq!(dist.count(), 0);
144        assert_eq!(dist.min(), None);
145        assert_eq!(dist.max(), None);
146        assert_eq!(dist.mean(), None);
147    }
148
149    #[test]
150    fn mean() {
151        let dist = Distribution::new(4);
152        dist.record(100);
153        dist.record(200);
154        dist.record(300);
155
156        let mean = dist.mean().expect("should have mean");
157        assert!((mean - 200.0).abs() < 0.01);
158    }
159
160    #[test]
161    fn concurrent_recording() {
162        let dist = Arc::new(Distribution::new(8));
163        let threads: Vec<_> = (0..8)
164            .map(|_| {
165                let d = Arc::clone(&dist);
166                std::thread::spawn(move || {
167                    for i in 1..=1000u64 {
168                        d.record(i);
169                    }
170                })
171            })
172            .collect();
173
174        for t in threads {
175            t.join().expect("thread panicked");
176        }
177        assert_eq!(dist.count(), 8000);
178        assert_eq!(dist.sum(), 8 * (1000 * 1001 / 2));
179    }
180
181    #[test]
182    fn multiple_distributions_independent() {
183        let dist1 = Distribution::new(4);
184        let dist2 = Distribution::new(4);
185
186        dist1.record(100);
187        dist2.record(200);
188
189        assert_eq!(dist1.count(), 1);
190        assert_eq!(dist1.sum(), 100);
191        assert_eq!(dist2.count(), 1);
192        assert_eq!(dist2.sum(), 200);
193    }
194
195    #[test]
196    fn buckets_snapshot_merges_threads() {
197        let dist = Arc::new(Distribution::new(4));
198
199        let threads: Vec<_> = (0..4)
200            .map(|_| {
201                let d = Arc::clone(&dist);
202                std::thread::spawn(move || {
203                    d.record(100); // bucket 6 (64..128)
204                })
205            })
206            .collect();
207
208        for t in threads {
209            t.join().expect("thread panicked");
210        }
211
212        let snap = dist.buckets_snapshot();
213        assert_eq!(snap.count, 4);
214        assert_eq!(snap.sum, 400);
215        assert_eq!(snap.positive[6], 4); // value 100 → bucket 6
216    }
217
218    #[test]
219    fn records_zero() {
220        let dist = Distribution::new(4);
221        dist.record(0);
222        dist.record(0);
223        dist.record(42);
224
225        assert_eq!(dist.count(), 3);
226        assert_eq!(dist.sum(), 42);
227
228        let snap = dist.buckets_snapshot();
229        assert_eq!(snap.zero_count, 2);
230        assert_eq!(snap.min(), Some(0));
231    }
232
233    #[test]
234    fn min_max_approximate() {
235        let dist = Distribution::new(4);
236        dist.record(100); // bucket 6: [64, 128)
237
238        // min returns lower bound of lowest non-zero bucket
239        assert_eq!(dist.min(), Some(64));
240        // max returns upper bound of highest non-zero bucket
241        assert_eq!(dist.max(), Some(127));
242    }
243}