1use ndarray::Array1;
87use rand::seq::SliceRandom;
88use rand::thread_rng;
89
90#[cfg(feature = "async")]
92pub mod async_anomalyzer;
93
94#[cfg(feature = "persist")]
96pub mod persistence;
97
98pub const NA: f64 = f64::INFINITY;
100
101#[derive(Clone, Debug)]
102pub struct AnomalyzerConf {
103 pub sensitivity: f64,
104 pub upper_bound: f64,
105 pub lower_bound: f64,
106 pub active_size: usize,
107 pub n_seasons: usize,
108 pub perm_count: usize,
109 pub methods: Vec<String>,
110}
111
112impl Default for AnomalyzerConf {
113 fn default() -> Self {
114 Self {
115 sensitivity: 0.1,
116 upper_bound: 100.0,
117 lower_bound: NA,
118 active_size: 1,
119 n_seasons: 4,
120 perm_count: 500,
121 methods: vec!["magnitude".to_string(), "cdf".to_string()],
122 }
123 }
124}
125
126pub struct Anomalyzer {
127 conf: AnomalyzerConf,
128 pub(crate) data: Vec<f64>,
129}
130
131impl Anomalyzer {
132 pub fn new(conf: AnomalyzerConf, initial_data: Option<Vec<f64>>) -> Result<Self, String> {
133 if conf.active_size == 0 {
134 return Err("active_size must be at least 1".to_string());
135 }
136
137 let reference_size = conf.n_seasons * conf.active_size;
138 if reference_size < conf.active_size {
139 return Err("reference window too small".to_string());
140 }
141
142 let methods = if conf.methods.is_empty() {
143 vec!["magnitude".to_string(), "cdf".to_string()]
144 } else {
145 conf.methods.clone()
146 };
147
148 let mut validated_conf = conf;
149 validated_conf.methods = methods;
150 if validated_conf.perm_count == 0 {
151 validated_conf.perm_count = 500;
152 }
153 if validated_conf.sensitivity == 0.0 {
154 validated_conf.sensitivity = 0.1;
155 }
156
157 let data = initial_data.unwrap_or_default();
158
159 Ok(Anomalyzer {
160 conf: validated_conf,
161 data,
162 })
163 }
164
165 fn reference_size(&self) -> usize {
166 self.conf.n_seasons * self.conf.active_size
167 }
168
169 fn extract_windows(&self) -> Option<(Array1<f64>, Array1<f64>)> {
170 let total_needed = self.conf.active_size + self.reference_size();
171 if self.data.len() < total_needed {
172 return None;
173 }
174
175 let len = self.data.len();
176 let ref_start = len - total_needed;
177 let active_start = len - self.conf.active_size;
178
179 let reference = Array1::from_vec(self.data[ref_start..active_start].to_vec());
180 let active = Array1::from_vec(self.data[active_start..].to_vec());
181
182 Some((reference, active))
183 }
184
185 pub fn push(&mut self, value: f64) -> f64 {
186 self.data.push(value);
187
188 let needed = self.conf.active_size + self.reference_size();
189 if self.data.len() > needed * 3 {
190 let keep_start = self.data.len() - (needed * 2);
191 self.data.drain(..keep_start);
192 }
193
194 self.eval()
195 }
196
197 pub fn eval(&self) -> f64 {
198 let (reference, active) = match self.extract_windows() {
199 Some(w) => w,
200 None => return 0.0,
201 };
202
203 let mut prob_map: Vec<(&str, f64)> = Vec::new();
204 let mut rank_prob = 0.0f64;
205
206 for method in &self.conf.methods {
207 let prob = match method.as_str() {
208 "magnitude" => self.magnitude_test(&reference, &active),
209 "fence" => self.fence_test(&active),
210 "cdf" => self.cdf_test(&reference, &active),
211 "diff" => self.diff_test(&reference, &active),
212 "highrank" => self.rank_test(&reference, &active, true),
213 "lowrank" => self.rank_test(&reference, &active, false),
214 "ks" => self.bootstrap_ks_test(&reference, &active),
215 _ => continue,
216 };
217
218 if prob.is_nan() || prob < 0.0 || prob > 1.0 {
219 continue;
220 }
221
222 if method == "highrank" || method == "lowrank" {
223 rank_prob = rank_prob.max(prob);
224 continue;
225 }
226
227 if method == "magnitude" && prob < self.conf.sensitivity {
228 return 0.0;
229 }
230
231 prob_map.push((method, prob));
232 }
233
234 if (self.conf.methods.contains(&"highrank".to_string())
235 || self.conf.methods.contains(&"lowrank".to_string()))
236 && rank_prob > 0.0
237 {
238 prob_map.push(("rank", rank_prob));
239 }
240
241 let mut weighted_sum = 0.0;
242 let mut weight_sum = 0.0;
243
244 for (name, prob) in prob_map {
245 let weight = if (name == "magnitude" || name == "fence") && prob > 0.8 {
246 5.0
247 } else {
248 0.5
249 };
250 weighted_sum += prob * weight;
251 weight_sum += weight;
252 }
253
254 if weight_sum > 0.0 {
255 weighted_sum / weight_sum
256 } else {
257 0.0
258 }
259 }
260
261 fn magnitude_test(&self, reference: &Array1<f64>, active: &Array1<f64>) -> f64 {
264 let ref_mean = reference.mean().unwrap_or(0.0);
265 let act_mean = active.mean().unwrap_or(0.0);
266
267 if ref_mean == 0.0 {
268 return if act_mean == 0.0 { 0.0 } else { 1.0 };
269 }
270
271 let pdiff = (act_mean - ref_mean).abs() / ref_mean;
272 let exp_val = 10.0f64.powf(pdiff.min(5.0));
273 (exp_val - 1.0) / 9.0
274 }
275
276 fn fence_test(&self, active: &Array1<f64>) -> f64 {
277 let x = active.mean().unwrap_or(0.0);
278 let distance = if self.conf.lower_bound.is_infinite() {
279 (x / self.conf.upper_bound).min(2.0)
280 } else {
281 let mid = (self.conf.upper_bound + self.conf.lower_bound) / 2.0;
282 let bound = (self.conf.upper_bound - self.conf.lower_bound) / 2.0;
283 ((x - mid).abs() / bound).min(2.0)
284 };
285 (10.0f64.powf(distance) - 1.0) / 9.0
286 }
287
288 fn cdf_test(&self, reference: &Array1<f64>, active: &Array1<f64>) -> f64 {
289 if reference.len() < 2 || active.len() < 2 {
290 return 0.5;
291 }
292
293 let ref_diffs: Vec<f64> = reference
294 .windows(2)
295 .into_iter()
296 .map(|w| (w[1] - w[0]).abs())
297 .collect();
298
299 let act_diffs: Vec<f64> = active
300 .windows(2)
301 .into_iter()
302 .map(|w| (w[1] - w[0]).abs())
303 .collect();
304
305 if ref_diffs.is_empty() || act_diffs.is_empty() {
306 return 0.5;
307 }
308
309 let active_mean_diff = act_diffs.iter().sum::<f64>() / act_diffs.len() as f64;
310
311 let mut sorted_ref: Vec<f64> = ref_diffs;
312 sorted_ref.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
313
314 let count_le = sorted_ref.iter().filter(|&&v| v <= active_mean_diff).count();
315 let percentile = count_le as f64 / sorted_ref.len() as f64;
316
317 let prob = 2.0 * (0.5 - percentile).abs();
318 prob.min(1.0)
319 }
320
321 fn diff_test(&self, reference: &Array1<f64>, active: &Array1<f64>) -> f64 {
322 if reference.len() < 2 || active.len() < 2 {
323 return 0.5;
324 }
325
326 let ref_diffs: Vec<f64> = reference
327 .windows(2)
328 .into_iter()
329 .map(|w| (w[1] - w[0]).abs())
330 .collect();
331
332 let act_diffs: Vec<f64> = active
333 .windows(2)
334 .into_iter()
335 .map(|w| (w[1] - w[0]).abs())
336 .collect();
337
338 if ref_diffs.is_empty() || act_diffs.is_empty() {
339 return 0.5;
340 }
341
342 let mut combined_diffs = ref_diffs.clone();
343 combined_diffs.extend(act_diffs.iter().cloned());
344
345 let mut indexed: Vec<(f64, usize)> = combined_diffs
346 .iter()
347 .copied()
348 .enumerate()
349 .map(|(i, v)| (v, i))
350 .collect();
351 indexed.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(std::cmp::Ordering::Equal));
352
353 let mut ranks = vec![0usize; combined_diffs.len()];
354 for (rank, &(_, idx)) in indexed.iter().enumerate() {
355 ranks[idx] = rank + 1;
356 }
357
358 let n_ref = ref_diffs.len();
359 let active_sum: usize = ranks[n_ref..].iter().sum();
360
361 let mut significant = 0;
362 let mut rng = thread_rng();
363
364 for _ in 0..self.conf.perm_count {
365 let mut perm = combined_diffs.clone();
366 perm.shuffle(&mut rng);
367
368 let mut perm_indexed: Vec<(f64, usize)> = perm
369 .iter()
370 .copied()
371 .enumerate()
372 .map(|(i, v)| (v, i))
373 .collect();
374 perm_indexed.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(std::cmp::Ordering::Equal));
375
376 let mut perm_ranks = vec![0usize; perm.len()];
377 for (rank, &(_, idx)) in perm_indexed.iter().enumerate() {
378 perm_ranks[idx] = rank + 1;
379 }
380
381 let perm_active_sum: usize = perm_ranks[n_ref..].iter().sum();
382
383 if perm_active_sum < active_sum {
384 significant += 1;
385 }
386 }
387
388 significant as f64 / self.conf.perm_count as f64
389 }
390
391 fn rank_test(&self, reference: &Array1<f64>, active: &Array1<f64>, high: bool) -> f64 {
392 let mut combined = reference.to_vec();
393 combined.extend(active.iter());
394 let n_active = active.len();
395
396 let mut indexed: Vec<(f64, usize)> = combined
397 .iter()
398 .copied()
399 .enumerate()
400 .map(|(idx, val)| (val, idx))
401 .collect();
402 indexed.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(std::cmp::Ordering::Equal));
403
404 let mut ranks = vec![0usize; combined.len()];
405 for (rank, &(_, idx)) in indexed.iter().enumerate() {
406 ranks[idx] = rank + 1;
407 }
408 let active_sum: usize = ranks[combined.len() - n_active..].iter().sum();
409
410 let mut significant = 0;
411 let mut rng = thread_rng();
412
413 for _ in 0..self.conf.perm_count {
414 let mut perm = combined.clone();
415 perm.shuffle(&mut rng);
416
417 let mut perm_indexed: Vec<(f64, usize)> = perm
418 .iter()
419 .copied()
420 .enumerate()
421 .map(|(idx, val)| (val, idx))
422 .collect();
423 perm_indexed.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(std::cmp::Ordering::Equal));
424
425 let mut perm_ranks = vec![0usize; perm.len()];
426 for (rank, &(_, idx)) in perm_indexed.iter().enumerate() {
427 perm_ranks[idx] = rank + 1;
428 }
429 let perm_active_sum: usize = perm_ranks[perm.len() - n_active..].iter().sum();
430
431 if high {
432 if perm_active_sum < active_sum {
433 significant += 1;
434 }
435 } else {
436 if perm_active_sum > active_sum {
437 significant += 1;
438 }
439 }
440 }
441
442 significant as f64 / self.conf.perm_count as f64
443 }
444
445 fn bootstrap_ks_test(&self, reference: &Array1<f64>, active: &Array1<f64>) -> f64 {
446 if reference.is_empty() || active.is_empty() {
447 return 0.5;
448 }
449
450 let ks_stat = self.ks_statistic(reference, active);
451
452 if ks_stat.is_nan() || ks_stat == 0.0 {
453 return 0.5;
454 }
455
456 let mut combined = reference.to_vec();
457 combined.extend(active.iter().cloned());
458
459 let mut significant = 0;
460 let mut rng = thread_rng();
461
462 for _ in 0..self.conf.perm_count {
463 let mut perm = combined.clone();
464 perm.shuffle(&mut rng);
465
466 let perm_ref = Array1::from_vec(perm[..reference.len()].to_vec());
467 let perm_act = Array1::from_vec(perm[reference.len()..].to_vec());
468
469 let perm_ks = self.ks_statistic(&perm_ref, &perm_act);
470
471 if perm_ks < ks_stat {
472 significant += 1;
473 }
474 }
475
476 significant as f64 / self.conf.perm_count as f64
477 }
478
479 fn ks_statistic(&self, a: &Array1<f64>, b: &Array1<f64>) -> f64 {
480 if a.is_empty() || b.is_empty() {
481 return f64::NAN;
482 }
483
484 let mut sorted_a = a.to_vec();
485 let mut sorted_b = b.to_vec();
486 sorted_a.sort_by(|x, y| x.partial_cmp(y).unwrap_or(std::cmp::Ordering::Equal));
487 sorted_b.sort_by(|x, y| x.partial_cmp(y).unwrap_or(std::cmp::Ordering::Equal));
488
489 let mut i = 0;
490 let mut j = 0;
491 let mut max_diff: f64 = 0.0;
492
493 while i < sorted_a.len() && j < sorted_b.len() {
494 if sorted_a[i] <= sorted_b[j] {
495 let ecdf_a = (i + 1) as f64 / sorted_a.len() as f64;
496 let ecdf_b = j as f64 / sorted_b.len() as f64;
497 max_diff = max_diff.max((ecdf_a - ecdf_b).abs());
498 i += 1;
499 } else {
500 let ecdf_a = i as f64 / sorted_a.len() as f64;
501 let ecdf_b = (j + 1) as f64 / sorted_b.len() as f64;
502 max_diff = max_diff.max((ecdf_a - ecdf_b).abs());
503 j += 1;
504 }
505 }
506
507 while i < sorted_a.len() {
508 let ecdf_a = (i + 1) as f64 / sorted_a.len() as f64;
509 let ecdf_b = 1.0;
510 max_diff = max_diff.max((ecdf_a - ecdf_b).abs());
511 i += 1;
512 }
513
514 while j < sorted_b.len() {
515 let ecdf_a = 1.0;
516 let ecdf_b = (j + 1) as f64 / sorted_b.len() as f64;
517 max_diff = max_diff.max((ecdf_a - ecdf_b).abs());
518 j += 1;
519 }
520
521 max_diff
522 }
523}
524
525#[cfg(feature = "persist")]
561pub struct PersistentAnomalyzer {
562 inner: Anomalyzer,
563 pm: persistence::PersistenceManager,
564}
565
566#[cfg(feature = "persist")]
567impl PersistentAnomalyzer {
568 pub fn open(
571 dir: impl AsRef<std::path::Path>,
572 conf: AnomalyzerConf,
573 ) -> std::io::Result<Self> {
574 let mut pm = persistence::PersistenceManager::open(&dir)?;
575 let recovered = pm.recover()?;
576
577 let initial = if recovered.is_empty() {
578 None
579 } else {
580 Some(recovered)
581 };
582
583 let inner = Anomalyzer::new(conf, initial)
584 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidInput, e))?;
585
586 Ok(Self { inner, pm })
587 }
588
589 pub fn push(&mut self, value: f64) -> std::io::Result<f64> {
591 let prob = self.inner.push(value);
592 self.pm.record_push(value, &self.inner.data)?;
593 Ok(prob)
594 }
595
596 pub fn eval(&self) -> f64 {
598 self.inner.eval()
599 }
600
601 pub fn flush(&mut self) -> std::io::Result<()> {
604 self.pm.compact(&self.inner.data)
605 }
606
607 pub fn wal_size_bytes(&self) -> std::io::Result<u64> {
609 self.pm.wal_size_bytes()
610 }
611
612 pub fn pending_wal_entries(&self) -> usize {
614 self.pm.pending_wal_entries()
615 }
616
617 pub fn set_snapshot_interval(&mut self, n: usize) {
619 self.pm.snapshot_interval = n;
620 }
621}
622
623#[cfg(test)]
628mod tests {
629 use super::*;
630
631 #[test]
632 fn basic_anomaly_detection() {
633 let conf = AnomalyzerConf {
634 sensitivity: 0.1,
635 upper_bound: 100.0,
636 lower_bound: NA,
637 active_size: 1,
638 n_seasons: 4,
639 perm_count: 1000,
640 methods: vec!["magnitude".to_string(), "highrank".to_string()],
641 ..Default::default()
642 };
643
644 let mut anom = Anomalyzer::new(conf, Some(vec![2.0, 2.1, 2.2, 2.0, 2.3])).unwrap();
645
646 let p_normal = anom.push(2.15);
647 assert!(p_normal < 0.7);
648
649 let p_anomalous = anom.push(9.0);
650 println!("Anomalous probability: {}", p_anomalous);
651 assert!(p_anomalous > 0.75);
652
653 let p_recovery = anom.push(2.4);
654 assert!(p_recovery < 0.8);
655 }
656
657 #[test]
658 fn cdf_sensitivity_test() {
659 let conf = AnomalyzerConf {
660 sensitivity: 0.1,
661 upper_bound: 100.0,
662 lower_bound: NA,
663 active_size: 3,
664 n_seasons: 2,
665 perm_count: 500,
666 methods: vec!["cdf".to_string()],
667 ..Default::default()
668 };
669
670 let mut anom = Anomalyzer::new(
671 conf,
672 Some(vec![10.0, 10.1, 10.05, 10.2, 10.15, 10.1]),
673 )
674 .unwrap();
675
676 anom.push(10.25);
677 anom.push(10.1);
678 let p_normal = anom.push(10.22);
679 println!("CDF normal probability: {}", p_normal);
680 assert!(p_normal < 0.7);
681
682 anom.push(12.0);
683 anom.push(8.5);
684 let p_volatile = anom.push(13.0);
685 println!("CDF volatile probability: {}", p_volatile);
686 assert!(p_volatile > 0.8);
687 }
688
689 #[test]
690 fn diff_sensitivity_test() {
691 let conf = AnomalyzerConf {
692 sensitivity: 0.1,
693 upper_bound: 100.0,
694 lower_bound: NA,
695 active_size: 4,
696 n_seasons: 2,
697 perm_count: 1000,
698 methods: vec!["diff".to_string()],
699 ..Default::default()
700 };
701
702 let mut anom = Anomalyzer::new(
703 conf,
704 Some(vec![
705 10.0, 10.2, 10.1, 10.3, 10.2, 10.4, 10.3, 10.5,
706 ]),
707 )
708 .unwrap();
709
710 anom.push(10.6);
711 anom.push(10.5);
712 anom.push(10.7);
713 let p_normal = anom.push(10.6);
714 println!("Diff normal probability: {}", p_normal);
715 assert!(p_normal < 0.6);
716
717 anom.push(13.0);
718 anom.push(9.0);
719 anom.push(14.0);
720 let p_volatile = anom.push(10.0);
721 println!("Diff high volatility probability: {}", p_volatile);
722 assert!(p_volatile > 0.9);
723 }
724
725 #[test]
726 fn ks_sensitivity_test() {
727 let conf = AnomalyzerConf {
728 sensitivity: 0.1,
729 upper_bound: 100.0,
730 lower_bound: NA,
731 active_size: 5,
732 n_seasons: 2,
733 perm_count: 1000,
734 methods: vec!["ks".to_string()],
735 ..Default::default()
736 };
737
738 let mut anom = Anomalyzer::new(
739 conf,
740 Some(vec![9.8, 10.2, 9.9, 10.1, 10.0, 10.3, 9.7, 10.4, 10.1, 9.9]),
741 )
742 .unwrap();
743
744 anom.push(10.0);
745 anom.push(10.2);
746 anom.push(9.8);
747 anom.push(10.1);
748 let p_normal = anom.push(10.0);
749 println!("KS normal probability: {}", p_normal);
750 assert!(
751 p_normal < 0.5,
752 "Similar distribution should give low KS prob: got {}",
753 p_normal
754 );
755
756 anom.push(12.0);
757 anom.push(12.5);
758 anom.push(11.8);
759 anom.push(12.2);
760 let p_shift = anom.push(12.1);
761 println!("KS shift probability: {}", p_shift);
762 assert!(
763 p_shift > 0.9,
764 "Distribution shift should trigger high KS prob: got {}",
765 p_shift
766 );
767 }
768
769 #[cfg(feature = "async")]
771 #[tokio::test]
772 async fn async_matches_sync_behaviour() {
773 use crate::async_anomalyzer::AsyncAnomalyzer;
774
775 let conf = AnomalyzerConf {
776 active_size: 1,
777 n_seasons: 4,
778 perm_count: 1000,
779 methods: vec!["magnitude".to_string(), "highrank".to_string()],
780 ..Default::default()
781 };
782
783 let detector = AsyncAnomalyzer::new(
784 conf,
785 Some(vec![2.0, 2.1, 2.2, 2.0, 2.3]),
786 )
787 .await
788 .unwrap();
789
790 let p_normal = detector.push(2.15).await;
791 assert!(p_normal < 0.7, "expected normal, got {p_normal}");
792
793 let p_anomalous = detector.push(9.0).await;
794 assert!(p_anomalous > 0.75, "expected anomaly, got {p_anomalous}");
795 }
796}
797
798#[cfg(all(test, feature = "persist"))]
799mod persist_tests {
800 use super::*;
801 use tempfile::tempdir;
802
803 fn conf() -> AnomalyzerConf {
804 AnomalyzerConf {
805 active_size: 1,
806 n_seasons: 4,
807 perm_count: 200,
808 methods: vec!["magnitude".into(), "highrank".into()],
809 ..Default::default()
810 }
811 }
812
813 #[test]
814 fn survives_restart() {
815 let dir = tempdir().unwrap();
816
817 let prob_before = {
819 let mut d = PersistentAnomalyzer::open(dir.path(), conf()).unwrap();
820 for v in [10.0f64, 10.1, 10.2, 10.0] {
821 d.push(v).unwrap();
822 }
823 d.push(15.0).unwrap()
824 }; let prob_after = {
828 let mut d = PersistentAnomalyzer::open(dir.path(), conf()).unwrap();
829 d.eval()
830 };
831
832 assert!(prob_before > 0.5, "before restart: {prob_before}");
834 assert!(prob_after > 0.5, "after restart: {prob_after}");
835 }
836
837 #[test]
838 fn wal_truncated_after_compact() {
839 let dir = tempdir().unwrap();
840 let mut d = PersistentAnomalyzer::open(dir.path(), conf()).unwrap();
841 d.set_snapshot_interval(5);
842
843 for v in [10.0f64, 10.1, 10.2, 10.0, 10.3] {
844 d.push(v).unwrap();
845 }
846 assert_eq!(d.wal_size_bytes().unwrap(), 0);
848 }
849
850 #[test]
851 fn partial_wal_tail_tolerated() {
852 use std::io::Write;
853
854 let dir = tempdir().unwrap();
855
856 {
858 let mut d = PersistentAnomalyzer::open(dir.path(), conf()).unwrap();
859 for v in [10.0f64, 10.1, 10.2] {
860 d.push(v).unwrap();
861 }
862 }
863
864 let wal = dir.path().join("anomalyzer.wal");
866 let mut f = std::fs::OpenOptions::new().append(true).open(&wal).unwrap();
867 f.write_all(&[0xAE, 0xFF]).unwrap(); let d = PersistentAnomalyzer::open(dir.path(), conf()).unwrap();
871 assert!(d.pending_wal_entries() == 0 || d.wal_size_bytes().unwrap() < 100);
872 }
873}