anomalyzer_ts/persistence.rs
1// src/persistence.rs
2//! Production-grade persistence for [`Anomalyzer`] using a WAL + snapshot strategy.
3//!
4//! ## Design (mirrors Redis AOF + RDB, RocksDB WAL)
5//!
6//! ```text
7//! <dir>/
8//! anomalyzer.snap — latest compacted snapshot (bincode)
9//! anomalyzer.snap.tmp — atomic write staging file
10//! anomalyzer.wal — append-only log of every push() since last snap
11//! ```
12//!
13//! **Recovery order**
14//! 1. Load snapshot → deserialise the full `data` window
15//! 2. Replay WAL entries appended after the snapshot was written
16//! 3. Truncate WAL (start fresh from the new in-memory state)
17//!
18//! **Durability guarantee**
19//! Each `push` is flushed to the WAL with `fsync` before returning.
20//! Snapshots are written to a `.tmp` file then atomically renamed, so a
21//! crash mid-snapshot never corrupts the previous good snapshot.
22//!
23//! **Compaction**
24//! After `snapshot_interval` pushes the manager writes a new snapshot and
25//! truncates the WAL, bounding recovery time to at most
26//! `snapshot_interval` WAL entries.
27
28#![cfg(feature = "persist")]
29
30use std::{
31 fs::{self, File, OpenOptions},
32 io::{self, BufReader, BufWriter, Read, Seek, SeekFrom, Write},
33 path::{Path, PathBuf},
34};
35
36use serde::{Deserialize, Serialize};
37
38// ── on-disk formats ────────────────────────────────────────────────────────
39
40/// Snapshot: a complete, compacted state dump.
41#[derive(Serialize, Deserialize)]
42pub(crate) struct Snapshot {
43 /// The version tag lets us evolve the format without silent corruption.
44 pub version: u8,
45 /// Full `data` ring-buffer at snapshot time.
46 pub data: Vec<f64>,
47}
48
49/// A single WAL entry — one `push` call.
50///
51/// Encoded as: `[ MAGIC(1) | f64-le(8) ]` = 9 bytes per record.
52/// The fixed magic byte lets the reader detect truncated/corrupt tails.
53const WAL_MAGIC: u8 = 0xAE;
54const WAL_RECORD_LEN: usize = 9; // 1 magic + 8 f64
55
56// ── public manager ─────────────────────────────────────────────────────────
57
58/// Manages WAL + snapshot persistence for an [`crate::Anomalyzer`] data window.
59///
60/// Obtain one via [`PersistenceManager::open`], then call
61/// [`recover`](PersistenceManager::recover) to get the initial `data` vec,
62/// and [`record_push`](PersistenceManager::record_push) on every `push`.
63pub struct PersistenceManager {
64 snap_path: PathBuf,
65 snap_tmp_path: PathBuf,
66 wal_path: PathBuf,
67 wal_file: File,
68 /// Pushes recorded since the last snapshot.
69 pushes_since_snap: usize,
70 /// Compact after this many pushes (default 1 000).
71 pub snapshot_interval: usize,
72}
73
74impl PersistenceManager {
75 /// Open (or create) the persistence directory.
76 ///
77 /// # Errors
78 /// Returns `io::Error` if the directory cannot be created or files opened.
79 pub fn open(dir: impl AsRef<Path>) -> io::Result<Self> {
80 let dir = dir.as_ref();
81 fs::create_dir_all(dir)?;
82
83 let snap_path = dir.join("anomalyzer.snap");
84 let snap_tmp_path = dir.join("anomalyzer.snap.tmp");
85 let wal_path = dir.join("anomalyzer.wal");
86
87 // Open WAL in append mode; create if absent.
88 let wal_file = OpenOptions::new()
89 .create(true)
90 .append(true)
91 .open(&wal_path)?;
92
93 Ok(Self {
94 snap_path,
95 snap_tmp_path,
96 wal_path,
97 wal_file,
98 pushes_since_snap: 0,
99 snapshot_interval: 1_000,
100 })
101 }
102
103 // ── recovery ────────────────────────────────────────────────────────────
104
105 /// Reconstruct the `data` window from snapshot + WAL.
106 ///
107 /// Call once at startup; the returned `Vec<f64>` is passed as
108 /// `initial_data` to [`crate::Anomalyzer::new`].
109 pub fn recover(&self) -> io::Result<Vec<f64>> {
110 let mut data = self.load_snapshot()?;
111 self.replay_wal(&mut data)?;
112 Ok(data)
113 }
114
115 fn load_snapshot(&self) -> io::Result<Vec<f64>> {
116 if !self.snap_path.exists() {
117 return Ok(Vec::new());
118 }
119
120 let file = File::open(&self.snap_path)?;
121 let mut reader = BufReader::new(file);
122 let mut bytes = Vec::new();
123 reader.read_to_end(&mut bytes)?;
124
125 let snap: Snapshot = bincode::deserialize(&bytes)
126 .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
127
128 if snap.version != 1 {
129 return Err(io::Error::new(
130 io::ErrorKind::InvalidData,
131 format!("unsupported snapshot version {}", snap.version),
132 ));
133 }
134
135 Ok(snap.data)
136 }
137
138 fn replay_wal(&self, data: &mut Vec<f64>) -> io::Result<()> {
139 if !self.wal_path.exists() {
140 return Ok(());
141 }
142
143 let file = File::open(&self.wal_path)?;
144 let file_len = file.metadata()?.len() as usize;
145 let mut reader = BufReader::new(file);
146 let mut buf = [0u8; WAL_RECORD_LEN];
147 let mut offset = 0usize;
148
149 loop {
150 if offset + WAL_RECORD_LEN > file_len {
151 // Partial / truncated tail — safe to stop here.
152 break;
153 }
154
155 match reader.read_exact(&mut buf) {
156 Ok(()) => {}
157 Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => break,
158 Err(e) => return Err(e),
159 }
160
161 if buf[0] != WAL_MAGIC {
162 // Corrupt record — stop replaying (conservative).
163 break;
164 }
165
166 let value = f64::from_le_bytes(buf[1..9].try_into().unwrap());
167 data.push(value);
168 offset += WAL_RECORD_LEN;
169 }
170
171 Ok(())
172 }
173
174 // ── hot path ─────────────────────────────────────────────────────────────
175
176 /// Record one `push(value)` to the WAL.
177 ///
178 /// Fsync is called on every write — call this *after* a successful
179 /// `Anomalyzer::push` so the WAL never leads the in-memory state.
180 ///
181 /// Triggers a snapshot + WAL truncation every `snapshot_interval` calls.
182 pub fn record_push(&mut self, value: f64, current_data: &[f64]) -> io::Result<()> {
183 self.append_wal(value)?;
184 self.pushes_since_snap += 1;
185
186 if self.pushes_since_snap >= self.snapshot_interval {
187 self.compact(current_data)?;
188 }
189
190 Ok(())
191 }
192
193 fn append_wal(&mut self, value: f64) -> io::Result<()> {
194 let mut record = [0u8; WAL_RECORD_LEN];
195 record[0] = WAL_MAGIC;
196 record[1..9].copy_from_slice(&value.to_le_bytes());
197 self.wal_file.write_all(&record)?;
198 self.wal_file.sync_data()?; // fdatasync — durable before returning
199 Ok(())
200 }
201
202 // ── compaction ───────────────────────────────────────────────────────────
203
204 /// Force a snapshot + WAL truncation now.
205 ///
206 /// Called automatically every `snapshot_interval` pushes; you can also
207 /// call it on clean shutdown to minimise next startup replay time.
208 pub fn compact(&mut self, current_data: &[f64]) -> io::Result<()> {
209 self.write_snapshot(current_data)?;
210 self.truncate_wal()?;
211 self.pushes_since_snap = 0;
212 Ok(())
213 }
214
215 fn write_snapshot(&self, data: &[f64]) -> io::Result<()> {
216 let snap = Snapshot {
217 version: 1,
218 data: data.to_vec(),
219 };
220
221 let bytes = bincode::serialize(&snap)
222 .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
223
224 // Write to .tmp, fsync, then atomic rename.
225 {
226 let tmp = File::create(&self.snap_tmp_path)?;
227 let mut writer = BufWriter::new(tmp);
228 writer.write_all(&bytes)?;
229 writer.flush()?;
230 writer.get_ref().sync_all()?;
231 }
232
233 fs::rename(&self.snap_tmp_path, &self.snap_path)?;
234 Ok(())
235 }
236
237 fn truncate_wal(&mut self) -> io::Result<()> {
238 // Re-open truncated; keep the same fd slot.
239 let new_wal = OpenOptions::new()
240 .create(true)
241 .write(true)
242 .truncate(true)
243 .open(&self.wal_path)?;
244 // Replace the append-mode fd with the fresh one.
245 self.wal_file = OpenOptions::new()
246 .append(true)
247 .open(&self.wal_path)?;
248 drop(new_wal);
249 Ok(())
250 }
251
252 // ── diagnostics ──────────────────────────────────────────────────────────
253
254 /// WAL size in bytes (useful for monitoring / alerting).
255 pub fn wal_size_bytes(&self) -> io::Result<u64> {
256 if self.wal_path.exists() {
257 Ok(fs::metadata(&self.wal_path)?.len())
258 } else {
259 Ok(0)
260 }
261 }
262
263 /// Number of WAL entries pending since the last snapshot.
264 pub fn pending_wal_entries(&self) -> usize {
265 self.pushes_since_snap
266 }
267}