use std::time::Duration;
use hdrhistogram::Histogram;
use crate::clock;
pub(crate) struct SlidingWindowHistogram<I: clock::Instant, const N: usize> {
sub_windows: [Histogram<u64>; N],
combined: Histogram<u64>,
sub_window_ms: u64,
current_index: u8,
current_window_start: I,
max_value: u64,
}
impl<I: clock::Instant, const N: usize> SlidingWindowHistogram<I, N> {
pub fn new(window: Duration, sigfig: u8, max_value: u64, now: I) -> Self {
assert!(N > 0, "N (number of sub-windows) must be > 0");
assert!(N <= u8::MAX as usize, "N must fit in a u8");
assert!(!window.is_zero(), "window must be > 0");
let sub_window_ms = window.as_millis() as u64 / N as u64;
assert!(sub_window_ms > 0, "sub-window duration must be >= 1ms");
let sub_windows = std::array::from_fn(|_| {
Histogram::new_with_max(max_value, sigfig).expect("valid histogram params")
});
let combined = Histogram::new_with_max(max_value, sigfig).expect("valid histogram params");
Self {
sub_windows,
combined,
sub_window_ms,
current_index: 0,
current_window_start: now,
max_value,
}
}
#[inline]
pub fn record(&mut self, value_ms: u64, now: I) {
self.rotate(now);
let clamped = value_ms.min(self.max_value);
self.sub_windows[self.current_index as usize]
.record(clamped)
.expect("value within max after clamping");
self.combined
.record(clamped)
.expect("value within max after clamping");
}
#[inline]
pub fn quantile(&mut self, quantile: f64, min_samples: u64, now: I) -> Option<u64> {
self.rotate(now);
if self.combined.len() < min_samples {
return None;
}
if self.combined.is_empty() {
return None;
}
Some(self.combined.value_at_quantile(quantile))
}
#[cfg(test)]
pub fn count(&self) -> u64 {
self.combined.len()
}
#[cfg(test)]
pub fn clear(&mut self, now: I) {
for sub in &mut self.sub_windows {
sub.reset();
}
self.combined.reset();
self.current_index = 0;
self.current_window_start = now;
}
fn rotate(&mut self, now: I) {
let elapsed_ms = now.duration_since(self.current_window_start).as_millis() as u64;
let steps = (elapsed_ms / self.sub_window_ms) as usize;
if steps == 0 {
return;
}
if steps >= N {
for sub in &mut self.sub_windows {
sub.reset();
}
self.combined.reset();
self.current_index = ((self.current_index as usize + steps) % N) as u8;
} else {
for _ in 0..steps {
self.current_index = ((self.current_index as usize + 1) % N) as u8;
let idx = self.current_index as usize;
self.combined
.subtract(&self.sub_windows[idx])
.expect("subtract from combined");
self.sub_windows[idx].reset();
}
}
self.current_window_start = self
.current_window_start
.add_duration(Duration::from_millis(self.sub_window_ms * steps as u64));
}
}
#[cfg(test)]
mod tests {
use std::time::Instant;
use super::*;
fn make_histogram(now: Instant) -> SlidingWindowHistogram<Instant, 10> {
SlidingWindowHistogram::new(Duration::from_secs(10), 2, 60_000, now)
}
#[test]
fn empty_histogram_returns_none() {
let now = Instant::now();
let mut h = make_histogram(now);
assert_eq!(h.quantile(0.5, 1, now), None);
assert_eq!(h.count(), 0);
}
#[test]
fn single_sample_below_min_returns_none() {
let now = Instant::now();
let mut h = make_histogram(now);
h.record(100, now);
assert_eq!(h.count(), 1);
assert_eq!(h.quantile(0.5, 2, now), None);
assert!(h.quantile(0.5, 1, now).is_some());
}
#[test]
fn records_and_queries_quantile() {
let now = Instant::now();
let mut h = make_histogram(now);
for _ in 0..100 {
h.record(50, now);
}
assert_eq!(h.count(), 100);
let p50 = h.quantile(0.5, 1, now).unwrap();
assert_eq!(p50, 50);
}
#[test]
fn rotation_expires_old_data() {
let now = Instant::now();
let mut h = make_histogram(now);
for _ in 0..50 {
h.record(100, now);
}
assert_eq!(h.count(), 50);
let later = now + Duration::from_secs(11);
h.record(200, later);
assert_eq!(h.count(), 1);
let p = h.quantile(0.5, 1, later).unwrap();
assert_eq!(p, 200);
}
#[test]
fn partial_rotation_keeps_recent_data() {
let now = Instant::now();
let mut h = make_histogram(now);
for _ in 0..50 {
h.record(100, now);
}
let t3 = now + Duration::from_secs(3);
for _ in 0..50 {
h.record(200, t3);
}
assert_eq!(h.count(), 100);
let t11 = now + Duration::from_secs(11);
let p = h.quantile(0.5, 1, t11);
assert!(p.is_some() || h.count() == 0);
}
#[test]
fn clear_resets_everything() {
let now = Instant::now();
let mut h = make_histogram(now);
for _ in 0..100 {
h.record(50, now);
}
assert_eq!(h.count(), 100);
h.clear(now);
assert_eq!(h.count(), 0);
assert_eq!(h.quantile(0.5, 1, now), None);
}
#[test]
fn values_are_clamped_to_max() {
let now = Instant::now();
let mut h = make_histogram(now);
h.record(100_000, now);
let p = h.quantile(0.5, 1, now).unwrap();
assert!((59_000..=61_000).contains(&p), "clamped value was {p}");
}
#[test]
fn quantile_distribution() {
let now = Instant::now();
let mut h = make_histogram(now);
for v in 1..=1000 {
h.record(v, now);
}
let p50 = h.quantile(0.5, 1, now).unwrap();
let p99 = h.quantile(0.99, 1, now).unwrap();
assert!((480..=520).contains(&p50), "p50 was {p50}");
assert!((970..=1010).contains(&p99), "p99 was {p99}");
}
#[test]
fn combined_stays_in_sync_across_rotations() {
let now = Instant::now();
let mut h = make_histogram(now);
for _ in 0..50 {
h.record(100, now);
}
let t2 = now + Duration::from_secs(2);
for _ in 0..50 {
h.record(200, t2);
}
assert_eq!(h.count(), 100);
let t10 = now + Duration::from_secs(10);
h.record(300, t10);
assert_eq!(h.count(), 51);
let p50 = h.quantile(0.5, 1, t10).unwrap();
assert_eq!(p50, 200);
}
}