Skip to main content

anomalyzer_ts/
persistence.rs

1// src/persistence.rs
2//! Production-grade persistence for [`Anomalyzer`] using a WAL + snapshot strategy.
3//!
4//! ## Design (mirrors Redis AOF + RDB, RocksDB WAL)
5//!
6//! ```text
7//! <dir>/
8//!   anomalyzer.snap          — latest compacted snapshot  (bincode)
9//!   anomalyzer.snap.tmp      — atomic write staging file
10//!   anomalyzer.wal           — append-only log of every push() since last snap
11//! ```
12//!
13//! **Recovery order**
14//! 1. Load snapshot → deserialise the full `data` window
15//! 2. Replay WAL entries appended after the snapshot was written
16//! 3. Truncate WAL (start fresh from the new in-memory state)
17//!
18//! **Durability guarantee**  
19//! Each `push` is flushed to the WAL with `fsync` before returning.
20//! Snapshots are written to a `.tmp` file then atomically renamed, so a
21//! crash mid-snapshot never corrupts the previous good snapshot.
22//!
23//! **Compaction**  
24//! After `snapshot_interval` pushes the manager writes a new snapshot and
25//! truncates the WAL, bounding recovery time to at most
26//! `snapshot_interval` WAL entries.
27
28#![cfg(feature = "persist")]
29
30use std::{
31    fs::{self, File, OpenOptions},
32    io::{self, BufReader, BufWriter, Read, Seek, SeekFrom, Write},
33    path::{Path, PathBuf},
34};
35
36use serde::{Deserialize, Serialize};
37
38// ── on-disk formats ────────────────────────────────────────────────────────
39
40/// Snapshot: a complete, compacted state dump.
41#[derive(Serialize, Deserialize)]
42pub(crate) struct Snapshot {
43    /// The version tag lets us evolve the format without silent corruption.
44    pub version: u8,
45    /// Full `data` ring-buffer at snapshot time.
46    pub data: Vec<f64>,
47}
48
49/// A single WAL entry — one `push` call.
50///
51/// Encoded as:  `[ MAGIC(1) | f64-le(8) ]` = 9 bytes per record.
52/// The fixed magic byte lets the reader detect truncated/corrupt tails.
53const WAL_MAGIC: u8 = 0xAE;
54const WAL_RECORD_LEN: usize = 9; // 1 magic + 8 f64
55
56// ── public manager ─────────────────────────────────────────────────────────
57
58/// Manages WAL + snapshot persistence for an [`crate::Anomalyzer`] data window.
59///
60/// Obtain one via [`PersistenceManager::open`], then call
61/// [`recover`](PersistenceManager::recover) to get the initial `data` vec,
62/// and [`record_push`](PersistenceManager::record_push) on every `push`.
63pub struct PersistenceManager {
64    snap_path: PathBuf,
65    snap_tmp_path: PathBuf,
66    wal_path: PathBuf,
67    wal_file: File,
68    /// Pushes recorded since the last snapshot.
69    pushes_since_snap: usize,
70    /// Compact after this many pushes (default 1 000).
71    pub snapshot_interval: usize,
72}
73
74impl PersistenceManager {
75    /// Open (or create) the persistence directory.
76    ///
77    /// # Errors
78    /// Returns `io::Error` if the directory cannot be created or files opened.
79    pub fn open(dir: impl AsRef<Path>) -> io::Result<Self> {
80        let dir = dir.as_ref();
81        fs::create_dir_all(dir)?;
82
83        let snap_path = dir.join("anomalyzer.snap");
84        let snap_tmp_path = dir.join("anomalyzer.snap.tmp");
85        let wal_path = dir.join("anomalyzer.wal");
86
87        // Open WAL in append mode; create if absent.
88        let wal_file = OpenOptions::new()
89            .create(true)
90            .append(true)
91            .open(&wal_path)?;
92
93        Ok(Self {
94            snap_path,
95            snap_tmp_path,
96            wal_path,
97            wal_file,
98            pushes_since_snap: 0,
99            snapshot_interval: 1_000,
100        })
101    }
102
103    // ── recovery ────────────────────────────────────────────────────────────
104
105    /// Reconstruct the `data` window from snapshot + WAL.
106    ///
107    /// Call once at startup; the returned `Vec<f64>` is passed as
108    /// `initial_data` to [`crate::Anomalyzer::new`].
109    pub fn recover(&self) -> io::Result<Vec<f64>> {
110        let mut data = self.load_snapshot()?;
111        self.replay_wal(&mut data)?;
112        Ok(data)
113    }
114
115    fn load_snapshot(&self) -> io::Result<Vec<f64>> {
116        if !self.snap_path.exists() {
117            return Ok(Vec::new());
118        }
119
120        let file = File::open(&self.snap_path)?;
121        let mut reader = BufReader::new(file);
122        let mut bytes = Vec::new();
123        reader.read_to_end(&mut bytes)?;
124
125        let snap: Snapshot = bincode::deserialize(&bytes)
126            .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
127
128        if snap.version != 1 {
129            return Err(io::Error::new(
130                io::ErrorKind::InvalidData,
131                format!("unsupported snapshot version {}", snap.version),
132            ));
133        }
134
135        Ok(snap.data)
136    }
137
138    fn replay_wal(&self, data: &mut Vec<f64>) -> io::Result<()> {
139        if !self.wal_path.exists() {
140            return Ok(());
141        }
142
143        let file = File::open(&self.wal_path)?;
144        let file_len = file.metadata()?.len() as usize;
145        let mut reader = BufReader::new(file);
146        let mut buf = [0u8; WAL_RECORD_LEN];
147        let mut offset = 0usize;
148
149        loop {
150            if offset + WAL_RECORD_LEN > file_len {
151                // Partial / truncated tail — safe to stop here.
152                break;
153            }
154
155            match reader.read_exact(&mut buf) {
156                Ok(()) => {}
157                Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => break,
158                Err(e) => return Err(e),
159            }
160
161            if buf[0] != WAL_MAGIC {
162                // Corrupt record — stop replaying (conservative).
163                break;
164            }
165
166            let value = f64::from_le_bytes(buf[1..9].try_into().unwrap());
167            data.push(value);
168            offset += WAL_RECORD_LEN;
169        }
170
171        Ok(())
172    }
173
174    // ── hot path ─────────────────────────────────────────────────────────────
175
176    /// Record one `push(value)` to the WAL.
177    ///
178    /// Fsync is called on every write — call this *after* a successful
179    /// `Anomalyzer::push` so the WAL never leads the in-memory state.
180    ///
181    /// Triggers a snapshot + WAL truncation every `snapshot_interval` calls.
182    pub fn record_push(&mut self, value: f64, current_data: &[f64]) -> io::Result<()> {
183        self.append_wal(value)?;
184        self.pushes_since_snap += 1;
185
186        if self.pushes_since_snap >= self.snapshot_interval {
187            self.compact(current_data)?;
188        }
189
190        Ok(())
191    }
192
193    fn append_wal(&mut self, value: f64) -> io::Result<()> {
194        let mut record = [0u8; WAL_RECORD_LEN];
195        record[0] = WAL_MAGIC;
196        record[1..9].copy_from_slice(&value.to_le_bytes());
197        self.wal_file.write_all(&record)?;
198        self.wal_file.sync_data()?; // fdatasync — durable before returning
199        Ok(())
200    }
201
202    // ── compaction ───────────────────────────────────────────────────────────
203
204    /// Force a snapshot + WAL truncation now.
205    ///
206    /// Called automatically every `snapshot_interval` pushes; you can also
207    /// call it on clean shutdown to minimise next startup replay time.
208    pub fn compact(&mut self, current_data: &[f64]) -> io::Result<()> {
209        self.write_snapshot(current_data)?;
210        self.truncate_wal()?;
211        self.pushes_since_snap = 0;
212        Ok(())
213    }
214
215    fn write_snapshot(&self, data: &[f64]) -> io::Result<()> {
216        let snap = Snapshot {
217            version: 1,
218            data: data.to_vec(),
219        };
220
221        let bytes = bincode::serialize(&snap)
222            .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
223
224        // Write to .tmp, fsync, then atomic rename.
225        {
226            let tmp = File::create(&self.snap_tmp_path)?;
227            let mut writer = BufWriter::new(tmp);
228            writer.write_all(&bytes)?;
229            writer.flush()?;
230            writer.get_ref().sync_all()?;
231        }
232
233        fs::rename(&self.snap_tmp_path, &self.snap_path)?;
234        Ok(())
235    }
236
237    fn truncate_wal(&mut self) -> io::Result<()> {
238        // Re-open truncated; keep the same fd slot.
239        let new_wal = OpenOptions::new()
240            .create(true)
241            .write(true)
242            .truncate(true)
243            .open(&self.wal_path)?;
244        // Replace the append-mode fd with the fresh one.
245        self.wal_file = OpenOptions::new()
246            .append(true)
247            .open(&self.wal_path)?;
248        drop(new_wal);
249        Ok(())
250    }
251
252    // ── diagnostics ──────────────────────────────────────────────────────────
253
254    /// WAL size in bytes (useful for monitoring / alerting).
255    pub fn wal_size_bytes(&self) -> io::Result<u64> {
256        if self.wal_path.exists() {
257            Ok(fs::metadata(&self.wal_path)?.len())
258        } else {
259            Ok(0)
260        }
261    }
262
263    /// Number of WAL entries pending since the last snapshot.
264    pub fn pending_wal_entries(&self) -> usize {
265        self.pushes_since_snap
266    }
267}