use std::time::{Duration, Instant};
#[derive(Debug, Clone, Copy)]
struct Sample<T: Copy> {
timestamp: u64, value: T,
}
macro_rules! impl_windowed_max {
($name:ident, $ty:ty, $init:expr) => {
#[derive(Debug, Clone)]
pub struct $name {
window: u64,
samples: [Sample<$ty>; 3],
count: u64,
base: Instant,
}
impl $name {
#[inline]
pub fn new(window: Duration) -> Result<Self, crate::ConfigError> {
Self::with_base(window, Instant::now())
}
#[inline]
pub fn with_base(window: Duration, base: Instant) -> Result<Self, crate::ConfigError> {
let window_ns = u64::try_from(window.as_nanos()).map_err(|_| crate::ConfigError::Invalid("window duration too large"))?;
if window_ns == 0 {
return Err(crate::ConfigError::Invalid("window must be positive"));
}
let init = Sample {
timestamp: 0,
value: $init,
};
Ok(Self {
window: window_ns,
samples: [init; 3],
count: 0,
base,
})
}
#[inline]
fn nanos_since_base(&self, now: Instant) -> u64 {
let nanos = now.saturating_duration_since(self.base).as_nanos();
if nanos > u64::MAX as u128 { u64::MAX } else { nanos as u64 }
}
#[inline]
#[must_use]
pub fn update(&mut self, now: Instant, value: $ty) -> $ty {
let timestamp = self.nanos_since_base(now);
self.count += 1;
let win = self.window;
let s = &mut self.samples;
if value >= s[0].value || timestamp.wrapping_sub(s[2].timestamp) > win {
s[0] = Sample { timestamp, value };
s[1] = s[0];
s[2] = s[0];
return s[0].value;
}
if timestamp.wrapping_sub(s[1].timestamp) > win / 3 {
s[1] = Sample { timestamp, value };
s[2] = s[1];
} else if timestamp.wrapping_sub(s[2].timestamp) > win / 3 {
s[2] = Sample { timestamp, value };
}
if value >= s[1].value {
s[1] = Sample { timestamp, value };
s[2] = s[1];
} else if value >= s[2].value {
s[2] = Sample { timestamp, value };
}
if timestamp.wrapping_sub(s[0].timestamp) > win {
s[0] = s[1];
s[1] = s[2];
s[2] = Sample { timestamp, value };
} else if timestamp.wrapping_sub(s[1].timestamp) > win / 3 {
s[1] = s[2];
s[2] = Sample { timestamp, value };
}
s[0].value
}
#[inline]
#[must_use]
pub fn max(&self) -> Option<$ty> {
if self.count == 0 {
Option::None
} else {
Option::Some(self.samples[0].value)
}
}
#[inline]
#[must_use]
pub fn window(&self) -> Duration {
Duration::from_nanos(self.window)
}
#[inline]
#[must_use]
pub fn count(&self) -> u64 {
self.count
}
#[inline]
pub fn reset(&mut self, now: Instant) {
let init = Sample {
timestamp: 0,
value: $init,
};
self.samples = [init; 3];
self.count = 0;
self.base = now;
}
}
};
}
macro_rules! impl_windowed_min {
($name:ident, $ty:ty, $init:expr) => {
#[derive(Debug, Clone)]
pub struct $name {
window: u64,
samples: [Sample<$ty>; 3],
count: u64,
base: Instant,
}
impl $name {
#[inline]
pub fn new(window: Duration) -> Result<Self, crate::ConfigError> {
Self::with_base(window, Instant::now())
}
#[inline]
pub fn with_base(window: Duration, base: Instant) -> Result<Self, crate::ConfigError> {
let window_ns = u64::try_from(window.as_nanos()).map_err(|_| crate::ConfigError::Invalid("window duration too large"))?;
if window_ns == 0 {
return Err(crate::ConfigError::Invalid("window must be positive"));
}
let init = Sample {
timestamp: 0,
value: $init,
};
Ok(Self {
window: window_ns,
samples: [init; 3],
count: 0,
base,
})
}
#[inline]
fn nanos_since_base(&self, now: Instant) -> u64 {
let nanos = now.saturating_duration_since(self.base).as_nanos();
if nanos > u64::MAX as u128 { u64::MAX } else { nanos as u64 }
}
#[inline]
#[must_use]
pub fn update(&mut self, now: Instant, value: $ty) -> $ty {
let timestamp = self.nanos_since_base(now);
self.count += 1;
let win = self.window;
let s = &mut self.samples;
if value <= s[0].value || timestamp.wrapping_sub(s[2].timestamp) > win {
s[0] = Sample { timestamp, value };
s[1] = s[0];
s[2] = s[0];
return s[0].value;
}
if timestamp.wrapping_sub(s[1].timestamp) > win / 3 {
s[1] = Sample { timestamp, value };
s[2] = s[1];
} else if timestamp.wrapping_sub(s[2].timestamp) > win / 3 {
s[2] = Sample { timestamp, value };
}
if value <= s[1].value {
s[1] = Sample { timestamp, value };
s[2] = s[1];
} else if value <= s[2].value {
s[2] = Sample { timestamp, value };
}
if timestamp.wrapping_sub(s[0].timestamp) > win {
s[0] = s[1];
s[1] = s[2];
s[2] = Sample { timestamp, value };
} else if timestamp.wrapping_sub(s[1].timestamp) > win / 3 {
s[1] = s[2];
s[2] = Sample { timestamp, value };
}
s[0].value
}
#[inline]
#[must_use]
pub fn min(&self) -> Option<$ty> {
if self.count == 0 {
Option::None
} else {
Option::Some(self.samples[0].value)
}
}
#[inline]
#[must_use]
pub fn window(&self) -> Duration {
Duration::from_nanos(self.window)
}
#[inline]
#[must_use]
pub fn count(&self) -> u64 {
self.count
}
#[inline]
pub fn reset(&mut self, now: Instant) {
let init = Sample {
timestamp: 0,
value: $init,
};
self.samples = [init; 3];
self.count = 0;
self.base = now;
}
}
};
}
impl_windowed_max!(WindowedMaxF64, f64, f64::MIN);
impl_windowed_max!(WindowedMaxF32, f32, f32::MIN);
impl_windowed_max!(WindowedMaxI64, i64, i64::MIN);
impl_windowed_max!(WindowedMaxI32, i32, i32::MIN);
impl_windowed_max!(WindowedMaxI128, i128, i128::MIN);
impl_windowed_min!(WindowedMinF64, f64, f64::MAX);
impl_windowed_min!(WindowedMinF32, f32, f32::MAX);
impl_windowed_min!(WindowedMinI64, i64, i64::MAX);
impl_windowed_min!(WindowedMinI32, i32, i32::MAX);
impl_windowed_min!(WindowedMinI128, i128, i128::MAX);
#[cfg(test)]
mod tests {
use super::*;
use std::time::{Duration, Instant};
fn t(base: Instant, nanos: u64) -> Instant {
base + Duration::from_nanos(nanos)
}
#[test]
fn max_empty() {
let base = Instant::now();
let wm = WindowedMaxF64::with_base(Duration::from_nanos(100), base).unwrap();
assert!(wm.max().is_none());
assert_eq!(wm.count(), 0);
}
#[test]
#[allow(clippy::float_cmp)]
fn max_single_sample() {
let base = Instant::now();
let mut wm = WindowedMaxF64::with_base(Duration::from_nanos(100), base).unwrap();
assert_eq!(wm.update(t(base, 0), 42.0), 42.0);
assert_eq!(wm.max(), Some(42.0));
}
#[test]
#[allow(clippy::float_cmp)]
fn max_new_peak_replaces() {
let base = Instant::now();
let mut wm = WindowedMaxF64::with_base(Duration::from_nanos(100), base).unwrap();
let _ = wm.update(t(base, 0), 10.0);
let _ = wm.update(t(base, 1), 20.0);
assert_eq!(wm.update(t(base, 2), 30.0), 30.0);
}
#[test]
fn max_expires_after_window() {
let base = Instant::now();
let mut wm = WindowedMaxF64::with_base(Duration::from_nanos(10), base).unwrap();
let _ = wm.update(t(base, 0), 100.0); let _ = wm.update(t(base, 5), 50.0);
let result = wm.update(t(base, 11), 60.0);
assert!(result <= 60.0, "old peak should have expired, got {result}");
}
#[test]
fn max_reset() {
let base = Instant::now();
let mut wm = WindowedMaxF64::with_base(Duration::from_nanos(100), base).unwrap();
let _ = wm.update(t(base, 0), 42.0);
wm.reset(base);
assert!(wm.max().is_none());
assert_eq!(wm.count(), 0);
}
#[test]
fn max_i64_basic() {
let base = Instant::now();
let mut wm = WindowedMaxI64::with_base(Duration::from_nanos(10), base).unwrap();
assert_eq!(wm.update(t(base, 0), 100), 100);
assert_eq!(wm.update(t(base, 1), 200), 200);
assert_eq!(wm.update(t(base, 2), 150), 200);
}
#[test]
fn max_monotonic_decreasing_tracks_recent() {
let base = Instant::now();
let mut wm = WindowedMaxF64::with_base(Duration::from_nanos(10), base).unwrap();
for ts in 0..20u64 {
let v = 100.0 - ts as f64;
let _ = wm.update(t(base, ts), v);
}
let m = wm.max().unwrap();
assert!(m < 100.0, "old max should have expired, got {m}");
}
#[test]
fn min_empty() {
let base = Instant::now();
let wm = WindowedMinF64::with_base(Duration::from_nanos(100), base).unwrap();
assert!(wm.min().is_none());
}
#[test]
#[allow(clippy::float_cmp)]
fn min_single_sample() {
let base = Instant::now();
let mut wm = WindowedMinF64::with_base(Duration::from_nanos(100), base).unwrap();
assert_eq!(wm.update(t(base, 0), 42.0), 42.0);
assert_eq!(wm.min(), Some(42.0));
}
#[test]
#[allow(clippy::float_cmp)]
fn min_new_low_replaces() {
let base = Instant::now();
let mut wm = WindowedMinF64::with_base(Duration::from_nanos(100), base).unwrap();
let _ = wm.update(t(base, 0), 30.0);
let _ = wm.update(t(base, 1), 20.0);
assert_eq!(wm.update(t(base, 2), 10.0), 10.0);
}
#[test]
fn min_expires_after_window() {
let base = Instant::now();
let mut wm = WindowedMinF64::with_base(Duration::from_nanos(10), base).unwrap();
let _ = wm.update(t(base, 0), 10.0); let _ = wm.update(t(base, 5), 50.0);
let result = wm.update(t(base, 11), 40.0);
assert!(result >= 40.0, "old min should have expired, got {result}");
}
#[test]
fn min_rtt_tracking() {
let base = Instant::now();
let mut min_rtt = WindowedMinI64::with_base(Duration::from_nanos(100), base).unwrap();
for ts in 0..50 {
let _ = min_rtt.update(t(base, ts), 50 + (ts % 5) as i64);
}
assert!(min_rtt.min().unwrap() <= 50);
let _ = min_rtt.update(t(base, 50), 200);
assert!(min_rtt.min().unwrap() <= 54);
}
#[test]
fn min_reset() {
let base = Instant::now();
let mut wm = WindowedMinF64::with_base(Duration::from_nanos(100), base).unwrap();
let _ = wm.update(t(base, 0), 42.0);
wm.reset(base);
assert!(wm.min().is_none());
}
#[test]
#[allow(clippy::float_cmp)]
fn min_f32_basic() {
let base = Instant::now();
let mut wm = WindowedMinF32::with_base(Duration::from_nanos(10), base).unwrap();
assert_eq!(wm.update(t(base, 0), 50.0), 50.0);
assert_eq!(wm.update(t(base, 1), 30.0), 30.0);
assert_eq!(wm.update(t(base, 2), 40.0), 30.0);
}
#[test]
fn min_i32_basic() {
let base = Instant::now();
let mut wm = WindowedMinI32::with_base(Duration::from_nanos(10), base).unwrap();
assert_eq!(wm.update(t(base, 0), 100), 100);
assert_eq!(wm.update(t(base, 1), 50), 50);
assert_eq!(wm.update(t(base, 2), 75), 50);
}
#[test]
fn max_rejects_zero_window() {
let base = Instant::now();
assert!(matches!(
WindowedMaxF64::with_base(Duration::from_nanos(0), base),
Err(crate::ConfigError::Invalid(_))
));
}
#[test]
fn min_rejects_zero_window() {
let base = Instant::now();
assert!(matches!(
WindowedMinF64::with_base(Duration::from_nanos(0), base),
Err(crate::ConfigError::Invalid(_))
));
}
#[test]
fn max_i128_basic() {
let base = Instant::now();
let mut wm = WindowedMaxI128::with_base(Duration::from_nanos(10), base).unwrap();
assert_eq!(wm.update(t(base, 0), 100), 100);
assert_eq!(wm.update(t(base, 1), 200), 200);
assert_eq!(wm.update(t(base, 2), 150), 200);
}
#[test]
fn min_i128_basic() {
let base = Instant::now();
let mut wm = WindowedMinI128::with_base(Duration::from_nanos(10), base).unwrap();
assert_eq!(wm.update(t(base, 0), 100), 100);
assert_eq!(wm.update(t(base, 1), 50), 50);
assert_eq!(wm.update(t(base, 2), 75), 50);
}
#[test]
fn window_overflow_returns_error() {
let result = WindowedMaxF64::new(Duration::from_secs(u64::MAX));
assert!(matches!(result, Err(crate::ConfigError::Invalid(_))));
}
}