#![cfg(feature = "std")]
use std::sync::Arc;
use crate::error::{RcfError, RcfResult};
use crate::metrics::{MetricsSink, default_sink, names};
pub const DEFAULT_DELTA: f64 = 0.002;
pub const DEFAULT_WINDOW_CAP: usize = 4096;
pub const MIN_SUBWINDOW_LEN: usize = 16;
#[derive(Debug, Clone)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub struct AdwinDetector {
range: f64,
delta: f64,
window_cap: usize,
buffer: Vec<f64>,
drift_fires: u64,
#[cfg_attr(
feature = "serde",
serde(skip, default = "crate::metrics::default_sink")
)]
metrics: Arc<dyn MetricsSink>,
}
impl AdwinDetector {
pub fn new(range: f64, delta: f64, window_cap: usize) -> RcfResult<Self> {
if !range.is_finite() || range <= 0.0 {
return Err(RcfError::InvalidConfig(
format!("AdwinDetector: range must be finite and > 0, got {range}").into(),
));
}
if !delta.is_finite() || !(0.0..1.0).contains(&delta) || delta <= 0.0 {
return Err(RcfError::InvalidConfig(
format!("AdwinDetector: delta must be in (0.0, 1.0), got {delta}").into(),
));
}
if window_cap < 2 * MIN_SUBWINDOW_LEN {
return Err(RcfError::InvalidConfig(
format!(
"AdwinDetector: window_cap must be >= {}, got {window_cap}",
2 * MIN_SUBWINDOW_LEN
)
.into(),
));
}
Ok(Self {
range,
delta,
window_cap,
buffer: Vec::with_capacity(window_cap),
drift_fires: 0,
metrics: default_sink(),
})
}
#[must_use]
pub fn with_metrics_sink(mut self, sink: Arc<dyn MetricsSink>) -> Self {
self.metrics = sink;
self
}
#[must_use]
pub fn metrics_sink(&self) -> &Arc<dyn MetricsSink> {
&self.metrics
}
#[must_use]
pub fn default_bounded() -> Self {
Self::new(1.0, DEFAULT_DELTA, DEFAULT_WINDOW_CAP).expect("default params valid")
}
#[must_use]
pub fn len(&self) -> usize {
self.buffer.len()
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.buffer.is_empty()
}
#[must_use]
pub fn drift_fires(&self) -> u64 {
self.drift_fires
}
#[must_use]
pub fn mean(&self) -> f64 {
if self.buffer.is_empty() {
return 0.0;
}
#[allow(clippy::cast_precision_loss)]
let n = self.buffer.len() as f64;
self.buffer.iter().sum::<f64>() / n
}
#[must_use = "detector output should be checked — dropping it silently usually indicates a logic bug"]
pub fn update(&mut self, value: f64) -> bool {
if !value.is_finite() {
return false;
}
self.metrics.inc_counter(names::ADWIN_OBSERVED_TOTAL, 1);
if self.buffer.len() >= self.window_cap {
self.buffer.remove(0);
}
self.buffer.push(value);
let fired = self.detect_and_shrink();
if fired {
self.metrics.inc_counter(names::ADWIN_DRIFT_FIRES_TOTAL, 1);
}
fired
}
pub fn reset_window(&mut self) {
self.buffer.clear();
}
fn detect_and_shrink(&mut self) -> bool {
let n = self.buffer.len();
if n < 2 * MIN_SUBWINDOW_LEN {
return false;
}
let mut prefix = Vec::with_capacity(n + 1);
prefix.push(0.0_f64);
let mut acc = 0.0_f64;
for &v in &self.buffer {
acc += v;
prefix.push(acc);
}
let total = prefix[n];
for (split, &left_sum) in prefix
.iter()
.enumerate()
.take(n - MIN_SUBWINDOW_LEN + 1)
.skip(MIN_SUBWINDOW_LEN)
{
let n_left = split;
let n_right = n - split;
#[allow(clippy::cast_precision_loss)]
let nl = n_left as f64;
#[allow(clippy::cast_precision_loss)]
let nr = n_right as f64;
let mean_l = left_sum / nl;
let mean_r = (total - left_sum) / nr;
let m = 1.0 / (1.0 / nl + 1.0 / nr);
#[allow(clippy::cast_precision_loss)]
let log_term = (4.0 * n as f64 / self.delta).ln();
let eps_cut = self.range * (log_term / (2.0 * m)).sqrt();
if (mean_l - mean_r).abs() > eps_cut {
self.buffer.drain(..split);
self.drift_fires = self.drift_fires.saturating_add(1);
return true;
}
}
false
}
}
#[cfg(test)]
#[allow(
clippy::unwrap_used,
clippy::panic,
clippy::float_cmp,
clippy::cast_precision_loss
)]
mod tests {
use super::*;
#[test]
fn new_rejects_invalid_params() {
assert!(AdwinDetector::new(-1.0, DEFAULT_DELTA, DEFAULT_WINDOW_CAP).is_err());
assert!(AdwinDetector::new(1.0, 0.0, DEFAULT_WINDOW_CAP).is_err());
assert!(AdwinDetector::new(1.0, 1.0, DEFAULT_WINDOW_CAP).is_err());
assert!(AdwinDetector::new(1.0, DEFAULT_DELTA, 5).is_err());
}
#[test]
fn stable_stream_does_not_fire() {
let mut d = AdwinDetector::default_bounded();
for _ in 0..512 {
assert!(!d.update(0.5));
}
assert_eq!(d.drift_fires(), 0);
}
#[test]
fn mean_shift_triggers_drift() {
let mut d = AdwinDetector::default_bounded();
for _ in 0..256 {
let _ = d.update(0.1);
}
let mut fired = false;
for _ in 0..128 {
if d.update(0.9) {
fired = true;
break;
}
}
assert!(fired, "ADWIN missed mean shift from 0.1 → 0.9");
assert!(d.drift_fires() >= 1);
}
#[test]
fn non_finite_is_ignored() {
let mut d = AdwinDetector::default_bounded();
assert!(!d.update(f64::NAN));
assert!(!d.update(f64::INFINITY));
assert_eq!(d.len(), 0);
}
#[test]
fn window_respects_cap() {
let mut d = AdwinDetector::new(1.0, DEFAULT_DELTA, 64).unwrap();
for i in 0..200 {
let _ = d.update(f64::from(i % 2));
}
assert!(d.len() <= 64);
}
#[test]
fn reset_window_clears_buffer() {
let mut d = AdwinDetector::default_bounded();
for _ in 0..100 {
let _ = d.update(0.5);
}
d.reset_window();
assert!(d.is_empty());
}
}