heliosdb_proxy/analytics/
histogram.rs1use std::sync::atomic::{AtomicU64, Ordering};
6use std::time::Duration;
7
8const DEFAULT_BUCKETS_US: &[u64] = &[
10 100, 500, 1_000, 5_000, 10_000, 25_000, 50_000, 100_000, 250_000, 500_000, 1_000_000, 2_500_000, 5_000_000, 10_000_000, ];
25
26#[derive(Debug)]
28pub struct HistogramBucket {
29 pub upper_bound_us: u64,
31 count: AtomicU64,
33}
34
35impl HistogramBucket {
36 pub fn new(upper_bound_us: u64) -> Self {
38 Self {
39 upper_bound_us,
40 count: AtomicU64::new(0),
41 }
42 }
43
44 pub fn increment(&self) {
46 self.count.fetch_add(1, Ordering::Relaxed);
47 }
48
49 pub fn count(&self) -> u64 {
51 self.count.load(Ordering::Relaxed)
52 }
53}
54
55pub struct LatencyHistogram {
57 buckets: Vec<HistogramBucket>,
59 overflow: AtomicU64,
61 total_count: AtomicU64,
63 total_sum_us: AtomicU64,
65}
66
67impl LatencyHistogram {
68 pub fn new() -> Self {
70 Self::with_buckets(DEFAULT_BUCKETS_US)
71 }
72
73 pub fn with_buckets(boundaries_us: &[u64]) -> Self {
75 let buckets = boundaries_us
76 .iter()
77 .map(|&bound| HistogramBucket::new(bound))
78 .collect();
79
80 Self {
81 buckets,
82 overflow: AtomicU64::new(0),
83 total_count: AtomicU64::new(0),
84 total_sum_us: AtomicU64::new(0),
85 }
86 }
87
88 pub fn record(&self, duration: Duration) {
90 let value_us = duration.as_micros() as u64;
91
92 self.total_count.fetch_add(1, Ordering::Relaxed);
93 self.total_sum_us.fetch_add(value_us, Ordering::Relaxed);
94
95 let mut recorded = false;
97 for bucket in &self.buckets {
98 if value_us < bucket.upper_bound_us {
99 bucket.increment();
100 recorded = true;
101 break;
102 }
103 }
104
105 if !recorded {
106 self.overflow.fetch_add(1, Ordering::Relaxed);
107 }
108 }
109
110 pub fn record_us(&self, value_us: u64) {
112 self.record(Duration::from_micros(value_us));
113 }
114
115 pub fn count(&self) -> u64 {
117 self.total_count.load(Ordering::Relaxed)
118 }
119
120 pub fn mean(&self) -> Duration {
122 let count = self.total_count.load(Ordering::Relaxed);
123 if count == 0 {
124 return Duration::ZERO;
125 }
126 let sum = self.total_sum_us.load(Ordering::Relaxed);
127 Duration::from_micros(sum / count)
128 }
129
130 pub fn percentile(&self, p: f64) -> Duration {
132 let p = p.clamp(0.0, 1.0);
133 let total = self.total_count.load(Ordering::Relaxed);
134
135 if total == 0 {
136 return Duration::ZERO;
137 }
138
139 let target = (total as f64 * p).ceil() as u64;
140 let mut cumulative = 0u64;
141
142 for bucket in &self.buckets {
143 cumulative += bucket.count();
144 if cumulative >= target {
145 return Duration::from_micros(bucket.upper_bound_us);
146 }
147 }
148
149 if let Some(last) = self.buckets.last() {
151 Duration::from_micros(last.upper_bound_us)
152 } else {
153 Duration::ZERO
154 }
155 }
156
157 pub fn p50(&self) -> Duration {
159 self.percentile(0.50)
160 }
161
162 pub fn p90(&self) -> Duration {
164 self.percentile(0.90)
165 }
166
167 pub fn p95(&self) -> Duration {
169 self.percentile(0.95)
170 }
171
172 pub fn p99(&self) -> Duration {
174 self.percentile(0.99)
175 }
176
177 pub fn snapshot(&self) -> HistogramSnapshot {
179 let buckets: Vec<_> = self
180 .buckets
181 .iter()
182 .map(|b| BucketSnapshot {
183 upper_bound_us: b.upper_bound_us,
184 count: b.count(),
185 })
186 .collect();
187
188 HistogramSnapshot {
189 buckets,
190 overflow: self.overflow.load(Ordering::Relaxed),
191 total_count: self.total_count.load(Ordering::Relaxed),
192 total_sum_us: self.total_sum_us.load(Ordering::Relaxed),
193 }
194 }
195
196 pub fn reset(&self) {
198 for bucket in &self.buckets {
199 bucket.count.store(0, Ordering::Relaxed);
200 }
201 self.overflow.store(0, Ordering::Relaxed);
202 self.total_count.store(0, Ordering::Relaxed);
203 self.total_sum_us.store(0, Ordering::Relaxed);
204 }
205}
206
207impl Default for LatencyHistogram {
208 fn default() -> Self {
209 Self::new()
210 }
211}
212
213#[derive(Debug, Clone)]
215pub struct BucketSnapshot {
216 pub upper_bound_us: u64,
218 pub count: u64,
220}
221
222#[derive(Debug, Clone)]
224pub struct HistogramSnapshot {
225 pub buckets: Vec<BucketSnapshot>,
227 pub overflow: u64,
229 pub total_count: u64,
231 pub total_sum_us: u64,
233}
234
235impl HistogramSnapshot {
236 pub fn mean(&self) -> Duration {
238 if self.total_count == 0 {
239 return Duration::ZERO;
240 }
241 Duration::from_micros(self.total_sum_us / self.total_count)
242 }
243
244 pub fn percentile(&self, p: f64) -> Duration {
246 let p = p.clamp(0.0, 1.0);
247
248 if self.total_count == 0 {
249 return Duration::ZERO;
250 }
251
252 let target = (self.total_count as f64 * p).ceil() as u64;
253 let mut cumulative = 0u64;
254
255 for bucket in &self.buckets {
256 cumulative += bucket.count;
257 if cumulative >= target {
258 return Duration::from_micros(bucket.upper_bound_us);
259 }
260 }
261
262 if let Some(last) = self.buckets.last() {
263 Duration::from_micros(last.upper_bound_us)
264 } else {
265 Duration::ZERO
266 }
267 }
268
269 pub fn format_ascii(&self, width: usize) -> String {
271 let max_count = self.buckets.iter().map(|b| b.count).max().unwrap_or(1);
272 let mut output = String::new();
273
274 for bucket in &self.buckets {
275 let label = format_duration(bucket.upper_bound_us);
276 let bar_len = if max_count > 0 {
277 (bucket.count as f64 / max_count as f64 * width as f64) as usize
278 } else {
279 0
280 };
281 let bar: String = std::iter::repeat('#').take(bar_len).collect();
282 output.push_str(&format!("{:>8} | {:6} | {}\n", label, bucket.count, bar));
283 }
284
285 if self.overflow > 0 {
286 output.push_str(&format!("{:>8} | {:6} | (overflow)\n", ">max", self.overflow));
287 }
288
289 output
290 }
291}
292
293fn format_duration(us: u64) -> String {
295 if us < 1_000 {
296 format!("{}µs", us)
297 } else if us < 1_000_000 {
298 format!("{}ms", us / 1_000)
299 } else {
300 format!("{:.1}s", us as f64 / 1_000_000.0)
301 }
302}
303
304#[cfg(test)]
305mod tests {
306 use super::*;
307
308 #[test]
309 fn test_histogram_new() {
310 let hist = LatencyHistogram::new();
311 assert_eq!(hist.count(), 0);
312 assert_eq!(hist.mean(), Duration::ZERO);
313 }
314
315 #[test]
316 fn test_histogram_record() {
317 let hist = LatencyHistogram::new();
318
319 hist.record(Duration::from_micros(500));
320 hist.record(Duration::from_millis(5));
321 hist.record(Duration::from_millis(50));
322
323 assert_eq!(hist.count(), 3);
324 }
325
326 #[test]
327 fn test_histogram_mean() {
328 let hist = LatencyHistogram::new();
329
330 hist.record(Duration::from_millis(10));
331 hist.record(Duration::from_millis(20));
332 hist.record(Duration::from_millis(30));
333
334 let mean = hist.mean();
335 assert_eq!(mean, Duration::from_millis(20));
336 }
337
338 #[test]
339 fn test_histogram_percentiles() {
340 let hist = LatencyHistogram::new();
341
342 for i in 1..=100 {
344 hist.record(Duration::from_millis(i));
345 }
346
347 let p50 = hist.p50();
349 assert!(p50 >= Duration::from_millis(50));
350
351 let p99 = hist.p99();
353 assert!(p99 >= Duration::from_millis(100));
354 }
355
356 #[test]
357 fn test_histogram_snapshot() {
358 let hist = LatencyHistogram::new();
359
360 hist.record(Duration::from_millis(1));
361 hist.record(Duration::from_millis(10));
362
363 let snapshot = hist.snapshot();
364 assert_eq!(snapshot.total_count, 2);
365 }
366
367 #[test]
368 fn test_histogram_reset() {
369 let hist = LatencyHistogram::new();
370
371 hist.record(Duration::from_millis(10));
372 assert_eq!(hist.count(), 1);
373
374 hist.reset();
375 assert_eq!(hist.count(), 0);
376 }
377
378 #[test]
379 fn test_custom_buckets() {
380 let hist = LatencyHistogram::with_buckets(&[100, 1000, 10000]);
381
382 hist.record(Duration::from_micros(50)); hist.record(Duration::from_micros(500)); hist.record(Duration::from_micros(5000)); hist.record(Duration::from_micros(50000)); let snapshot = hist.snapshot();
388 assert_eq!(snapshot.buckets[0].count, 1);
389 assert_eq!(snapshot.buckets[1].count, 1);
390 assert_eq!(snapshot.buckets[2].count, 1);
391 assert_eq!(snapshot.overflow, 1);
392 }
393
394 #[test]
395 fn test_format_duration() {
396 assert_eq!(format_duration(500), "500µs");
397 assert_eq!(format_duration(5_000), "5ms");
398 assert_eq!(format_duration(5_000_000), "5.0s");
399 }
400
401 #[test]
402 fn test_snapshot_format_ascii() {
403 let hist = LatencyHistogram::with_buckets(&[1000, 10000, 100000]);
404
405 hist.record(Duration::from_micros(500));
406 hist.record(Duration::from_micros(500));
407 hist.record(Duration::from_micros(5000));
408
409 let snapshot = hist.snapshot();
410 let ascii = snapshot.format_ascii(20);
411
412 assert!(ascii.contains("1ms"));
413 assert!(ascii.contains("10ms"));
414 }
415}