Skip to main content

slipstream/
snapshot.rs

1//! Append-only snapshot log for KV state.
2//!
3//! Streams [`KvUpdate`]s to disk as they arrive via [`SnapshotWriter`],
4//! with no in-memory state beyond a file handle and byte counter.
5//! On startup, [`load`] replays the log to reconstruct entries + cursor,
6//! then compacts the file.
7//!
8//! ## File format
9//!
10//! 6-byte header followed by a sequence of CRC'd records:
11//!
12//! ```text
13//! Header:  b"PGSS" ++ version:u16le
14//! Record:  crc32:u32le ++ type:u8 ++ payload (varies by type)
15//!
16//! Put:     key_len:u16le ++ key ++ value_len:u32le ++ value ++ ver_len:u8 ++ version
17//! Delete:  key_len:u16le ++ key ++ ver_len:u8 ++ version
18//! Cursor:  cur_len:u8 ++ cursor
19//! ```
20//!
21//! `version` is the raw [`VersionToken`] bytes (≤10), not a fixed u64 — a
22//! 10-byte FDB versionstamp round-trips intact, where a `u64`-only field would
23//! silently flatten it to 0 and break every later CAS.
24//!
25//! Used by edge/tunnel services to survive restarts without a full
26//! NATS KV scan. The snapshot is a cache — delete it and the system
27//! falls back to `load_all()`.
28//!
29//! ## Blocking I/O
30//!
31//! Every function in this module performs **synchronous** file I/O and is
32//! deliberately runtime-agnostic — it pulls in no async runtime so a caller can
33//! place it on whichever executor (or none) it wants. The flip side is that none
34//! of these calls may run directly on an async executor thread: [`load`],
35//! [`SnapshotWriter::compact`], and friends `read`/`write`/`rename` whole files
36//! and will stall the reactor if awaited inline. Async callers must offload them
37//! with `tokio::task::spawn_blocking` (or the equivalent).
38//! [`SnapshotWriter::compact`] is the heaviest — it reads and rewrites the entire
39//! log — but the rule is the same for all of them.
40
41use std::collections::{HashMap, HashSet};
42use std::fs::{self, File, OpenOptions};
43use std::io::{self, Write};
44use std::path::{Path, PathBuf};
45
46use crate::kv::{KvEntry, KvUpdate, VersionToken, WatchCursor};
47
48const MAGIC: &[u8; 4] = b"PGSS";
49// v2: Put/Delete records store the version as length-prefixed raw bytes instead
50// of a fixed 8-byte u64, so non-u64 tokens (e.g. FDB versionstamps) survive a
51// round-trip. v1 files are rejected by `replay_log`; the snapshot is a cache, so
52// a rejected file just triggers a fresh NATS scan + watch replay.
53const FORMAT_VERSION: u16 = 2;
54const HEADER_LEN: usize = 6;
55
56const REC_PUT: u8 = 0x01;
57const REC_DELETE: u8 = 0x02;
58const REC_CURSOR: u8 = 0x03;
59
60// Minimum complete record sizes (CRC + type + minimum payload)
61const MIN_CURSOR_RECORD: usize = 4 + 1 + 1; // 6
62
63/// Errors from snapshot operations.
64///
65/// Uses `thiserror` to match [`crate::KvError`]; unlike `KvError` (which is
66/// `Clone` and so flattens its causes to strings), snapshot errors are observed
67/// by a single caller, so `Io` keeps its `#[source]` chain via `#[from]`.
68#[derive(Debug, thiserror::Error)]
69pub enum SnapshotError {
70    #[error("snapshot I/O error: {0}")]
71    Io(#[from] io::Error),
72    #[error("invalid snapshot format: {0}")]
73    InvalidFormat(String),
74    #[error("snapshot corrupted (CRC mismatch)")]
75    Corrupted,
76    /// A pluggable [`SnapshotStore`] backend (e.g. the `fjall` on-disk store)
77    /// reported an error. Kept backend-agnostic — no backend type leaks into this
78    /// enum's signature — so enabling a backend feature does not change the public
79    /// error surface.
80    #[error("snapshot backend error: {0}")]
81    Backend(String),
82}
83
84/// Result of loading a snapshot from disk.
85#[derive(Debug)]
86pub struct Snapshot {
87    /// Watch cursor at the time of the last checkpoint.
88    pub cursor: WatchCursor,
89    /// Live KV entries keyed by name (deduplicated, deletes applied).
90    pub entries: HashMap<String, KvEntry>,
91}
92
93impl Snapshot {
94    /// Keys present in this snapshot but absent from a fresh scan result.
95    ///
96    /// After a cursor-expired fallback to full `watch_all()`, callers should
97    /// compare the snapshot against the live key set and emit synthetic
98    /// `Delete` events for stale keys to ensure convergence.
99    pub fn stale_keys<'a, I>(&'a self, current_keys: I) -> Vec<&'a str>
100    where
101        I: IntoIterator<Item = &'a str>,
102    {
103        let current: HashSet<&str> = current_keys.into_iter().collect();
104        self.entries
105            .keys()
106            .filter(|k| !current.contains(k.as_str()))
107            .map(|k| k.as_str())
108            .collect()
109    }
110}
111
112/// Append-only snapshot writer.
113///
114/// Streams [`KvUpdate`] records to disk via a buffered writer. No
115/// in-memory state beyond the file handle and a byte counter for
116/// compaction triggering.
117///
118/// Compacts automatically when bytes written since last compaction
119/// exceeds `compact_threshold`. Compaction replays the log into a
120/// transient [`HashMap`], rewrites via tempfile+rename, and reopens
121/// for append.
122pub struct SnapshotWriter {
123    path: PathBuf,
124    // `None` only after a `compact()` rewrote the file (atomic rename succeeded)
125    // but failed to reopen it for append. The old handle then pointed at the
126    // renamed-away inode; rather than keep writing into that orphan — silently
127    // losing every later record on close — we drop it and poison the writer so
128    // `write_update`/`checkpoint`/`flush` return an error until reconstructed.
129    writer: Option<io::BufWriter<File>>,
130    bytes_since_compact: u64,
131    compact_threshold: u64,
132}
133
134impl SnapshotWriter {
135    /// Open or create a snapshot log.
136    ///
137    /// If the file doesn't exist, writes the header. If it exists, opens
138    /// for append. `compact_threshold` controls how many bytes accumulate
139    /// before an automatic compaction.
140    pub fn open(path: &Path, compact_threshold: u64) -> Result<Self, SnapshotError> {
141        // Open first, then size the file from its own handle. The previous
142        // `path.exists()` + `fs::metadata(path)` pair issued two extra `stat(2)`
143        // syscalls and left a TOCTOU window (the file could be created or removed
144        // between the checks). One `open(2)` plus a handle `metadata()` is both
145        // fewer syscalls and race-free — the length we read is the length of the
146        // file we hold open.
147        let file = OpenOptions::new().create(true).append(true).open(path)?;
148        let existing_len = file.metadata()?.len();
149
150        let mut writer = io::BufWriter::new(file);
151
152        // A file with at least a full header is an existing log: everything past
153        // the 6-byte header counts toward the compaction threshold. Anything
154        // shorter (brand-new, or a header torn by a crash mid-create) gets a
155        // fresh header written before the first record.
156        let bytes_since_compact = if existing_len >= HEADER_LEN as u64 {
157            existing_len - HEADER_LEN as u64
158        } else {
159            writer.write_all(MAGIC)?;
160            writer.write_all(&FORMAT_VERSION.to_le_bytes())?;
161            writer.flush()?;
162            0
163        };
164
165        Ok(Self {
166            path: path.to_path_buf(),
167            writer: Some(writer),
168            bytes_since_compact,
169            compact_threshold,
170        })
171    }
172
173    /// Borrow the underlying writer, or fail if a prior `compact()` poisoned it
174    /// (see the `writer` field doc). Keeps the orphaned-fd failure mode a
175    /// surfaced error rather than a silent data loss.
176    fn writer(&mut self) -> Result<&mut io::BufWriter<File>, SnapshotError> {
177        self.writer.as_mut().ok_or_else(|| {
178            SnapshotError::Io(io::Error::other(
179                "snapshot writer poisoned: a prior compact() failed to reopen the log for append",
180            ))
181        })
182    }
183
184    /// Write a single [`KvUpdate`] record to the log.
185    ///
186    /// Buffered — does not flush to disk until [`checkpoint`](Self::checkpoint).
187    #[must_use = "I/O errors mean the write was lost"]
188    pub fn write_update(&mut self, update: &KvUpdate) -> Result<(), SnapshotError> {
189        let w = self.writer()?;
190        let bytes = match update {
191            KvUpdate::Put(entry) => write_put_record(w, &entry.key, &entry.value, &entry.version)?,
192            KvUpdate::Delete { key, version } | KvUpdate::Purge { key, version } => {
193                write_delete_record(w, key, version)?
194            }
195        };
196        self.bytes_since_compact += bytes as u64;
197        Ok(())
198    }
199
200    /// Write a cursor checkpoint and flush the buffer to the OS.
201    ///
202    /// The flush is a `write(2)` into the page cache — it survives a process
203    /// crash, but NOT power loss: there is no `fsync` on this path. The durable
204    /// `sync_all` happens in [`compact`](Self::compact). That's deliberate — the
205    /// snapshot is a cache backed by NATS and checkpoints are frequent, so an
206    /// fsync per checkpoint isn't worth its latency; a tail lost to power loss is
207    /// rebuilt from a NATS scan + watch replay.
208    ///
209    /// Returns `true` when the log has grown past the compaction threshold
210    /// and the caller should run [`compact`](Self::compact). Separating
211    /// the check from the I/O lets async callers offload compaction to a
212    /// blocking task instead of stalling the executor.
213    #[must_use = "returns true when compaction is needed"]
214    pub fn checkpoint(&mut self, cursor: &WatchCursor) -> Result<bool, SnapshotError> {
215        let w = self.writer()?;
216        let bytes = write_cursor_record(w, cursor)?;
217        w.flush()?;
218        self.bytes_since_compact += bytes as u64;
219        Ok(self.bytes_since_compact > self.compact_threshold)
220    }
221
222    /// Flush the buffer to disk without writing a cursor record.
223    #[must_use = "I/O errors mean the flush failed"]
224    pub fn flush(&mut self) -> Result<(), SnapshotError> {
225        self.writer()?.flush()?;
226        Ok(())
227    }
228
229    /// Compact the snapshot log: replay, deduplicate, and rewrite.
230    ///
231    /// Performs synchronous file I/O. In async contexts, run via
232    /// `spawn_blocking` to avoid stalling the executor.
233    #[must_use = "compaction errors leave the log uncompacted"]
234    pub fn compact(&mut self) -> Result<(), SnapshotError> {
235        // Flush buffered records to the file before reading it back. Records
236        // written since the last `checkpoint()` still sit in the BufWriter; if we
237        // read+rewrite without flushing, they're excluded from the replay, and
238        // the old BufWriter then flushes them on drop into the inode we just
239        // renamed away — silently losing them. Flushing first makes `compact()`
240        // safe regardless of whether a `checkpoint()` preceded it.
241        self.writer()?.flush()?;
242        let data = fs::read(&self.path)?;
243        let (entries, cursor, _already_compact) = replay_log(&data)?;
244        compact_to_file(&self.path, &entries, &cursor)?;
245
246        // The rename in `compact_to_file` has already replaced the file, so the
247        // current handle now points at the orphaned (renamed-away) inode. Drop it
248        // *before* reopening: if the reopen fails, `writer` stays `None` and the
249        // writer is poisoned (see field doc), turning a would-be silent loss into
250        // a surfaced error on the next write.
251        self.writer = None;
252        let file = OpenOptions::new().append(true).open(&self.path)?;
253        self.writer = Some(io::BufWriter::new(file));
254        self.bytes_since_compact = 0;
255
256        Ok(())
257    }
258}
259
260/// Load a snapshot from disk.
261///
262/// Replays the append log, deduplicates (last write wins per key),
263/// compacts the file, and returns the live entries + cursor.
264///
265/// Returns `Ok(None)` if the file doesn't exist or contains no entries.
266pub fn load(path: &Path) -> Result<Option<Snapshot>, SnapshotError> {
267    let data = match fs::read(path) {
268        Ok(d) => d,
269        Err(e) if e.kind() == io::ErrorKind::NotFound => return Ok(None),
270        Err(e) => return Err(SnapshotError::Io(e)),
271    };
272
273    let (entries, cursor, already_compact) = replay_log(&data)?;
274
275    if entries.is_empty() && cursor.is_none() {
276        return Ok(None);
277    }
278
279    // Skip the rewrite when the log is already in compact form (no duplicate
280    // keys, no deletes, no truncated tail). Avoids a full read+write on every
281    // startup after the first. A truncated tail forces a rewrite even when the
282    // surviving records are unique: leaving the partial record on disk would
283    // let the next append land after it, corrupting the log.
284    if !already_compact {
285        compact_to_file(path, &entries, &cursor)?;
286    }
287
288    Ok(Some(Snapshot { cursor, entries }))
289}
290
291// ---------------------------------------------------------------------------
292// The durable-fold contract
293// ---------------------------------------------------------------------------
294
295/// A durable, resumable, queryable fold of a KV watch stream.
296///
297/// This is the primitive slipstream owns: a place to apply a stream of
298/// [`KvUpdate`]s such that, after any restart, the fold resumes from exactly
299/// where it left off and stays a faithful function of the log. [`watch_applied`]
300/// drives it; consumers pick the backend.
301///
302/// [`watch_applied`]: crate::watch_applied
303///
304/// ## Invariants every implementation must hold
305///
306/// - **Pure function of the log.** Delete the store, replay every update with
307///   revision `>` the persisted cursor, and you get byte-identical state. The
308///   store caches the fold; it is never the source of truth (that is NATS).
309/// - **Cursor-after-apply.** A persisted cursor `C` implies every update with
310///   revision `≤ C` is durably folded in. [`apply`](Self::apply) writes data and
311///   cursor together so the cursor never names a revision whose data is absent —
312///   on a transactional backend in one txn, on the append log data-then-cursor
313///   (a torn write leaves data *ahead* of the cursor, which replay re-folds, never
314///   skips).
315/// - **Snapshot is a cache.** Any tail lost to power loss (under a no-sync
316///   durability mode) is rebuilt by resuming the watch from the recovered cursor;
317///   never a correctness failure.
318///
319/// ## Threading
320///
321/// Methods are **synchronous** and may block on I/O — the same runtime-agnostic
322/// discipline as the rest of this module. [`watch_applied`] offloads
323/// [`apply`](Self::apply) to a blocking task; a consumer calling
324/// [`get`](Self::get)/[`range`](Self::range) from an async context should do the
325/// same.
326pub trait SnapshotStore: Sized + Send {
327    /// Open (or resume) the store at `path`.
328    ///
329    /// Returns the persisted resume cursor — [`WatchCursor::none`] when the store
330    /// is fresh — and the store ready to [`apply`](Self::apply)/query. Pass the
331    /// returned cursor to [`watch_applied`](crate::watch_applied) as the resume
332    /// position.
333    ///
334    /// Backends with tuning knobs (compaction threshold, sync mode) expose them
335    /// on their own constructors; this uses safe defaults.
336    fn load(path: &Path) -> Result<(WatchCursor, Self), SnapshotError>;
337
338    /// Atomically fold `batch` into the store and advance the resume cursor.
339    ///
340    /// Data and cursor become durable together (see the cursor-after-apply
341    /// invariant). `batch` is the raw, revision-ordered updates received since the
342    /// last flush — including ones a consumer's `parse` rejected, since they are
343    /// still part of the bucket's state. `cursor` is the highest revision received
344    /// in the batch.
345    fn apply(&mut self, batch: &[KvUpdate], cursor: &WatchCursor) -> Result<(), SnapshotError>;
346
347    /// Look up the live entry for `key`. `None` if absent or deleted.
348    fn get(&self, key: &str) -> Result<Option<KvEntry>, SnapshotError>;
349
350    /// All live entries whose key starts with `prefix`, in ascending key order.
351    ///
352    /// Buffers the whole match set into a `Vec`. Convenient for the bounded
353    /// prefixes an in-RAM consumer scans, but a broad prefix against an on-disk
354    /// fold (the 1B-route case an on-disk backend exists for) materializes every
355    /// match at once — use [`for_each_in_range`](Self::for_each_in_range) there.
356    fn range(&self, prefix: &str) -> Result<Vec<KvEntry>, SnapshotError>;
357
358    /// Stream every live entry whose key starts with `prefix`, in ascending key
359    /// order, invoking `f` once per entry — without buffering the whole match
360    /// set in memory.
361    ///
362    /// This is the memory-bounded counterpart to [`range`](Self::range). The
363    /// reason to pick an on-disk backend is a fold too large for RAM; for such a
364    /// consumer a broad [`range`](Self::range) defeats the purpose by collecting
365    /// every match into one `Vec`. `f` returning `Err` stops the scan early and
366    /// propagates that error.
367    ///
368    /// The provided implementation delegates to [`range`](Self::range) — correct
369    /// for in-RAM backends, where the fold already fits in memory. On-disk
370    /// backends override it to stream straight from storage.
371    fn for_each_in_range(
372        &self,
373        prefix: &str,
374        mut f: impl FnMut(KvEntry) -> Result<(), SnapshotError>,
375    ) -> Result<(), SnapshotError> {
376        for entry in self.range(prefix)? {
377            f(entry)?;
378        }
379        Ok(())
380    }
381}
382
383// ---------------------------------------------------------------------------
384// AppendLogSnapshot: the default, pure-Rust backend
385// ---------------------------------------------------------------------------
386
387/// Default compaction threshold for [`AppendLogSnapshot::load`]: 10 MiB of
388/// appended records before a compaction is triggered. Matches the value the
389/// hand-rolled callers used; tune via [`AppendLogSnapshot::open`].
390pub const DEFAULT_COMPACT_THRESHOLD: u64 = 10 * 1024 * 1024;
391
392/// The append-only CRC log as a [`SnapshotStore`] (the default backend).
393///
394/// Wraps the existing [`SnapshotWriter`] + [`load`] machinery — same v2 on-disk
395/// format, same CRC framing, same FDB-versionstamp-safe cursor encoding, so files
396/// written by either path are mutually loadable. Keeps the whole fold in a
397/// [`HashMap`] in RAM to serve [`get`](SnapshotStore::get)/[`range`](SnapshotStore::range),
398/// which is exactly what edge/tunnel-style consumers already do — small state,
399/// pure Rust. A consumer that cannot hold its fold in RAM wants an on-disk
400/// backend instead (e.g. the `fjall` feature).
401pub struct AppendLogSnapshot {
402    writer: SnapshotWriter,
403    entries: HashMap<String, KvEntry>,
404    cursor: WatchCursor,
405}
406
407impl AppendLogSnapshot {
408    /// Open or resume the log at `path` with an explicit compaction threshold.
409    ///
410    /// Replays any existing log into the in-RAM fold (and compacts it, exactly as
411    /// [`load`] does), then opens the writer for append. Returns the resume cursor
412    /// alongside the store.
413    pub fn open(path: &Path, compact_threshold: u64) -> Result<(WatchCursor, Self), SnapshotError> {
414        let (cursor, entries) = match load(path)? {
415            Some(snap) => (snap.cursor, snap.entries),
416            None => (WatchCursor::none(), HashMap::new()),
417        };
418        let writer = SnapshotWriter::open(path, compact_threshold)?;
419        Ok((
420            cursor.clone(),
421            Self {
422                writer,
423                entries,
424                cursor,
425            },
426        ))
427    }
428}
429
430impl SnapshotStore for AppendLogSnapshot {
431    fn load(path: &Path) -> Result<(WatchCursor, Self), SnapshotError> {
432        Self::open(path, DEFAULT_COMPACT_THRESHOLD)
433    }
434
435    fn apply(&mut self, batch: &[KvUpdate], cursor: &WatchCursor) -> Result<(), SnapshotError> {
436        // Stream the raw records, then the cursor checkpoint — data-then-cursor,
437        // so a torn write leaves data ahead of the cursor (re-folded on resume),
438        // never a cursor ahead of its data. The in-RAM fold mirrors the same
439        // mutations so get/range stay consistent with what was just durably written.
440        for update in batch {
441            self.writer.write_update(update)?;
442            match update {
443                KvUpdate::Put(entry) => {
444                    self.entries.insert(entry.key.clone(), entry.clone());
445                }
446                KvUpdate::Delete { key, .. } | KvUpdate::Purge { key, .. } => {
447                    self.entries.remove(key);
448                }
449            }
450        }
451        // checkpoint() flushes the buffered records + the cursor record to the OS;
452        // it returns true when the log has grown past the compaction threshold, in
453        // which case we compact inline (this whole call already runs off the hot
454        // path via spawn_blocking in watch_applied).
455        if self.writer.checkpoint(cursor)? {
456            self.writer.compact()?;
457        }
458        self.cursor = cursor.clone();
459        Ok(())
460    }
461
462    fn get(&self, key: &str) -> Result<Option<KvEntry>, SnapshotError> {
463        Ok(self.entries.get(key).cloned())
464    }
465
466    fn range(&self, prefix: &str) -> Result<Vec<KvEntry>, SnapshotError> {
467        let mut out: Vec<KvEntry> = self
468            .entries
469            .values()
470            .filter(|e| e.key.starts_with(prefix))
471            .cloned()
472            .collect();
473        out.sort_unstable_by(|a, b| a.key.cmp(&b.key));
474        Ok(out)
475    }
476}
477
478// ---------------------------------------------------------------------------
479// Internal: log replay
480// ---------------------------------------------------------------------------
481
482/// Replay the append log into the live key set.
483///
484/// Records borrow directly from `data` — keys and values are not allocated
485/// until the surviving set is materialized at the end, so overwritten or
486/// deleted records cost nothing. Returns the entries, the latest cursor, and
487/// whether the log was already compact (no duplicate keys, no deletes, no
488/// truncated tail) so callers can skip a redundant rewrite.
489fn replay_log(data: &[u8]) -> Result<(HashMap<String, KvEntry>, WatchCursor, bool), SnapshotError> {
490    if data.len() < HEADER_LEN {
491        return Err(SnapshotError::InvalidFormat("file too short".into()));
492    }
493    if &data[0..4] != MAGIC {
494        return Err(SnapshotError::InvalidFormat("bad magic".into()));
495    }
496    let version = u16::from_le_bytes([data[4], data[5]]);
497    if version != FORMAT_VERSION {
498        return Err(SnapshotError::InvalidFormat(format!(
499            "unsupported version {version}"
500        )));
501    }
502
503    // Upper-bound the working set by data size to avoid rehashing on large
504    // snapshots, capped so a tiny-record file can't request a huge table.
505    let estimated = (data.len() - HEADER_LEN) / 30;
506    let mut live: HashMap<&str, (&[u8], VersionToken)> =
507        HashMap::with_capacity(estimated.min(4096));
508    let mut cursor = WatchCursor::none();
509    let mut pos = HEADER_LEN;
510
511    // The log is compact only if every record contributed a unique surviving
512    // entry and we consumed the file cleanly. Any overwrite, delete, or
513    // truncated tail means a rewrite would shrink or repair the file.
514    let mut redundant = false;
515    let mut clean_eof = true;
516
517    while pos < data.len() {
518        match parse_record(&data[pos..]) {
519            Ok((record, consumed)) => {
520                match record {
521                    Record::Put {
522                        key,
523                        value,
524                        version,
525                    } => {
526                        if live.insert(key, (value, version)).is_some() {
527                            redundant = true;
528                        }
529                    }
530                    Record::Delete { key } => {
531                        live.remove(key);
532                        redundant = true;
533                    }
534                    Record::Cursor(c) => {
535                        // Each new cursor record supersedes the previous one, so
536                        // any earlier cursor on disk is now dead weight. Mark the
537                        // log redundant so `load()` rewrites it — otherwise a
538                        // service that checkpoints frequently accumulates one
539                        // cursor record per checkpoint and `already_compact` stays
540                        // true (entries are unique, EOF is clean), leaving the
541                        // bloat in place until the writer's own threshold fires.
542                        if !cursor.is_none() {
543                            redundant = true;
544                        }
545                        cursor = c;
546                    }
547                }
548                pos += consumed;
549            }
550            Err(RecordError::Truncated) => {
551                // KNOWN LIMITATION: a record's length is read from its (CRC-but-
552                // not-yet-verified) length fields, so corruption that inflates a
553                // key_len/value_len makes the record look like it runs past EOF —
554                // indistinguishable here from a genuine torn final write. Both
555                // land as `Truncated`, so we stop and silently drop everything
556                // after this point. Detecting it would need a framed, separately-
557                // checksummed length (a format change). Acceptable because the
558                // snapshot is a cache: a short read just triggers a fuller NATS
559                // scan + watch replay, never data loss of record.
560                clean_eof = false;
561                break;
562            }
563            Err(RecordError::CrcMismatch { consumed }) => {
564                // Near EOF → crash recovery (partial final write).
565                // Otherwise → mid-file corruption.
566                if pos + consumed >= data.len() || data.len() - (pos + consumed) < MIN_CURSOR_RECORD
567                {
568                    clean_eof = false;
569                    break;
570                }
571                return Err(SnapshotError::Corrupted);
572            }
573            Err(RecordError::Invalid(msg)) => {
574                return Err(SnapshotError::InvalidFormat(msg));
575            }
576        }
577    }
578
579    // Materialize owned entries for the survivors only — one key + one value
580    // allocation per live key, instead of per record.
581    let mut entries: HashMap<String, KvEntry> = HashMap::with_capacity(live.len());
582    for (key, (value, version)) in live {
583        let key = key.to_string();
584        entries.insert(
585            key.clone(),
586            KvEntry {
587                key,
588                value: value.to_vec(),
589                version,
590            },
591        );
592    }
593
594    let already_compact = !redundant && clean_eof;
595    Ok((entries, cursor, already_compact))
596}
597
598enum Record<'a> {
599    Put {
600        key: &'a str,
601        value: &'a [u8],
602        version: VersionToken,
603    },
604    Delete {
605        key: &'a str,
606    },
607    Cursor(WatchCursor),
608}
609
610enum RecordError {
611    Truncated,
612    CrcMismatch { consumed: usize },
613    Invalid(String),
614}
615
616fn parse_record(data: &[u8]) -> Result<(Record<'_>, usize), RecordError> {
617    // Need at least CRC (4) + type (1)
618    if data.len() < 5 {
619        return Err(RecordError::Truncated);
620    }
621
622    let stored_crc = u32::from_le_bytes([data[0], data[1], data[2], data[3]]);
623
624    match data[4] {
625        REC_PUT => parse_put(data, stored_crc),
626        REC_DELETE => parse_delete(data, stored_crc),
627        REC_CURSOR => parse_cursor(data, stored_crc),
628        other => Err(RecordError::Invalid(format!(
629            "unknown record type: {other:#x}"
630        ))),
631    }
632}
633
634fn parse_put(data: &[u8], stored_crc: u32) -> Result<(Record<'_>, usize), RecordError> {
635    // CRC(4) + type(1) + key_len(2) = 7 minimum to read key_len
636    if data.len() < 7 {
637        return Err(RecordError::Truncated);
638    }
639    let key_len = u16::from_le_bytes([data[5], data[6]]) as usize;
640    let vl_off = 7 + key_len;
641
642    if data.len() < vl_off + 4 {
643        return Err(RecordError::Truncated);
644    }
645    let value_len = u32::from_le_bytes([
646        data[vl_off],
647        data[vl_off + 1],
648        data[vl_off + 2],
649        data[vl_off + 3],
650    ]) as usize;
651
652    // ver_len byte sits right after the value.
653    let ver_len_off = vl_off + 4 + value_len;
654    if data.len() < ver_len_off + 1 {
655        return Err(RecordError::Truncated);
656    }
657    let ver_len = data[ver_len_off] as usize;
658    // A version token holds at most 10 bytes inline. A larger length means a
659    // corrupt or incompatible record — reject it rather than letting it reach
660    // `VersionToken::from_raw`, which would panic.
661    if ver_len > 10 {
662        return Err(RecordError::Invalid(format!(
663            "version length {ver_len} exceeds max version token size (10)"
664        )));
665    }
666
667    let total = ver_len_off + 1 + ver_len;
668    if data.len() < total {
669        return Err(RecordError::Truncated);
670    }
671
672    let computed = crc32fast::hash(&data[4..total]);
673    if computed != stored_crc {
674        return Err(RecordError::CrcMismatch { consumed: total });
675    }
676
677    let key = std::str::from_utf8(&data[7..7 + key_len])
678        .map_err(|e| RecordError::Invalid(format!("invalid UTF-8 key: {e}")))?;
679    let value = &data[vl_off + 4..vl_off + 4 + value_len];
680    // The `ver_len > 10` check above bounds this to the inline capacity, so
681    // `from_raw` always returns `Some` here; the guard makes that explicit.
682    let version = VersionToken::from_raw(&data[ver_len_off + 1..total]).ok_or_else(|| {
683        RecordError::Invalid(format!(
684            "version length {ver_len} exceeds max version token size (10)"
685        ))
686    })?;
687
688    Ok((
689        Record::Put {
690            key,
691            value,
692            version,
693        },
694        total,
695    ))
696}
697
698fn parse_delete(data: &[u8], stored_crc: u32) -> Result<(Record<'_>, usize), RecordError> {
699    if data.len() < 7 {
700        return Err(RecordError::Truncated);
701    }
702    let key_len = u16::from_le_bytes([data[5], data[6]]) as usize;
703    let ver_len_off = 7 + key_len;
704    if data.len() < ver_len_off + 1 {
705        return Err(RecordError::Truncated);
706    }
707    let ver_len = data[ver_len_off] as usize;
708    // See `parse_put`: reject oversized version lengths before `from_raw`.
709    if ver_len > 10 {
710        return Err(RecordError::Invalid(format!(
711            "version length {ver_len} exceeds max version token size (10)"
712        )));
713    }
714    let total = ver_len_off + 1 + ver_len;
715
716    if data.len() < total {
717        return Err(RecordError::Truncated);
718    }
719
720    let computed = crc32fast::hash(&data[4..total]);
721    if computed != stored_crc {
722        return Err(RecordError::CrcMismatch { consumed: total });
723    }
724
725    let key = std::str::from_utf8(&data[7..7 + key_len])
726        .map_err(|e| RecordError::Invalid(format!("invalid UTF-8 key: {e}")))?;
727    // The delete record's version is written for format symmetry but unused on
728    // replay (a delete just removes the key from the live set).
729
730    Ok((Record::Delete { key }, total))
731}
732
733fn parse_cursor(data: &[u8], stored_crc: u32) -> Result<(Record<'_>, usize), RecordError> {
734    if data.len() < 6 {
735        return Err(RecordError::Truncated);
736    }
737    let cursor_len = data[5] as usize;
738    // A version token holds at most 10 bytes inline. A larger length means a
739    // corrupt or incompatible record — reject it rather than letting it reach
740    // `VersionToken::from_raw`, which would panic.
741    if cursor_len > 10 {
742        return Err(RecordError::Invalid(format!(
743            "cursor length {cursor_len} exceeds max version token size (10)"
744        )));
745    }
746    let total = 6 + cursor_len;
747
748    if data.len() < total {
749        return Err(RecordError::Truncated);
750    }
751
752    let computed = crc32fast::hash(&data[4..total]);
753    if computed != stored_crc {
754        return Err(RecordError::CrcMismatch { consumed: total });
755    }
756
757    // The `cursor_len > 10` check above bounds this to the inline capacity, so
758    // `from_raw` always returns `Some` here; the guard makes that explicit.
759    let version = VersionToken::from_raw(&data[6..total]).ok_or_else(|| {
760        RecordError::Invalid(format!(
761            "cursor length {cursor_len} exceeds max version token size (10)"
762        ))
763    })?;
764
765    Ok((Record::Cursor(WatchCursor::from_version(version)), total))
766}
767
768// ---------------------------------------------------------------------------
769// Internal: record writing (incremental CRC, no allocations)
770// ---------------------------------------------------------------------------
771
772fn write_put_record(
773    w: &mut impl Write,
774    key: &str,
775    value: &[u8],
776    version: &VersionToken,
777) -> Result<usize, SnapshotError> {
778    let kb = key.as_bytes();
779    let vb = version.as_bytes();
780
781    // The wire format encodes key length as u16 and value length as u32.
782    // Reject anything that would truncate on cast — a silent truncation here
783    // produces a record whose CRC covers the full bytes but whose stored length
784    // is wrong, which the reader can only interpret as mid-file corruption.
785    let key_len = u16::try_from(kb.len()).map_err(|_| {
786        SnapshotError::InvalidFormat(format!(
787            "key too long: {} bytes (max {})",
788            kb.len(),
789            u16::MAX
790        ))
791    })?;
792    let value_len = u32::try_from(value.len()).map_err(|_| {
793        SnapshotError::InvalidFormat(format!(
794            "value too long: {} bytes (max {})",
795            value.len(),
796            u32::MAX
797        ))
798    })?;
799    // The version is stored as length-prefixed raw bytes so any backend's token
800    // (NATS u64, FDB 10-byte versionstamp) round-trips intact. `VersionToken`
801    // caps inline storage at 10 bytes, so this `u8` length never truncates today;
802    // checking surfaces a format error rather than corrupting the frame if a
803    // future token widens past 255 bytes.
804    let ver_len = u8::try_from(vb.len()).map_err(|_| {
805        SnapshotError::InvalidFormat(format!(
806            "version too long: {} bytes (max {})",
807            vb.len(),
808            u8::MAX
809        ))
810    })?;
811
812    let mut h = crc32fast::Hasher::new();
813    h.update(&[REC_PUT]);
814    h.update(&key_len.to_le_bytes());
815    h.update(kb);
816    h.update(&value_len.to_le_bytes());
817    h.update(value);
818    h.update(&[ver_len]);
819    h.update(vb);
820    let crc = h.finalize();
821
822    w.write_all(&crc.to_le_bytes())?;
823    w.write_all(&[REC_PUT])?;
824    w.write_all(&key_len.to_le_bytes())?;
825    w.write_all(kb)?;
826    w.write_all(&value_len.to_le_bytes())?;
827    w.write_all(value)?;
828    w.write_all(&[ver_len])?;
829    w.write_all(vb)?;
830
831    Ok(4 + 1 + 2 + kb.len() + 4 + value.len() + 1 + vb.len())
832}
833
834fn write_delete_record(
835    w: &mut impl Write,
836    key: &str,
837    version: &VersionToken,
838) -> Result<usize, SnapshotError> {
839    let kb = key.as_bytes();
840    let vb = version.as_bytes();
841
842    let key_len = u16::try_from(kb.len()).map_err(|_| {
843        SnapshotError::InvalidFormat(format!(
844            "key too long: {} bytes (max {})",
845            kb.len(),
846            u16::MAX
847        ))
848    })?;
849    // Length-prefixed version, matching `write_put_record`. See its comment for
850    // why this is bytes rather than a fixed u64.
851    let ver_len = u8::try_from(vb.len()).map_err(|_| {
852        SnapshotError::InvalidFormat(format!(
853            "version too long: {} bytes (max {})",
854            vb.len(),
855            u8::MAX
856        ))
857    })?;
858
859    let mut h = crc32fast::Hasher::new();
860    h.update(&[REC_DELETE]);
861    h.update(&key_len.to_le_bytes());
862    h.update(kb);
863    h.update(&[ver_len]);
864    h.update(vb);
865    let crc = h.finalize();
866
867    w.write_all(&crc.to_le_bytes())?;
868    w.write_all(&[REC_DELETE])?;
869    w.write_all(&key_len.to_le_bytes())?;
870    w.write_all(kb)?;
871    w.write_all(&[ver_len])?;
872    w.write_all(vb)?;
873
874    Ok(4 + 1 + 2 + kb.len() + 1 + vb.len())
875}
876
877fn write_cursor_record(w: &mut impl Write, cursor: &WatchCursor) -> Result<usize, SnapshotError> {
878    let cb = cursor.version().as_bytes();
879    // The record encodes the cursor length as a single byte. `VersionToken`
880    // caps inline storage at 10 bytes, so this never trips today — but checking
881    // here rather than casting means a future backend that widens the token
882    // surfaces a format error instead of silently truncating the length prefix
883    // (which the reader would then mis-frame as corruption).
884    let cb_len = u8::try_from(cb.len()).map_err(|_| {
885        SnapshotError::InvalidFormat(format!(
886            "cursor too long: {} bytes (max {})",
887            cb.len(),
888            u8::MAX
889        ))
890    })?;
891
892    let mut h = crc32fast::Hasher::new();
893    h.update(&[REC_CURSOR]);
894    h.update(&[cb_len]);
895    h.update(cb);
896    let crc = h.finalize();
897
898    w.write_all(&crc.to_le_bytes())?;
899    w.write_all(&[REC_CURSOR])?;
900    w.write_all(&[cb_len])?;
901    w.write_all(cb)?;
902
903    Ok(4 + 1 + 1 + cb.len())
904}
905
906// ---------------------------------------------------------------------------
907// Internal: compaction
908// ---------------------------------------------------------------------------
909
910fn compact_to_file(
911    path: &Path,
912    entries: &HashMap<String, KvEntry>,
913    cursor: &WatchCursor,
914) -> Result<(), SnapshotError> {
915    // The tempfile must live in the same directory as the target so the final
916    // `persist` is an atomic same-filesystem rename. Falling back to "." here
917    // would silently place it in the cwd, and a cross-filesystem rename then
918    // fails with EXDEV — a hard error masquerading as a path quirk. A snapshot
919    // path with no parent is a caller bug; surface it.
920    let dir = path.parent().ok_or_else(|| {
921        SnapshotError::InvalidFormat(format!("snapshot path has no parent: {}", path.display()))
922    })?;
923    // Collect the live entries once, then drive both the buffer-size estimate
924    // and the write pass off this slice — a single walk of `entries` instead of
925    // one to sum sizes and another to write.
926    //
927    // Write in sorted key order so a given logical state always serializes to
928    // identical bytes. `HashMap` iteration order is randomized per process,
929    // which would otherwise make every compaction emit a different layout —
930    // defeating byte-level snapshot comparison (e.g. an integrity checksum) and
931    // making file diffs noise. The sort is O(n log n) on the live key set, which
932    // is trivial next to the I/O it precedes.
933    let mut sorted: Vec<&KvEntry> = entries.values().collect();
934    sorted.sort_unstable_by(|a, b| a.key.cmp(&b.key));
935
936    // Buffer the writes: each record is 5–8 individual `write_all` calls, so an
937    // unbuffered tempfile would issue thousands of `write(2)` syscalls per
938    // compaction. Size the buffer to the exact serialized length (clamped to
939    // [8 KiB, 1 MiB]) so the whole snapshot flushes in a handful of syscalls
940    // instead of one per default 8 KiB page — and a pathologically large table
941    // can't balloon the buffer past 1 MiB.
942    let estimated: usize = HEADER_LEN
943        + sorted
944            .iter()
945            .map(|e| 4 + 1 + 2 + e.key.len() + 4 + e.value.len() + 1 + e.version.as_bytes().len())
946            .sum::<usize>()
947        + if cursor.is_none() {
948            0
949        } else {
950            4 + 1 + 1 + cursor.version().as_bytes().len()
951        };
952    let capacity = estimated.clamp(8 * 1024, 1024 * 1024);
953    let mut buf = io::BufWriter::with_capacity(capacity, tempfile::NamedTempFile::new_in(dir)?);
954
955    buf.write_all(MAGIC)?;
956    buf.write_all(&FORMAT_VERSION.to_le_bytes())?;
957
958    for entry in sorted {
959        write_put_record(&mut buf, &entry.key, &entry.value, &entry.version)?;
960    }
961
962    if !cursor.is_none() {
963        write_cursor_record(&mut buf, cursor)?;
964    }
965
966    buf.flush()?;
967    let tmp = buf
968        .into_inner()
969        .map_err(|e| SnapshotError::Io(e.into_error()))?;
970
971    tmp.as_file().sync_all()?;
972    tmp.persist(path).map_err(|e| SnapshotError::Io(e.error))?;
973
974    Ok(())
975}
976
977// ---------------------------------------------------------------------------
978// Tests
979// ---------------------------------------------------------------------------
980
981#[cfg(test)]
982mod tests {
983    use super::*;
984    use tempfile::TempDir;
985
986    fn entry(key: &str, value: &[u8], rev: u64) -> KvEntry {
987        KvEntry {
988            key: key.to_string(),
989            value: value.to_vec(),
990            version: VersionToken::from_u64(rev),
991        }
992    }
993
994    fn cursor(rev: u64) -> WatchCursor {
995        WatchCursor::from_u64(rev)
996    }
997
998    fn put(key: &str, value: &[u8], rev: u64) -> KvUpdate {
999        KvUpdate::Put(entry(key, value, rev))
1000    }
1001
1002    fn delete(key: &str, rev: u64) -> KvUpdate {
1003        KvUpdate::Delete {
1004            key: key.to_string(),
1005            version: VersionToken::from_u64(rev),
1006        }
1007    }
1008
1009    #[test]
1010    fn round_trip() {
1011        let dir = TempDir::new().unwrap();
1012        let path = dir.path().join("test.snap");
1013
1014        let mut w = SnapshotWriter::open(&path, u64::MAX).unwrap();
1015        w.write_update(&put("node.us-east-1", b"val1", 1)).unwrap();
1016        w.write_update(&put("node.eu-west-1", b"val2", 2)).unwrap();
1017        w.checkpoint(&cursor(2)).unwrap();
1018        drop(w);
1019
1020        let snap = load(&path).unwrap().unwrap();
1021        assert_eq!(snap.entries.len(), 2);
1022        assert_eq!(snap.cursor.as_u64(), Some(2));
1023
1024        assert_eq!(snap.entries["node.us-east-1"].value, b"val1");
1025        assert_eq!(snap.entries["node.eu-west-1"].value, b"val2");
1026    }
1027
1028    #[test]
1029    fn multiple_batches() {
1030        let dir = TempDir::new().unwrap();
1031        let path = dir.path().join("test.snap");
1032
1033        let mut w = SnapshotWriter::open(&path, u64::MAX).unwrap();
1034        w.write_update(&put("a", b"v1", 1)).unwrap();
1035        w.checkpoint(&cursor(1)).unwrap();
1036        w.write_update(&put("b", b"v2", 2)).unwrap();
1037        w.checkpoint(&cursor(2)).unwrap();
1038        drop(w);
1039
1040        let snap = load(&path).unwrap().unwrap();
1041        assert_eq!(snap.entries.len(), 2);
1042        assert_eq!(snap.cursor.as_u64(), Some(2));
1043    }
1044
1045    #[test]
1046    fn delete_removes_entry() {
1047        let dir = TempDir::new().unwrap();
1048        let path = dir.path().join("test.snap");
1049
1050        let mut w = SnapshotWriter::open(&path, u64::MAX).unwrap();
1051        w.write_update(&put("a", b"v1", 1)).unwrap();
1052        w.write_update(&put("b", b"v2", 2)).unwrap();
1053        w.checkpoint(&cursor(2)).unwrap();
1054        w.write_update(&delete("a", 3)).unwrap();
1055        w.checkpoint(&cursor(3)).unwrap();
1056        drop(w);
1057
1058        let snap = load(&path).unwrap().unwrap();
1059        assert_eq!(snap.entries.len(), 1);
1060        assert!(snap.entries.contains_key("b"));
1061    }
1062
1063    #[test]
1064    fn purge_removes_entry() {
1065        let dir = TempDir::new().unwrap();
1066        let path = dir.path().join("test.snap");
1067
1068        let mut w = SnapshotWriter::open(&path, u64::MAX).unwrap();
1069        w.write_update(&put("a", b"v1", 1)).unwrap();
1070        w.checkpoint(&cursor(1)).unwrap();
1071        w.write_update(&KvUpdate::Purge {
1072            key: "a".to_string(),
1073            version: VersionToken::from_u64(2),
1074        })
1075        .unwrap();
1076        w.checkpoint(&cursor(2)).unwrap();
1077        drop(w);
1078
1079        let snap = load(&path).unwrap().unwrap();
1080        assert!(!snap.entries.contains_key("a"));
1081    }
1082
1083    #[test]
1084    fn missing_file_returns_none() {
1085        let dir = TempDir::new().unwrap();
1086        assert!(load(&dir.path().join("nope.snap")).unwrap().is_none());
1087    }
1088
1089    #[test]
1090    fn corrupted_mid_file() {
1091        let dir = TempDir::new().unwrap();
1092        let path = dir.path().join("test.snap");
1093
1094        let mut w = SnapshotWriter::open(&path, u64::MAX).unwrap();
1095        // Write three entries so corruption in the middle has records on both sides
1096        w.write_update(&put("a", b"aaaa-long-value-here", 1))
1097            .unwrap();
1098        w.checkpoint(&cursor(1)).unwrap();
1099        w.write_update(&put("b", b"bbbb-long-value-here", 2))
1100            .unwrap();
1101        w.checkpoint(&cursor(2)).unwrap();
1102        w.write_update(&put("c", b"cccc-long-value-here", 3))
1103            .unwrap();
1104        w.checkpoint(&cursor(3)).unwrap();
1105        drop(w);
1106
1107        let mut data = fs::read(&path).unwrap();
1108        // Flip a byte in the second record area (well past the first, well before EOF)
1109        let target = HEADER_LEN + 40;
1110        assert!(
1111            target < data.len() - 60,
1112            "need enough room after corruption for valid records"
1113        );
1114        data[target] ^= 0xFF;
1115        fs::write(&path, &data).unwrap();
1116
1117        match load(&path) {
1118            Err(SnapshotError::Corrupted) => {}
1119            other => panic!("expected Corrupted, got {other:?}"),
1120        }
1121    }
1122
1123    #[test]
1124    fn truncated_final_record_recovered() {
1125        let dir = TempDir::new().unwrap();
1126        let path = dir.path().join("test.snap");
1127
1128        let mut w = SnapshotWriter::open(&path, u64::MAX).unwrap();
1129        w.write_update(&put("a", b"v1", 1)).unwrap();
1130        w.checkpoint(&cursor(1)).unwrap();
1131        w.write_update(&put("b", b"v2", 2)).unwrap();
1132        w.checkpoint(&cursor(2)).unwrap();
1133        drop(w);
1134
1135        // Chop a few bytes off the end (simulates crash mid-write)
1136        let mut data = fs::read(&path).unwrap();
1137        data.truncate(data.len() - 3);
1138        fs::write(&path, &data).unwrap();
1139
1140        let snap = load(&path).unwrap().unwrap();
1141        assert!(snap.entries.contains_key("a"));
1142    }
1143
1144    #[test]
1145    fn truncated_tail_repaired_then_appendable() {
1146        // The already-compact skip optimization must NOT skip the rewrite when
1147        // the log has a truncated tail: leaving the partial record on disk would
1148        // let the next append land after it and corrupt the log.
1149        let dir = TempDir::new().unwrap();
1150        let path = dir.path().join("test.snap");
1151
1152        let mut w = SnapshotWriter::open(&path, u64::MAX).unwrap();
1153        w.write_update(&put("a", b"v1", 1)).unwrap();
1154        w.write_update(&put("b", b"v2", 2)).unwrap();
1155        w.checkpoint(&cursor(2)).unwrap();
1156        drop(w);
1157
1158        // Simulate a crash mid-write: chop the final bytes.
1159        let mut data = fs::read(&path).unwrap();
1160        data.truncate(data.len() - 3);
1161        fs::write(&path, &data).unwrap();
1162
1163        // load() repairs the file by rewriting without the partial tail.
1164        let snap = load(&path).unwrap().unwrap();
1165        assert!(snap.entries.contains_key("a"));
1166        assert!(snap.entries.contains_key("b"));
1167
1168        // Appending after the repaired load must not corrupt the log.
1169        let mut w = SnapshotWriter::open(&path, u64::MAX).unwrap();
1170        w.write_update(&put("c", b"v3", 3)).unwrap();
1171        w.checkpoint(&cursor(3)).unwrap();
1172        drop(w);
1173
1174        let snap = load(&path).unwrap().unwrap();
1175        assert_eq!(snap.entries.len(), 3);
1176        assert!(snap.entries.contains_key("c"));
1177        assert_eq!(snap.cursor.as_u64(), Some(3));
1178    }
1179
1180    #[test]
1181    fn repeated_cursor_records_trigger_compaction() {
1182        // A service that checkpoints frequently writes one cursor record per
1183        // checkpoint. Even with unique keys and a clean EOF, the stale cursor
1184        // records are dead weight: load() must NOT treat the file as already
1185        // compact, or the bloat persists until the writer's own threshold fires
1186        // (and never, if the process crashes first).
1187        let dir = TempDir::new().unwrap();
1188        let path = dir.path().join("test.snap");
1189
1190        let mut w = SnapshotWriter::open(&path, u64::MAX).unwrap();
1191        w.write_update(&put("a", b"v1", 1)).unwrap();
1192        for i in 1..=10u64 {
1193            w.checkpoint(&cursor(i)).unwrap();
1194        }
1195        drop(w);
1196
1197        let size_before = fs::metadata(&path).unwrap().len();
1198        let snap = load(&path).unwrap().unwrap();
1199        let size_after = fs::metadata(&path).unwrap().len();
1200
1201        // Single entry + only the latest cursor survive.
1202        assert_eq!(snap.entries.len(), 1);
1203        assert_eq!(snap.cursor.as_u64(), Some(10));
1204        assert!(
1205            size_after < size_before,
1206            "stale cursor records should be compacted away: {size_before} -> {size_after}"
1207        );
1208
1209        // The rewritten file is now genuinely compact: a second load is a no-op.
1210        let after_first = fs::read(&path).unwrap();
1211        load(&path).unwrap().unwrap();
1212        let after_second = fs::read(&path).unwrap();
1213        assert_eq!(after_first, after_second);
1214    }
1215
1216    #[test]
1217    fn already_compact_file_reloads_unchanged() {
1218        // A compact file (unique keys, no deletes, clean EOF) should reload
1219        // correctly even though load() skips the rewrite.
1220        let dir = TempDir::new().unwrap();
1221        let path = dir.path().join("test.snap");
1222
1223        let mut w = SnapshotWriter::open(&path, u64::MAX).unwrap();
1224        w.write_update(&put("a", b"v1", 1)).unwrap();
1225        w.write_update(&put("b", b"v2", 2)).unwrap();
1226        w.checkpoint(&cursor(2)).unwrap();
1227        drop(w);
1228
1229        // First load compacts. Capture the resulting bytes.
1230        load(&path).unwrap().unwrap();
1231        let after_first = fs::read(&path).unwrap();
1232
1233        // Second load sees an already-compact file: skips the rewrite, so the
1234        // bytes are byte-for-byte identical, and the data still round-trips.
1235        let snap = load(&path).unwrap().unwrap();
1236        let after_second = fs::read(&path).unwrap();
1237
1238        assert_eq!(after_first, after_second);
1239        assert_eq!(snap.entries.len(), 2);
1240        assert_eq!(snap.cursor.as_u64(), Some(2));
1241    }
1242
1243    #[test]
1244    fn bad_magic() {
1245        let dir = TempDir::new().unwrap();
1246        let path = dir.path().join("test.snap");
1247        fs::write(&path, b"XXXX\x01\x00").unwrap();
1248
1249        match load(&path) {
1250            Err(SnapshotError::InvalidFormat(msg)) => assert!(msg.contains("magic")),
1251            other => panic!("expected InvalidFormat, got {other:?}"),
1252        }
1253    }
1254
1255    #[test]
1256    fn wrong_version_rejected() {
1257        let dir = TempDir::new().unwrap();
1258        let path = dir.path().join("test.snap");
1259        // Valid magic, but a format version this build doesn't understand.
1260        // A future engineer who bumps FORMAT_VERSION must see old files rejected
1261        // rather than silently misparsed.
1262        let mut data = Vec::new();
1263        data.extend_from_slice(MAGIC);
1264        data.extend_from_slice(&(FORMAT_VERSION + 1).to_le_bytes());
1265        fs::write(&path, &data).unwrap();
1266
1267        match load(&path) {
1268            Err(SnapshotError::InvalidFormat(msg)) => {
1269                assert!(
1270                    msg.contains("version"),
1271                    "message should mention version: {msg}"
1272                )
1273            }
1274            other => panic!("expected InvalidFormat, got {other:?}"),
1275        }
1276    }
1277
1278    #[test]
1279    fn empty_log_returns_none() {
1280        let dir = TempDir::new().unwrap();
1281        let path = dir.path().join("test.snap");
1282
1283        let mut f = File::create(&path).unwrap();
1284        f.write_all(MAGIC).unwrap();
1285        f.write_all(&FORMAT_VERSION.to_le_bytes()).unwrap();
1286        drop(f);
1287
1288        assert!(load(&path).unwrap().is_none());
1289    }
1290
1291    #[test]
1292    fn compaction_on_load_shrinks_file() {
1293        let dir = TempDir::new().unwrap();
1294        let path = dir.path().join("test.snap");
1295
1296        let mut w = SnapshotWriter::open(&path, u64::MAX).unwrap();
1297        for i in 0..10u64 {
1298            w.write_update(&put("same-key", format!("v{i}").as_bytes(), i))
1299                .unwrap();
1300            w.checkpoint(&cursor(i)).unwrap();
1301        }
1302        drop(w);
1303
1304        let size_before = fs::metadata(&path).unwrap().len();
1305        let snap = load(&path).unwrap().unwrap();
1306        let size_after = fs::metadata(&path).unwrap().len();
1307
1308        assert_eq!(snap.entries.len(), 1);
1309        assert_eq!(snap.entries["same-key"].value, b"v9");
1310        assert!(
1311            size_after < size_before,
1312            "compaction should shrink: {size_before} -> {size_after}"
1313        );
1314    }
1315
1316    #[test]
1317    fn compact_when_threshold_exceeded() {
1318        let dir = TempDir::new().unwrap();
1319        let path = dir.path().join("test.snap");
1320
1321        // Threshold low enough to trigger after a few writes
1322        let mut w = SnapshotWriter::open(&path, 100).unwrap();
1323        for i in 0..20u64 {
1324            w.write_update(&put("key", format!("value-{i}").as_bytes(), i))
1325                .unwrap();
1326            if w.checkpoint(&cursor(i)).unwrap() {
1327                w.compact().unwrap();
1328            }
1329        }
1330        drop(w);
1331
1332        let snap = load(&path).unwrap().unwrap();
1333        assert_eq!(snap.entries.len(), 1);
1334        assert_eq!(snap.entries["key"].value, b"value-19");
1335    }
1336
1337    #[test]
1338    fn reopen_appends() {
1339        let dir = TempDir::new().unwrap();
1340        let path = dir.path().join("test.snap");
1341
1342        // First writer
1343        let mut w = SnapshotWriter::open(&path, u64::MAX).unwrap();
1344        w.write_update(&put("a", b"v1", 1)).unwrap();
1345        w.checkpoint(&cursor(1)).unwrap();
1346        drop(w);
1347
1348        // Second writer appends
1349        let mut w = SnapshotWriter::open(&path, u64::MAX).unwrap();
1350        w.write_update(&put("b", b"v2", 2)).unwrap();
1351        w.checkpoint(&cursor(2)).unwrap();
1352        drop(w);
1353
1354        let snap = load(&path).unwrap().unwrap();
1355        assert_eq!(snap.entries.len(), 2);
1356        assert_eq!(snap.cursor.as_u64(), Some(2));
1357    }
1358
1359    #[test]
1360    fn large_values() {
1361        let dir = TempDir::new().unwrap();
1362        let path = dir.path().join("test.snap");
1363
1364        let big = vec![0xABu8; 100_000];
1365        let mut w = SnapshotWriter::open(&path, u64::MAX).unwrap();
1366        w.write_update(&put("big", &big, 1)).unwrap();
1367        w.checkpoint(&cursor(1)).unwrap();
1368        drop(w);
1369
1370        let snap = load(&path).unwrap().unwrap();
1371        assert_eq!(snap.entries.len(), 1);
1372        assert_eq!(snap.entries["big"].value.len(), 100_000);
1373        assert!(snap.entries["big"].value.iter().all(|&b| b == 0xAB));
1374    }
1375
1376    #[test]
1377    fn cursor_only_snapshot_returns_some_with_empty_entries() {
1378        // A service that checkpoints before writing any entries produces a file
1379        // with only a cursor record. load() must return Some (not None) so callers
1380        // get the resume position even when there's nothing to preload. The guard
1381        // `entries.is_empty() && cursor.is_none()` must NOT fire when the cursor
1382        // is present.
1383        let dir = TempDir::new().unwrap();
1384        let path = dir.path().join("test.snap");
1385
1386        let mut w = SnapshotWriter::open(&path, u64::MAX).unwrap();
1387        w.checkpoint(&cursor(42)).unwrap();
1388        drop(w);
1389
1390        let snap = load(&path)
1391            .unwrap()
1392            .expect("cursor-only snapshot should return Some");
1393        assert!(snap.entries.is_empty(), "no entries written, none expected");
1394        assert_eq!(
1395            snap.cursor.as_u64(),
1396            Some(42),
1397            "cursor must survive round-trip"
1398        );
1399    }
1400
1401    #[test]
1402    fn stale_keys_detects_removed_entries() {
1403        let dir = TempDir::new().unwrap();
1404        let path = dir.path().join("test.snap");
1405
1406        let mut w = SnapshotWriter::open(&path, u64::MAX).unwrap();
1407        w.write_update(&put("node.a", b"v1", 1)).unwrap();
1408        w.write_update(&put("node.b", b"v2", 2)).unwrap();
1409        w.write_update(&put("node.c", b"v3", 3)).unwrap();
1410        w.checkpoint(&cursor(3)).unwrap();
1411        drop(w);
1412
1413        let snap = load(&path).unwrap().unwrap();
1414
1415        // Simulate a fresh scan that only has "node.a" and "node.c"
1416        let mut stale = snap.stale_keys(["node.a", "node.c"]);
1417        stale.sort();
1418        assert_eq!(stale, vec!["node.b"]);
1419
1420        // All keys present → no stale
1421        let stale = snap.stale_keys(["node.a", "node.b", "node.c"]);
1422        assert!(stale.is_empty());
1423
1424        // No keys present → all stale
1425        let mut stale: Vec<&str> = snap.stale_keys(std::iter::empty::<&str>());
1426        stale.sort();
1427        assert_eq!(stale, vec!["node.a", "node.b", "node.c"]);
1428    }
1429
1430    #[test]
1431    fn non_u64_version_token_round_trips() {
1432        // Regression: Put/Delete records store the version as length-prefixed raw
1433        // bytes, not a fixed u64. A 10-byte FDB-style versionstamp must survive a
1434        // round-trip intact — the old u64-only field flattened it to 0, which
1435        // would make every later CAS on a restored entry fail RevisionMismatch.
1436        let dir = TempDir::new().unwrap();
1437        let path = dir.path().join("test.snap");
1438
1439        let stamp = [9u8, 8, 7, 6, 5, 4, 3, 2, 1, 0];
1440        let token = VersionToken::from_fdb_versionstamp(&stamp);
1441        assert!(token.as_u64().is_none(), "10-byte token is not a u64");
1442
1443        let mut w = SnapshotWriter::open(&path, u64::MAX).unwrap();
1444        w.write_update(&KvUpdate::Put(KvEntry {
1445            key: "fdb.key".to_string(),
1446            value: b"v".to_vec(),
1447            version: token.clone(),
1448        }))
1449        .unwrap();
1450        w.checkpoint(&cursor(1)).unwrap();
1451        drop(w);
1452
1453        let snap = load(&path).unwrap().unwrap();
1454        assert_eq!(
1455            snap.entries["fdb.key"].version.as_bytes(),
1456            &stamp,
1457            "versionstamp must survive the snapshot round-trip byte-for-byte"
1458        );
1459    }
1460
1461    /// Proves the `compact()` flush-before-read fix: records written but not yet
1462    /// checkpointed still sit in the BufWriter. Before the fix, `compact()` read
1463    /// the file (missing those records), rewrote it, and the old buffer then
1464    /// flushed into the renamed-away inode — silently dropping the writes. With
1465    /// the leading flush, an un-checkpointed write survives compaction.
1466    #[test]
1467    fn compact_preserves_uncheckpointed_writes() {
1468        let dir = TempDir::new().unwrap();
1469        let path = dir.path().join("test.snap");
1470
1471        let mut w = SnapshotWriter::open(&path, u64::MAX).unwrap();
1472        // Buffered only — deliberately NO checkpoint() before compacting.
1473        w.write_update(&put("node.a", b"survives", 1)).unwrap();
1474        w.compact().unwrap();
1475        drop(w);
1476
1477        let snap = load(&path).unwrap().unwrap();
1478        assert!(
1479            snap.entries.contains_key("node.a"),
1480            "compact() must not drop buffered-but-uncheckpointed writes"
1481        );
1482        assert_eq!(snap.entries["node.a"].value, b"survives");
1483    }
1484}