1use std::collections::VecDeque;
4
5#[derive(Debug, Clone)]
34pub struct StreamingStats {
35 n: u64,
36 mean: f64,
37 m2: f64, }
39
40impl StreamingStats {
41 pub fn new() -> Self {
43 Self {
44 n: 0,
45 mean: 0.0,
46 m2: 0.0,
47 }
48 }
49
50 pub fn add(&mut self, value: f64) {
52 self.n += 1;
53 let delta = value - self.mean;
54 self.mean += delta / self.n as f64;
55 let delta2 = value - self.mean;
56 self.m2 += delta * delta2;
57 }
58
59 pub fn count(&self) -> u64 {
61 self.n
62 }
63
64 pub fn mean(&self) -> f64 {
66 self.mean
67 }
68
69 pub fn variance(&self) -> f64 {
71 if self.n < 2 {
72 0.0
73 } else {
74 self.m2 / self.n as f64
75 }
76 }
77
78 pub fn sample_variance(&self) -> f64 {
80 if self.n < 2 {
81 0.0
82 } else {
83 self.m2 / (self.n - 1) as f64
84 }
85 }
86
87 pub fn std_dev(&self) -> f64 {
89 self.variance().sqrt()
90 }
91
92 pub fn sample_std_dev(&self) -> f64 {
94 self.sample_variance().sqrt()
95 }
96
97 pub fn reset(&mut self) {
99 self.n = 0;
100 self.mean = 0.0;
101 self.m2 = 0.0;
102 }
103
104 pub fn merge(&mut self, other: &StreamingStats) {
106 if other.n == 0 {
107 return;
108 }
109 if self.n == 0 {
110 *self = other.clone();
111 return;
112 }
113
114 let total_n = self.n + other.n;
115 let delta = other.mean - self.mean;
116 let new_mean = (self.n as f64 * self.mean + other.n as f64 * other.mean) / total_n as f64;
117
118 #[allow(clippy::suspicious_operation_groupings)]
120 {
121 self.m2 = self.m2
122 + other.m2
123 + delta * delta * (self.n as f64 * other.n as f64) / total_n as f64;
124 }
125 self.mean = new_mean;
126 self.n = total_n;
127 }
128}
129
130impl Default for StreamingStats {
131 fn default() -> Self {
132 Self::new()
133 }
134}
135
136#[derive(Debug, Clone)]
161pub struct ExponentialBackoff {
162 base_ms: u64,
163 max_ms: u64,
164 multiplier: f64,
165 attempt: u32,
166 max_attempts: u32,
167}
168
169impl ExponentialBackoff {
170 pub fn new() -> Self {
173 Self {
174 base_ms: 100,
175 max_ms: 30_000,
176 multiplier: 2.0,
177 attempt: 0,
178 max_attempts: 10,
179 }
180 }
181
182 pub fn custom(base_ms: u64, max_ms: u64, multiplier: f64, max_attempts: u32) -> Self {
184 Self {
185 base_ms,
186 max_ms,
187 multiplier,
188 attempt: 0,
189 max_attempts,
190 }
191 }
192
193 pub fn next_delay_ms(&mut self) -> u64 {
195 if self.attempt >= self.max_attempts {
196 return self.max_ms;
197 }
198
199 let delay = (self.base_ms as f64 * self.multiplier.powi(self.attempt as i32)) as u64;
200 let delay = delay.min(self.max_ms);
201
202 let jitter_range = delay / 4;
204 let jitter = if jitter_range > 0 {
205 let mut bytes = [0u8; 8];
206 getrandom::fill(&mut bytes).unwrap_or_default();
207 let random_u64 = u64::from_le_bytes(bytes);
208 (random_u64 % (jitter_range * 2)).saturating_sub(jitter_range)
209 } else {
210 0
211 };
212
213 self.attempt += 1;
214 delay.saturating_add(jitter)
215 }
216
217 pub fn reset(&mut self) {
219 self.attempt = 0;
220 }
221
222 pub fn is_exhausted(&self) -> bool {
224 self.attempt >= self.max_attempts
225 }
226
227 pub fn attempt_count(&self) -> u32 {
229 self.attempt
230 }
231}
232
233impl Default for ExponentialBackoff {
234 fn default() -> Self {
235 Self::new()
236 }
237}
238
239#[derive(Debug, Clone)]
241pub struct SlidingWindow {
242 values: VecDeque<f64>,
243 capacity: usize,
244}
245
246impl SlidingWindow {
247 pub fn new(capacity: usize) -> Self {
249 Self {
250 values: VecDeque::with_capacity(capacity),
251 capacity,
252 }
253 }
254
255 pub fn push(&mut self, value: f64) {
257 if self.values.len() >= self.capacity {
258 self.values.pop_front();
259 }
260 self.values.push_back(value);
261 }
262
263 pub fn len(&self) -> usize {
265 self.values.len()
266 }
267
268 pub fn is_empty(&self) -> bool {
270 self.values.is_empty()
271 }
272
273 pub fn is_full(&self) -> bool {
275 self.values.len() >= self.capacity
276 }
277
278 pub fn mean(&self) -> f64 {
280 if self.values.is_empty() {
281 return 0.0;
282 }
283 self.values.iter().sum::<f64>() / self.values.len() as f64
284 }
285
286 pub fn min(&self) -> Option<f64> {
288 self.values
289 .iter()
290 .min_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal))
291 .copied()
292 }
293
294 pub fn max(&self) -> Option<f64> {
296 self.values
297 .iter()
298 .max_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal))
299 .copied()
300 }
301
302 pub fn std_dev(&self) -> f64 {
304 if self.values.len() < 2 {
305 return 0.0;
306 }
307
308 let mean = self.mean();
309 let variance = self
310 .values
311 .iter()
312 .map(|v| {
313 let diff = v - mean;
314 diff * diff
315 })
316 .sum::<f64>()
317 / self.values.len() as f64;
318
319 variance.sqrt()
320 }
321
322 pub fn clear(&mut self) {
324 self.values.clear()
325 }
326
327 pub fn values(&self) -> Vec<f64> {
329 self.values.iter().copied().collect()
330 }
331}
332
333#[derive(Debug, Clone)]
335pub struct Histogram {
336 buckets: Vec<(f64, u64)>, sum: f64,
338 count: u64,
339 min: f64,
340 max: f64,
341}
342
343impl Histogram {
344 pub fn new(bucket_bounds: Vec<f64>) -> Self {
347 let buckets = bucket_bounds.into_iter().map(|b| (b, 0)).collect();
348 Self {
349 buckets,
350 sum: 0.0,
351 count: 0,
352 min: f64::INFINITY,
353 max: f64::NEG_INFINITY,
354 }
355 }
356
357 pub fn for_latency_ms() -> Self {
360 Self::new(vec![
361 1.0,
362 2.0,
363 5.0,
364 10.0,
365 20.0,
366 50.0,
367 100.0,
368 200.0,
369 500.0,
370 1000.0,
371 2000.0,
372 5000.0,
373 f64::INFINITY,
374 ])
375 }
376
377 pub fn for_bandwidth_mbps() -> Self {
379 Self::new(vec![
380 0.1,
381 0.5,
382 1.0,
383 5.0,
384 10.0,
385 50.0,
386 100.0,
387 500.0,
388 1000.0,
389 f64::INFINITY,
390 ])
391 }
392
393 pub fn record(&mut self, value: f64) {
395 self.count += 1;
396 self.sum += value;
397 self.min = self.min.min(value);
398 self.max = self.max.max(value);
399
400 for (bound, count) in &mut self.buckets {
402 if value <= *bound {
403 *count += 1;
404 return;
405 }
406 }
407 }
408
409 pub fn count(&self) -> u64 {
411 self.count
412 }
413
414 pub fn sum(&self) -> f64 {
416 self.sum
417 }
418
419 pub fn mean(&self) -> f64 {
421 if self.count == 0 {
422 0.0
423 } else {
424 self.sum / self.count as f64
425 }
426 }
427
428 pub fn min(&self) -> f64 {
430 if self.count == 0 { 0.0 } else { self.min }
431 }
432
433 pub fn max(&self) -> f64 {
435 if self.count == 0 { 0.0 } else { self.max }
436 }
437
438 pub fn percentile(&self, p: f64) -> f64 {
440 if self.count == 0 {
441 return 0.0;
442 }
443
444 let target = (self.count as f64 * p.clamp(0.0, 1.0)) as u64;
445 let mut cumulative = 0u64;
446
447 for (bound, count) in &self.buckets {
448 cumulative += count;
449 if cumulative >= target {
450 return *bound;
451 }
452 }
453
454 self.max
455 }
456
457 pub fn p50(&self) -> f64 {
459 self.percentile(0.50)
460 }
461
462 pub fn p95(&self) -> f64 {
464 self.percentile(0.95)
465 }
466
467 pub fn p99(&self) -> f64 {
469 self.percentile(0.99)
470 }
471
472 pub fn p999(&self) -> f64 {
474 self.percentile(0.999)
475 }
476
477 pub fn merge(&mut self, other: &Histogram) {
479 if self.buckets.len() != other.buckets.len() {
480 return; }
482
483 for (i, (_, count)) in self.buckets.iter_mut().enumerate() {
484 *count += other.buckets[i].1;
485 }
486
487 self.sum += other.sum;
488 self.count += other.count;
489 self.min = self.min.min(other.min);
490 self.max = self.max.max(other.max);
491 }
492
493 pub fn buckets_info(&self) -> Vec<(f64, u64, u64)> {
495 let mut cumulative = 0u64;
496 self.buckets
497 .iter()
498 .map(|(bound, count)| {
499 cumulative += count;
500 (*bound, *count, cumulative)
501 })
502 .collect()
503 }
504}
505
506#[cfg(test)]
507mod tests {
508 use super::*;
509
510 #[test]
511 fn test_streaming_stats() {
512 let mut stats = StreamingStats::new();
513 assert_eq!(stats.count(), 0);
514 assert_eq!(stats.mean(), 0.0);
515
516 stats.add(10.0);
518 stats.add(20.0);
519 stats.add(30.0);
520
521 assert_eq!(stats.count(), 3);
522 assert_eq!(stats.mean(), 20.0);
523 assert!((stats.std_dev() - 8.164_965_809_277_26).abs() < 0.0001);
524
525 stats.reset();
527 assert_eq!(stats.count(), 0);
528 assert_eq!(stats.mean(), 0.0);
529 }
530
531 #[test]
532 fn test_streaming_stats_merge() {
533 let mut stats1 = StreamingStats::new();
534 stats1.add(10.0);
535 stats1.add(20.0);
536
537 let mut stats2 = StreamingStats::new();
538 stats2.add(30.0);
539 stats2.add(40.0);
540
541 stats1.merge(&stats2);
542 assert_eq!(stats1.count(), 4);
543 assert_eq!(stats1.mean(), 25.0);
544 }
545
546 #[test]
547 fn test_streaming_stats_sample_variance() {
548 let mut stats = StreamingStats::new();
549 stats.add(2.0);
550 stats.add(4.0);
551 stats.add(6.0);
552 stats.add(8.0);
553
554 let sample_var = stats.sample_variance();
555 let expected_var = 6.666_666_666_666_667; assert!((sample_var - expected_var).abs() < 0.0001);
557 }
558
559 #[test]
560 fn test_exponential_backoff() {
561 let mut backoff = ExponentialBackoff::new();
562 assert_eq!(backoff.attempt_count(), 0);
563 assert!(!backoff.is_exhausted());
564
565 let delay1 = backoff.next_delay_ms();
567 assert!((75..=125).contains(&delay1)); assert_eq!(backoff.attempt_count(), 1);
569
570 let delay2 = backoff.next_delay_ms();
572 assert!((150..=250).contains(&delay2));
573
574 backoff.reset();
576 assert_eq!(backoff.attempt_count(), 0);
577 }
578
579 #[test]
580 fn test_exponential_backoff_max() {
581 let mut backoff = ExponentialBackoff::custom(100, 1000, 2.0, 5);
582
583 for _ in 0..5 {
585 backoff.next_delay_ms();
586 }
587 assert!(backoff.is_exhausted());
588
589 let delay = backoff.next_delay_ms();
591 assert_eq!(delay, 1000);
592 }
593
594 #[test]
595 fn test_sliding_window() {
596 let mut window = SlidingWindow::new(3);
597 assert!(window.is_empty());
598 assert!(!window.is_full());
599
600 window.push(10.0);
601 window.push(20.0);
602 window.push(30.0);
603
604 assert!(window.is_full());
605 assert_eq!(window.len(), 3);
606 assert_eq!(window.mean(), 20.0);
607 assert_eq!(window.min(), Some(10.0));
608 assert_eq!(window.max(), Some(30.0));
609
610 window.push(40.0);
612 assert_eq!(window.len(), 3);
613 assert_eq!(window.mean(), 30.0);
614 assert_eq!(window.min(), Some(20.0));
615
616 window.clear();
618 assert!(window.is_empty());
619 assert_eq!(window.len(), 0);
620 }
621
622 #[test]
623 fn test_sliding_window_std_dev() {
624 let mut window = SlidingWindow::new(4);
625 window.push(2.0);
626 window.push(4.0);
627 window.push(6.0);
628 window.push(8.0);
629
630 let std_dev = window.std_dev();
631 let expected = 2.236_067_977_499_79; assert!((std_dev - expected).abs() < 0.0001);
633 }
634
635 #[test]
636 fn test_histogram() {
637 let mut hist = Histogram::for_latency_ms();
638 assert_eq!(hist.count(), 0);
639
640 hist.record(5.0);
642 hist.record(15.0);
643 hist.record(25.0);
644 hist.record(100.0);
645 hist.record(500.0);
646
647 assert_eq!(hist.count(), 5);
648 assert_eq!(hist.sum(), 645.0);
649 assert_eq!(hist.mean(), 129.0);
650 assert_eq!(hist.min(), 5.0);
651 assert_eq!(hist.max(), 500.0);
652
653 assert!(hist.p50() > 0.0);
655 assert!(hist.p95() > 0.0);
656 assert!(hist.p99() > 0.0);
657 }
658
659 #[test]
660 fn test_histogram_merge() {
661 let mut hist1 = Histogram::for_latency_ms();
662 hist1.record(10.0);
663 hist1.record(20.0);
664
665 let mut hist2 = Histogram::for_latency_ms();
666 hist2.record(30.0);
667 hist2.record(40.0);
668
669 hist1.merge(&hist2);
670 assert_eq!(hist1.count(), 4);
671 assert_eq!(hist1.sum(), 100.0);
672 assert_eq!(hist1.mean(), 25.0);
673 assert_eq!(hist1.min(), 10.0);
674 assert_eq!(hist1.max(), 40.0);
675 }
676
677 #[test]
678 fn test_histogram_percentiles() {
679 let mut hist = Histogram::for_latency_ms();
680 for i in 1..=100 {
681 hist.record(i as f64);
682 }
683
684 assert_eq!(hist.count(), 100);
685 let p50 = hist.p50();
687 assert!((50.0..=100.0).contains(&p50));
688
689 let p95 = hist.p95();
691 assert!(p95 >= 95.0);
692
693 let p99 = hist.p99();
695 assert!(p99 >= 99.0);
696 }
697
698 #[test]
699 fn test_histogram_bandwidth() {
700 let mut hist = Histogram::for_bandwidth_mbps();
701 hist.record(0.5);
702 hist.record(5.0);
703 hist.record(50.0);
704
705 assert_eq!(hist.count(), 3);
706 assert_eq!(hist.mean(), 18.5);
707 }
708
709 #[test]
710 fn test_histogram_buckets_info() {
711 let mut hist = Histogram::new(vec![10.0, 20.0, 30.0]);
712 hist.record(5.0);
713 hist.record(15.0);
714 hist.record(25.0);
715
716 let buckets = hist.buckets_info();
717 assert_eq!(buckets.len(), 3);
718 assert_eq!(buckets[0], (10.0, 1, 1)); assert_eq!(buckets[1], (20.0, 1, 2)); assert_eq!(buckets[2], (30.0, 1, 3)); }
722}