use ndarray::Array1;
use rand::seq::SliceRandom;
use rand::thread_rng;
#[cfg(feature = "async")]
pub mod async_anomalyzer;
#[cfg(feature = "persist")]
pub mod persistence;
pub const NA: f64 = f64::INFINITY;
#[derive(Clone, Debug)]
pub struct AnomalyzerConf {
pub sensitivity: f64,
pub upper_bound: f64,
pub lower_bound: f64,
pub active_size: usize,
pub n_seasons: usize,
pub perm_count: usize,
pub methods: Vec<String>,
}
impl Default for AnomalyzerConf {
fn default() -> Self {
Self {
sensitivity: 0.1,
upper_bound: 100.0,
lower_bound: NA,
active_size: 1,
n_seasons: 4,
perm_count: 500,
methods: vec!["magnitude".to_string(), "cdf".to_string()],
}
}
}
pub struct Anomalyzer {
conf: AnomalyzerConf,
pub(crate) data: Vec<f64>,
}
impl Anomalyzer {
pub fn new(conf: AnomalyzerConf, initial_data: Option<Vec<f64>>) -> Result<Self, String> {
if conf.active_size == 0 {
return Err("active_size must be at least 1".to_string());
}
let reference_size = conf.n_seasons * conf.active_size;
if reference_size < conf.active_size {
return Err("reference window too small".to_string());
}
let methods = if conf.methods.is_empty() {
vec!["magnitude".to_string(), "cdf".to_string()]
} else {
conf.methods.clone()
};
let mut validated_conf = conf;
validated_conf.methods = methods;
if validated_conf.perm_count == 0 {
validated_conf.perm_count = 500;
}
if validated_conf.sensitivity == 0.0 {
validated_conf.sensitivity = 0.1;
}
let data = initial_data.unwrap_or_default();
Ok(Anomalyzer {
conf: validated_conf,
data,
})
}
fn reference_size(&self) -> usize {
self.conf.n_seasons * self.conf.active_size
}
fn extract_windows(&self) -> Option<(Array1<f64>, Array1<f64>)> {
let total_needed = self.conf.active_size + self.reference_size();
if self.data.len() < total_needed {
return None;
}
let len = self.data.len();
let ref_start = len - total_needed;
let active_start = len - self.conf.active_size;
let reference = Array1::from_vec(self.data[ref_start..active_start].to_vec());
let active = Array1::from_vec(self.data[active_start..].to_vec());
Some((reference, active))
}
pub fn push(&mut self, value: f64) -> f64 {
self.data.push(value);
let needed = self.conf.active_size + self.reference_size();
if self.data.len() > needed * 3 {
let keep_start = self.data.len() - (needed * 2);
self.data.drain(..keep_start);
}
self.eval()
}
pub fn eval(&self) -> f64 {
let (reference, active) = match self.extract_windows() {
Some(w) => w,
None => return 0.0,
};
let mut prob_map: Vec<(&str, f64)> = Vec::new();
let mut rank_prob = 0.0f64;
for method in &self.conf.methods {
let prob = match method.as_str() {
"magnitude" => self.magnitude_test(&reference, &active),
"fence" => self.fence_test(&active),
"cdf" => self.cdf_test(&reference, &active),
"diff" => self.diff_test(&reference, &active),
"highrank" => self.rank_test(&reference, &active, true),
"lowrank" => self.rank_test(&reference, &active, false),
"ks" => self.bootstrap_ks_test(&reference, &active),
_ => continue,
};
if prob.is_nan() || prob < 0.0 || prob > 1.0 {
continue;
}
if method == "highrank" || method == "lowrank" {
rank_prob = rank_prob.max(prob);
continue;
}
if method == "magnitude" && prob < self.conf.sensitivity {
return 0.0;
}
prob_map.push((method, prob));
}
if (self.conf.methods.contains(&"highrank".to_string())
|| self.conf.methods.contains(&"lowrank".to_string()))
&& rank_prob > 0.0
{
prob_map.push(("rank", rank_prob));
}
let mut weighted_sum = 0.0;
let mut weight_sum = 0.0;
for (name, prob) in prob_map {
let weight = if (name == "magnitude" || name == "fence") && prob > 0.8 {
5.0
} else {
0.5
};
weighted_sum += prob * weight;
weight_sum += weight;
}
if weight_sum > 0.0 {
weighted_sum / weight_sum
} else {
0.0
}
}
fn magnitude_test(&self, reference: &Array1<f64>, active: &Array1<f64>) -> f64 {
let ref_mean = reference.mean().unwrap_or(0.0);
let act_mean = active.mean().unwrap_or(0.0);
if ref_mean == 0.0 {
return if act_mean == 0.0 { 0.0 } else { 1.0 };
}
let pdiff = (act_mean - ref_mean).abs() / ref_mean;
let exp_val = 10.0f64.powf(pdiff.min(5.0));
(exp_val - 1.0) / 9.0
}
fn fence_test(&self, active: &Array1<f64>) -> f64 {
let x = active.mean().unwrap_or(0.0);
let distance = if self.conf.lower_bound.is_infinite() {
(x / self.conf.upper_bound).min(2.0)
} else {
let mid = (self.conf.upper_bound + self.conf.lower_bound) / 2.0;
let bound = (self.conf.upper_bound - self.conf.lower_bound) / 2.0;
((x - mid).abs() / bound).min(2.0)
};
(10.0f64.powf(distance) - 1.0) / 9.0
}
fn cdf_test(&self, reference: &Array1<f64>, active: &Array1<f64>) -> f64 {
if reference.len() < 2 || active.len() < 2 {
return 0.5;
}
let ref_diffs: Vec<f64> = reference
.windows(2)
.into_iter()
.map(|w| (w[1] - w[0]).abs())
.collect();
let act_diffs: Vec<f64> = active
.windows(2)
.into_iter()
.map(|w| (w[1] - w[0]).abs())
.collect();
if ref_diffs.is_empty() || act_diffs.is_empty() {
return 0.5;
}
let active_mean_diff = act_diffs.iter().sum::<f64>() / act_diffs.len() as f64;
let mut sorted_ref: Vec<f64> = ref_diffs;
sorted_ref.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
let count_le = sorted_ref.iter().filter(|&&v| v <= active_mean_diff).count();
let percentile = count_le as f64 / sorted_ref.len() as f64;
let prob = 2.0 * (0.5 - percentile).abs();
prob.min(1.0)
}
fn diff_test(&self, reference: &Array1<f64>, active: &Array1<f64>) -> f64 {
if reference.len() < 2 || active.len() < 2 {
return 0.5;
}
let ref_diffs: Vec<f64> = reference
.windows(2)
.into_iter()
.map(|w| (w[1] - w[0]).abs())
.collect();
let act_diffs: Vec<f64> = active
.windows(2)
.into_iter()
.map(|w| (w[1] - w[0]).abs())
.collect();
if ref_diffs.is_empty() || act_diffs.is_empty() {
return 0.5;
}
let mut combined_diffs = ref_diffs.clone();
combined_diffs.extend(act_diffs.iter().cloned());
let mut indexed: Vec<(f64, usize)> = combined_diffs
.iter()
.copied()
.enumerate()
.map(|(i, v)| (v, i))
.collect();
indexed.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(std::cmp::Ordering::Equal));
let mut ranks = vec![0usize; combined_diffs.len()];
for (rank, &(_, idx)) in indexed.iter().enumerate() {
ranks[idx] = rank + 1;
}
let n_ref = ref_diffs.len();
let active_sum: usize = ranks[n_ref..].iter().sum();
let mut significant = 0;
let mut rng = thread_rng();
for _ in 0..self.conf.perm_count {
let mut perm = combined_diffs.clone();
perm.shuffle(&mut rng);
let mut perm_indexed: Vec<(f64, usize)> = perm
.iter()
.copied()
.enumerate()
.map(|(i, v)| (v, i))
.collect();
perm_indexed.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(std::cmp::Ordering::Equal));
let mut perm_ranks = vec![0usize; perm.len()];
for (rank, &(_, idx)) in perm_indexed.iter().enumerate() {
perm_ranks[idx] = rank + 1;
}
let perm_active_sum: usize = perm_ranks[n_ref..].iter().sum();
if perm_active_sum < active_sum {
significant += 1;
}
}
significant as f64 / self.conf.perm_count as f64
}
fn rank_test(&self, reference: &Array1<f64>, active: &Array1<f64>, high: bool) -> f64 {
let mut combined = reference.to_vec();
combined.extend(active.iter());
let n_active = active.len();
let mut indexed: Vec<(f64, usize)> = combined
.iter()
.copied()
.enumerate()
.map(|(idx, val)| (val, idx))
.collect();
indexed.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(std::cmp::Ordering::Equal));
let mut ranks = vec![0usize; combined.len()];
for (rank, &(_, idx)) in indexed.iter().enumerate() {
ranks[idx] = rank + 1;
}
let active_sum: usize = ranks[combined.len() - n_active..].iter().sum();
let mut significant = 0;
let mut rng = thread_rng();
for _ in 0..self.conf.perm_count {
let mut perm = combined.clone();
perm.shuffle(&mut rng);
let mut perm_indexed: Vec<(f64, usize)> = perm
.iter()
.copied()
.enumerate()
.map(|(idx, val)| (val, idx))
.collect();
perm_indexed.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(std::cmp::Ordering::Equal));
let mut perm_ranks = vec![0usize; perm.len()];
for (rank, &(_, idx)) in perm_indexed.iter().enumerate() {
perm_ranks[idx] = rank + 1;
}
let perm_active_sum: usize = perm_ranks[perm.len() - n_active..].iter().sum();
if high {
if perm_active_sum < active_sum {
significant += 1;
}
} else {
if perm_active_sum > active_sum {
significant += 1;
}
}
}
significant as f64 / self.conf.perm_count as f64
}
fn bootstrap_ks_test(&self, reference: &Array1<f64>, active: &Array1<f64>) -> f64 {
if reference.is_empty() || active.is_empty() {
return 0.5;
}
let ks_stat = self.ks_statistic(reference, active);
if ks_stat.is_nan() || ks_stat == 0.0 {
return 0.5;
}
let mut combined = reference.to_vec();
combined.extend(active.iter().cloned());
let mut significant = 0;
let mut rng = thread_rng();
for _ in 0..self.conf.perm_count {
let mut perm = combined.clone();
perm.shuffle(&mut rng);
let perm_ref = Array1::from_vec(perm[..reference.len()].to_vec());
let perm_act = Array1::from_vec(perm[reference.len()..].to_vec());
let perm_ks = self.ks_statistic(&perm_ref, &perm_act);
if perm_ks < ks_stat {
significant += 1;
}
}
significant as f64 / self.conf.perm_count as f64
}
fn ks_statistic(&self, a: &Array1<f64>, b: &Array1<f64>) -> f64 {
if a.is_empty() || b.is_empty() {
return f64::NAN;
}
let mut sorted_a = a.to_vec();
let mut sorted_b = b.to_vec();
sorted_a.sort_by(|x, y| x.partial_cmp(y).unwrap_or(std::cmp::Ordering::Equal));
sorted_b.sort_by(|x, y| x.partial_cmp(y).unwrap_or(std::cmp::Ordering::Equal));
let mut i = 0;
let mut j = 0;
let mut max_diff: f64 = 0.0;
while i < sorted_a.len() && j < sorted_b.len() {
if sorted_a[i] <= sorted_b[j] {
let ecdf_a = (i + 1) as f64 / sorted_a.len() as f64;
let ecdf_b = j as f64 / sorted_b.len() as f64;
max_diff = max_diff.max((ecdf_a - ecdf_b).abs());
i += 1;
} else {
let ecdf_a = i as f64 / sorted_a.len() as f64;
let ecdf_b = (j + 1) as f64 / sorted_b.len() as f64;
max_diff = max_diff.max((ecdf_a - ecdf_b).abs());
j += 1;
}
}
while i < sorted_a.len() {
let ecdf_a = (i + 1) as f64 / sorted_a.len() as f64;
let ecdf_b = 1.0;
max_diff = max_diff.max((ecdf_a - ecdf_b).abs());
i += 1;
}
while j < sorted_b.len() {
let ecdf_a = 1.0;
let ecdf_b = (j + 1) as f64 / sorted_b.len() as f64;
max_diff = max_diff.max((ecdf_a - ecdf_b).abs());
j += 1;
}
max_diff
}
}
#[cfg(feature = "persist")]
pub struct PersistentAnomalyzer {
inner: Anomalyzer,
pm: persistence::PersistenceManager,
}
#[cfg(feature = "persist")]
impl PersistentAnomalyzer {
pub fn open(
dir: impl AsRef<std::path::Path>,
conf: AnomalyzerConf,
) -> std::io::Result<Self> {
let mut pm = persistence::PersistenceManager::open(&dir)?;
let recovered = pm.recover()?;
let initial = if recovered.is_empty() {
None
} else {
Some(recovered)
};
let inner = Anomalyzer::new(conf, initial)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidInput, e))?;
Ok(Self { inner, pm })
}
pub fn push(&mut self, value: f64) -> std::io::Result<f64> {
let prob = self.inner.push(value);
self.pm.record_push(value, &self.inner.data)?;
Ok(prob)
}
pub fn eval(&self) -> f64 {
self.inner.eval()
}
pub fn flush(&mut self) -> std::io::Result<()> {
self.pm.compact(&self.inner.data)
}
pub fn wal_size_bytes(&self) -> std::io::Result<u64> {
self.pm.wal_size_bytes()
}
pub fn pending_wal_entries(&self) -> usize {
self.pm.pending_wal_entries()
}
pub fn set_snapshot_interval(&mut self, n: usize) {
self.pm.snapshot_interval = n;
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn basic_anomaly_detection() {
let conf = AnomalyzerConf {
sensitivity: 0.1,
upper_bound: 100.0,
lower_bound: NA,
active_size: 1,
n_seasons: 4,
perm_count: 1000,
methods: vec!["magnitude".to_string(), "highrank".to_string()],
..Default::default()
};
let mut anom = Anomalyzer::new(conf, Some(vec![2.0, 2.1, 2.2, 2.0, 2.3])).unwrap();
let p_normal = anom.push(2.15);
assert!(p_normal < 0.7);
let p_anomalous = anom.push(9.0);
println!("Anomalous probability: {}", p_anomalous);
assert!(p_anomalous > 0.75);
let p_recovery = anom.push(2.4);
assert!(p_recovery < 0.8);
}
#[test]
fn cdf_sensitivity_test() {
let conf = AnomalyzerConf {
sensitivity: 0.1,
upper_bound: 100.0,
lower_bound: NA,
active_size: 3,
n_seasons: 2,
perm_count: 500,
methods: vec!["cdf".to_string()],
..Default::default()
};
let mut anom = Anomalyzer::new(
conf,
Some(vec![10.0, 10.1, 10.05, 10.2, 10.15, 10.1]),
)
.unwrap();
anom.push(10.25);
anom.push(10.1);
let p_normal = anom.push(10.22);
println!("CDF normal probability: {}", p_normal);
assert!(p_normal < 0.7);
anom.push(12.0);
anom.push(8.5);
let p_volatile = anom.push(13.0);
println!("CDF volatile probability: {}", p_volatile);
assert!(p_volatile > 0.8);
}
#[test]
fn diff_sensitivity_test() {
let conf = AnomalyzerConf {
sensitivity: 0.1,
upper_bound: 100.0,
lower_bound: NA,
active_size: 4,
n_seasons: 2,
perm_count: 1000,
methods: vec!["diff".to_string()],
..Default::default()
};
let mut anom = Anomalyzer::new(
conf,
Some(vec![
10.0, 10.2, 10.1, 10.3, 10.2, 10.4, 10.3, 10.5,
]),
)
.unwrap();
anom.push(10.6);
anom.push(10.5);
anom.push(10.7);
let p_normal = anom.push(10.6);
println!("Diff normal probability: {}", p_normal);
assert!(p_normal < 0.6);
anom.push(13.0);
anom.push(9.0);
anom.push(14.0);
let p_volatile = anom.push(10.0);
println!("Diff high volatility probability: {}", p_volatile);
assert!(p_volatile > 0.9);
}
#[test]
fn ks_sensitivity_test() {
let conf = AnomalyzerConf {
sensitivity: 0.1,
upper_bound: 100.0,
lower_bound: NA,
active_size: 5,
n_seasons: 2,
perm_count: 1000,
methods: vec!["ks".to_string()],
..Default::default()
};
let mut anom = Anomalyzer::new(
conf,
Some(vec![9.8, 10.2, 9.9, 10.1, 10.0, 10.3, 9.7, 10.4, 10.1, 9.9]),
)
.unwrap();
anom.push(10.0);
anom.push(10.2);
anom.push(9.8);
anom.push(10.1);
let p_normal = anom.push(10.0);
println!("KS normal probability: {}", p_normal);
assert!(
p_normal < 0.5,
"Similar distribution should give low KS prob: got {}",
p_normal
);
anom.push(12.0);
anom.push(12.5);
anom.push(11.8);
anom.push(12.2);
let p_shift = anom.push(12.1);
println!("KS shift probability: {}", p_shift);
assert!(
p_shift > 0.9,
"Distribution shift should trigger high KS prob: got {}",
p_shift
);
}
#[cfg(feature = "async")]
#[tokio::test]
async fn async_matches_sync_behaviour() {
use crate::async_anomalyzer::AsyncAnomalyzer;
let conf = AnomalyzerConf {
active_size: 1,
n_seasons: 4,
perm_count: 1000,
methods: vec!["magnitude".to_string(), "highrank".to_string()],
..Default::default()
};
let detector = AsyncAnomalyzer::new(
conf,
Some(vec![2.0, 2.1, 2.2, 2.0, 2.3]),
)
.await
.unwrap();
let p_normal = detector.push(2.15).await;
assert!(p_normal < 0.7, "expected normal, got {p_normal}");
let p_anomalous = detector.push(9.0).await;
assert!(p_anomalous > 0.75, "expected anomaly, got {p_anomalous}");
}
}
#[cfg(all(test, feature = "persist"))]
mod persist_tests {
use super::*;
use tempfile::tempdir;
fn conf() -> AnomalyzerConf {
AnomalyzerConf {
active_size: 1,
n_seasons: 4,
perm_count: 200,
methods: vec!["magnitude".into(), "highrank".into()],
..Default::default()
}
}
#[test]
fn survives_restart() {
let dir = tempdir().unwrap();
let prob_before = {
let mut d = PersistentAnomalyzer::open(dir.path(), conf()).unwrap();
for v in [10.0f64, 10.1, 10.2, 10.0] {
d.push(v).unwrap();
}
d.push(15.0).unwrap()
};
let prob_after = {
let mut d = PersistentAnomalyzer::open(dir.path(), conf()).unwrap();
d.eval()
};
assert!(prob_before > 0.5, "before restart: {prob_before}");
assert!(prob_after > 0.5, "after restart: {prob_after}");
}
#[test]
fn wal_truncated_after_compact() {
let dir = tempdir().unwrap();
let mut d = PersistentAnomalyzer::open(dir.path(), conf()).unwrap();
d.set_snapshot_interval(5);
for v in [10.0f64, 10.1, 10.2, 10.0, 10.3] {
d.push(v).unwrap();
}
assert_eq!(d.wal_size_bytes().unwrap(), 0);
}
#[test]
fn partial_wal_tail_tolerated() {
use std::io::Write;
let dir = tempdir().unwrap();
{
let mut d = PersistentAnomalyzer::open(dir.path(), conf()).unwrap();
for v in [10.0f64, 10.1, 10.2] {
d.push(v).unwrap();
}
}
let wal = dir.path().join("anomalyzer.wal");
let mut f = std::fs::OpenOptions::new().append(true).open(&wal).unwrap();
f.write_all(&[0xAE, 0xFF]).unwrap();
let d = PersistentAnomalyzer::open(dir.path(), conf()).unwrap();
assert!(d.pending_wal_entries() == 0 || d.wal_size_bytes().unwrap() < 100);
}
}