#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum ResidualSource {
Latency,
Throughput,
ErrorRate,
QueueDepth,
HeartbeatRtt,
PollDuration,
MemoryUsage,
SerdeLatency,
FlowControlWindow,
DnsLatency,
Custom(&'static str),
}
#[derive(Debug, Clone, Copy)]
pub struct ResidualSample {
pub value: f64,
pub baseline: f64,
pub timestamp_ns: u64,
pub source: ResidualSource,
}
impl ResidualSample {
#[inline]
pub fn residual(&self) -> f64 {
self.value - self.baseline
}
}
#[derive(Debug, Clone, Copy)]
pub struct ResidualSign {
pub residual: f64,
pub drift: f64,
pub slew: f64,
pub timestamp_ns: u64,
pub source: ResidualSource,
}
pub struct ResidualEstimator {
window: [f64; 128],
timestamps: [u64; 128],
head: usize,
count: usize,
persistence_window: usize,
source: ResidualSource,
}
impl ResidualEstimator {
pub fn new(source: ResidualSource, persistence_window: usize) -> Self {
assert!(
persistence_window > 0 && persistence_window <= 128,
"Persistence window must be in [1, 128], got {}",
persistence_window
);
Self {
window: [0.0; 128],
timestamps: [0; 128],
head: 0,
count: 0,
persistence_window,
source,
}
}
pub fn observe(&mut self, sample: &ResidualSample) -> ResidualSign {
let r = sample.residual();
self.window[self.head] = r;
self.timestamps[self.head] = sample.timestamp_ns;
self.head = (self.head + 1) % 128;
if self.count < 128 {
self.count += 1;
}
let drift = self.estimate_drift();
let slew = self.estimate_slew();
ResidualSign {
residual: r,
drift,
slew,
timestamp_ns: sample.timestamp_ns,
source: self.source,
}
}
fn estimate_drift(&self) -> f64 {
let n = self.count.min(self.persistence_window);
if n < 2 {
return 0.0;
}
let mut sum_x: f64 = 0.0;
let mut sum_y: f64 = 0.0;
let mut sum_xy: f64 = 0.0;
let mut sum_x2: f64 = 0.0;
for i in 0..n {
let idx = if self.head >= n {
self.head - n + i
} else {
(128 + self.head - n + i) % 128
};
let x = i as f64;
let y = self.window[idx];
sum_x += x;
sum_y += y;
sum_xy += x * y;
sum_x2 += x * x;
}
let nf = n as f64;
let denom = nf * sum_x2 - sum_x * sum_x;
if denom.abs() < 1e-15 {
return 0.0;
}
(nf * sum_xy - sum_x * sum_y) / denom
}
fn estimate_slew(&self) -> f64 {
let n = self.count.min(self.persistence_window);
if n < 4 {
return 0.0;
}
let half = n / 2;
let drift_first = self.half_window_drift(0, half);
let drift_second = self.half_window_drift(half, n);
drift_second - drift_first
}
fn half_window_drift(&self, start_offset: usize, end_offset: usize) -> f64 {
let n = self.count.min(self.persistence_window);
let sub_n = end_offset - start_offset;
if sub_n < 2 {
return 0.0;
}
let mut sum_x: f64 = 0.0;
let mut sum_y: f64 = 0.0;
let mut sum_xy: f64 = 0.0;
let mut sum_x2: f64 = 0.0;
for i in 0..sub_n {
let global_i = start_offset + i;
let idx = if self.head >= n {
self.head - n + global_i
} else {
(128 + self.head - n + global_i) % 128
};
let x = i as f64;
let y = self.window[idx];
sum_x += x;
sum_y += y;
sum_xy += x * y;
sum_x2 += x * x;
}
let nf = sub_n as f64;
let denom = nf * sum_x2 - sum_x * sum_x;
if denom.abs() < 1e-15 {
return 0.0;
}
(nf * sum_xy - sum_x * sum_y) / denom
}
pub fn reset(&mut self) {
self.window = [0.0; 128];
self.timestamps = [0; 128];
self.head = 0;
self.count = 0;
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_zero_residual_produces_zero_drift_slew() {
let mut est = ResidualEstimator::new(ResidualSource::Latency, 20);
for i in 0..30 {
let sample = ResidualSample {
value: 100.0,
baseline: 100.0,
timestamp_ns: i * 1_000_000_000,
source: ResidualSource::Latency,
};
let sign = est.observe(&sample);
assert!((sign.residual).abs() < 1e-10);
}
let sample = ResidualSample {
value: 100.0,
baseline: 100.0,
timestamp_ns: 30_000_000_000,
source: ResidualSource::Latency,
};
let sign = est.observe(&sample);
assert!(sign.drift.abs() < 1e-10);
assert!(sign.slew.abs() < 1e-10);
}
#[test]
fn test_linear_drift_detected() {
let mut est = ResidualEstimator::new(ResidualSource::Latency, 20);
for i in 0..30u64 {
let sample = ResidualSample {
value: 100.0 + 0.5 * i as f64,
baseline: 100.0,
timestamp_ns: i * 1_000_000_000,
source: ResidualSource::Latency,
};
est.observe(&sample);
}
let sample = ResidualSample {
value: 100.0 + 15.5,
baseline: 100.0,
timestamp_ns: 31_000_000_000,
source: ResidualSource::Latency,
};
let sign = est.observe(&sample);
assert!(
(sign.drift - 0.5).abs() < 0.1,
"Expected drift ~0.5, got {}",
sign.drift
);
assert!(sign.slew.abs() < 0.2, "Expected slew ~0, got {}", sign.slew);
}
#[test]
fn test_accelerating_drift_produces_positive_slew() {
let mut est = ResidualEstimator::new(ResidualSource::Latency, 40);
for i in 0..50u64 {
let sample = ResidualSample {
value: 100.0 + 0.01 * (i as f64) * (i as f64),
baseline: 100.0,
timestamp_ns: i * 1_000_000_000,
source: ResidualSource::Latency,
};
est.observe(&sample);
}
let sample = ResidualSample {
value: 100.0 + 0.01 * 50.0 * 50.0,
baseline: 100.0,
timestamp_ns: 50_000_000_000,
source: ResidualSource::Latency,
};
let sign = est.observe(&sample);
assert!(
sign.slew > 0.0,
"Expected positive slew for quadratic growth, got {}",
sign.slew
);
}
#[test]
fn test_source_preserved() {
let mut est = ResidualEstimator::new(ResidualSource::HeartbeatRtt, 10);
let sample = ResidualSample {
value: 50.0,
baseline: 40.0,
timestamp_ns: 0,
source: ResidualSource::HeartbeatRtt,
};
let sign = est.observe(&sample);
assert_eq!(sign.source, ResidualSource::HeartbeatRtt);
}
}