use std::collections::VecDeque;
#[derive(Debug, Clone)]
pub struct OnlineZScore {
pub mean: f64,
pub m2: f64,
pub n: u64,
pub threshold: f64,
}
impl OnlineZScore {
#[must_use]
pub fn new(threshold: f64) -> Self {
Self {
mean: 0.0,
m2: 0.0,
n: 0,
threshold,
}
}
#[must_use]
pub fn score(&self, x: f64) -> f64 {
if self.n < 2 {
return 0.0;
}
let variance = self.m2 / (self.n - 1) as f64;
let std = (variance + 1e-12).sqrt();
(x - self.mean).abs() / std
}
pub fn update(&mut self, x: f64) -> Option<bool> {
let decision = if self.n >= 2 {
let z = self.score(x);
Some(z > self.threshold)
} else {
None
};
self.n += 1;
let delta = x - self.mean;
self.mean += delta / self.n as f64;
let delta2 = x - self.mean;
self.m2 += delta * delta2;
decision
}
pub fn window_z_score_update(&mut self, x: f64) -> f64 {
let z = self.score(x);
let _ = self.update(x);
z
}
pub fn reset(&mut self) {
self.mean = 0.0;
self.m2 = 0.0;
self.n = 0;
}
}
impl Default for OnlineZScore {
fn default() -> Self {
Self::new(3.0)
}
}
#[derive(Debug, Clone)]
pub struct ExponentialZ {
pub ema_mean: f64,
pub ema_var: f64,
pub alpha: f64,
pub threshold: f64,
pub n: u64,
}
impl ExponentialZ {
#[must_use]
pub fn new(alpha: f64, threshold: f64) -> Self {
Self {
ema_mean: 0.0,
ema_var: 1.0, alpha,
threshold,
n: 0,
}
}
#[must_use]
pub fn score(&self, x: f64) -> f64 {
if self.n == 0 {
return 0.0;
}
let std = (self.ema_var + 1e-12).sqrt();
(x - self.ema_mean).abs() / std
}
pub fn update(&mut self, x: f64) -> Option<bool> {
if self.n == 0 {
self.ema_mean = x;
self.n = 1;
return None;
}
let z = self.score(x);
self.ema_mean = (1.0 - self.alpha) * self.ema_mean + self.alpha * x;
self.ema_var = (1.0 - self.alpha) * self.ema_var + self.alpha * (x - self.ema_mean).powi(2);
self.n += 1;
Some(z > self.threshold)
}
pub fn reset(&mut self) {
self.ema_mean = 0.0;
self.ema_var = 1.0;
self.n = 0;
}
}
impl Default for ExponentialZ {
fn default() -> Self {
Self::new(0.05, 3.0)
}
}
#[derive(Debug, Clone)]
pub struct SlidingMad {
window: VecDeque<f64>,
pub mad_scale: f64,
pub threshold: f64,
pub window_size: usize,
}
impl SlidingMad {
#[must_use]
pub fn new(window_size: usize, mad_scale: f64, threshold: f64) -> Self {
Self {
window: VecDeque::with_capacity(window_size + 1),
mad_scale,
threshold,
window_size: window_size.max(1),
}
}
fn sorted_median(sorted: &[f64]) -> f64 {
if sorted.is_empty() {
return 0.0;
}
let n = sorted.len();
if n.is_multiple_of(2) {
(sorted[n / 2 - 1] + sorted[n / 2]) * 0.5
} else {
sorted[n / 2]
}
}
#[must_use]
pub fn score(&self, x: f64) -> f64 {
if self.window.is_empty() {
return 0.0;
}
let mut sorted: Vec<f64> = self.window.iter().copied().collect();
sorted.sort_unstable_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
let med = Self::sorted_median(&sorted);
let mut abs_devs: Vec<f64> = sorted.iter().map(|v| (v - med).abs()).collect();
abs_devs.sort_unstable_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
let mad = Self::sorted_median(&abs_devs);
let denom = mad * self.mad_scale + 1e-12;
(x - med).abs() / denom
}
pub fn update(&mut self, x: f64) -> Option<bool> {
if self.window.len() >= self.window_size {
self.window.pop_front();
}
self.window.push_back(x);
if self.window.len() < 3 {
return None;
}
let z = self.score(x);
Some(z > self.threshold)
}
pub fn reset(&mut self) {
self.window.clear();
}
#[must_use]
pub fn len(&self) -> usize {
self.window.len()
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.window.is_empty()
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum StreamMethod {
Zscore,
EwmaZ,
SlidingMad,
}
#[derive(Debug, Clone, Copy)]
pub struct StreamingResult {
pub score: f64,
pub is_anomaly: bool,
pub n_processed: u64,
}
enum StreamState {
Zscore(OnlineZScore),
EwmaZ(ExponentialZ),
Mad(SlidingMad),
}
pub struct StreamingThresholdDetector {
pub method: StreamMethod,
state: StreamState,
n_processed: u64,
}
impl StreamingThresholdDetector {
#[must_use]
pub fn with_zscore(threshold: f64) -> Self {
Self {
method: StreamMethod::Zscore,
state: StreamState::Zscore(OnlineZScore::new(threshold)),
n_processed: 0,
}
}
#[must_use]
pub fn with_ewma_z(alpha: f64, threshold: f64) -> Self {
Self {
method: StreamMethod::EwmaZ,
state: StreamState::EwmaZ(ExponentialZ::new(alpha, threshold)),
n_processed: 0,
}
}
#[must_use]
pub fn with_sliding_mad(window_size: usize, mad_scale: f64, threshold: f64) -> Self {
Self {
method: StreamMethod::SlidingMad,
state: StreamState::Mad(SlidingMad::new(window_size, mad_scale, threshold)),
n_processed: 0,
}
}
pub fn update(&mut self, x: f64) -> Option<StreamingResult> {
self.n_processed += 1;
let n_processed = self.n_processed;
match &mut self.state {
StreamState::Zscore(oz) => {
let score = oz.score(x);
let decision = oz.update(x)?;
Some(StreamingResult {
score,
is_anomaly: decision,
n_processed,
})
}
StreamState::EwmaZ(ez) => {
let score = ez.score(x);
let decision = ez.update(x)?;
Some(StreamingResult {
score,
is_anomaly: decision,
n_processed,
})
}
StreamState::Mad(sm) => {
let score = sm.score(x);
let decision = sm.update(x)?;
Some(StreamingResult {
score,
is_anomaly: decision,
n_processed,
})
}
}
}
pub fn reset(&mut self) {
self.n_processed = 0;
match &mut self.state {
StreamState::Zscore(oz) => oz.reset(),
StreamState::EwmaZ(ez) => ez.reset(),
StreamState::Mad(sm) => sm.reset(),
}
}
#[must_use]
pub fn n_processed(&self) -> u64 {
self.n_processed
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn online_zscore_warmup_returns_none() {
let mut oz = OnlineZScore::new(3.0);
assert!(oz.update(1.0).is_none());
assert!(oz.update(2.0).is_none());
assert!(oz.update(1.5).is_some());
}
#[test]
fn online_zscore_extreme_outlier_detected() {
let mut oz = OnlineZScore::new(3.0);
for v in &[1.0_f64, 1.1, 0.9, 1.05, 0.95, 1.02, 0.98] {
oz.update(*v);
}
let r = oz.update(1000.0);
assert_eq!(r, Some(true), "1000.0 should be an anomaly");
}
#[test]
fn online_zscore_inlier_not_anomaly() {
let mut oz = OnlineZScore::new(5.0);
for v in &[10.0_f64, 10.1, 9.9, 10.05, 9.95, 10.02, 9.98] {
oz.update(*v);
}
let r = oz.update(10.0);
assert_eq!(r, Some(false), "inlier should not be anomaly");
}
#[test]
fn online_zscore_score_before_update() {
let mut oz = OnlineZScore::new(3.0);
oz.update(5.0);
oz.update(10.0);
let z = oz.score(5.0);
assert!(z.is_finite(), "z={z}");
}
#[test]
fn online_zscore_window_z_score_update() {
let mut oz = OnlineZScore::new(3.0);
let z = oz.window_z_score_update(1.0);
assert_eq!(z, 0.0, "first call should return 0.0 (n<2)");
oz.window_z_score_update(2.0);
let z2 = oz.window_z_score_update(1.5);
assert!(z2.is_finite(), "z2={z2}");
}
#[test]
fn online_zscore_reset() {
let mut oz = OnlineZScore::new(3.0);
for v in 0..10 {
oz.update(v as f64);
}
oz.reset();
assert_eq!(oz.n, 0);
assert_eq!(oz.mean, 0.0);
assert_eq!(oz.m2, 0.0);
}
#[test]
fn ewma_z_first_observation_returns_none() {
let mut ez = ExponentialZ::new(0.05, 3.0);
assert!(ez.update(1.0).is_none());
}
#[test]
fn ewma_z_outlier_detected() {
let mut ez = ExponentialZ::new(0.1, 3.0);
for v in &[1.0_f64, 1.1, 0.9, 1.05, 0.95, 1.02, 0.98, 1.01] {
ez.update(*v);
}
let r = ez.update(500.0);
assert_eq!(r, Some(true), "500.0 should be anomaly for EWMA Z");
}
#[test]
fn ewma_z_score_finite() {
let mut ez = ExponentialZ::new(0.05, 3.0);
ez.update(1.0);
ez.update(2.0);
let s = ez.score(1.5);
assert!(s.is_finite(), "score={s}");
}
#[test]
fn ewma_z_reset_clears_state() {
let mut ez = ExponentialZ::new(0.1, 3.0);
for v in 0..20 {
ez.update(v as f64);
}
ez.reset();
assert_eq!(ez.n, 0);
assert!(ez.update(1.0).is_none());
}
#[test]
fn sliding_mad_warmup_returns_none() {
let mut sm = SlidingMad::new(10, 1.4826, 3.0);
assert!(sm.update(1.0).is_none());
assert!(sm.update(2.0).is_none());
assert!(sm.update(1.5).is_some());
}
#[test]
fn sliding_mad_outlier_detected() {
let mut sm = SlidingMad::new(20, 1.4826, 3.0);
for v in 0..15 {
sm.update(v as f64 * 0.1);
}
let r = sm.update(1000.0);
assert_eq!(r, Some(true), "1000.0 should be anomaly");
}
#[test]
fn sliding_mad_window_evicts_old_values() {
let mut sm = SlidingMad::new(5, 1.4826, 3.0);
for v in 0..10 {
sm.update(v as f64);
}
assert_eq!(sm.len(), 5, "window must have exactly window_size elements");
}
#[test]
fn sliding_mad_score_finite() {
let mut sm = SlidingMad::new(10, 1.4826, 3.0);
for v in 0..5 {
sm.update(v as f64);
}
let s = sm.score(2.5);
assert!(s.is_finite(), "score={s}");
}
#[test]
fn sliding_mad_is_empty_after_reset() {
let mut sm = SlidingMad::new(10, 1.4826, 3.0);
sm.update(1.0);
sm.reset();
assert!(sm.is_empty());
}
#[test]
fn streaming_zscore_method() {
let mut det = StreamingThresholdDetector::with_zscore(3.0);
let values = [1.0_f64, 1.1, 0.9, 1.05, 0.95, 500.0];
let mut results = Vec::new();
for &v in &values {
if let Some(r) = det.update(v) {
results.push(r);
}
}
assert!(
results.last().is_some_and(|r| r.is_anomaly),
"500.0 should be anomaly"
);
assert_eq!(det.n_processed(), values.len() as u64);
}
#[test]
fn streaming_ewma_method() {
let mut det = StreamingThresholdDetector::with_ewma_z(0.1, 3.0);
assert!(det.update(1.0).is_none());
for v in 1..10 {
det.update(v as f64 * 0.1);
}
let r = det.update(1000.0);
assert!(
r.is_some_and(|r| r.is_anomaly),
"1000.0 should be anomaly for EWMA"
);
}
#[test]
fn streaming_sliding_mad_method() {
let mut det = StreamingThresholdDetector::with_sliding_mad(15, 1.4826, 3.0);
for v in 0..10 {
det.update(v as f64 * 0.1);
}
let r = det.update(9999.0);
assert!(r.is_some_and(|r| r.is_anomaly), "9999.0 should be anomaly");
}
#[test]
fn streaming_detector_reset() {
let mut det = StreamingThresholdDetector::with_zscore(3.0);
for v in 0..20 {
det.update(v as f64);
}
det.reset();
assert_eq!(det.n_processed(), 0);
assert!(det.update(1.0).is_none());
}
#[test]
fn streaming_result_n_processed_increments() {
let mut det = StreamingThresholdDetector::with_zscore(3.0);
for i in 1_u64..=10 {
det.update(i as f64);
assert_eq!(det.n_processed(), i);
}
}
}