anomalyzer-ts 0.1.0

Probabilistic anomaly detection for time-series data
Documentation

# anomalyzer-ts

A fast, lightweight, probabilistic anomaly detection library for streaming time-series data.

Inspired by Etsy's Skyline.

## Features

- **Streaming** — push values one at a time; no batch required
- **Multiple statistical tests** — magnitude, CDF, diff, rank, KS
- **Configurable windows and methods** — tune sensitivity, window sizes, and which tests to run
- **Dynamic weighting** — strong signals are amplified automatically
- **Optional async support** — non-blocking via `AsyncAnomalyzer` (requires `--features async`)
- **Optional persistence** — WAL + snapshot durability via `PersistentAnomalyzer` (requires `--features persist`)
- **No heavy dependencies** — just `ndarray` and `rand` in the default build

---

## Installation

```toml
# Default (sync only)
anomalyzer-ts = "0.1"

# With async support
anomalyzer-ts = { version = "0.1", features = ["async"] }

# With persistence
anomalyzer-ts = { version = "0.1", features = ["persist"] }

# All features
anomalyzer-ts = { version = "0.1", features = ["async", "persist"] }
```

Or via cargo:

```bash
cargo add anomalyzer-ts
cargo add anomalyzer-ts --features async
cargo add anomalyzer-ts --features persist
```

---

## Quick Start

### Sync

```rust
use anomalyzer_ts::{Anomalyzer, AnomalyzerConf, NA};

let conf = AnomalyzerConf {
    active_size: 1,
    n_seasons: 4,
    methods: vec!["magnitude".to_string(), "highrank".to_string()],
    ..Default::default()
};

let mut detector = Anomalyzer::new(conf, Some(vec![10.0, 10.1, 10.2, 10.0])).unwrap();

let prob = detector.push(15.0);
println!("Anomaly probability: {prob:.3}");
```

### Async

Requires `--features async`. `AsyncAnomalyzer` wraps `Anomalyzer` behind a `tokio::sync::Mutex`
and offloads CPU-bound work to `spawn_blocking` so it never blocks the async executor.

```rust
use anomalyzer_ts::{AnomalyzerConf, async_anomalyzer::AsyncAnomalyzer};

#[tokio::main]
async fn main() {
    let conf = AnomalyzerConf {
        active_size: 1,
        n_seasons: 4,
        methods: vec!["magnitude".to_string(), "highrank".to_string()],
        ..Default::default()
    };

    let detector = AsyncAnomalyzer::new(conf, Some(vec![10.0, 10.1, 10.2, 10.0]))
        .await
        .unwrap();

    let prob = detector.push(15.0).await;
    println!("Anomaly probability: {prob:.3}");
}
```

### Persistent

Requires `--features persist`. `PersistentAnomalyzer` pairs the detector with a WAL + snapshot
persistence manager. History is recovered automatically on restart — no manual save/load needed.

```rust
use anomalyzer_ts::{AnomalyzerConf, PersistentAnomalyzer};

fn main() -> std::io::Result<()> {
    let conf = AnomalyzerConf {
        active_size: 1,
        n_seasons: 4,
        methods: vec!["magnitude".to_string(), "highrank".to_string()],
        ..Default::default()
    };

    // Opens the directory, recovers prior history, and returns a ready detector.
    // Creates the directory if it does not exist.
    let mut detector = PersistentAnomalyzer::open("/var/lib/myapp/anomaly", conf)?;

    let prob = detector.push(15.0)?;
    println!("Anomaly probability: {prob:.3}");

    // Optional: compact WAL on clean shutdown to speed up next startup.
    detector.flush()?;

    Ok(())
}
```

---

## Persistence

### How It Works

`PersistentAnomalyzer` uses a **WAL (write-ahead log) + snapshot** strategy, similar to Redis AOF+RDB or RocksDB:

```
<dir>/
  anomalyzer.snap       — latest compacted snapshot  (bincode)
  anomalyzer.snap.tmp   — atomic write staging file
  anomalyzer.wal        — append-only log since last snapshot
```

**On every `push`:**
1. The new value is appended to the WAL and flushed with `fsync` before returning.
2. After `snapshot_interval` pushes (default: 1 000), a new snapshot is written atomically and the WAL is truncated.

**On startup (`open`):**
1. The latest snapshot is loaded.
2. WAL entries written after the snapshot are replayed on top.
3. Torn/corrupt WAL tails (e.g. from a crash mid-write) are silently skipped.

### Durability Guarantees

- Each `push` is durable before it returns — a crash after `push` will not lose that value.
- Snapshots are written to `.snap.tmp` then atomically renamed, so a crash mid-snapshot never corrupts the previous good snapshot.
- A corrupt or truncated WAL tail is tolerated; only complete records are replayed.

### Compaction / Snapshot Interval

```rust
// Compact every 500 pushes instead of the default 1 000.
detector.set_snapshot_interval(500);

// Force an immediate snapshot + WAL truncation (e.g. on clean shutdown).
detector.flush()?;
```

### Diagnostics

```rust
// Bytes currently accumulated in the WAL.
let wal_bytes = detector.wal_size_bytes()?;

// WAL entries pending since the last snapshot.
let pending = detector.pending_wal_entries();
```

---

## Examples

### Basic usage
```bash
cargo run --example basic
```
Pushes a handful of values into a sync detector and prints the anomaly probability for each.

### Simulated CPU monitoring with spike detection
```bash
cargo run --example cpu_monitoring
```
Simulates 100 samples of normal CPU load followed by a spike sequence. Demonstrates `fence` and `highrank` methods alongside `magnitude` and `cdf` for threshold-aware detection.

### Async basic
```bash
cargo run --example async_basic --features async
```
The async equivalent of `basic`. Shows normal values, a high spike, and a dip — flags anything above 0.7 probability.

### Async multi-sensor
```bash
cargo run --example async_multi_sensor --features async
```
Runs three independent `AsyncAnomalyzer` detectors concurrently via `tokio::join!`, each with different configuration:
- **temperature** — magnitude + highrank, subtle baseline then a sharp spike
- **pressure** — magnitude + cdf, gradual climb over time
- **vibration** — ks + diff, noisy-but-bounded baseline then a burst

### Async streaming pipeline
```bash
cargo run --example async_streaming_pipeline --features async
```
A realistic producer/consumer pipeline over a `tokio::sync::mpsc` channel. A producer task emits timestamped metrics; a detector loop feeds them into `AsyncAnomalyzer` and prints `✅ ok` / `⚠️ warn` / `🚨 ALERT` status. Maps directly to production patterns such as Kafka consumers, WebSocket streams, or metrics scrape loops.

### Persistent basic
```bash
cargo run --example persistent_basic --features persist
```
Demonstrates WAL + snapshot persistence across two simulated "process runs": the first run seeds a baseline and pushes a spike, the second restores history from disk and confirms the spike is still detected.

---

## Configuration

| Field | Type | Default | Description |
|---|---|---|---|
| `active_size` | `usize` | `1` | Number of most-recent values treated as the active window |
| `n_seasons` | `usize` | `4` | Number of seasons (reference window = `n_seasons × active_size`) |
| `sensitivity` | `f64` | `0.1` | Minimum magnitude change to flag; returns `0.0` if below threshold |
| `upper_bound` | `f64` | `100.0` | Upper fence boundary |
| `lower_bound` | `f64` | `NA` | Lower fence boundary (`NA` = disabled) |
| `perm_count` | `usize` | `500` | Permutations for rank/diff/KS tests |
| `methods` | `Vec<String>` | `["magnitude","cdf"]` | Detection methods to use (see below) |

---

## Detection Methods

| Method | Description |
|---|---|
| `magnitude` | Relative mean shift between reference and active windows |
| `fence` | How far the active mean sits outside configured bounds |
| `cdf` | CDF-based test on first-difference volatility |
| `diff` | Permutation rank test on absolute differences |
| `highrank` | Permutation test — active values ranked higher than reference |
| `lowrank` | Permutation test — active values ranked lower than reference |
| `ks` | Bootstrap Kolmogorov-Smirnov distribution shift test |

---

## Feature Flags

| Feature | Enables | Extra dependencies |
|---|---|---|
| _(none)_ | `Anomalyzer` (sync) ||
| `async` | `AsyncAnomalyzer` | `tokio` |
| `persist` | `PersistentAnomalyzer` | `serde`, `bincode` |

---

## License

Apache-2.0