use serde::{Deserialize, Serialize};
use std::collections::VecDeque;
const DEFAULT_WINDOW_SIZE: usize = 50;
const DEFAULT_THRESHOLD: f64 = 4.0;
const BODY_HASH_WINDOW: usize = 32;
const SIGNAL_AGREEMENT: usize = 2;
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum RegimeChange {
LooserNow,
StricterNow,
Unclear,
}
#[derive(Debug, Clone)]
pub struct ProbeObservation {
pub response_time_ms: f64,
pub was_blocked: bool,
pub body_hash: Option<u64>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct CusumDetector {
window: VecDeque<f64>,
window_size: usize,
s_high: f64,
s_low: f64,
threshold: f64,
last_direction: i8,
}
impl CusumDetector {
fn new(window_size: usize, threshold: f64) -> Self {
Self {
window: VecDeque::with_capacity(window_size),
window_size,
s_high: 0.0,
s_low: 0.0,
threshold,
last_direction: 0,
}
}
fn push(&mut self, value: f64) -> Option<i8> {
if self.window.len() < 4 {
if self.window.len() >= self.window_size {
self.window.pop_front();
}
self.window.push_back(value);
return None;
}
let (mean, std) = self.mean_std();
let floor = if mean.abs() < 1.0 {
0.01
} else {
mean.abs() * 0.05
};
let effective_std = std.max(floor);
let k = self.threshold * effective_std;
self.s_high = (self.s_high + (value - mean - k / 2.0)).max(0.0);
self.s_low = (self.s_low + (mean - value - k / 2.0)).max(0.0);
if self.window.len() >= self.window_size {
self.window.pop_front();
}
self.window.push_back(value);
if self.s_high > k {
self.s_high = 0.0;
self.s_low = 0.0;
self.last_direction = 1;
return Some(1);
}
if self.s_low > k {
self.s_high = 0.0;
self.s_low = 0.0;
self.last_direction = -1;
return Some(-1);
}
None
}
fn mean_std(&self) -> (f64, f64) {
let n = self.window.len() as f64;
if n == 0.0 {
return (0.0, 0.0);
}
let mean: f64 = self.window.iter().sum::<f64>() / n;
let variance: f64 = self.window.iter().map(|x| (x - mean).powi(2)).sum::<f64>() / n;
(mean, variance.sqrt())
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DriftDetector {
pub window_size: usize,
pub threshold: f64,
cusum_median_rt: CusumDetector,
cusum_p95_rt: CusumDetector,
cusum_block_rate: CusumDetector,
cusum_body_entropy: CusumDetector,
rt_window: VecDeque<f64>,
block_window: VecDeque<bool>,
body_hash_window: VecDeque<u64>,
pub probe_count: u64,
}
impl Default for DriftDetector {
fn default() -> Self {
Self::new(DEFAULT_WINDOW_SIZE, DEFAULT_THRESHOLD)
}
}
impl DriftDetector {
pub fn new(window_size: usize, threshold: f64) -> Self {
let ws = window_size.max(8); Self {
window_size: ws,
threshold,
cusum_median_rt: CusumDetector::new(ws, threshold),
cusum_p95_rt: CusumDetector::new(ws, threshold),
cusum_block_rate: CusumDetector::new(ws, threshold),
cusum_body_entropy: CusumDetector::new(ws, threshold),
rt_window: VecDeque::with_capacity(ws),
block_window: VecDeque::with_capacity(ws),
body_hash_window: VecDeque::with_capacity(BODY_HASH_WINDOW),
probe_count: 0,
}
}
pub fn observe(&mut self, obs: ProbeObservation) -> Option<RegimeChange> {
self.probe_count = self.probe_count.saturating_add(1);
if self.rt_window.len() >= self.window_size {
self.rt_window.pop_front();
}
self.rt_window.push_back(obs.response_time_ms);
if self.block_window.len() >= self.window_size {
self.block_window.pop_front();
}
self.block_window.push_back(obs.was_blocked);
if let Some(hash) = obs.body_hash {
if self.body_hash_window.len() >= BODY_HASH_WINDOW {
self.body_hash_window.pop_front();
}
self.body_hash_window.push_back(hash);
}
let median_rt = self.compute_median_rt();
let p95_rt = self.compute_p95_rt();
let block_rate = self.compute_block_rate();
let body_entropy = self.compute_body_entropy();
let mut up_votes: i32 = 0;
let mut down_votes: i32 = 0;
let mut witness_events: i32 = 0;
for direction in [
self.cusum_median_rt.push(median_rt),
self.cusum_p95_rt.push(p95_rt),
self.cusum_block_rate.push(block_rate),
]
.iter()
.flatten()
{
if *direction > 0 {
up_votes += 1;
} else {
down_votes += 1;
}
}
if self.cusum_body_entropy.push(body_entropy).is_some() {
witness_events += 1;
}
let directional_votes = up_votes + down_votes;
let total_change_witnesses = directional_votes + witness_events;
if total_change_witnesses < SIGNAL_AGREEMENT as i32 {
return None;
}
if directional_votes == 0 {
return Some(RegimeChange::Unclear);
}
if up_votes >= SIGNAL_AGREEMENT as i32 && down_votes == 0 {
Some(RegimeChange::StricterNow)
} else if down_votes >= SIGNAL_AGREEMENT as i32 && up_votes == 0 {
Some(RegimeChange::LooserNow)
} else if up_votes > 0 && down_votes == 0 {
Some(RegimeChange::StricterNow)
} else if down_votes > 0 && up_votes == 0 {
Some(RegimeChange::LooserNow)
} else {
Some(RegimeChange::Unclear)
}
}
fn compute_median_rt(&self) -> f64 {
if self.rt_window.is_empty() {
return 0.0;
}
let mut sorted: Vec<f64> = self.rt_window.iter().copied().collect();
sorted.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
let mid = sorted.len() / 2;
if sorted.len().is_multiple_of(2) {
(sorted[mid - 1] + sorted[mid]) / 2.0
} else {
sorted[mid]
}
}
fn compute_p95_rt(&self) -> f64 {
if self.rt_window.is_empty() {
return 0.0;
}
let mut sorted: Vec<f64> = self.rt_window.iter().copied().collect();
sorted.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
let idx = ((sorted.len() as f64 * 0.95).ceil() as usize).saturating_sub(1);
sorted[idx.min(sorted.len() - 1)]
}
fn compute_block_rate(&self) -> f64 {
if self.block_window.is_empty() {
return 0.0;
}
let blocked = self.block_window.iter().filter(|&&b| b).count();
blocked as f64 / self.block_window.len() as f64
}
fn compute_body_entropy(&self) -> f64 {
if self.body_hash_window.len() < 2 {
return 0.0;
}
let mut counts: Vec<(u64, usize)> = Vec::new();
for &h in &self.body_hash_window {
if let Some(entry) = counts.iter_mut().find(|(hh, _)| *hh == h) {
entry.1 += 1;
} else {
counts.push((h, 1));
}
}
let total = self.body_hash_window.len() as f64;
counts
.iter()
.map(|(_, c)| {
let p = *c as f64 / total;
if p > 0.0 { -p * p.log2() } else { 0.0 }
})
.sum()
}
#[must_use]
pub fn has_baseline(&self) -> bool {
self.probe_count >= (self.window_size / 2) as u64
}
#[must_use]
pub fn signal_snapshot(&self) -> [f64; 4] {
[
self.compute_median_rt(),
self.compute_p95_rt(),
self.compute_block_rate(),
self.compute_body_entropy(),
]
}
}
#[derive(Debug, Clone, PartialEq)]
pub enum ChangePointEvent {
NoChange,
AlarmFired {
observed_rate: f64,
baseline_rate: f64,
drop_pp: f64,
},
}
#[derive(Debug, Clone)]
pub struct BypassRateMonitor {
window: VecDeque<bool>,
window_size: usize,
k: f64,
h: f64,
s: f64,
baseline: Option<f64>,
}
impl BypassRateMonitor {
#[must_use]
pub fn new(window_size: usize, k: f64, h: f64) -> Self {
let ws = window_size.max(4);
Self {
window: VecDeque::with_capacity(ws),
window_size: ws,
k: k.max(0.0),
h: h.max(0.0),
s: 0.0,
baseline: None,
}
}
#[must_use]
pub fn new_default() -> Self {
Self::new(50, 0.05, 0.5)
}
pub fn observe(&mut self, bypassed: bool) -> ChangePointEvent {
if self.window.len() >= self.window_size {
self.window.pop_front();
}
self.window.push_back(bypassed);
let p_observed = self.current_rate_inner();
let baseline = match self.baseline {
Some(b) => b,
None => {
if self.window.len() < self.window_size {
return ChangePointEvent::NoChange;
}
self.baseline = Some(p_observed);
return ChangePointEvent::NoChange;
}
};
self.s = (self.s + (baseline - p_observed - self.k)).max(0.0);
if self.s > self.h {
self.s = 0.0;
let old_baseline = baseline;
self.baseline = Some(p_observed);
let drop_pp = (old_baseline - p_observed) * 100.0;
return ChangePointEvent::AlarmFired {
observed_rate: p_observed,
baseline_rate: old_baseline,
drop_pp,
};
}
ChangePointEvent::NoChange
}
#[must_use]
pub fn current_rate(&self) -> Option<f64> {
if self.window.len() < self.window_size {
return None;
}
Some(self.current_rate_inner())
}
#[must_use]
pub fn baseline_rate(&self) -> Option<f64> {
self.baseline
}
fn current_rate_inner(&self) -> f64 {
if self.window.is_empty() {
return 0.0;
}
let bypassed = self.window.iter().filter(|&&b| b).count();
bypassed as f64 / self.window.len() as f64
}
}
#[cfg(test)]
mod tests {
use super::*;
fn blocked_obs(rt_ms: f64) -> ProbeObservation {
ProbeObservation {
response_time_ms: rt_ms,
was_blocked: true,
body_hash: Some(0xaaaa_aaaa_aaaa_aaaa),
}
}
fn pass_obs(rt_ms: f64) -> ProbeObservation {
ProbeObservation {
response_time_ms: rt_ms,
was_blocked: false,
body_hash: Some(0xbbbb_bbbb_bbbb_bbbb),
}
}
fn pass_obs_varied(rt_ms: f64, hash: u64) -> ProbeObservation {
ProbeObservation {
response_time_ms: rt_ms,
was_blocked: false,
body_hash: Some(hash),
}
}
fn feed_stationary(det: &mut DriftDetector, n: usize, rt: f64, blocked: bool, hash: u64) {
for _ in 0..n {
det.observe(ProbeObservation {
response_time_ms: rt,
was_blocked: blocked,
body_hash: Some(hash),
});
}
}
#[test]
fn latency_step_change_detected() {
let mut det = DriftDetector::new(20, 3.0);
feed_stationary(&mut det, 30, 20.0, false, 0x1111);
let mut fired = false;
for _ in 0..30 {
if det.observe(blocked_obs(200.0)).is_some() {
fired = true;
break;
}
}
assert!(fired, "latency step change must be detected");
}
#[test]
fn block_rate_step_change_detected() {
let mut det = DriftDetector::new(20, 3.0);
feed_stationary(&mut det, 30, 50.0, false, 0x2222);
let mut fired = false;
for _ in 0..30 {
if det.observe(blocked_obs(52.0)).is_some() {
fired = true;
break;
}
}
assert!(fired, "block-rate step change must be detected");
}
#[test]
fn no_false_positives_stationary_noise() {
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
let mut det = DriftDetector::new(50, 4.5);
let mut seed: u64 = 0xdead_beef_cafe_babe;
let mut false_positives = 0u32;
for i in 0u64..500 {
seed = seed
.wrapping_mul(6_364_136_223_846_793_005)
.wrapping_add(1_442_695_040_888_963_407);
let noise = ((seed >> 33) % 21) as f64; let rt = 40.0 + noise;
let mut h = DefaultHasher::new();
i.hash(&mut h);
let hash = h.finish() % 4;
let obs = ProbeObservation {
response_time_ms: rt,
was_blocked: (seed >> 60) == 0, body_hash: Some(hash),
};
if det.observe(obs).is_some() {
false_positives += 1;
}
}
assert!(
false_positives <= 1,
"too many false positives on stationary noise: {false_positives}"
);
}
#[test]
fn looser_now_fires_on_block_rate_drop() {
let mut det = DriftDetector::new(8, 2.0);
feed_stationary(&mut det, 20, 150.0, true, 0xaaaa);
let mut regime = None;
for _ in 0..80 {
regime = det.observe(pass_obs(30.0));
if regime.is_some() {
break;
}
}
assert_eq!(
regime,
Some(RegimeChange::LooserNow),
"must detect LooserNow when block rate drops"
);
}
#[test]
fn stricter_now_fires_on_block_rate_rise() {
let mut det = DriftDetector::new(8, 2.0);
feed_stationary(&mut det, 20, 30.0, false, 0x1111);
let mut regime = None;
for _ in 0..80 {
regime = det.observe(blocked_obs(200.0));
if regime.is_some() {
break;
}
}
assert_eq!(
regime,
Some(RegimeChange::StricterNow),
"must detect StricterNow when block rate rises"
);
}
#[test]
fn single_signal_alone_does_not_fire() {
let mut det = DriftDetector::new(50, 10.0);
feed_stationary(&mut det, 60, 50.0, false, 0xcccc);
let mut fired = false;
for _ in 0..10 {
if det.observe(pass_obs(55.0)).is_some() {
fired = true;
break;
}
}
assert!(
!fired,
"tiny single-signal nudge must not fire with high threshold"
);
}
#[test]
fn minimum_window_size_respected() {
let mut det = DriftDetector::new(0, 2.0);
assert_eq!(
det.window_size, 8,
"window_size must be clamped to minimum 8"
);
feed_stationary(&mut det, 20, 20.0, false, 0x1234);
let mut fired = false;
for _ in 0..30 {
if det.observe(blocked_obs(500.0)).is_some() {
fired = true;
break;
}
}
assert!(
fired,
"detector with minimum window must still detect step changes"
);
}
#[test]
fn lower_threshold_detects_faster() {
let mut fast = DriftDetector::new(20, 1.5);
let mut slow = DriftDetector::new(20, 5.0);
feed_stationary(&mut fast, 25, 30.0, false, 0x9999);
feed_stationary(&mut slow, 25, 30.0, false, 0x9999);
let mut fast_detection = None;
let mut slow_detection = None;
for i in 0..50u64 {
let obs = blocked_obs(200.0);
if fast_detection.is_none() && fast.observe(obs.clone()).is_some() {
fast_detection = Some(i);
}
if slow_detection.is_none() && slow.observe(obs).is_some() {
slow_detection = Some(i);
}
}
assert!(fast_detection.is_some(), "low-threshold detector must fire");
assert!(
fast_detection <= slow_detection.or(Some(u64::MAX)),
"low-threshold must detect at least as fast as high-threshold"
);
}
#[test]
fn json_serialization_round_trips() {
let mut det = DriftDetector::new(30, 3.5);
feed_stationary(&mut det, 15, 40.0, false, 0xdead);
det.observe(blocked_obs(300.0));
let json = serde_json::to_string(&det).expect("serialization must succeed");
let restored: DriftDetector =
serde_json::from_str(&json).expect("deserialization must succeed");
assert_eq!(restored.window_size, det.window_size);
assert_eq!(restored.threshold, det.threshold);
assert_eq!(restored.probe_count, det.probe_count);
}
#[test]
fn body_entropy_signal_contributes() {
let mut det = DriftDetector::new(20, 2.0);
feed_stationary(&mut det, 30, 50.0, false, 0xAAAA_AAAA);
let mut body_entropy_fired = false;
for i in 0u64..40 {
let obs = pass_obs_varied(52.0, i * 0xdead_beef + 1);
let snap_before = det.signal_snapshot()[3];
det.observe(obs);
let snap_after = det.signal_snapshot()[3];
if snap_after > snap_before + 0.01 {
body_entropy_fired = true;
break;
}
}
assert!(
body_entropy_fired,
"body entropy signal must increase on hash diversity"
);
}
#[test]
fn has_baseline_gated_on_probe_count() {
let mut det = DriftDetector::new(40, 4.0);
assert!(!det.has_baseline(), "no baseline before any probes");
for _ in 0..19 {
det.observe(pass_obs(50.0));
}
assert!(!det.has_baseline(), "baseline not ready at 19/40 probes");
det.observe(pass_obs(50.0)); assert!(
det.has_baseline(),
"baseline must be ready at window_size/2 probes"
);
}
#[test]
fn probe_count_saturates_not_wraps() {
let mut det = DriftDetector::new(8, 4.0);
det.probe_count = u64::MAX - 1;
det.observe(pass_obs(50.0));
assert_eq!(
det.probe_count,
u64::MAX,
"probe_count must saturate at u64::MAX"
);
det.observe(pass_obs(50.0));
assert_eq!(
det.probe_count,
u64::MAX,
"probe_count must remain at u64::MAX after second saturating add"
);
}
#[test]
fn signal_snapshot_structure() {
let mut det = DriftDetector::default();
let snap = det.signal_snapshot();
assert_eq!(snap.len(), 4);
for v in &snap {
assert!(
v.is_finite(),
"all signal values must be finite at zero state"
);
}
feed_stationary(&mut det, 10, 75.0, true, 0xBEEF);
let snap2 = det.signal_snapshot();
assert!((snap2[0] - 75.0).abs() < 1.0, "median RT must be ~75 ms");
assert!((snap2[2] - 1.0).abs() < 0.01, "block rate must be ~1.0");
}
#[test]
fn bypass_monitor_empty_window_returns_none() {
let monitor = BypassRateMonitor::new(50, 0.05, 0.5);
assert!(
monitor.current_rate().is_none(),
"no rate before window fills"
);
assert!(
monitor.baseline_rate().is_none(),
"no baseline before window fills"
);
}
#[test]
fn bypass_monitor_steady_rate_no_alarm() {
let mut monitor = BypassRateMonitor::new(50, 0.05, 0.5);
let mut fired = false;
for i in 0..200usize {
if let ChangePointEvent::AlarmFired { .. } = monitor.observe(i % 3 == 0) {
fired = true;
break;
}
}
assert!(!fired, "steady 33% bypass rate must not fire an alarm");
}
#[test]
fn bypass_monitor_zero_rate_fires_alarm() {
let mut monitor = BypassRateMonitor::new(20, 0.05, 0.5);
for i in 0..20usize {
monitor.observe(i % 2 == 0);
}
assert!(monitor.baseline_rate().is_some());
let mut fired = false;
for _ in 0..30 {
if let ChangePointEvent::AlarmFired { .. } = monitor.observe(false) {
fired = true;
break;
}
}
assert!(
fired,
"zero bypass rate after 50% baseline must trigger alarm"
);
}
#[test]
fn bypass_monitor_bimodal_alarm_at_break() {
let mut monitor = BypassRateMonitor::new(30, 0.05, 0.5);
for i in 0..60usize {
monitor.observe(i % 5 < 3); }
let mut alarm_idx: Option<usize> = None;
for i in 0..60usize {
if let ChangePointEvent::AlarmFired { .. } = monitor.observe(false) {
alarm_idx = Some(i);
break;
}
}
assert!(
alarm_idx.is_some(),
"bimodal pattern must trigger alarm in phase-2 region"
);
assert!(
alarm_idx.unwrap() < 40,
"alarm should fire within 40 samples of the regime break"
);
}
#[test]
fn bypass_monitor_high_threshold_no_fire() {
let mut monitor = BypassRateMonitor::new(30, 0.05, 10.0);
for i in 0..30usize {
monitor.observe(i % 2 == 0);
}
let mut fired = false;
for i in 0..60usize {
if let ChangePointEvent::AlarmFired { .. } = monitor.observe(i % 5 < 2) {
fired = true;
break;
}
}
assert!(!fired, "h=10 must NOT fire on a moderate rate drop");
}
#[test]
fn bypass_monitor_low_threshold_fires_fast() {
let mut monitor = BypassRateMonitor::new(10, 0.05, 0.01);
for _ in 0..10 {
monitor.observe(true);
}
let mut alarm_idx: Option<usize> = None;
for i in 0..10 {
if let ChangePointEvent::AlarmFired { .. } = monitor.observe(false) {
alarm_idx = Some(i);
break;
}
}
assert!(
alarm_idx.is_some(),
"h=0.01 must fire almost immediately on any downward deviation"
);
assert!(
alarm_idx.unwrap() <= 5,
"h=0.01 must fire within 5 samples of the change (got {:?})",
alarm_idx
);
}
#[test]
fn bypass_monitor_reset_after_alarm() {
let mut monitor = BypassRateMonitor::new(4, 0.05, 0.5);
for _ in 0..4 {
monitor.observe(true);
}
let mut first_alarm_fired = false;
for _ in 0..20 {
let evt = monitor.observe(false);
if let ChangePointEvent::AlarmFired { observed_rate, .. } = evt {
first_alarm_fired = true;
let new_baseline = monitor.baseline_rate().unwrap();
assert!(
(new_baseline - observed_rate).abs() < 0.05,
"baseline must reset to observed rate after alarm: new_baseline={new_baseline:.3}, observed={observed_rate:.3}"
);
}
}
assert!(first_alarm_fired, "first alarm must have fired");
let mut second_alarm = false;
for _ in 0..100 {
if let ChangePointEvent::AlarmFired { .. } = monitor.observe(false) {
second_alarm = true;
break;
}
}
assert!(
!second_alarm,
"no second alarm when staying at 0% after baseline reset and window drain"
);
}
#[test]
fn bypass_monitor_alarm_within_20_samples_of_drop() {
let mut monitor = BypassRateMonitor::new_default();
for i in 0..50usize {
monitor.observe(i % 10 < 3); }
let baseline = monitor.baseline_rate().expect("baseline must be set");
assert!(
(baseline - 0.3).abs() < 0.05,
"baseline must be ~30%: got {baseline:.3}"
);
let mut alarm_idx: Option<usize> = None;
for i in 0..20 {
if let ChangePointEvent::AlarmFired { .. } = monitor.observe(false) {
alarm_idx = Some(i);
break;
}
}
assert!(
alarm_idx.is_some(),
"alarm must fire within 20 samples of a 30%→0% bypass rate drop"
);
}
#[test]
fn bypass_monitor_no_alarm_on_steady_30pct_200_samples() {
let mut monitor = BypassRateMonitor::new_default();
let mut fired = false;
for i in 0..200usize {
if let ChangePointEvent::AlarmFired { .. } = monitor.observe(i % 10 < 3) {
fired = true;
break;
}
}
assert!(
!fired,
"must NOT fire on a perfectly steady 30% bypass rate over 200 samples"
);
}
#[test]
fn bypass_monitor_current_rate_accurate() {
let mut monitor = BypassRateMonitor::new(10, 0.05, 0.5);
for i in 0..10usize {
monitor.observe(i < 7);
}
let rate = monitor
.current_rate()
.expect("rate must be available after window fills");
assert!(
(rate - 0.7).abs() < 0.01,
"current_rate must be ~70% but got {rate:.3}"
);
}
}