Skip to main content

anomalyzer_ts/
lib.rs

1//! # anomalyzer-ts
2//!
3//! A fast, lightweight, probabilistic anomaly detection library for streaming time-series data.
4//!
5//! Inspired by Etsy's Skyline.
6//!
7//! It supports multiple statistical tests (magnitude, CDF, diff, rank, KS) combined into a single anomaly probability (0.0 to 1.0).
8//!
9//! ## Features
10//!
11//! - Streaming: push values one by one
12//! - Configurable active/reference window sizes
13//! - Multiple detection methods
14//! - Dynamic weighting of strong signals
15//! - No heavy dependencies
16//! - Optional async support via `--features async`
17//! - Optional WAL + snapshot persistence via `--features persist`
18//!
19//! ## Quick Example
20//!
21//! ```rust
22//! use anomalyzer_ts::{Anomalyzer, AnomalyzerConf, NA};
23//!
24//! let conf = AnomalyzerConf {
25//!     active_size: 1,
26//!     n_seasons: 4,
27//!     methods: vec!["magnitude".to_string(), "highrank".to_string()],
28//!     ..Default::default()
29//! };
30//!
31//! let mut detector = Anomalyzer::new(conf, Some(vec![2.0, 2.1, 2.2, 2.0, 2.3])).unwrap();
32//!
33//! assert!(detector.push(2.15) < 0.7);  // normal
34//! assert!(detector.push(9.0) > 0.75);   // anomalous!
35//! ```
36//!
37//! ## Async Example
38//!
39//! ```rust,ignore
40//! # #[cfg(feature = "async")]
41//! # #[tokio::main]
42//! # async fn main() {
43//! use anomalyzer_ts::{AnomalyzerConf, async_anomalyzer::AsyncAnomalyzer};
44//!
45//! let conf = AnomalyzerConf {
46//!     active_size: 1,
47//!     n_seasons: 4,
48//!     methods: vec!["magnitude".into(), "highrank".into()],
49//!     ..Default::default()
50//! };
51//!
52//! let detector = AsyncAnomalyzer::new(conf, Some(vec![2.0, 2.1, 2.2, 2.0, 2.3]))
53//!     .await
54//!     .unwrap();
55//!
56//! let prob = detector.push(9.0).await;
57//! println!("Anomaly probability: {prob:.3}");
58//! # }
59//! ```
60//!
61//! ## Persistent Example
62//!
63//! ```rust,ignore
64//! # #[cfg(feature = "persist")]
65//! # fn main() -> std::io::Result<()> {
66//! use anomalyzer_ts::{AnomalyzerConf, PersistentAnomalyzer};
67//!
68//! let conf = AnomalyzerConf {
69//!     active_size: 1,
70//!     n_seasons: 4,
71//!     methods: vec!["magnitude".into(), "highrank".into()],
72//!     ..Default::default()
73//! };
74//!
75//! // History is restored automatically on subsequent runs.
76//! let mut detector = PersistentAnomalyzer::open("/var/lib/myapp/anomaly", conf)?;
77//!
78//! let prob = detector.push(15.0)?;
79//! println!("prob = {prob:.3}");
80//!
81//! detector.flush()?; // compact on clean shutdown
82//! # Ok(())
83//! # }
84//! ```
85
86use ndarray::Array1;
87use rand::seq::SliceRandom;
88use rand::thread_rng;
89
90/// Async wrapper around [`Anomalyzer`]. Only available with `--features async`.
91#[cfg(feature = "async")]
92pub mod async_anomalyzer;
93
94/// WAL + snapshot persistence. Only available with `--features persist`.
95#[cfg(feature = "persist")]
96pub mod persistence;
97
98/// Special value to disable lower bound (like Go's anomalyzer.NA)
99pub const NA: f64 = f64::INFINITY;
100
101#[derive(Clone, Debug)]
102pub struct AnomalyzerConf {
103    pub sensitivity: f64,
104    pub upper_bound: f64,
105    pub lower_bound: f64,
106    pub active_size: usize,
107    pub n_seasons: usize,
108    pub perm_count: usize,
109    pub methods: Vec<String>,
110}
111
112impl Default for AnomalyzerConf {
113    fn default() -> Self {
114        Self {
115            sensitivity: 0.1,
116            upper_bound: 100.0,
117            lower_bound: NA,
118            active_size: 1,
119            n_seasons: 4,
120            perm_count: 500,
121            methods: vec!["magnitude".to_string(), "cdf".to_string()],
122        }
123    }
124}
125
126pub struct Anomalyzer {
127    conf: AnomalyzerConf,
128    pub(crate) data: Vec<f64>,
129}
130
131impl Anomalyzer {
132    pub fn new(conf: AnomalyzerConf, initial_data: Option<Vec<f64>>) -> Result<Self, String> {
133        if conf.active_size == 0 {
134            return Err("active_size must be at least 1".to_string());
135        }
136
137        let reference_size = conf.n_seasons * conf.active_size;
138        if reference_size < conf.active_size {
139            return Err("reference window too small".to_string());
140        }
141
142        let methods = if conf.methods.is_empty() {
143            vec!["magnitude".to_string(), "cdf".to_string()]
144        } else {
145            conf.methods.clone()
146        };
147
148        let mut validated_conf = conf;
149        validated_conf.methods = methods;
150        if validated_conf.perm_count == 0 {
151            validated_conf.perm_count = 500;
152        }
153        if validated_conf.sensitivity == 0.0 {
154            validated_conf.sensitivity = 0.1;
155        }
156
157        let data = initial_data.unwrap_or_default();
158
159        Ok(Anomalyzer {
160            conf: validated_conf,
161            data,
162        })
163    }
164
165    fn reference_size(&self) -> usize {
166        self.conf.n_seasons * self.conf.active_size
167    }
168
169    fn extract_windows(&self) -> Option<(Array1<f64>, Array1<f64>)> {
170        let total_needed = self.conf.active_size + self.reference_size();
171        if self.data.len() < total_needed {
172            return None;
173        }
174
175        let len = self.data.len();
176        let ref_start = len - total_needed;
177        let active_start = len - self.conf.active_size;
178
179        let reference = Array1::from_vec(self.data[ref_start..active_start].to_vec());
180        let active = Array1::from_vec(self.data[active_start..].to_vec());
181
182        Some((reference, active))
183    }
184
185    pub fn push(&mut self, value: f64) -> f64 {
186        self.data.push(value);
187
188        let needed = self.conf.active_size + self.reference_size();
189        if self.data.len() > needed * 3 {
190            let keep_start = self.data.len() - (needed * 2);
191            self.data.drain(..keep_start);
192        }
193
194        self.eval()
195    }
196
197    pub fn eval(&self) -> f64 {
198        let (reference, active) = match self.extract_windows() {
199            Some(w) => w,
200            None => return 0.0,
201        };
202
203        let mut prob_map: Vec<(&str, f64)> = Vec::new();
204        let mut rank_prob = 0.0f64;
205
206        for method in &self.conf.methods {
207            let prob = match method.as_str() {
208                "magnitude" => self.magnitude_test(&reference, &active),
209                "fence"     => self.fence_test(&active),
210                "cdf"       => self.cdf_test(&reference, &active),
211                "diff"      => self.diff_test(&reference, &active),
212                "highrank"  => self.rank_test(&reference, &active, true),
213                "lowrank"   => self.rank_test(&reference, &active, false),
214                "ks"        => self.bootstrap_ks_test(&reference, &active),
215                _           => continue,
216            };
217
218            if prob.is_nan() || prob < 0.0 || prob > 1.0 {
219                continue;
220            }
221
222            if method == "highrank" || method == "lowrank" {
223                rank_prob = rank_prob.max(prob);
224                continue;
225            }
226
227            if method == "magnitude" && prob < self.conf.sensitivity {
228                return 0.0;
229            }
230
231            prob_map.push((method, prob));
232        }
233
234        if (self.conf.methods.contains(&"highrank".to_string())
235            || self.conf.methods.contains(&"lowrank".to_string()))
236            && rank_prob > 0.0
237        {
238            prob_map.push(("rank", rank_prob));
239        }
240
241        let mut weighted_sum = 0.0;
242        let mut weight_sum = 0.0;
243
244        for (name, prob) in prob_map {
245            let weight = if (name == "magnitude" || name == "fence") && prob > 0.8 {
246                5.0
247            } else {
248                0.5
249            };
250            weighted_sum += prob * weight;
251            weight_sum += weight;
252        }
253
254        if weight_sum > 0.0 {
255            weighted_sum / weight_sum
256        } else {
257            0.0
258        }
259    }
260
261    // === ALGORITHMS ===
262
263    fn magnitude_test(&self, reference: &Array1<f64>, active: &Array1<f64>) -> f64 {
264        let ref_mean = reference.mean().unwrap_or(0.0);
265        let act_mean = active.mean().unwrap_or(0.0);
266
267        if ref_mean == 0.0 {
268            return if act_mean == 0.0 { 0.0 } else { 1.0 };
269        }
270
271        let pdiff = (act_mean - ref_mean).abs() / ref_mean;
272        let exp_val = 10.0f64.powf(pdiff.min(5.0));
273        (exp_val - 1.0) / 9.0
274    }
275
276    fn fence_test(&self, active: &Array1<f64>) -> f64 {
277        let x = active.mean().unwrap_or(0.0);
278        let distance = if self.conf.lower_bound.is_infinite() {
279            (x / self.conf.upper_bound).min(2.0)
280        } else {
281            let mid = (self.conf.upper_bound + self.conf.lower_bound) / 2.0;
282            let bound = (self.conf.upper_bound - self.conf.lower_bound) / 2.0;
283            ((x - mid).abs() / bound).min(2.0)
284        };
285        (10.0f64.powf(distance) - 1.0) / 9.0
286    }
287
288    fn cdf_test(&self, reference: &Array1<f64>, active: &Array1<f64>) -> f64 {
289        if reference.len() < 2 || active.len() < 2 {
290            return 0.5;
291        }
292
293        let ref_diffs: Vec<f64> = reference
294            .windows(2)
295            .into_iter()
296            .map(|w| (w[1] - w[0]).abs())
297            .collect();
298
299        let act_diffs: Vec<f64> = active
300            .windows(2)
301            .into_iter()
302            .map(|w| (w[1] - w[0]).abs())
303            .collect();
304
305        if ref_diffs.is_empty() || act_diffs.is_empty() {
306            return 0.5;
307        }
308
309        let active_mean_diff = act_diffs.iter().sum::<f64>() / act_diffs.len() as f64;
310
311        let mut sorted_ref: Vec<f64> = ref_diffs;
312        sorted_ref.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
313
314        let count_le = sorted_ref.iter().filter(|&&v| v <= active_mean_diff).count();
315        let percentile = count_le as f64 / sorted_ref.len() as f64;
316
317        let prob = 2.0 * (0.5 - percentile).abs();
318        prob.min(1.0)
319    }
320
321    fn diff_test(&self, reference: &Array1<f64>, active: &Array1<f64>) -> f64 {
322        if reference.len() < 2 || active.len() < 2 {
323            return 0.5;
324        }
325
326        let ref_diffs: Vec<f64> = reference
327            .windows(2)
328            .into_iter()
329            .map(|w| (w[1] - w[0]).abs())
330            .collect();
331
332        let act_diffs: Vec<f64> = active
333            .windows(2)
334            .into_iter()
335            .map(|w| (w[1] - w[0]).abs())
336            .collect();
337
338        if ref_diffs.is_empty() || act_diffs.is_empty() {
339            return 0.5;
340        }
341
342        let mut combined_diffs = ref_diffs.clone();
343        combined_diffs.extend(act_diffs.iter().cloned());
344
345        let mut indexed: Vec<(f64, usize)> = combined_diffs
346            .iter()
347            .copied()
348            .enumerate()
349            .map(|(i, v)| (v, i))
350            .collect();
351        indexed.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(std::cmp::Ordering::Equal));
352
353        let mut ranks = vec![0usize; combined_diffs.len()];
354        for (rank, &(_, idx)) in indexed.iter().enumerate() {
355            ranks[idx] = rank + 1;
356        }
357
358        let n_ref = ref_diffs.len();
359        let active_sum: usize = ranks[n_ref..].iter().sum();
360
361        let mut significant = 0;
362        let mut rng = thread_rng();
363
364        for _ in 0..self.conf.perm_count {
365            let mut perm = combined_diffs.clone();
366            perm.shuffle(&mut rng);
367
368            let mut perm_indexed: Vec<(f64, usize)> = perm
369                .iter()
370                .copied()
371                .enumerate()
372                .map(|(i, v)| (v, i))
373                .collect();
374            perm_indexed.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(std::cmp::Ordering::Equal));
375
376            let mut perm_ranks = vec![0usize; perm.len()];
377            for (rank, &(_, idx)) in perm_indexed.iter().enumerate() {
378                perm_ranks[idx] = rank + 1;
379            }
380
381            let perm_active_sum: usize = perm_ranks[n_ref..].iter().sum();
382
383            if perm_active_sum < active_sum {
384                significant += 1;
385            }
386        }
387
388        significant as f64 / self.conf.perm_count as f64
389    }
390
391    fn rank_test(&self, reference: &Array1<f64>, active: &Array1<f64>, high: bool) -> f64 {
392        let mut combined = reference.to_vec();
393        combined.extend(active.iter());
394        let n_active = active.len();
395
396        let mut indexed: Vec<(f64, usize)> = combined
397            .iter()
398            .copied()
399            .enumerate()
400            .map(|(idx, val)| (val, idx))
401            .collect();
402        indexed.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(std::cmp::Ordering::Equal));
403
404        let mut ranks = vec![0usize; combined.len()];
405        for (rank, &(_, idx)) in indexed.iter().enumerate() {
406            ranks[idx] = rank + 1;
407        }
408        let active_sum: usize = ranks[combined.len() - n_active..].iter().sum();
409
410        let mut significant = 0;
411        let mut rng = thread_rng();
412
413        for _ in 0..self.conf.perm_count {
414            let mut perm = combined.clone();
415            perm.shuffle(&mut rng);
416
417            let mut perm_indexed: Vec<(f64, usize)> = perm
418                .iter()
419                .copied()
420                .enumerate()
421                .map(|(idx, val)| (val, idx))
422                .collect();
423            perm_indexed.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(std::cmp::Ordering::Equal));
424
425            let mut perm_ranks = vec![0usize; perm.len()];
426            for (rank, &(_, idx)) in perm_indexed.iter().enumerate() {
427                perm_ranks[idx] = rank + 1;
428            }
429            let perm_active_sum: usize = perm_ranks[perm.len() - n_active..].iter().sum();
430
431            if high {
432                if perm_active_sum < active_sum {
433                    significant += 1;
434                }
435            } else {
436                if perm_active_sum > active_sum {
437                    significant += 1;
438                }
439            }
440        }
441
442        significant as f64 / self.conf.perm_count as f64
443    }
444
445    fn bootstrap_ks_test(&self, reference: &Array1<f64>, active: &Array1<f64>) -> f64 {
446        if reference.is_empty() || active.is_empty() {
447            return 0.5;
448        }
449
450        let ks_stat = self.ks_statistic(reference, active);
451
452        if ks_stat.is_nan() || ks_stat == 0.0 {
453            return 0.5;
454        }
455
456        let mut combined = reference.to_vec();
457        combined.extend(active.iter().cloned());
458
459        let mut significant = 0;
460        let mut rng = thread_rng();
461
462        for _ in 0..self.conf.perm_count {
463            let mut perm = combined.clone();
464            perm.shuffle(&mut rng);
465
466            let perm_ref = Array1::from_vec(perm[..reference.len()].to_vec());
467            let perm_act = Array1::from_vec(perm[reference.len()..].to_vec());
468
469            let perm_ks = self.ks_statistic(&perm_ref, &perm_act);
470
471            if perm_ks < ks_stat {
472                significant += 1;
473            }
474        }
475
476        significant as f64 / self.conf.perm_count as f64
477    }
478
479    fn ks_statistic(&self, a: &Array1<f64>, b: &Array1<f64>) -> f64 {
480        if a.is_empty() || b.is_empty() {
481            return f64::NAN;
482        }
483
484        let mut sorted_a = a.to_vec();
485        let mut sorted_b = b.to_vec();
486        sorted_a.sort_by(|x, y| x.partial_cmp(y).unwrap_or(std::cmp::Ordering::Equal));
487        sorted_b.sort_by(|x, y| x.partial_cmp(y).unwrap_or(std::cmp::Ordering::Equal));
488
489        let mut i = 0;
490        let mut j = 0;
491        let mut max_diff: f64 = 0.0;
492
493        while i < sorted_a.len() && j < sorted_b.len() {
494            if sorted_a[i] <= sorted_b[j] {
495                let ecdf_a = (i + 1) as f64 / sorted_a.len() as f64;
496                let ecdf_b = j as f64 / sorted_b.len() as f64;
497                max_diff = max_diff.max((ecdf_a - ecdf_b).abs());
498                i += 1;
499            } else {
500                let ecdf_a = i as f64 / sorted_a.len() as f64;
501                let ecdf_b = (j + 1) as f64 / sorted_b.len() as f64;
502                max_diff = max_diff.max((ecdf_a - ecdf_b).abs());
503                j += 1;
504            }
505        }
506
507        while i < sorted_a.len() {
508            let ecdf_a = (i + 1) as f64 / sorted_a.len() as f64;
509            let ecdf_b = 1.0;
510            max_diff = max_diff.max((ecdf_a - ecdf_b).abs());
511            i += 1;
512        }
513
514        while j < sorted_b.len() {
515            let ecdf_a = 1.0;
516            let ecdf_b = (j + 1) as f64 / sorted_b.len() as f64;
517            max_diff = max_diff.max((ecdf_a - ecdf_b).abs());
518            j += 1;
519        }
520
521        max_diff
522    }
523}
524
525// =============================================================================
526// PersistentAnomalyzer
527// =============================================================================
528
529/// A thin wrapper that pairs an [`Anomalyzer`] with a
530/// [`PersistenceManager`](persistence::PersistenceManager).
531///
532/// Every `push` is durably written to the WAL before the probability is
533/// returned, and a snapshot is compacted automatically every
534/// `snapshot_interval` pushes (default 1 000).
535///
536/// # Example
537/// ```rust,no_run
538/// # #[cfg(feature = "persist")]
539/// # fn main() -> std::io::Result<()> {
540/// use anomalyzer_ts::{AnomalyzerConf, PersistentAnomalyzer};
541///
542/// let conf = AnomalyzerConf {
543///     active_size: 1,
544///     n_seasons: 4,
545///     methods: vec!["magnitude".into(), "highrank".into()],
546///     ..Default::default()
547/// };
548///
549/// // First run — no prior state, starts fresh.
550/// // Subsequent runs — history is restored from disk automatically.
551/// let mut detector = PersistentAnomalyzer::open("/var/lib/myapp/anomaly", conf)?;
552///
553/// let prob = detector.push(15.0)?;
554/// println!("prob = {prob:.3}");
555///
556/// detector.flush()?; // compact on clean shutdown
557/// # Ok(())
558/// # }
559/// ```
560#[cfg(feature = "persist")]
561pub struct PersistentAnomalyzer {
562    inner: Anomalyzer,
563    pm: persistence::PersistenceManager,
564}
565
566#[cfg(feature = "persist")]
567impl PersistentAnomalyzer {
568    /// Open the persistence directory, recover history, and return a ready
569    /// detector. Creates the directory if it does not exist.
570    pub fn open(
571        dir: impl AsRef<std::path::Path>,
572        conf: AnomalyzerConf,
573    ) -> std::io::Result<Self> {
574        let mut pm = persistence::PersistenceManager::open(&dir)?;
575        let recovered = pm.recover()?;
576
577        let initial = if recovered.is_empty() {
578            None
579        } else {
580            Some(recovered)
581        };
582
583        let inner = Anomalyzer::new(conf, initial)
584            .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidInput, e))?;
585
586        Ok(Self { inner, pm })
587    }
588
589    /// Push a new value, persist it durably, and return the anomaly probability.
590    pub fn push(&mut self, value: f64) -> std::io::Result<f64> {
591        let prob = self.inner.push(value);
592        self.pm.record_push(value, &self.inner.data)?;
593        Ok(prob)
594    }
595
596    /// Re-evaluate the current windows without pushing a new value.
597    pub fn eval(&self) -> f64 {
598        self.inner.eval()
599    }
600
601    /// Force a snapshot + WAL truncation immediately. Call on clean shutdown
602    /// to minimise replay time at next startup.
603    pub fn flush(&mut self) -> std::io::Result<()> {
604        self.pm.compact(&self.inner.data)
605    }
606
607    /// Bytes currently accumulated in the WAL.
608    pub fn wal_size_bytes(&self) -> std::io::Result<u64> {
609        self.pm.wal_size_bytes()
610    }
611
612    /// WAL entries pending since the last snapshot.
613    pub fn pending_wal_entries(&self) -> usize {
614        self.pm.pending_wal_entries()
615    }
616
617    /// Adjust how often an automatic snapshot is taken (default: 1 000 pushes).
618    pub fn set_snapshot_interval(&mut self, n: usize) {
619        self.pm.snapshot_interval = n;
620    }
621}
622
623// =============================================================================
624// Tests
625// =============================================================================
626
627#[cfg(test)]
628mod tests {
629    use super::*;
630
631    #[test]
632    fn basic_anomaly_detection() {
633        let conf = AnomalyzerConf {
634            sensitivity: 0.1,
635            upper_bound: 100.0,
636            lower_bound: NA,
637            active_size: 1,
638            n_seasons: 4,
639            perm_count: 1000,
640            methods: vec!["magnitude".to_string(), "highrank".to_string()],
641            ..Default::default()
642        };
643
644        let mut anom = Anomalyzer::new(conf, Some(vec![2.0, 2.1, 2.2, 2.0, 2.3])).unwrap();
645
646        let p_normal = anom.push(2.15);
647        assert!(p_normal < 0.7);
648
649        let p_anomalous = anom.push(9.0);
650        println!("Anomalous probability: {}", p_anomalous);
651        assert!(p_anomalous > 0.75);
652
653        let p_recovery = anom.push(2.4);
654        assert!(p_recovery < 0.8);
655    }
656
657    #[test]
658    fn cdf_sensitivity_test() {
659        let conf = AnomalyzerConf {
660            sensitivity: 0.1,
661            upper_bound: 100.0,
662            lower_bound: NA,
663            active_size: 3,
664            n_seasons: 2,
665            perm_count: 500,
666            methods: vec!["cdf".to_string()],
667            ..Default::default()
668        };
669
670        let mut anom = Anomalyzer::new(
671            conf,
672            Some(vec![10.0, 10.1, 10.05, 10.2, 10.15, 10.1]),
673        )
674        .unwrap();
675
676        anom.push(10.25);
677        anom.push(10.1);
678        let p_normal = anom.push(10.22);
679        println!("CDF normal probability: {}", p_normal);
680        assert!(p_normal < 0.7);
681
682        anom.push(12.0);
683        anom.push(8.5);
684        let p_volatile = anom.push(13.0);
685        println!("CDF volatile probability: {}", p_volatile);
686        assert!(p_volatile > 0.8);
687    }
688
689    #[test]
690    fn diff_sensitivity_test() {
691        let conf = AnomalyzerConf {
692            sensitivity: 0.1,
693            upper_bound: 100.0,
694            lower_bound: NA,
695            active_size: 4,
696            n_seasons: 2,
697            perm_count: 1000,
698            methods: vec!["diff".to_string()],
699            ..Default::default()
700        };
701
702        let mut anom = Anomalyzer::new(
703            conf,
704            Some(vec![
705                10.0, 10.2, 10.1, 10.3, 10.2, 10.4, 10.3, 10.5,
706            ]),
707        )
708        .unwrap();
709
710        anom.push(10.6);
711        anom.push(10.5);
712        anom.push(10.7);
713        let p_normal = anom.push(10.6);
714        println!("Diff normal probability: {}", p_normal);
715        assert!(p_normal < 0.6);
716
717        anom.push(13.0);
718        anom.push(9.0);
719        anom.push(14.0);
720        let p_volatile = anom.push(10.0);
721        println!("Diff high volatility probability: {}", p_volatile);
722        assert!(p_volatile > 0.9);
723    }
724
725    #[test]
726    fn ks_sensitivity_test() {
727        let conf = AnomalyzerConf {
728            sensitivity: 0.1,
729            upper_bound: 100.0,
730            lower_bound: NA,
731            active_size: 5,
732            n_seasons: 2,
733            perm_count: 1000,
734            methods: vec!["ks".to_string()],
735            ..Default::default()
736        };
737
738        let mut anom = Anomalyzer::new(
739            conf,
740            Some(vec![9.8, 10.2, 9.9, 10.1, 10.0, 10.3, 9.7, 10.4, 10.1, 9.9]),
741        )
742        .unwrap();
743
744        anom.push(10.0);
745        anom.push(10.2);
746        anom.push(9.8);
747        anom.push(10.1);
748        let p_normal = anom.push(10.0);
749        println!("KS normal probability: {}", p_normal);
750        assert!(
751            p_normal < 0.5,
752            "Similar distribution should give low KS prob: got {}",
753            p_normal
754        );
755
756        anom.push(12.0);
757        anom.push(12.5);
758        anom.push(11.8);
759        anom.push(12.2);
760        let p_shift = anom.push(12.1);
761        println!("KS shift probability: {}", p_shift);
762        assert!(
763            p_shift > 0.9,
764            "Distribution shift should trigger high KS prob: got {}",
765            p_shift
766        );
767    }
768
769    /// Verify AsyncAnomalyzer produces consistent results with sync Anomalyzer.
770    #[cfg(feature = "async")]
771    #[tokio::test]
772    async fn async_matches_sync_behaviour() {
773        use crate::async_anomalyzer::AsyncAnomalyzer;
774
775        let conf = AnomalyzerConf {
776            active_size: 1,
777            n_seasons: 4,
778            perm_count: 1000,
779            methods: vec!["magnitude".to_string(), "highrank".to_string()],
780            ..Default::default()
781        };
782
783        let detector = AsyncAnomalyzer::new(
784            conf,
785            Some(vec![2.0, 2.1, 2.2, 2.0, 2.3]),
786        )
787        .await
788        .unwrap();
789
790        let p_normal = detector.push(2.15).await;
791        assert!(p_normal < 0.7, "expected normal, got {p_normal}");
792
793        let p_anomalous = detector.push(9.0).await;
794        assert!(p_anomalous > 0.75, "expected anomaly, got {p_anomalous}");
795    }
796}
797
798#[cfg(all(test, feature = "persist"))]
799mod persist_tests {
800    use super::*;
801    use tempfile::tempdir;
802
803    fn conf() -> AnomalyzerConf {
804        AnomalyzerConf {
805            active_size: 1,
806            n_seasons: 4,
807            perm_count: 200,
808            methods: vec!["magnitude".into(), "highrank".into()],
809            ..Default::default()
810        }
811    }
812
813    #[test]
814    fn survives_restart() {
815        let dir = tempdir().unwrap();
816
817        // First "process" — push some values.
818        let prob_before = {
819            let mut d = PersistentAnomalyzer::open(dir.path(), conf()).unwrap();
820            for v in [10.0f64, 10.1, 10.2, 10.0] {
821                d.push(v).unwrap();
822            }
823            d.push(15.0).unwrap()
824        }; // dropped — simulates process exit
825
826        // Second "process" — history recovered from disk.
827        let prob_after = {
828            let mut d = PersistentAnomalyzer::open(dir.path(), conf()).unwrap();
829            d.eval()
830        };
831
832        // Both runs should see the same spike as anomalous.
833        assert!(prob_before > 0.5, "before restart: {prob_before}");
834        assert!(prob_after > 0.5, "after restart: {prob_after}");
835    }
836
837    #[test]
838    fn wal_truncated_after_compact() {
839        let dir = tempdir().unwrap();
840        let mut d = PersistentAnomalyzer::open(dir.path(), conf()).unwrap();
841        d.set_snapshot_interval(5);
842
843        for v in [10.0f64, 10.1, 10.2, 10.0, 10.3] {
844            d.push(v).unwrap();
845        }
846        // 5th push triggers compaction → WAL should be empty.
847        assert_eq!(d.wal_size_bytes().unwrap(), 0);
848    }
849
850    #[test]
851    fn partial_wal_tail_tolerated() {
852        use std::io::Write;
853
854        let dir = tempdir().unwrap();
855
856        // Write good data.
857        {
858            let mut d = PersistentAnomalyzer::open(dir.path(), conf()).unwrap();
859            for v in [10.0f64, 10.1, 10.2] {
860                d.push(v).unwrap();
861            }
862        }
863
864        // Corrupt the last few bytes (simulate crash mid-write).
865        let wal = dir.path().join("anomalyzer.wal");
866        let mut f = std::fs::OpenOptions::new().append(true).open(&wal).unwrap();
867        f.write_all(&[0xAE, 0xFF]).unwrap(); // partial record
868
869        // Should recover cleanly, ignoring the torn tail.
870        let d = PersistentAnomalyzer::open(dir.path(), conf()).unwrap();
871        assert!(d.pending_wal_entries() == 0 || d.wal_size_bytes().unwrap() < 100);
872    }
873}