anomalyzer-ts 0.1.0

Probabilistic anomaly detection for time-series data
Documentation

anomalyzer-ts

A fast, lightweight, probabilistic anomaly detection library for streaming time-series data.

Inspired by Etsy's Skyline.

Features

  • Streaming — push values one at a time; no batch required
  • Multiple statistical tests — magnitude, CDF, diff, rank, KS
  • Configurable windows and methods — tune sensitivity, window sizes, and which tests to run
  • Dynamic weighting — strong signals are amplified automatically
  • Optional async support — non-blocking via AsyncAnomalyzer (requires --features async)
  • Optional persistence — WAL + snapshot durability via PersistentAnomalyzer (requires --features persist)
  • No heavy dependencies — just ndarray and rand in the default build

Installation

# Default (sync only)
anomalyzer-ts = "0.1"

# With async support
anomalyzer-ts = { version = "0.1", features = ["async"] }

# With persistence
anomalyzer-ts = { version = "0.1", features = ["persist"] }

# All features
anomalyzer-ts = { version = "0.1", features = ["async", "persist"] }

Or via cargo:

cargo add anomalyzer-ts
cargo add anomalyzer-ts --features async
cargo add anomalyzer-ts --features persist

Quick Start

Sync

use anomalyzer_ts::{Anomalyzer, AnomalyzerConf, NA};

let conf = AnomalyzerConf {
    active_size: 1,
    n_seasons: 4,
    methods: vec!["magnitude".to_string(), "highrank".to_string()],
    ..Default::default()
};

let mut detector = Anomalyzer::new(conf, Some(vec![10.0, 10.1, 10.2, 10.0])).unwrap();

let prob = detector.push(15.0);
println!("Anomaly probability: {prob:.3}");

Async

Requires --features async. AsyncAnomalyzer wraps Anomalyzer behind a tokio::sync::Mutex and offloads CPU-bound work to spawn_blocking so it never blocks the async executor.

use anomalyzer_ts::{AnomalyzerConf, async_anomalyzer::AsyncAnomalyzer};

#[tokio::main]
async fn main() {
    let conf = AnomalyzerConf {
        active_size: 1,
        n_seasons: 4,
        methods: vec!["magnitude".to_string(), "highrank".to_string()],
        ..Default::default()
    };

    let detector = AsyncAnomalyzer::new(conf, Some(vec![10.0, 10.1, 10.2, 10.0]))
        .await
        .unwrap();

    let prob = detector.push(15.0).await;
    println!("Anomaly probability: {prob:.3}");
}

Persistent

Requires --features persist. PersistentAnomalyzer pairs the detector with a WAL + snapshot persistence manager. History is recovered automatically on restart — no manual save/load needed.

use anomalyzer_ts::{AnomalyzerConf, PersistentAnomalyzer};

fn main() -> std::io::Result<()> {
    let conf = AnomalyzerConf {
        active_size: 1,
        n_seasons: 4,
        methods: vec!["magnitude".to_string(), "highrank".to_string()],
        ..Default::default()
    };

    // Opens the directory, recovers prior history, and returns a ready detector.
    // Creates the directory if it does not exist.
    let mut detector = PersistentAnomalyzer::open("/var/lib/myapp/anomaly", conf)?;

    let prob = detector.push(15.0)?;
    println!("Anomaly probability: {prob:.3}");

    // Optional: compact WAL on clean shutdown to speed up next startup.
    detector.flush()?;

    Ok(())
}

Persistence

How It Works

PersistentAnomalyzer uses a WAL (write-ahead log) + snapshot strategy, similar to Redis AOF+RDB or RocksDB:

<dir>/
  anomalyzer.snap       — latest compacted snapshot  (bincode)
  anomalyzer.snap.tmp   — atomic write staging file
  anomalyzer.wal        — append-only log since last snapshot

On every push:

  1. The new value is appended to the WAL and flushed with fsync before returning.
  2. After snapshot_interval pushes (default: 1 000), a new snapshot is written atomically and the WAL is truncated.

On startup (open):

  1. The latest snapshot is loaded.
  2. WAL entries written after the snapshot are replayed on top.
  3. Torn/corrupt WAL tails (e.g. from a crash mid-write) are silently skipped.

Durability Guarantees

  • Each push is durable before it returns — a crash after push will not lose that value.
  • Snapshots are written to .snap.tmp then atomically renamed, so a crash mid-snapshot never corrupts the previous good snapshot.
  • A corrupt or truncated WAL tail is tolerated; only complete records are replayed.

Compaction / Snapshot Interval

// Compact every 500 pushes instead of the default 1 000.
detector.set_snapshot_interval(500);

// Force an immediate snapshot + WAL truncation (e.g. on clean shutdown).
detector.flush()?;

Diagnostics

// Bytes currently accumulated in the WAL.
let wal_bytes = detector.wal_size_bytes()?;

// WAL entries pending since the last snapshot.
let pending = detector.pending_wal_entries();

Examples

Basic usage

cargo run --example basic

Pushes a handful of values into a sync detector and prints the anomaly probability for each.

Simulated CPU monitoring with spike detection

cargo run --example cpu_monitoring

Simulates 100 samples of normal CPU load followed by a spike sequence. Demonstrates fence and highrank methods alongside magnitude and cdf for threshold-aware detection.

Async basic

cargo run --example async_basic --features async

The async equivalent of basic. Shows normal values, a high spike, and a dip — flags anything above 0.7 probability.

Async multi-sensor

cargo run --example async_multi_sensor --features async

Runs three independent AsyncAnomalyzer detectors concurrently via tokio::join!, each with different configuration:

  • temperature — magnitude + highrank, subtle baseline then a sharp spike
  • pressure — magnitude + cdf, gradual climb over time
  • vibration — ks + diff, noisy-but-bounded baseline then a burst

Async streaming pipeline

cargo run --example async_streaming_pipeline --features async

A realistic producer/consumer pipeline over a tokio::sync::mpsc channel. A producer task emits timestamped metrics; a detector loop feeds them into AsyncAnomalyzer and prints ✅ ok / ⚠️ warn / 🚨 ALERT status. Maps directly to production patterns such as Kafka consumers, WebSocket streams, or metrics scrape loops.

Persistent basic

cargo run --example persistent_basic --features persist

Demonstrates WAL + snapshot persistence across two simulated "process runs": the first run seeds a baseline and pushes a spike, the second restores history from disk and confirms the spike is still detected.


Configuration

Field Type Default Description
active_size usize 1 Number of most-recent values treated as the active window
n_seasons usize 4 Number of seasons (reference window = n_seasons × active_size)
sensitivity f64 0.1 Minimum magnitude change to flag; returns 0.0 if below threshold
upper_bound f64 100.0 Upper fence boundary
lower_bound f64 NA Lower fence boundary (NA = disabled)
perm_count usize 500 Permutations for rank/diff/KS tests
methods Vec<String> ["magnitude","cdf"] Detection methods to use (see below)

Detection Methods

Method Description
magnitude Relative mean shift between reference and active windows
fence How far the active mean sits outside configured bounds
cdf CDF-based test on first-difference volatility
diff Permutation rank test on absolute differences
highrank Permutation test — active values ranked higher than reference
lowrank Permutation test — active values ranked lower than reference
ks Bootstrap Kolmogorov-Smirnov distribution shift test

Feature Flags

Feature Enables Extra dependencies
(none) Anomalyzer (sync)
async AsyncAnomalyzer tokio
persist PersistentAnomalyzer serde, bincode

License

Apache-2.0