Skip to main content

heliosdb_proxy/analytics/
histogram.rs

1//! Latency Histogram
2//!
3//! Track query latency distributions with configurable bucket boundaries.
4
5use std::sync::atomic::{AtomicU64, Ordering};
6use std::time::Duration;
7
8/// Default bucket boundaries in microseconds
9const DEFAULT_BUCKETS_US: &[u64] = &[
10    100,        // 100µs
11    500,        // 500µs
12    1_000,      // 1ms
13    5_000,      // 5ms
14    10_000,     // 10ms
15    25_000,     // 25ms
16    50_000,     // 50ms
17    100_000,    // 100ms
18    250_000,    // 250ms
19    500_000,    // 500ms
20    1_000_000,  // 1s
21    2_500_000,  // 2.5s
22    5_000_000,  // 5s
23    10_000_000, // 10s
24];
25
26/// Histogram bucket
27#[derive(Debug)]
28pub struct HistogramBucket {
29    /// Upper bound in microseconds (exclusive)
30    pub upper_bound_us: u64,
31    /// Count of values in this bucket
32    count: AtomicU64,
33}
34
35impl HistogramBucket {
36    /// Create new bucket with upper bound
37    pub fn new(upper_bound_us: u64) -> Self {
38        Self {
39            upper_bound_us,
40            count: AtomicU64::new(0),
41        }
42    }
43
44    /// Increment count
45    pub fn increment(&self) {
46        self.count.fetch_add(1, Ordering::Relaxed);
47    }
48
49    /// Get count
50    pub fn count(&self) -> u64 {
51        self.count.load(Ordering::Relaxed)
52    }
53}
54
55/// Latency histogram for tracking query execution times
56pub struct LatencyHistogram {
57    /// Histogram buckets
58    buckets: Vec<HistogramBucket>,
59    /// Overflow bucket (values exceeding max bucket)
60    overflow: AtomicU64,
61    /// Total count
62    total_count: AtomicU64,
63    /// Sum of all values (for mean calculation)
64    total_sum_us: AtomicU64,
65}
66
67impl LatencyHistogram {
68    /// Create histogram with default buckets
69    pub fn new() -> Self {
70        Self::with_buckets(DEFAULT_BUCKETS_US)
71    }
72
73    /// Create histogram with custom bucket boundaries
74    pub fn with_buckets(boundaries_us: &[u64]) -> Self {
75        let buckets = boundaries_us
76            .iter()
77            .map(|&bound| HistogramBucket::new(bound))
78            .collect();
79
80        Self {
81            buckets,
82            overflow: AtomicU64::new(0),
83            total_count: AtomicU64::new(0),
84            total_sum_us: AtomicU64::new(0),
85        }
86    }
87
88    /// Record a duration
89    pub fn record(&self, duration: Duration) {
90        let value_us = duration.as_micros() as u64;
91
92        self.total_count.fetch_add(1, Ordering::Relaxed);
93        self.total_sum_us.fetch_add(value_us, Ordering::Relaxed);
94
95        // Find the appropriate bucket
96        let mut recorded = false;
97        for bucket in &self.buckets {
98            if value_us < bucket.upper_bound_us {
99                bucket.increment();
100                recorded = true;
101                break;
102            }
103        }
104
105        if !recorded {
106            self.overflow.fetch_add(1, Ordering::Relaxed);
107        }
108    }
109
110    /// Record a value in microseconds
111    pub fn record_us(&self, value_us: u64) {
112        self.record(Duration::from_micros(value_us));
113    }
114
115    /// Get total count
116    pub fn count(&self) -> u64 {
117        self.total_count.load(Ordering::Relaxed)
118    }
119
120    /// Get mean latency
121    pub fn mean(&self) -> Duration {
122        let count = self.total_count.load(Ordering::Relaxed);
123        if count == 0 {
124            return Duration::ZERO;
125        }
126        let sum = self.total_sum_us.load(Ordering::Relaxed);
127        Duration::from_micros(sum / count)
128    }
129
130    /// Get percentile (0.0 - 1.0)
131    pub fn percentile(&self, p: f64) -> Duration {
132        let p = p.clamp(0.0, 1.0);
133        let total = self.total_count.load(Ordering::Relaxed);
134
135        if total == 0 {
136            return Duration::ZERO;
137        }
138
139        let target = (total as f64 * p).ceil() as u64;
140        let mut cumulative = 0u64;
141
142        for bucket in &self.buckets {
143            cumulative += bucket.count();
144            if cumulative >= target {
145                return Duration::from_micros(bucket.upper_bound_us);
146            }
147        }
148
149        // Overflow bucket - return last bucket boundary
150        if let Some(last) = self.buckets.last() {
151            Duration::from_micros(last.upper_bound_us)
152        } else {
153            Duration::ZERO
154        }
155    }
156
157    /// Get P50 (median)
158    pub fn p50(&self) -> Duration {
159        self.percentile(0.50)
160    }
161
162    /// Get P90
163    pub fn p90(&self) -> Duration {
164        self.percentile(0.90)
165    }
166
167    /// Get P95
168    pub fn p95(&self) -> Duration {
169        self.percentile(0.95)
170    }
171
172    /// Get P99
173    pub fn p99(&self) -> Duration {
174        self.percentile(0.99)
175    }
176
177    /// Get snapshot of histogram
178    pub fn snapshot(&self) -> HistogramSnapshot {
179        let buckets: Vec<_> = self
180            .buckets
181            .iter()
182            .map(|b| BucketSnapshot {
183                upper_bound_us: b.upper_bound_us,
184                count: b.count(),
185            })
186            .collect();
187
188        HistogramSnapshot {
189            buckets,
190            overflow: self.overflow.load(Ordering::Relaxed),
191            total_count: self.total_count.load(Ordering::Relaxed),
192            total_sum_us: self.total_sum_us.load(Ordering::Relaxed),
193        }
194    }
195
196    /// Reset histogram
197    pub fn reset(&self) {
198        for bucket in &self.buckets {
199            bucket.count.store(0, Ordering::Relaxed);
200        }
201        self.overflow.store(0, Ordering::Relaxed);
202        self.total_count.store(0, Ordering::Relaxed);
203        self.total_sum_us.store(0, Ordering::Relaxed);
204    }
205}
206
207impl Default for LatencyHistogram {
208    fn default() -> Self {
209        Self::new()
210    }
211}
212
213/// Snapshot of a histogram bucket
214#[derive(Debug, Clone)]
215pub struct BucketSnapshot {
216    /// Upper bound in microseconds
217    pub upper_bound_us: u64,
218    /// Count of values
219    pub count: u64,
220}
221
222/// Snapshot of histogram state
223#[derive(Debug, Clone)]
224pub struct HistogramSnapshot {
225    /// Bucket snapshots
226    pub buckets: Vec<BucketSnapshot>,
227    /// Overflow count
228    pub overflow: u64,
229    /// Total count
230    pub total_count: u64,
231    /// Total sum in microseconds
232    pub total_sum_us: u64,
233}
234
235impl HistogramSnapshot {
236    /// Get mean latency
237    pub fn mean(&self) -> Duration {
238        if self.total_count == 0 {
239            return Duration::ZERO;
240        }
241        Duration::from_micros(self.total_sum_us / self.total_count)
242    }
243
244    /// Get percentile from snapshot
245    pub fn percentile(&self, p: f64) -> Duration {
246        let p = p.clamp(0.0, 1.0);
247
248        if self.total_count == 0 {
249            return Duration::ZERO;
250        }
251
252        let target = (self.total_count as f64 * p).ceil() as u64;
253        let mut cumulative = 0u64;
254
255        for bucket in &self.buckets {
256            cumulative += bucket.count;
257            if cumulative >= target {
258                return Duration::from_micros(bucket.upper_bound_us);
259            }
260        }
261
262        if let Some(last) = self.buckets.last() {
263            Duration::from_micros(last.upper_bound_us)
264        } else {
265            Duration::ZERO
266        }
267    }
268
269    /// Format as ASCII histogram
270    pub fn format_ascii(&self, width: usize) -> String {
271        let max_count = self.buckets.iter().map(|b| b.count).max().unwrap_or(1);
272        let mut output = String::new();
273
274        for bucket in &self.buckets {
275            let label = format_duration(bucket.upper_bound_us);
276            let bar_len = if max_count > 0 {
277                (bucket.count as f64 / max_count as f64 * width as f64) as usize
278            } else {
279                0
280            };
281            let bar: String = std::iter::repeat('#').take(bar_len).collect();
282            output.push_str(&format!("{:>8} | {:6} | {}\n", label, bucket.count, bar));
283        }
284
285        if self.overflow > 0 {
286            output.push_str(&format!("{:>8} | {:6} | (overflow)\n", ">max", self.overflow));
287        }
288
289        output
290    }
291}
292
293/// Format microseconds as human-readable duration
294fn format_duration(us: u64) -> String {
295    if us < 1_000 {
296        format!("{}µs", us)
297    } else if us < 1_000_000 {
298        format!("{}ms", us / 1_000)
299    } else {
300        format!("{:.1}s", us as f64 / 1_000_000.0)
301    }
302}
303
304#[cfg(test)]
305mod tests {
306    use super::*;
307
308    #[test]
309    fn test_histogram_new() {
310        let hist = LatencyHistogram::new();
311        assert_eq!(hist.count(), 0);
312        assert_eq!(hist.mean(), Duration::ZERO);
313    }
314
315    #[test]
316    fn test_histogram_record() {
317        let hist = LatencyHistogram::new();
318
319        hist.record(Duration::from_micros(500));
320        hist.record(Duration::from_millis(5));
321        hist.record(Duration::from_millis(50));
322
323        assert_eq!(hist.count(), 3);
324    }
325
326    #[test]
327    fn test_histogram_mean() {
328        let hist = LatencyHistogram::new();
329
330        hist.record(Duration::from_millis(10));
331        hist.record(Duration::from_millis(20));
332        hist.record(Duration::from_millis(30));
333
334        let mean = hist.mean();
335        assert_eq!(mean, Duration::from_millis(20));
336    }
337
338    #[test]
339    fn test_histogram_percentiles() {
340        let hist = LatencyHistogram::new();
341
342        // Record 100 values from 1ms to 100ms
343        for i in 1..=100 {
344            hist.record(Duration::from_millis(i));
345        }
346
347        // P50 should be around 50ms bucket
348        let p50 = hist.p50();
349        assert!(p50 >= Duration::from_millis(50));
350
351        // P99 should be around 99ms bucket
352        let p99 = hist.p99();
353        assert!(p99 >= Duration::from_millis(100));
354    }
355
356    #[test]
357    fn test_histogram_snapshot() {
358        let hist = LatencyHistogram::new();
359
360        hist.record(Duration::from_millis(1));
361        hist.record(Duration::from_millis(10));
362
363        let snapshot = hist.snapshot();
364        assert_eq!(snapshot.total_count, 2);
365    }
366
367    #[test]
368    fn test_histogram_reset() {
369        let hist = LatencyHistogram::new();
370
371        hist.record(Duration::from_millis(10));
372        assert_eq!(hist.count(), 1);
373
374        hist.reset();
375        assert_eq!(hist.count(), 0);
376    }
377
378    #[test]
379    fn test_custom_buckets() {
380        let hist = LatencyHistogram::with_buckets(&[100, 1000, 10000]);
381
382        hist.record(Duration::from_micros(50));   // bucket 0 (< 100)
383        hist.record(Duration::from_micros(500));  // bucket 1 (< 1000)
384        hist.record(Duration::from_micros(5000)); // bucket 2 (< 10000)
385        hist.record(Duration::from_micros(50000)); // overflow
386
387        let snapshot = hist.snapshot();
388        assert_eq!(snapshot.buckets[0].count, 1);
389        assert_eq!(snapshot.buckets[1].count, 1);
390        assert_eq!(snapshot.buckets[2].count, 1);
391        assert_eq!(snapshot.overflow, 1);
392    }
393
394    #[test]
395    fn test_format_duration() {
396        assert_eq!(format_duration(500), "500µs");
397        assert_eq!(format_duration(5_000), "5ms");
398        assert_eq!(format_duration(5_000_000), "5.0s");
399    }
400
401    #[test]
402    fn test_snapshot_format_ascii() {
403        let hist = LatencyHistogram::with_buckets(&[1000, 10000, 100000]);
404
405        hist.record(Duration::from_micros(500));
406        hist.record(Duration::from_micros(500));
407        hist.record(Duration::from_micros(5000));
408
409        let snapshot = hist.snapshot();
410        let ascii = snapshot.format_ascii(20);
411
412        assert!(ascii.contains("1ms"));
413        assert!(ascii.contains("10ms"));
414    }
415}