Skip to main content

fast_telemetry/metric/
distribution.rs

1//! Distribution: exponential histogram storage for high-performance recording.
2//!
3//! Uses base-2 exponential buckets with pre-allocated, thread-sharded cells.
4//! The hot path is a TLS read + bitmask index + 3 relaxed atomics (bucket
5//! increment, sum add, count add) — identical cost to a sharded counter.
6//!
7//! OTLP export produces a native `ExponentialHistogram`.
8//! DogStatsD export produces count + sum counters (same as `Histogram`).
9
10use crate::exp_buckets::{ExpBuckets, ExpBucketsSnapshot};
11use crate::thread_id::thread_id;
12use crossbeam_utils::CachePadded;
13
14/// Exponential histogram distribution for high-performance metric recording.
15///
16/// Cells are pre-allocated and indexed by `thread_id & mask`, so recording is
17/// lock-free with no per-call TLS bookkeeping.
18pub struct Distribution {
19    cells: Vec<CachePadded<ExpBuckets>>,
20    shard_mask: usize,
21}
22
23impl Distribution {
24    /// Create a new Distribution with `shard_count` shards.
25    ///
26    /// The count is rounded up to the next power of two for fast modulo.
27    pub fn new(shard_count: usize) -> Self {
28        let shard_count = shard_count.next_power_of_two();
29        Self {
30            cells: (0..shard_count)
31                .map(|_| CachePadded::new(ExpBuckets::new()))
32                .collect(),
33            shard_mask: shard_count - 1,
34        }
35    }
36
37    /// Record a value.
38    #[inline]
39    pub fn record(&self, value: u64) {
40        let idx = thread_id() & self.shard_mask;
41        // SAFETY: idx is always < cells.len() due to power-of-two masking
42        let cell = if cfg!(debug_assertions) {
43            self.cells.get(idx).expect("index out of bounds")
44        } else {
45            unsafe { self.cells.get_unchecked(idx) }
46        };
47        cell.record(value);
48    }
49
50    /// Cumulative count across all shards.
51    pub fn count(&self) -> u64 {
52        self.cells.iter().map(|c| c.get_count()).sum()
53    }
54
55    /// Cumulative sum across all shards.
56    pub fn sum(&self) -> u64 {
57        self.cells.iter().map(|c| c.get_sum()).sum()
58    }
59
60    /// Sum and count across all shards in a single pass.
61    ///
62    /// Cheaper than calling [`Self::sum`] and [`Self::count`] separately:
63    /// each shard's `ExpBuckets` is visited once, and `sum` + `count` share
64    /// a cache line per shard so the combined pass roughly halves the cache
65    /// traffic of two separate scans.
66    pub fn sum_and_count(&self) -> (u64, u64) {
67        let mut sum = 0u64;
68        let mut count = 0u64;
69        for cell in &self.cells {
70            sum += cell.get_sum();
71            count += cell.get_count();
72        }
73        (sum, count)
74    }
75
76    /// Approximate minimum from bucket boundaries.
77    pub fn min(&self) -> Option<u64> {
78        self.buckets_snapshot().min()
79    }
80
81    /// Approximate maximum from bucket boundaries.
82    pub fn max(&self) -> Option<u64> {
83        self.buckets_snapshot().max()
84    }
85
86    /// Mean value, or `None` if no values have been recorded.
87    pub fn mean(&self) -> Option<f64> {
88        let count = self.count();
89        if count == 0 {
90            return None;
91        }
92        Some(self.sum() as f64 / count as f64)
93    }
94
95    /// Merge bucket counts from all shards into a single snapshot.
96    pub fn buckets_snapshot(&self) -> ExpBucketsSnapshot {
97        let mut positive = [0u64; 64];
98        let mut zero_count = 0u64;
99        let mut sum = 0u64;
100        let mut count = 0u64;
101
102        for cell in &self.cells {
103            let shard_buckets = cell.get_positive_buckets();
104            for (i, &c) in shard_buckets.iter().enumerate() {
105                positive[i] += c;
106            }
107            zero_count += cell.get_zero_count();
108            sum += cell.get_sum();
109            count += cell.get_count();
110        }
111
112        ExpBucketsSnapshot {
113            positive,
114            zero_count,
115            sum,
116            count,
117        }
118    }
119}
120
121impl Default for Distribution {
122    fn default() -> Self {
123        Self::new(4)
124    }
125}
126
127impl std::fmt::Debug for Distribution {
128    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
129        f.debug_struct("Distribution")
130            .field("count", &self.count())
131            .field("sum", &self.sum())
132            .field("min", &self.min())
133            .field("max", &self.max())
134            .finish()
135    }
136}
137
138#[cfg(test)]
139mod tests {
140    use super::*;
141    use std::sync::Arc;
142
143    #[test]
144    fn basic_recording() {
145        let dist = Distribution::new(4);
146        dist.record(100);
147        dist.record(200);
148        dist.record(300);
149
150        assert_eq!(dist.count(), 3);
151        assert_eq!(dist.sum(), 600);
152        assert!(dist.min().is_some());
153        assert!(dist.max().is_some());
154    }
155
156    #[test]
157    fn empty() {
158        let dist = Distribution::new(4);
159        assert_eq!(dist.count(), 0);
160        assert_eq!(dist.min(), None);
161        assert_eq!(dist.max(), None);
162        assert_eq!(dist.mean(), None);
163    }
164
165    #[test]
166    fn mean() {
167        let dist = Distribution::new(4);
168        dist.record(100);
169        dist.record(200);
170        dist.record(300);
171
172        let mean = dist.mean().expect("should have mean");
173        assert!((mean - 200.0).abs() < 0.01);
174    }
175
176    #[test]
177    fn concurrent_recording() {
178        let dist = Arc::new(Distribution::new(8));
179        let threads: Vec<_> = (0..8)
180            .map(|_| {
181                let d = Arc::clone(&dist);
182                std::thread::spawn(move || {
183                    for i in 1..=1000u64 {
184                        d.record(i);
185                    }
186                })
187            })
188            .collect();
189
190        for t in threads {
191            t.join().expect("thread panicked");
192        }
193        assert_eq!(dist.count(), 8000);
194        assert_eq!(dist.sum(), 8 * (1000 * 1001 / 2));
195    }
196
197    #[test]
198    fn multiple_distributions_independent() {
199        let dist1 = Distribution::new(4);
200        let dist2 = Distribution::new(4);
201
202        dist1.record(100);
203        dist2.record(200);
204
205        assert_eq!(dist1.count(), 1);
206        assert_eq!(dist1.sum(), 100);
207        assert_eq!(dist2.count(), 1);
208        assert_eq!(dist2.sum(), 200);
209    }
210
211    #[test]
212    fn buckets_snapshot_merges_threads() {
213        let dist = Arc::new(Distribution::new(4));
214
215        let threads: Vec<_> = (0..4)
216            .map(|_| {
217                let d = Arc::clone(&dist);
218                std::thread::spawn(move || {
219                    d.record(100); // bucket 6 (64..128)
220                })
221            })
222            .collect();
223
224        for t in threads {
225            t.join().expect("thread panicked");
226        }
227
228        let snap = dist.buckets_snapshot();
229        assert_eq!(snap.count, 4);
230        assert_eq!(snap.sum, 400);
231        assert_eq!(snap.positive[6], 4); // value 100 → bucket 6
232    }
233
234    #[test]
235    fn records_zero() {
236        let dist = Distribution::new(4);
237        dist.record(0);
238        dist.record(0);
239        dist.record(42);
240
241        assert_eq!(dist.count(), 3);
242        assert_eq!(dist.sum(), 42);
243
244        let snap = dist.buckets_snapshot();
245        assert_eq!(snap.zero_count, 2);
246        assert_eq!(snap.min(), Some(0));
247    }
248
249    #[test]
250    fn min_max_approximate() {
251        let dist = Distribution::new(4);
252        dist.record(100); // bucket 6: [64, 128)
253
254        // min returns lower bound of lowest non-zero bucket
255        assert_eq!(dist.min(), Some(64));
256        // max returns upper bound of highest non-zero bucket
257        assert_eq!(dist.max(), Some(127));
258    }
259}