#[derive(Debug, Clone)]
pub struct StreamingQuantileTracker {
quantile: f64,
estimate: f64,
lr: f64,
n_updates: u64,
}
impl StreamingQuantileTracker {
pub fn new(quantile: f64, lr: f64) -> Self {
assert!(
quantile > 0.0 && quantile < 1.0,
"quantile must be in (0, 1), got {quantile}"
);
assert!(lr > 0.0, "lr must be > 0, got {lr}");
Self {
quantile,
estimate: 0.0,
lr,
n_updates: 0,
}
}
pub fn default_median() -> Self {
Self::new(0.5, 0.01)
}
pub fn default_90th() -> Self {
Self::new(0.9, 0.01)
}
pub fn update(&mut self, value: f64) {
if value > self.estimate {
self.estimate += self.lr * self.quantile;
} else {
self.estimate -= self.lr * (1.0 - self.quantile);
}
self.n_updates += 1;
}
pub fn estimate(&self) -> f64 {
self.estimate
}
pub fn quantile(&self) -> f64 {
self.quantile
}
pub fn n_updates(&self) -> u64 {
self.n_updates
}
pub fn reset(&mut self) {
self.estimate = 0.0;
self.n_updates = 0;
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn median_converges() {
let mut tracker = StreamingQuantileTracker::new(0.5, 0.5);
for _ in 0..20 {
for i in 0..500 {
tracker.update(i as f64); tracker.update(999.0 - i as f64); }
}
assert!(
(tracker.estimate() - 499.5).abs() < 50.0,
"median estimate {} should be near 499.5",
tracker.estimate()
);
}
#[test]
fn percentile_90th_converges() {
let mut tracker = StreamingQuantileTracker::new(0.9, 0.5);
for _ in 0..20 {
for i in 0..500 {
tracker.update(i as f64);
tracker.update(999.0 - i as f64);
}
}
assert!(
(tracker.estimate() - 899.0).abs() < 60.0,
"P90 estimate {} should be near 899",
tracker.estimate()
);
}
#[test]
fn reacts_to_distribution_shift() {
let mut tracker = StreamingQuantileTracker::new(0.5, 1.0);
for _ in 0..2000 {
tracker.update(100.0);
}
let est_before = tracker.estimate();
for _ in 0..2000 {
tracker.update(500.0);
}
let est_after = tracker.estimate();
assert!(
est_after > est_before,
"estimate should increase after shift: before={est_before}, after={est_after}"
);
assert!(
(est_after - 500.0).abs() < 50.0,
"estimate {} should track toward 500 after shift",
est_after
);
}
#[test]
fn reset_clears_state() {
let mut tracker = StreamingQuantileTracker::default_median();
for i in 0..100 {
tracker.update(i as f64);
}
assert!(tracker.n_updates() > 0);
assert!(tracker.estimate().abs() > 0.0);
tracker.reset();
assert_eq!(tracker.n_updates(), 0);
assert!((tracker.estimate()).abs() < 1e-10);
}
#[test]
fn estimate_starts_at_zero() {
let tracker = StreamingQuantileTracker::default_90th();
assert!((tracker.estimate()).abs() < 1e-10);
assert_eq!(tracker.n_updates(), 0);
}
#[test]
fn lr_controls_adaptation_speed() {
let mut fast = StreamingQuantileTracker::new(0.5, 1.0);
let mut slow = StreamingQuantileTracker::new(0.5, 0.01);
for _ in 0..100 {
fast.update(100.0);
slow.update(100.0);
}
assert!(
(fast.estimate() - 100.0).abs() < (slow.estimate() - 100.0).abs(),
"fast ({}) should be closer to 100 than slow ({})",
fast.estimate(),
slow.estimate()
);
}
}