anomalyzer-ts 0.1.0

Probabilistic anomaly detection for time-series data
Documentation
// src/persistence.rs
//! Production-grade persistence for [`Anomalyzer`] using a WAL + snapshot strategy.
//!
//! ## Design (mirrors Redis AOF + RDB, RocksDB WAL)
//!
//! ```text
//! <dir>/
//!   anomalyzer.snap          — latest compacted snapshot  (bincode)
//!   anomalyzer.snap.tmp      — atomic write staging file
//!   anomalyzer.wal           — append-only log of every push() since last snap
//! ```
//!
//! **Recovery order**
//! 1. Load snapshot → deserialise the full `data` window
//! 2. Replay WAL entries appended after the snapshot was written
//! 3. Truncate WAL (start fresh from the new in-memory state)
//!
//! **Durability guarantee**  
//! Each `push` is flushed to the WAL with `fsync` before returning.
//! Snapshots are written to a `.tmp` file then atomically renamed, so a
//! crash mid-snapshot never corrupts the previous good snapshot.
//!
//! **Compaction**  
//! After `snapshot_interval` pushes the manager writes a new snapshot and
//! truncates the WAL, bounding recovery time to at most
//! `snapshot_interval` WAL entries.

#![cfg(feature = "persist")]

use std::{
    fs::{self, File, OpenOptions},
    io::{self, BufReader, BufWriter, Read, Seek, SeekFrom, Write},
    path::{Path, PathBuf},
};

use serde::{Deserialize, Serialize};

// ── on-disk formats ────────────────────────────────────────────────────────

/// Snapshot: a complete, compacted state dump.
#[derive(Serialize, Deserialize)]
pub(crate) struct Snapshot {
    /// The version tag lets us evolve the format without silent corruption.
    pub version: u8,
    /// Full `data` ring-buffer at snapshot time.
    pub data: Vec<f64>,
}

/// A single WAL entry — one `push` call.
///
/// Encoded as:  `[ MAGIC(1) | f64-le(8) ]` = 9 bytes per record.
/// The fixed magic byte lets the reader detect truncated/corrupt tails.
const WAL_MAGIC: u8 = 0xAE;
const WAL_RECORD_LEN: usize = 9; // 1 magic + 8 f64

// ── public manager ─────────────────────────────────────────────────────────

/// Manages WAL + snapshot persistence for an [`crate::Anomalyzer`] data window.
///
/// Obtain one via [`PersistenceManager::open`], then call
/// [`recover`](PersistenceManager::recover) to get the initial `data` vec,
/// and [`record_push`](PersistenceManager::record_push) on every `push`.
pub struct PersistenceManager {
    snap_path: PathBuf,
    snap_tmp_path: PathBuf,
    wal_path: PathBuf,
    wal_file: File,
    /// Pushes recorded since the last snapshot.
    pushes_since_snap: usize,
    /// Compact after this many pushes (default 1 000).
    pub snapshot_interval: usize,
}

impl PersistenceManager {
    /// Open (or create) the persistence directory.
    ///
    /// # Errors
    /// Returns `io::Error` if the directory cannot be created or files opened.
    pub fn open(dir: impl AsRef<Path>) -> io::Result<Self> {
        let dir = dir.as_ref();
        fs::create_dir_all(dir)?;

        let snap_path = dir.join("anomalyzer.snap");
        let snap_tmp_path = dir.join("anomalyzer.snap.tmp");
        let wal_path = dir.join("anomalyzer.wal");

        // Open WAL in append mode; create if absent.
        let wal_file = OpenOptions::new()
            .create(true)
            .append(true)
            .open(&wal_path)?;

        Ok(Self {
            snap_path,
            snap_tmp_path,
            wal_path,
            wal_file,
            pushes_since_snap: 0,
            snapshot_interval: 1_000,
        })
    }

    // ── recovery ────────────────────────────────────────────────────────────

    /// Reconstruct the `data` window from snapshot + WAL.
    ///
    /// Call once at startup; the returned `Vec<f64>` is passed as
    /// `initial_data` to [`crate::Anomalyzer::new`].
    pub fn recover(&self) -> io::Result<Vec<f64>> {
        let mut data = self.load_snapshot()?;
        self.replay_wal(&mut data)?;
        Ok(data)
    }

    fn load_snapshot(&self) -> io::Result<Vec<f64>> {
        if !self.snap_path.exists() {
            return Ok(Vec::new());
        }

        let file = File::open(&self.snap_path)?;
        let mut reader = BufReader::new(file);
        let mut bytes = Vec::new();
        reader.read_to_end(&mut bytes)?;

        let snap: Snapshot = bincode::deserialize(&bytes)
            .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;

        if snap.version != 1 {
            return Err(io::Error::new(
                io::ErrorKind::InvalidData,
                format!("unsupported snapshot version {}", snap.version),
            ));
        }

        Ok(snap.data)
    }

    fn replay_wal(&self, data: &mut Vec<f64>) -> io::Result<()> {
        if !self.wal_path.exists() {
            return Ok(());
        }

        let file = File::open(&self.wal_path)?;
        let file_len = file.metadata()?.len() as usize;
        let mut reader = BufReader::new(file);
        let mut buf = [0u8; WAL_RECORD_LEN];
        let mut offset = 0usize;

        loop {
            if offset + WAL_RECORD_LEN > file_len {
                // Partial / truncated tail — safe to stop here.
                break;
            }

            match reader.read_exact(&mut buf) {
                Ok(()) => {}
                Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => break,
                Err(e) => return Err(e),
            }

            if buf[0] != WAL_MAGIC {
                // Corrupt record — stop replaying (conservative).
                break;
            }

            let value = f64::from_le_bytes(buf[1..9].try_into().unwrap());
            data.push(value);
            offset += WAL_RECORD_LEN;
        }

        Ok(())
    }

    // ── hot path ─────────────────────────────────────────────────────────────

    /// Record one `push(value)` to the WAL.
    ///
    /// Fsync is called on every write — call this *after* a successful
    /// `Anomalyzer::push` so the WAL never leads the in-memory state.
    ///
    /// Triggers a snapshot + WAL truncation every `snapshot_interval` calls.
    pub fn record_push(&mut self, value: f64, current_data: &[f64]) -> io::Result<()> {
        self.append_wal(value)?;
        self.pushes_since_snap += 1;

        if self.pushes_since_snap >= self.snapshot_interval {
            self.compact(current_data)?;
        }

        Ok(())
    }

    fn append_wal(&mut self, value: f64) -> io::Result<()> {
        let mut record = [0u8; WAL_RECORD_LEN];
        record[0] = WAL_MAGIC;
        record[1..9].copy_from_slice(&value.to_le_bytes());
        self.wal_file.write_all(&record)?;
        self.wal_file.sync_data()?; // fdatasync — durable before returning
        Ok(())
    }

    // ── compaction ───────────────────────────────────────────────────────────

    /// Force a snapshot + WAL truncation now.
    ///
    /// Called automatically every `snapshot_interval` pushes; you can also
    /// call it on clean shutdown to minimise next startup replay time.
    pub fn compact(&mut self, current_data: &[f64]) -> io::Result<()> {
        self.write_snapshot(current_data)?;
        self.truncate_wal()?;
        self.pushes_since_snap = 0;
        Ok(())
    }

    fn write_snapshot(&self, data: &[f64]) -> io::Result<()> {
        let snap = Snapshot {
            version: 1,
            data: data.to_vec(),
        };

        let bytes = bincode::serialize(&snap)
            .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;

        // Write to .tmp, fsync, then atomic rename.
        {
            let tmp = File::create(&self.snap_tmp_path)?;
            let mut writer = BufWriter::new(tmp);
            writer.write_all(&bytes)?;
            writer.flush()?;
            writer.get_ref().sync_all()?;
        }

        fs::rename(&self.snap_tmp_path, &self.snap_path)?;
        Ok(())
    }

    fn truncate_wal(&mut self) -> io::Result<()> {
        // Re-open truncated; keep the same fd slot.
        let new_wal = OpenOptions::new()
            .create(true)
            .write(true)
            .truncate(true)
            .open(&self.wal_path)?;
        // Replace the append-mode fd with the fresh one.
        self.wal_file = OpenOptions::new()
            .append(true)
            .open(&self.wal_path)?;
        drop(new_wal);
        Ok(())
    }

    // ── diagnostics ──────────────────────────────────────────────────────────

    /// WAL size in bytes (useful for monitoring / alerting).
    pub fn wal_size_bytes(&self) -> io::Result<u64> {
        if self.wal_path.exists() {
            Ok(fs::metadata(&self.wal_path)?.len())
        } else {
            Ok(0)
        }
    }

    /// Number of WAL entries pending since the last snapshot.
    pub fn pending_wal_entries(&self) -> usize {
        self.pushes_since_snap
    }
}