Skip to main content

fast_telemetry/metric/
histogram.rs

1//! Fixed-bucket histogram using sharded counters.
2//!
3//! Each bucket is a thread-sharded `Counter`, so recording is contention-free.
4//! Bucket boundaries are defined at construction time.
5
6use crate::Counter;
7
8/// A fixed-bucket histogram with thread-sharded counters.
9///
10/// Buckets are defined by upper bounds (inclusive). Values are placed in the
11/// first bucket whose bound is >= the value. Values exceeding all bounds go
12/// in the final "+Inf" bucket.
13pub struct Histogram {
14    /// Upper bounds for each bucket (last is always +Inf conceptually)
15    bounds: Vec<u64>,
16    /// One sharded counter per bucket, plus one for +Inf
17    buckets: Vec<Counter>,
18    /// Sum of all recorded values (for computing mean)
19    sum: Counter,
20    /// Total count (for Prometheus _count)
21    count: Counter,
22}
23
24impl Histogram {
25    /// Create a histogram with the given bucket boundaries.
26    ///
27    /// Boundaries should be sorted ascending. Each boundary represents the
28    /// upper bound (inclusive) of a bucket. An implicit +Inf bucket is added.
29    ///
30    /// `shard_count` is passed to each underlying `Counter`.
31    pub fn new(bounds: &[u64], shard_count: usize) -> Self {
32        let buckets = (0..=bounds.len())
33            .map(|_| Counter::new(shard_count))
34            .collect();
35
36        Self {
37            bounds: bounds.to_vec(),
38            buckets,
39            sum: Counter::new(shard_count),
40            count: Counter::new(shard_count),
41        }
42    }
43
44    /// Create a histogram with default latency buckets (in microseconds).
45    ///
46    /// Buckets: 10µs, 50µs, 100µs, 500µs, 1ms, 5ms, 10ms, 50ms, 100ms, 500ms, 1s, 5s, 10s
47    pub fn with_latency_buckets(shard_count: usize) -> Self {
48        Self::new(
49            &[
50                10,         // 10µs
51                50,         // 50µs
52                100,        // 100µs
53                500,        // 500µs
54                1_000,      // 1ms
55                5_000,      // 5ms
56                10_000,     // 10ms
57                50_000,     // 50ms
58                100_000,    // 100ms
59                500_000,    // 500ms
60                1_000_000,  // 1s
61                5_000_000,  // 5s
62                10_000_000, // 10s
63            ],
64            shard_count,
65        )
66    }
67
68    /// Record a value in the histogram.
69    #[inline]
70    pub fn record(&self, value: u64) {
71        // Find bucket via linear search (should be fast for small bucket counts)
72        let bucket_idx = self
73            .bounds
74            .iter()
75            .position(|&bound| value <= bound)
76            .unwrap_or(self.bounds.len());
77
78        self.buckets[bucket_idx].inc();
79        self.sum.add(value as isize);
80        self.count.inc();
81    }
82
83    /// Get cumulative bucket counts -- for Prometheus exposition.
84    ///
85    /// Returns pairs of (upper_bound, cumulative_count). The last entry
86    /// has bound `u64::MAX` representing +Inf.
87    ///
88    /// Prefer [`Self::buckets_cumulative_iter`] on the export path; it avoids
89    /// the `Vec` allocation per call.
90    pub fn buckets_cumulative(&self) -> Vec<(u64, u64)> {
91        self.buckets_cumulative_iter().collect()
92    }
93
94    /// Iterator form of [`Self::buckets_cumulative`] that skips the `Vec`
95    /// allocation. Used by the Prometheus and OTLP export paths.
96    pub fn buckets_cumulative_iter(&self) -> impl Iterator<Item = (u64, u64)> + '_ {
97        let mut cumulative = 0i64;
98        self.buckets.iter().enumerate().map(move |(i, counter)| {
99            cumulative += counter.sum() as i64;
100            let bound = if i < self.bounds.len() {
101                self.bounds[i]
102            } else {
103                u64::MAX
104            };
105            (bound, cumulative as u64)
106        })
107    }
108
109    pub fn sum(&self) -> u64 {
110        self.sum.sum() as u64
111    }
112
113    pub fn count(&self) -> u64 {
114        self.count.sum() as u64
115    }
116}
117
118#[cfg(test)]
119mod tests {
120    use super::*;
121
122    #[test]
123    fn test_basic_recording() {
124        let h = Histogram::new(&[10, 100, 1000], 4);
125
126        h.record(5); // bucket 0 (≤10)
127        h.record(50); // bucket 1 (≤100)
128        h.record(500); // bucket 2 (≤1000)
129        h.record(5000); // bucket 3 (+Inf)
130
131        let buckets = h.buckets_cumulative();
132        assert_eq!(buckets.len(), 4);
133        assert_eq!(buckets[0], (10, 1)); // ≤10: 1 cumulative
134        assert_eq!(buckets[1], (100, 2)); // ≤100: 2 cumulative
135        assert_eq!(buckets[2], (1000, 3)); // ≤1000: 3 cumulative
136        assert_eq!(buckets[3], (u64::MAX, 4)); // +Inf: 4 cumulative
137
138        assert_eq!(h.count(), 4);
139        assert_eq!(h.sum(), 5 + 50 + 500 + 5000);
140    }
141
142    #[test]
143    fn test_boundary_values() {
144        let h = Histogram::new(&[10, 100], 4);
145
146        h.record(10); // exactly on boundary, goes in bucket 0
147        h.record(100); // exactly on boundary, goes in bucket 1
148
149        let buckets = h.buckets_cumulative();
150        assert_eq!(buckets[0], (10, 1));
151        assert_eq!(buckets[1], (100, 2));
152    }
153
154    #[test]
155    fn test_latency_buckets() {
156        let h = Histogram::with_latency_buckets(4);
157
158        h.record(5); // 5µs
159        h.record(1_000); // 1ms
160        h.record(1_000_000); // 1s
161
162        assert_eq!(h.count(), 3);
163    }
164}