Skip to main content

dbx_core/monitoring/
histogram.rs

1//! Latency Histogram — bucket-based latency distribution tracker.
2//!
3//! Tracks operation latency in microseconds using pre-defined buckets.
4//! Thread-safe via `Mutex<HistogramInner>`.
5
6use std::sync::Mutex;
7
8/// Pre-defined upper bounds in microseconds.
9/// [10µs, 50µs, 100µs, 500µs, 1ms, 5ms, 10ms, 50ms, 100ms, 500ms, 1s, +Inf]
10const BUCKET_BOUNDS_US: &[u64] = &[
11    10, 50, 100, 500, 1_000, 5_000, 10_000, 50_000, 100_000, 500_000, 1_000_000,
12];
13
14const NUM_BUCKETS: usize = 12; // 11 finite + 1 +Inf
15
16struct HistogramInner {
17    /// Cumulative counts per bucket (index 11 = +Inf)
18    buckets: [u64; NUM_BUCKETS],
19    /// Sum of all observed values (µs)
20    sum_us: u64,
21    /// Total observations
22    count: u64,
23}
24
25impl HistogramInner {
26    fn new() -> Self {
27        Self {
28            buckets: [0; NUM_BUCKETS],
29            sum_us: 0,
30            count: 0,
31        }
32    }
33
34    fn observe(&mut self, value_us: u64) {
35        self.sum_us = self.sum_us.saturating_add(value_us);
36        self.count += 1;
37
38        // Insert into all buckets where upper_bound >= value
39        for (i, &bound) in BUCKET_BOUNDS_US.iter().enumerate() {
40            if value_us <= bound {
41                self.buckets[i] += 1;
42            }
43        }
44        // +Inf bucket always gets the observation
45        self.buckets[NUM_BUCKETS - 1] += 1;
46    }
47
48    fn reset(&mut self) {
49        self.buckets = [0; NUM_BUCKETS];
50        self.sum_us = 0;
51        self.count = 0;
52    }
53}
54
55/// Thread-safe latency histogram.
56pub struct Histogram {
57    inner: Mutex<HistogramInner>,
58    /// Name used for Prometheus export
59    pub name: &'static str,
60    /// Help text for Prometheus export
61    pub help: &'static str,
62}
63
64impl Histogram {
65    pub fn new(name: &'static str, help: &'static str) -> Self {
66        Self {
67            inner: Mutex::new(HistogramInner::new()),
68            name,
69            help,
70        }
71    }
72
73    /// Record a latency observation in microseconds.
74    pub fn observe(&self, value_us: u64) {
75        if let Ok(mut inner) = self.inner.lock() {
76            inner.observe(value_us);
77        }
78    }
79
80    /// Reset all histogram data.
81    pub fn reset(&self) {
82        if let Ok(mut inner) = self.inner.lock() {
83            inner.reset();
84        }
85    }
86
87    /// Export Prometheus histogram lines.
88    pub fn export_prometheus(&self, out: &mut String) {
89        let inner = match self.inner.lock() {
90            Ok(inner) => inner,
91            Err(_) => return,
92        };
93
94        out.push_str(&format!("# HELP {} {}\n", self.name, self.help));
95        out.push_str(&format!("# TYPE {} histogram\n", self.name));
96
97        // Cumulative bucket lines
98        for (i, &bound) in BUCKET_BOUNDS_US.iter().enumerate() {
99            let label = if bound < 1_000 {
100                format!("{}µs", bound)
101            } else if bound < 1_000_000 {
102                format!("{}ms", bound / 1_000)
103            } else {
104                format!("{}s", bound / 1_000_000)
105            };
106            out.push_str(&format!(
107                "{}_bucket{{le=\"{}\"}} {}\n",
108                self.name, label, inner.buckets[i]
109            ));
110        }
111        // +Inf bucket
112        out.push_str(&format!(
113            "{}_bucket{{le=\"+Inf\"}} {}\n",
114            self.name,
115            inner.buckets[NUM_BUCKETS - 1]
116        ));
117        out.push_str(&format!("{}_sum {}\n", self.name, inner.sum_us));
118        out.push_str(&format!("{}_count {}\n", self.name, inner.count));
119    }
120
121    /// Return (sum_us, count) snapshot.
122    pub fn snapshot(&self) -> (u64, u64) {
123        self.inner
124            .lock()
125            .map(|inner| (inner.sum_us, inner.count))
126            .unwrap_or((0, 0))
127    }
128
129    /// Return bucket counts (excluding +Inf) and total count.
130    pub fn bucket_snapshot(&self) -> ([u64; NUM_BUCKETS], u64) {
131        self.inner
132            .lock()
133            .map(|inner| (inner.buckets, inner.count))
134            .unwrap_or(([0; NUM_BUCKETS], 0))
135    }
136}
137
138#[cfg(test)]
139mod tests {
140    use super::*;
141
142    #[test]
143    fn test_observe_buckets() {
144        let h = Histogram::new("test", "test histogram");
145        h.observe(5); // → 10µs bucket
146        h.observe(100); // → 100µs bucket
147        h.observe(2_000_000); // → +Inf only
148
149        let (buckets, count) = h.bucket_snapshot();
150        assert_eq!(count, 3);
151        // 10µs bucket should have 1 (only 5µs)
152        assert_eq!(buckets[0], 1);
153        // +Inf bucket has all 3
154        assert_eq!(buckets[NUM_BUCKETS - 1], 3);
155    }
156
157    #[test]
158    fn test_reset() {
159        let h = Histogram::new("test", "test histogram");
160        h.observe(100);
161        h.reset();
162        let (sum, count) = h.snapshot();
163        assert_eq!(sum, 0);
164        assert_eq!(count, 0);
165    }
166
167    #[test]
168    fn test_prometheus_export() {
169        let h = Histogram::new("dbx_query_latency_us", "Query latency");
170        h.observe(80);
171        let mut out = String::new();
172        h.export_prometheus(&mut out);
173        assert!(out.contains("dbx_query_latency_us_count 1"));
174        assert!(out.contains("# TYPE dbx_query_latency_us histogram"));
175    }
176}