use std::time::{Duration, Instant};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct WindowConfig {
pub buckets: usize,
pub bucket_duration: Duration,
}
impl Default for WindowConfig {
fn default() -> Self {
Self {
buckets: 10,
bucket_duration: Duration::from_secs(1),
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum WindowOutcome {
Success,
Failure,
Drop,
}
#[derive(Debug, Clone, Copy, Default, PartialEq)]
pub struct WindowSnapshot {
pub successes: u64,
pub failures: u64,
pub drops: u64,
pub latency_sum: Duration,
pub latency_samples: u64,
}
impl WindowSnapshot {
pub fn total(&self) -> u64 {
self.successes + self.failures + self.drops
}
pub fn failure_ratio(&self) -> f64 {
ratio(self.failures, self.successes + self.failures)
}
pub fn drop_ratio(&self) -> f64 {
ratio(self.drops, self.total())
}
pub fn average_latency(&self) -> Option<Duration> {
if self.latency_samples == 0 {
return None;
}
Some(Duration::from_secs_f64(
self.latency_sum.as_secs_f64() / self.latency_samples as f64,
))
}
}
#[derive(Debug, Clone)]
pub struct RollingWindow {
anchor: Instant,
config: WindowConfig,
buckets: Vec<WindowBucket>,
}
impl RollingWindow {
pub fn new(config: WindowConfig) -> Self {
let config = WindowConfig {
buckets: config.buckets.max(1),
bucket_duration: if config.bucket_duration.is_zero() {
Duration::from_millis(1)
} else {
config.bucket_duration
},
};
Self {
anchor: Instant::now(),
config,
buckets: vec![WindowBucket::default(); config.buckets],
}
}
pub fn record(&mut self, outcome: WindowOutcome) {
self.record_at(outcome, Instant::now());
}
pub fn record_with_latency(&mut self, outcome: WindowOutcome, latency: Duration) {
self.record_at_with_latency(outcome, latency, Instant::now());
}
pub fn snapshot(&self) -> WindowSnapshot {
self.snapshot_at(Instant::now())
}
pub fn max_successes_per_bucket(&self) -> u64 {
self.max_successes_per_bucket_at(Instant::now())
}
pub fn min_average_latency(&self) -> Option<Duration> {
self.min_average_latency_at(Instant::now())
}
pub(crate) fn record_at(&mut self, outcome: WindowOutcome, now: Instant) {
self.record_at_inner(outcome, None, now);
}
pub(crate) fn record_at_with_latency(
&mut self,
outcome: WindowOutcome,
latency: Duration,
now: Instant,
) {
self.record_at_inner(outcome, Some(latency), now);
}
pub(crate) fn snapshot_at(&self, now: Instant) -> WindowSnapshot {
let current_generation = self.generation(now);
self.buckets
.iter()
.filter(|bucket| bucket.is_live(current_generation, self.config.buckets as u64))
.fold(WindowSnapshot::default(), |mut snapshot, bucket| {
snapshot.successes += bucket.successes;
snapshot.failures += bucket.failures;
snapshot.drops += bucket.drops;
snapshot.latency_sum += bucket.latency_sum;
snapshot.latency_samples += bucket.latency_samples;
snapshot
})
}
pub(crate) fn max_successes_per_bucket_at(&self, now: Instant) -> u64 {
let current_generation = self.generation(now);
self.buckets
.iter()
.filter(|bucket| bucket.is_live(current_generation, self.config.buckets as u64))
.map(|bucket| bucket.successes)
.max()
.unwrap_or_default()
}
pub(crate) fn min_average_latency_at(&self, now: Instant) -> Option<Duration> {
let current_generation = self.generation(now);
self.buckets
.iter()
.filter(|bucket| bucket.is_live(current_generation, self.config.buckets as u64))
.filter_map(WindowBucket::average_latency)
.min()
}
fn record_at_inner(&mut self, outcome: WindowOutcome, latency: Option<Duration>, now: Instant) {
let generation = self.generation(now);
let index = generation as usize % self.config.buckets;
let bucket = &mut self.buckets[index];
if bucket.generation != Some(generation) {
*bucket = WindowBucket {
generation: Some(generation),
..WindowBucket::default()
};
}
bucket.record(outcome);
if let Some(latency) = latency {
bucket.latency_sum += latency;
bucket.latency_samples += 1;
}
}
fn generation(&self, now: Instant) -> u64 {
let elapsed = now.saturating_duration_since(self.anchor);
let width = self.config.bucket_duration.as_nanos().max(1);
(elapsed.as_nanos() / width) as u64
}
}
#[derive(Debug, Clone, Default)]
struct WindowBucket {
generation: Option<u64>,
successes: u64,
failures: u64,
drops: u64,
latency_sum: Duration,
latency_samples: u64,
}
impl WindowBucket {
fn record(&mut self, outcome: WindowOutcome) {
match outcome {
WindowOutcome::Success => self.successes += 1,
WindowOutcome::Failure => self.failures += 1,
WindowOutcome::Drop => self.drops += 1,
}
}
fn is_live(&self, current_generation: u64, bucket_count: u64) -> bool {
self.generation
.is_some_and(|generation| current_generation.saturating_sub(generation) < bucket_count)
}
fn average_latency(&self) -> Option<Duration> {
if self.latency_samples == 0 {
return None;
}
Some(Duration::from_secs_f64(
self.latency_sum.as_secs_f64() / self.latency_samples as f64,
))
}
}
fn ratio(part: u64, total: u64) -> f64 {
if total == 0 {
0.0
} else {
part as f64 / total as f64
}
}
#[cfg(test)]
mod tests {
use std::time::{Duration, Instant};
use super::{RollingWindow, WindowConfig, WindowOutcome};
#[test]
fn rolling_window_aggregates_live_buckets() {
let mut window = RollingWindow::new(WindowConfig {
buckets: 2,
bucket_duration: Duration::from_millis(10),
});
let now = Instant::now();
window.record_at(WindowOutcome::Success, now);
window.record_at(WindowOutcome::Failure, now + Duration::from_millis(10));
window.record_at(WindowOutcome::Drop, now + Duration::from_millis(20));
let snapshot = window.snapshot_at(now + Duration::from_millis(20));
assert_eq!(snapshot.successes, 0);
assert_eq!(snapshot.failures, 1);
assert_eq!(snapshot.drops, 1);
assert_eq!(snapshot.total(), 2);
}
#[test]
fn rolling_window_tracks_average_latency() {
let mut window = RollingWindow::new(WindowConfig::default());
let now = Instant::now();
window.record_at_with_latency(WindowOutcome::Success, Duration::from_millis(10), now);
window.record_at_with_latency(WindowOutcome::Success, Duration::from_millis(30), now);
assert_eq!(
window.snapshot_at(now).average_latency(),
Some(Duration::from_millis(20))
);
}
#[test]
fn rolling_window_reports_max_pass_and_min_latency() {
let mut window = RollingWindow::new(WindowConfig {
buckets: 2,
bucket_duration: Duration::from_millis(10),
});
let now = Instant::now();
window.record_at_with_latency(WindowOutcome::Success, Duration::from_millis(40), now);
window.record_at_with_latency(WindowOutcome::Success, Duration::from_millis(20), now);
window.record_at_with_latency(
WindowOutcome::Success,
Duration::from_millis(5),
now + Duration::from_millis(10),
);
assert_eq!(window.max_successes_per_bucket_at(now), 2);
assert_eq!(
window.min_average_latency_at(now + Duration::from_millis(10)),
Some(Duration::from_millis(5))
);
}
}