Skip to main content

slipstream/
snapshot.rs

1//! Append-only snapshot log for KV state.
2//!
3//! Streams [`KvUpdate`]s to disk as they arrive via [`SnapshotWriter`],
4//! with no in-memory state beyond a file handle and byte counter.
5//! On startup, [`load`] replays the log to reconstruct entries + cursor,
6//! then compacts the file.
7//!
8//! ## File format
9//!
10//! 6-byte header followed by a sequence of CRC'd records:
11//!
12//! ```text
13//! Header:  b"PGSS" ++ version:u16le
14//! Record:  crc32:u32le ++ type:u8 ++ payload (varies by type)
15//!
16//! Put:     key_len:u16le ++ key ++ value_len:u32le ++ value ++ ver_len:u8 ++ version
17//! Delete:  key_len:u16le ++ key ++ ver_len:u8 ++ version
18//! Cursor:  cur_len:u8 ++ cursor
19//! ```
20//!
21//! `version` is the raw [`VersionToken`] bytes (≤10), not a fixed u64 — a
22//! 10-byte FDB versionstamp round-trips intact, where a `u64`-only field would
23//! silently flatten it to 0 and break every later CAS.
24//!
25//! Used by edge/tunnel services to survive restarts without a full
26//! NATS KV scan. The snapshot is a cache — delete it and the system
27//! falls back to `load_all()`.
28//!
29//! ## Blocking I/O
30//!
31//! Every function in this module performs **synchronous** file I/O and is
32//! deliberately runtime-agnostic — it pulls in no async runtime so a caller can
33//! place it on whichever executor (or none) it wants. The flip side is that none
34//! of these calls may run directly on an async executor thread: [`load`],
35//! [`SnapshotWriter::compact`], and friends `read`/`write`/`rename` whole files
36//! and will stall the reactor if awaited inline. Async callers must offload them
37//! with `tokio::task::spawn_blocking` (or the equivalent).
38//! [`SnapshotWriter::compact`] is the heaviest — it reads and rewrites the entire
39//! log — but the rule is the same for all of them.
40
41use std::collections::{HashMap, HashSet};
42use std::fs::{self, File, OpenOptions};
43use std::io::{self, Write};
44use std::path::{Path, PathBuf};
45
46use crate::artifact::{ExportManifest, ExportStage, verify_and_stage_import};
47use crate::kv::{KvEntry, KvUpdate, VersionToken, WatchCursor};
48
49const MAGIC: &[u8; 4] = b"PGSS";
50// v2: Put/Delete records store the version as length-prefixed raw bytes instead
51// of a fixed 8-byte u64, so non-u64 tokens (e.g. FDB versionstamps) survive a
52// round-trip. v1 files are rejected by `replay_log`; the snapshot is a cache, so
53// a rejected file just triggers a fresh NATS scan + watch replay.
54const FORMAT_VERSION: u16 = 2;
55const HEADER_LEN: usize = 6;
56
57const REC_PUT: u8 = 0x01;
58const REC_DELETE: u8 = 0x02;
59const REC_CURSOR: u8 = 0x03;
60
61// Minimum complete record sizes (CRC + type + minimum payload)
62const MIN_CURSOR_RECORD: usize = 4 + 1 + 1; // 6
63
64/// Errors from snapshot operations.
65///
66/// Uses `thiserror` to match [`crate::KvError`]; unlike `KvError` (which is
67/// `Clone` and so flattens its causes to strings), snapshot errors are observed
68/// by a single caller, so `Io` keeps its `#[source]` chain via `#[from]`.
69#[derive(Debug, thiserror::Error)]
70pub enum SnapshotError {
71    #[error("snapshot I/O error: {0}")]
72    Io(#[from] io::Error),
73    #[error("invalid snapshot format: {0}")]
74    InvalidFormat(String),
75    #[error("snapshot corrupted (CRC mismatch)")]
76    Corrupted,
77    /// A pluggable [`SnapshotStore`] backend (e.g. the `fjall` on-disk store)
78    /// reported an error. Kept backend-agnostic — no backend type leaks into this
79    /// enum's signature — so enabling a backend feature does not change the public
80    /// error surface.
81    #[error("snapshot backend error: {0}")]
82    Backend(String),
83    /// An export artifact failed validation: malformed/mismatched manifest,
84    /// checksum mismatch, version-policy rejection, unavailable destination, or a
85    /// post-import cursor that disagrees with the manifest.
86    ///
87    /// Distinct from [`InvalidFormat`](Self::InvalidFormat) (a *store-file*
88    /// problem — "my local fold is broken") because the recovery is different: an
89    /// invalid artifact means "fetch another artifact or fall back to a full
90    /// scan", never "repair the local store".
91    #[error("invalid snapshot artifact: {0}")]
92    ArtifactInvalid(String),
93}
94
95/// Result of loading a snapshot from disk.
96#[derive(Debug)]
97pub struct Snapshot {
98    /// Watch cursor at the time of the last checkpoint.
99    pub cursor: WatchCursor,
100    /// Live KV entries keyed by name (deduplicated, deletes applied).
101    pub entries: HashMap<String, KvEntry>,
102}
103
104impl Snapshot {
105    /// Keys present in this snapshot but absent from a fresh scan result.
106    ///
107    /// After a cursor-expired fallback to full `watch_all()`, callers should
108    /// compare the snapshot against the live key set and emit synthetic
109    /// `Delete` events for stale keys to ensure convergence.
110    pub fn stale_keys<'a, I>(&'a self, current_keys: I) -> Vec<&'a str>
111    where
112        I: IntoIterator<Item = &'a str>,
113    {
114        let current: HashSet<&str> = current_keys.into_iter().collect();
115        self.entries
116            .keys()
117            .filter(|k| !current.contains(k.as_str()))
118            .map(|k| k.as_str())
119            .collect()
120    }
121}
122
123/// Append-only snapshot writer.
124///
125/// Streams [`KvUpdate`] records to disk via a buffered writer. No
126/// in-memory state beyond the file handle and a byte counter for
127/// compaction triggering.
128///
129/// Compacts automatically when bytes written since last compaction
130/// exceeds `compact_threshold`. Compaction replays the log into a
131/// transient [`HashMap`], rewrites via tempfile+rename, and reopens
132/// for append.
133pub struct SnapshotWriter {
134    path: PathBuf,
135    // `None` only after a `compact()` rewrote the file (atomic rename succeeded)
136    // but failed to reopen it for append. The old handle then pointed at the
137    // renamed-away inode; rather than keep writing into that orphan — silently
138    // losing every later record on close — we drop it and poison the writer so
139    // `write_update`/`checkpoint`/`flush` return an error until reconstructed.
140    writer: Option<io::BufWriter<File>>,
141    bytes_since_compact: u64,
142    compact_threshold: u64,
143}
144
145impl SnapshotWriter {
146    /// Open or create a snapshot log.
147    ///
148    /// If the file doesn't exist, writes the header. If it exists, opens
149    /// for append. `compact_threshold` controls how many bytes accumulate
150    /// before an automatic compaction.
151    pub fn open(path: &Path, compact_threshold: u64) -> Result<Self, SnapshotError> {
152        // Open first, then size the file from its own handle. The previous
153        // `path.exists()` + `fs::metadata(path)` pair issued two extra `stat(2)`
154        // syscalls and left a TOCTOU window (the file could be created or removed
155        // between the checks). One `open(2)` plus a handle `metadata()` is both
156        // fewer syscalls and race-free — the length we read is the length of the
157        // file we hold open.
158        let file = OpenOptions::new().create(true).append(true).open(path)?;
159        let existing_len = file.metadata()?.len();
160
161        let mut writer = io::BufWriter::new(file);
162
163        // A file with at least a full header is an existing log: everything past
164        // the 6-byte header counts toward the compaction threshold. Anything
165        // shorter (brand-new, or a header torn by a crash mid-create) gets a
166        // fresh header written before the first record.
167        let bytes_since_compact = if existing_len >= HEADER_LEN as u64 {
168            existing_len - HEADER_LEN as u64
169        } else {
170            writer.write_all(MAGIC)?;
171            writer.write_all(&FORMAT_VERSION.to_le_bytes())?;
172            writer.flush()?;
173            0
174        };
175
176        Ok(Self {
177            path: path.to_path_buf(),
178            writer: Some(writer),
179            bytes_since_compact,
180            compact_threshold,
181        })
182    }
183
184    /// Borrow the underlying writer, or fail if a prior `compact()` poisoned it
185    /// (see the `writer` field doc). Keeps the orphaned-fd failure mode a
186    /// surfaced error rather than a silent data loss.
187    fn writer(&mut self) -> Result<&mut io::BufWriter<File>, SnapshotError> {
188        self.writer.as_mut().ok_or_else(|| {
189            SnapshotError::Io(io::Error::other(
190                "snapshot writer poisoned: a prior compact() failed to reopen the log for append",
191            ))
192        })
193    }
194
195    /// Write a single [`KvUpdate`] record to the log.
196    ///
197    /// Buffered — does not flush to disk until [`checkpoint`](Self::checkpoint).
198    #[must_use = "I/O errors mean the write was lost"]
199    pub fn write_update(&mut self, update: &KvUpdate) -> Result<(), SnapshotError> {
200        let w = self.writer()?;
201        let bytes = match update {
202            KvUpdate::Put(entry) => write_put_record(w, &entry.key, &entry.value, &entry.version)?,
203            KvUpdate::Delete { key, version } | KvUpdate::Purge { key, version } => {
204                write_delete_record(w, key, version)?
205            }
206        };
207        self.bytes_since_compact += bytes as u64;
208        Ok(())
209    }
210
211    /// Write a cursor checkpoint and flush the buffer to the OS.
212    ///
213    /// The flush is a `write(2)` into the page cache — it survives a process
214    /// crash, but NOT power loss: there is no `fsync` on this path. The durable
215    /// `sync_all` happens in [`compact`](Self::compact). That's deliberate — the
216    /// snapshot is a cache backed by NATS and checkpoints are frequent, so an
217    /// fsync per checkpoint isn't worth its latency; a tail lost to power loss is
218    /// rebuilt from a NATS scan + watch replay.
219    ///
220    /// Returns `true` when the log has grown past the compaction threshold
221    /// and the caller should run [`compact`](Self::compact). Separating
222    /// the check from the I/O lets async callers offload compaction to a
223    /// blocking task instead of stalling the executor.
224    #[must_use = "returns true when compaction is needed"]
225    pub fn checkpoint(&mut self, cursor: &WatchCursor) -> Result<bool, SnapshotError> {
226        let w = self.writer()?;
227        let bytes = write_cursor_record(w, cursor)?;
228        w.flush()?;
229        self.bytes_since_compact += bytes as u64;
230        Ok(self.bytes_since_compact > self.compact_threshold)
231    }
232
233    /// Flush the buffer to disk without writing a cursor record.
234    #[must_use = "I/O errors mean the flush failed"]
235    pub fn flush(&mut self) -> Result<(), SnapshotError> {
236        self.writer()?.flush()?;
237        Ok(())
238    }
239
240    /// Compact the snapshot log: replay, deduplicate, and rewrite.
241    ///
242    /// Performs synchronous file I/O. In async contexts, run via
243    /// `spawn_blocking` to avoid stalling the executor.
244    #[must_use = "compaction errors leave the log uncompacted"]
245    pub fn compact(&mut self) -> Result<(), SnapshotError> {
246        // Flush buffered records to the file before reading it back. Records
247        // written since the last `checkpoint()` still sit in the BufWriter; if we
248        // read+rewrite without flushing, they're excluded from the replay, and
249        // the old BufWriter then flushes them on drop into the inode we just
250        // renamed away — silently losing them. Flushing first makes `compact()`
251        // safe regardless of whether a `checkpoint()` preceded it.
252        self.writer()?.flush()?;
253        let data = fs::read(&self.path)?;
254        let (entries, cursor, _already_compact) = replay_log(&data)?;
255        compact_to_file(&self.path, &entries, &cursor)?;
256
257        // The rename in `compact_to_file` has already replaced the file, so the
258        // current handle now points at the orphaned (renamed-away) inode. Drop it
259        // *before* reopening: if the reopen fails, `writer` stays `None` and the
260        // writer is poisoned (see field doc), turning a would-be silent loss into
261        // a surfaced error on the next write.
262        self.writer = None;
263        let file = OpenOptions::new().append(true).open(&self.path)?;
264        self.writer = Some(io::BufWriter::new(file));
265        self.bytes_since_compact = 0;
266
267        Ok(())
268    }
269}
270
271/// Load a snapshot from disk.
272///
273/// Replays the append log, deduplicates (last write wins per key),
274/// compacts the file, and returns the live entries + cursor.
275///
276/// Returns `Ok(None)` if the file doesn't exist or contains no entries.
277pub fn load(path: &Path) -> Result<Option<Snapshot>, SnapshotError> {
278    let data = match fs::read(path) {
279        Ok(d) => d,
280        Err(e) if e.kind() == io::ErrorKind::NotFound => return Ok(None),
281        Err(e) => return Err(SnapshotError::Io(e)),
282    };
283
284    let (entries, cursor, already_compact) = replay_log(&data)?;
285
286    if entries.is_empty() && cursor.is_none() {
287        return Ok(None);
288    }
289
290    // Skip the rewrite when the log is already in compact form (no duplicate
291    // keys, no deletes, no truncated tail). Avoids a full read+write on every
292    // startup after the first. A truncated tail forces a rewrite even when the
293    // surviving records are unique: leaving the partial record on disk would
294    // let the next append land after it, corrupting the log.
295    if !already_compact {
296        compact_to_file(path, &entries, &cursor)?;
297    }
298
299    Ok(Some(Snapshot { cursor, entries }))
300}
301
302// ---------------------------------------------------------------------------
303// The durable-fold contract
304// ---------------------------------------------------------------------------
305
306/// A durable, resumable, queryable fold of a KV watch stream.
307///
308/// This is the primitive slipstream owns: a place to apply a stream of
309/// [`KvUpdate`]s such that, after any restart, the fold resumes from exactly
310/// where it left off and stays a faithful function of the log. [`watch_applied`]
311/// drives it; consumers pick the backend.
312///
313/// [`watch_applied`]: crate::watch_applied
314///
315/// ## Invariants every implementation must hold
316///
317/// - **Pure function of the log.** Delete the store, replay every update with
318///   revision `>` the persisted cursor, and you get byte-identical state. The
319///   store caches the fold; it is never the source of truth (that is NATS).
320/// - **Cursor-after-apply.** A persisted cursor `C` implies every update with
321///   revision `≤ C` is durably folded in. [`apply`](Self::apply) writes data and
322///   cursor together so the cursor never names a revision whose data is absent —
323///   on a transactional backend in one txn, on the append log data-then-cursor
324///   (a torn write leaves data *ahead* of the cursor, which replay re-folds, never
325///   skips).
326/// - **Snapshot is a cache.** Any tail lost to power loss (under a no-sync
327///   durability mode) is rebuilt by resuming the watch from the recovered cursor;
328///   never a correctness failure.
329///
330/// ## Threading
331///
332/// Methods are **synchronous** and may block on I/O — the same runtime-agnostic
333/// discipline as the rest of this module. [`watch_applied`] offloads
334/// [`apply`](Self::apply) to a blocking task; a consumer calling
335/// [`get`](Self::get)/[`range`](Self::range) from an async context should do the
336/// same.
337pub trait SnapshotStore: Sized + Send {
338    /// Open (or resume) the store at `path`.
339    ///
340    /// Returns the persisted resume cursor — [`WatchCursor::none`] when the store
341    /// is fresh — and the store ready to [`apply`](Self::apply)/query. Pass the
342    /// returned cursor to [`watch_applied`](crate::watch_applied) as the resume
343    /// position.
344    ///
345    /// Backends with tuning knobs (compaction threshold, sync mode) expose them
346    /// on their own constructors; this uses safe defaults.
347    fn load(path: &Path) -> Result<(WatchCursor, Self), SnapshotError>;
348
349    /// Atomically fold `batch` into the store and advance the resume cursor.
350    ///
351    /// Data and cursor become durable together (see the cursor-after-apply
352    /// invariant). `batch` is the raw, revision-ordered updates received since the
353    /// last flush — including ones a consumer's `parse` rejected, since they are
354    /// still part of the bucket's state. `cursor` is the highest revision received
355    /// in the batch.
356    fn apply(&mut self, batch: &[KvUpdate], cursor: &WatchCursor) -> Result<(), SnapshotError>;
357
358    /// Look up the live entry for `key`. `None` if absent or deleted.
359    fn get(&self, key: &str) -> Result<Option<KvEntry>, SnapshotError>;
360
361    /// All live entries whose key starts with `prefix`, in ascending key order.
362    ///
363    /// Buffers the whole match set into a `Vec`. Convenient for the bounded
364    /// prefixes an in-RAM consumer scans, but a broad prefix against an on-disk
365    /// fold (the 1B-route case an on-disk backend exists for) materializes every
366    /// match at once — and an empty `prefix` is an unbounded full scan that
367    /// buffers the *entire* fold. Use
368    /// [`for_each_in_range`](Self::for_each_in_range) for either.
369    fn range(&self, prefix: &str) -> Result<Vec<KvEntry>, SnapshotError>;
370
371    /// Stream every live entry whose key starts with `prefix`, in ascending key
372    /// order, invoking `f` once per entry — without buffering the whole match
373    /// set in memory.
374    ///
375    /// This is the memory-bounded counterpart to [`range`](Self::range). The
376    /// reason to pick an on-disk backend is a fold too large for RAM; for such a
377    /// consumer a broad [`range`](Self::range) defeats the purpose by collecting
378    /// every match into one `Vec`. `f` returning `Err` stops the scan early and
379    /// propagates that error.
380    ///
381    /// The provided implementation delegates to [`range`](Self::range) — correct
382    /// for in-RAM backends, where the fold already fits in memory. On-disk
383    /// backends override it to stream straight from storage.
384    fn for_each_in_range(
385        &self,
386        prefix: &str,
387        mut f: impl FnMut(KvEntry) -> Result<(), SnapshotError>,
388    ) -> Result<(), SnapshotError> {
389        for entry in self.range(prefix)? {
390            f(entry)?;
391        }
392        Ok(())
393    }
394
395    /// The most recently applied (and durably persisted) resume cursor —
396    /// [`WatchCursor::none`] when nothing has been applied.
397    fn cursor(&self) -> WatchCursor;
398
399    /// Export this fold as a transferable **artifact directory** at `dest_dir`:
400    /// the backend's files under `data/` plus a `MANIFEST.json` carrying
401    /// per-file checksums, the backend's identity and on-disk format generation,
402    /// and the watch cursor the payload is exactly consistent with
403    /// (== [`cursor`](Self::cursor) at the moment of export).
404    ///
405    /// This is the bootstrap primitive for replicas of a **bounded** log: ship
406    /// the artifact to a new/rebuilding node, import it there, and resume the
407    /// watch from the embedded cursor — replaying only the log tail instead of
408    /// a full state that the log may no longer hold.
409    ///
410    /// ## Contract
411    ///
412    /// - `dest_dir` must not exist (or be an empty directory). Never overwrites.
413    /// - On `Ok`, the artifact at `dest_dir` is **complete and fsynced**: it was
414    ///   assembled in a temp directory, the manifest written last, and atomically
415    ///   renamed into place. A crash mid-export leaves no artifact at `dest_dir`.
416    /// - `&mut self` is deliberate: export must not run concurrently with
417    ///   [`apply`](Self::apply), which is what makes the embedded cursor exact.
418    ///   Inside [`watch_applied`](crate::watch_applied), use its export-request
419    ///   channel rather than calling this directly.
420    /// - **Artifacts are transient.** Directory-copy backends hardlink immutable
421    ///   files where possible, so a lingering artifact pins storage the source
422    ///   later compacts away — upload (or move) it, then delete it. Stage and
423    ///   destination should be on the same filesystem as the fold; a
424    ///   cross-filesystem destination silently degrades hardlinks to full copies.
425    /// - Blocking I/O, like everything on this trait.
426    fn export_to(&mut self, dest_dir: &Path) -> Result<ExportManifest, SnapshotError>;
427}
428
429// ---------------------------------------------------------------------------
430// AppendLogSnapshot: the default, pure-Rust backend
431// ---------------------------------------------------------------------------
432
433/// Default compaction threshold for [`AppendLogSnapshot::load`]: 10 MiB of
434/// appended records before a compaction is triggered. Matches the value the
435/// hand-rolled callers used; tune via [`AppendLogSnapshot::open`].
436pub const DEFAULT_COMPACT_THRESHOLD: u64 = 10 * 1024 * 1024;
437
438/// The append-only CRC log as a [`SnapshotStore`] (the default backend).
439///
440/// Wraps the existing [`SnapshotWriter`] + [`load`] machinery — same v2 on-disk
441/// format, same CRC framing, same FDB-versionstamp-safe cursor encoding, so files
442/// written by either path are mutually loadable. Keeps the whole fold in a
443/// [`HashMap`] in RAM to serve [`get`](SnapshotStore::get)/[`range`](SnapshotStore::range),
444/// which is exactly what edge/tunnel-style consumers already do — small state,
445/// pure Rust. A consumer that cannot hold its fold in RAM wants an on-disk
446/// backend instead (e.g. the `fjall` feature).
447pub struct AppendLogSnapshot {
448    writer: SnapshotWriter,
449    entries: HashMap<String, KvEntry>,
450    cursor: WatchCursor,
451}
452
453impl AppendLogSnapshot {
454    /// Backend identity in artifact manifests.
455    pub(crate) const BACKEND: &'static str = "append-log";
456    /// On-disk format generation in artifact manifests (the append log's
457    /// [`FORMAT_VERSION`]).
458    pub(crate) const BACKEND_VERSION: &'static str = "2";
459    /// The single payload file inside an artifact, relative to its `data/` dir.
460    const ARTIFACT_PAYLOAD: &'static str = "fold.snap";
461
462    /// Open or resume the log at `path` with an explicit compaction threshold.
463    ///
464    /// Replays any existing log into the in-RAM fold (and compacts it, exactly as
465    /// [`load`] does), then opens the writer for append. Returns the resume cursor
466    /// alongside the store.
467    pub fn open(path: &Path, compact_threshold: u64) -> Result<(WatchCursor, Self), SnapshotError> {
468        let (cursor, entries) = match load(path)? {
469            Some(snap) => (snap.cursor, snap.entries),
470            None => (WatchCursor::none(), HashMap::new()),
471        };
472        let writer = SnapshotWriter::open(path, compact_threshold)?;
473        Ok((
474            cursor.clone(),
475            Self {
476                writer,
477                entries,
478                cursor,
479            },
480        ))
481    }
482
483    /// Import an exported artifact (see [`SnapshotStore::export_to`]) as a new
484    /// fold at `dest_path`, returning the embedded resume cursor and the opened
485    /// store.
486    ///
487    /// `dest_path` is a **file** path (this backend is a single log file) and
488    /// must not already exist. The artifact is fully verified against its
489    /// manifest — checksums, backend identity, format generation — and the
490    /// staged copy is loaded and its cursor compared against the manifest's
491    /// before anything lands at `dest_path`; a bad artifact never becomes a
492    /// fold. A crash mid-import leaves nothing at `dest_path`; a crash after
493    /// the final rename leaves a valid fold (a retried import then refuses the
494    /// existing destination — just [`open`](Self::open) it).
495    pub fn import(
496        artifact_dir: &Path,
497        dest_path: &Path,
498        compact_threshold: u64,
499    ) -> Result<(WatchCursor, Self), SnapshotError> {
500        let (manifest, stage) =
501            verify_and_stage_import(artifact_dir, dest_path, Self::BACKEND, |v| {
502                if v == Self::BACKEND_VERSION {
503                    Ok(())
504                } else {
505                    Err(SnapshotError::ArtifactInvalid(format!(
506                        "append-log artifact has format generation {v:?}, this build reads {:?}",
507                        Self::BACKEND_VERSION
508                    )))
509                }
510            })?;
511
512        // This backend's artifact is exactly one payload file.
513        let expected = format!(
514            "{}/{}",
515            crate::artifact::PAYLOAD_DIR,
516            Self::ARTIFACT_PAYLOAD
517        );
518        if manifest.files.len() != 1 || manifest.files[0].path != expected {
519            return Err(SnapshotError::ArtifactInvalid(format!(
520                "append-log artifact must contain exactly {expected:?}"
521            )));
522        }
523
524        // Verify the staged payload opens and agrees with the manifest cursor
525        // BEFORE the rename — a bad artifact must never land at the destination.
526        let staged_file = stage.payload().join(Self::ARTIFACT_PAYLOAD);
527        let staged_cursor = match load(&staged_file)? {
528            Some(snap) => snap.cursor,
529            // An empty fold exports to an artifact whose log holds no entries
530            // and no cursor; `load` reports that as `None`.
531            None => WatchCursor::none(),
532        };
533        if staged_cursor != manifest.cursor {
534            return Err(SnapshotError::ArtifactInvalid(format!(
535                "payload cursor {staged_cursor:?} disagrees with manifest cursor {:?}",
536                manifest.cursor
537            )));
538        }
539
540        stage.finalize_file(Self::ARTIFACT_PAYLOAD)?;
541        Self::open(dest_path, compact_threshold)
542    }
543}
544
545impl SnapshotStore for AppendLogSnapshot {
546    fn load(path: &Path) -> Result<(WatchCursor, Self), SnapshotError> {
547        Self::open(path, DEFAULT_COMPACT_THRESHOLD)
548    }
549
550    fn apply(&mut self, batch: &[KvUpdate], cursor: &WatchCursor) -> Result<(), SnapshotError> {
551        // Stream the raw records, then the cursor checkpoint — data-then-cursor,
552        // so a torn write leaves data ahead of the cursor (re-folded on resume),
553        // never a cursor ahead of its data. The in-RAM fold mirrors the same
554        // mutations so get/range stay consistent with what was just durably written.
555        for update in batch {
556            self.writer.write_update(update)?;
557            match update {
558                KvUpdate::Put(entry) => {
559                    self.entries.insert(entry.key.clone(), entry.clone());
560                }
561                KvUpdate::Delete { key, .. } | KvUpdate::Purge { key, .. } => {
562                    self.entries.remove(key);
563                }
564            }
565        }
566        // checkpoint() flushes the buffered records + the cursor record to the OS;
567        // it returns true when the log has grown past the compaction threshold, in
568        // which case we compact inline (this whole call already runs off the hot
569        // path via spawn_blocking in watch_applied).
570        if self.writer.checkpoint(cursor)? {
571            self.writer.compact()?;
572        }
573        self.cursor = cursor.clone();
574        Ok(())
575    }
576
577    fn get(&self, key: &str) -> Result<Option<KvEntry>, SnapshotError> {
578        Ok(self.entries.get(key).cloned())
579    }
580
581    fn range(&self, prefix: &str) -> Result<Vec<KvEntry>, SnapshotError> {
582        let mut out: Vec<KvEntry> = self
583            .entries
584            .values()
585            .filter(|e| e.key.starts_with(prefix))
586            .cloned()
587            .collect();
588        out.sort_unstable_by(|a, b| a.key.cmp(&b.key));
589        Ok(out)
590    }
591
592    fn cursor(&self) -> WatchCursor {
593        self.cursor.clone()
594    }
595
596    fn export_to(&mut self, dest_dir: &Path) -> Result<ExportManifest, SnapshotError> {
597        let stage = ExportStage::new(dest_dir)?;
598        fs::create_dir(stage.payload())?;
599        // The in-RAM fold is the complete live state, so the artifact payload is
600        // simply a compacted log written from it — same sorted, CRC'd, synced v2
601        // file `compact_to_file` produces for the store itself. No need to flush
602        // or read back the live log file.
603        let payload = stage.payload().join(Self::ARTIFACT_PAYLOAD);
604        compact_to_file(&payload, &self.entries, &self.cursor)?;
605
606        // Verify the payload by loading it back: cursor and entry count must
607        // match the live fold. Cheap for the in-RAM backend, and keeps the
608        // "an artifact that exists is trustworthy" guarantee uniform across
609        // backends.
610        let (loaded_cursor, loaded_len) = match load(&payload)? {
611            Some(snap) => (snap.cursor, snap.entries.len()),
612            None => (WatchCursor::none(), 0),
613        };
614        if loaded_cursor != self.cursor || loaded_len != self.entries.len() {
615            return Err(SnapshotError::ArtifactInvalid(format!(
616                "exported payload disagrees with live fold (cursor {loaded_cursor:?} vs {:?}, \
617                 {loaded_len} vs {} entries)",
618                self.cursor,
619                self.entries.len()
620            )));
621        }
622
623        stage.seal_and_finalize(Self::BACKEND, Self::BACKEND_VERSION, &self.cursor)
624    }
625}
626
627// ---------------------------------------------------------------------------
628// Internal: log replay
629// ---------------------------------------------------------------------------
630
631/// Replay the append log into the live key set.
632///
633/// Records borrow directly from `data` — keys and values are not allocated
634/// until the surviving set is materialized at the end, so overwritten or
635/// deleted records cost nothing. Returns the entries, the latest cursor, and
636/// whether the log was already compact (no duplicate keys, no deletes, no
637/// truncated tail) so callers can skip a redundant rewrite.
638fn replay_log(data: &[u8]) -> Result<(HashMap<String, KvEntry>, WatchCursor, bool), SnapshotError> {
639    if data.len() < HEADER_LEN {
640        return Err(SnapshotError::InvalidFormat("file too short".into()));
641    }
642    if &data[0..4] != MAGIC {
643        return Err(SnapshotError::InvalidFormat("bad magic".into()));
644    }
645    let version = u16::from_le_bytes([data[4], data[5]]);
646    if version != FORMAT_VERSION {
647        return Err(SnapshotError::InvalidFormat(format!(
648            "unsupported version {version}"
649        )));
650    }
651
652    // Upper-bound the working set by data size to avoid rehashing on large
653    // snapshots, capped so a tiny-record file can't request a huge table.
654    let estimated = (data.len() - HEADER_LEN) / 30;
655    let mut live: HashMap<&str, (&[u8], VersionToken)> =
656        HashMap::with_capacity(estimated.min(4096));
657    let mut cursor = WatchCursor::none();
658    let mut pos = HEADER_LEN;
659
660    // The log is compact only if every record contributed a unique surviving
661    // entry and we consumed the file cleanly. Any overwrite, delete, or
662    // truncated tail means a rewrite would shrink or repair the file.
663    let mut redundant = false;
664    let mut clean_eof = true;
665
666    while pos < data.len() {
667        match parse_record(&data[pos..]) {
668            Ok((record, consumed)) => {
669                match record {
670                    Record::Put {
671                        key,
672                        value,
673                        version,
674                    } => {
675                        if live.insert(key, (value, version)).is_some() {
676                            redundant = true;
677                        }
678                    }
679                    Record::Delete { key } => {
680                        live.remove(key);
681                        redundant = true;
682                    }
683                    Record::Cursor(c) => {
684                        // Each new cursor record supersedes the previous one, so
685                        // any earlier cursor on disk is now dead weight. Mark the
686                        // log redundant so `load()` rewrites it — otherwise a
687                        // service that checkpoints frequently accumulates one
688                        // cursor record per checkpoint and `already_compact` stays
689                        // true (entries are unique, EOF is clean), leaving the
690                        // bloat in place until the writer's own threshold fires.
691                        if !cursor.is_none() {
692                            redundant = true;
693                        }
694                        cursor = c;
695                    }
696                }
697                pos += consumed;
698            }
699            Err(RecordError::Truncated) => {
700                // KNOWN LIMITATION: a record's length is read from its (CRC-but-
701                // not-yet-verified) length fields, so corruption that inflates a
702                // key_len/value_len makes the record look like it runs past EOF —
703                // indistinguishable here from a genuine torn final write. Both
704                // land as `Truncated`, so we stop and silently drop everything
705                // after this point. Detecting it would need a framed, separately-
706                // checksummed length (a format change). Acceptable because the
707                // snapshot is a cache: a short read just triggers a fuller NATS
708                // scan + watch replay, never data loss of record.
709                clean_eof = false;
710                break;
711            }
712            Err(RecordError::CrcMismatch { consumed }) => {
713                // Near EOF → crash recovery (partial final write).
714                // Otherwise → mid-file corruption.
715                if pos + consumed >= data.len() || data.len() - (pos + consumed) < MIN_CURSOR_RECORD
716                {
717                    clean_eof = false;
718                    break;
719                }
720                return Err(SnapshotError::Corrupted);
721            }
722            Err(RecordError::Invalid(msg)) => {
723                return Err(SnapshotError::InvalidFormat(msg));
724            }
725        }
726    }
727
728    // Materialize owned entries for the survivors only — one key + one value
729    // allocation per live key, instead of per record.
730    let mut entries: HashMap<String, KvEntry> = HashMap::with_capacity(live.len());
731    for (key, (value, version)) in live {
732        let key = key.to_string();
733        entries.insert(
734            key.clone(),
735            KvEntry {
736                key,
737                value: value.to_vec(),
738                version,
739            },
740        );
741    }
742
743    let already_compact = !redundant && clean_eof;
744    Ok((entries, cursor, already_compact))
745}
746
747enum Record<'a> {
748    Put {
749        key: &'a str,
750        value: &'a [u8],
751        version: VersionToken,
752    },
753    Delete {
754        key: &'a str,
755    },
756    Cursor(WatchCursor),
757}
758
759enum RecordError {
760    Truncated,
761    CrcMismatch { consumed: usize },
762    Invalid(String),
763}
764
765fn parse_record(data: &[u8]) -> Result<(Record<'_>, usize), RecordError> {
766    // Need at least CRC (4) + type (1)
767    if data.len() < 5 {
768        return Err(RecordError::Truncated);
769    }
770
771    let stored_crc = u32::from_le_bytes([data[0], data[1], data[2], data[3]]);
772
773    match data[4] {
774        REC_PUT => parse_put(data, stored_crc),
775        REC_DELETE => parse_delete(data, stored_crc),
776        REC_CURSOR => parse_cursor(data, stored_crc),
777        other => Err(RecordError::Invalid(format!(
778            "unknown record type: {other:#x}"
779        ))),
780    }
781}
782
783fn parse_put(data: &[u8], stored_crc: u32) -> Result<(Record<'_>, usize), RecordError> {
784    // CRC(4) + type(1) + key_len(2) = 7 minimum to read key_len
785    if data.len() < 7 {
786        return Err(RecordError::Truncated);
787    }
788    let key_len = u16::from_le_bytes([data[5], data[6]]) as usize;
789    let vl_off = 7 + key_len;
790
791    if data.len() < vl_off + 4 {
792        return Err(RecordError::Truncated);
793    }
794    let value_len = u32::from_le_bytes([
795        data[vl_off],
796        data[vl_off + 1],
797        data[vl_off + 2],
798        data[vl_off + 3],
799    ]) as usize;
800
801    // ver_len byte sits right after the value.
802    let ver_len_off = vl_off + 4 + value_len;
803    if data.len() < ver_len_off + 1 {
804        return Err(RecordError::Truncated);
805    }
806    let ver_len = data[ver_len_off] as usize;
807    // A version token holds at most 10 bytes inline. A larger length means a
808    // corrupt or incompatible record — reject it rather than letting it reach
809    // `VersionToken::from_raw`, which would panic.
810    if ver_len > 10 {
811        return Err(RecordError::Invalid(format!(
812            "version length {ver_len} exceeds max version token size (10)"
813        )));
814    }
815
816    let total = ver_len_off + 1 + ver_len;
817    if data.len() < total {
818        return Err(RecordError::Truncated);
819    }
820
821    let computed = crc32fast::hash(&data[4..total]);
822    if computed != stored_crc {
823        return Err(RecordError::CrcMismatch { consumed: total });
824    }
825
826    let key = std::str::from_utf8(&data[7..7 + key_len])
827        .map_err(|e| RecordError::Invalid(format!("invalid UTF-8 key: {e}")))?;
828    let value = &data[vl_off + 4..vl_off + 4 + value_len];
829    // The `ver_len > 10` check above bounds this to the inline capacity, so
830    // `from_raw` always returns `Some` here; the guard makes that explicit.
831    let version = VersionToken::from_raw(&data[ver_len_off + 1..total]).ok_or_else(|| {
832        RecordError::Invalid(format!(
833            "version length {ver_len} exceeds max version token size (10)"
834        ))
835    })?;
836
837    Ok((
838        Record::Put {
839            key,
840            value,
841            version,
842        },
843        total,
844    ))
845}
846
847fn parse_delete(data: &[u8], stored_crc: u32) -> Result<(Record<'_>, usize), RecordError> {
848    if data.len() < 7 {
849        return Err(RecordError::Truncated);
850    }
851    let key_len = u16::from_le_bytes([data[5], data[6]]) as usize;
852    let ver_len_off = 7 + key_len;
853    if data.len() < ver_len_off + 1 {
854        return Err(RecordError::Truncated);
855    }
856    let ver_len = data[ver_len_off] as usize;
857    // See `parse_put`: reject oversized version lengths before `from_raw`.
858    if ver_len > 10 {
859        return Err(RecordError::Invalid(format!(
860            "version length {ver_len} exceeds max version token size (10)"
861        )));
862    }
863    let total = ver_len_off + 1 + ver_len;
864
865    if data.len() < total {
866        return Err(RecordError::Truncated);
867    }
868
869    let computed = crc32fast::hash(&data[4..total]);
870    if computed != stored_crc {
871        return Err(RecordError::CrcMismatch { consumed: total });
872    }
873
874    let key = std::str::from_utf8(&data[7..7 + key_len])
875        .map_err(|e| RecordError::Invalid(format!("invalid UTF-8 key: {e}")))?;
876    // The delete record's version is written for format symmetry but unused on
877    // replay (a delete just removes the key from the live set).
878
879    Ok((Record::Delete { key }, total))
880}
881
882fn parse_cursor(data: &[u8], stored_crc: u32) -> Result<(Record<'_>, usize), RecordError> {
883    if data.len() < 6 {
884        return Err(RecordError::Truncated);
885    }
886    let cursor_len = data[5] as usize;
887    // A version token holds at most 10 bytes inline. A larger length means a
888    // corrupt or incompatible record — reject it rather than letting it reach
889    // `VersionToken::from_raw`, which would panic.
890    if cursor_len > 10 {
891        return Err(RecordError::Invalid(format!(
892            "cursor length {cursor_len} exceeds max version token size (10)"
893        )));
894    }
895    let total = 6 + cursor_len;
896
897    if data.len() < total {
898        return Err(RecordError::Truncated);
899    }
900
901    let computed = crc32fast::hash(&data[4..total]);
902    if computed != stored_crc {
903        return Err(RecordError::CrcMismatch { consumed: total });
904    }
905
906    // The `cursor_len > 10` check above bounds this to the inline capacity, so
907    // `from_raw` always returns `Some` here; the guard makes that explicit.
908    let version = VersionToken::from_raw(&data[6..total]).ok_or_else(|| {
909        RecordError::Invalid(format!(
910            "cursor length {cursor_len} exceeds max version token size (10)"
911        ))
912    })?;
913
914    Ok((Record::Cursor(WatchCursor::from_version(version)), total))
915}
916
917// ---------------------------------------------------------------------------
918// Internal: record writing (incremental CRC, no allocations)
919// ---------------------------------------------------------------------------
920
921fn write_put_record(
922    w: &mut impl Write,
923    key: &str,
924    value: &[u8],
925    version: &VersionToken,
926) -> Result<usize, SnapshotError> {
927    let kb = key.as_bytes();
928    let vb = version.as_bytes();
929
930    // The wire format encodes key length as u16 and value length as u32.
931    // Reject anything that would truncate on cast — a silent truncation here
932    // produces a record whose CRC covers the full bytes but whose stored length
933    // is wrong, which the reader can only interpret as mid-file corruption.
934    let key_len = u16::try_from(kb.len()).map_err(|_| {
935        SnapshotError::InvalidFormat(format!(
936            "key too long: {} bytes (max {})",
937            kb.len(),
938            u16::MAX
939        ))
940    })?;
941    let value_len = u32::try_from(value.len()).map_err(|_| {
942        SnapshotError::InvalidFormat(format!(
943            "value too long: {} bytes (max {})",
944            value.len(),
945            u32::MAX
946        ))
947    })?;
948    // The version is stored as length-prefixed raw bytes so any backend's token
949    // (NATS u64, FDB 10-byte versionstamp) round-trips intact. `VersionToken`
950    // caps inline storage at 10 bytes, so this `u8` length never truncates today;
951    // checking surfaces a format error rather than corrupting the frame if a
952    // future token widens past 255 bytes.
953    let ver_len = u8::try_from(vb.len()).map_err(|_| {
954        SnapshotError::InvalidFormat(format!(
955            "version too long: {} bytes (max {})",
956            vb.len(),
957            u8::MAX
958        ))
959    })?;
960
961    let mut h = crc32fast::Hasher::new();
962    h.update(&[REC_PUT]);
963    h.update(&key_len.to_le_bytes());
964    h.update(kb);
965    h.update(&value_len.to_le_bytes());
966    h.update(value);
967    h.update(&[ver_len]);
968    h.update(vb);
969    let crc = h.finalize();
970
971    w.write_all(&crc.to_le_bytes())?;
972    w.write_all(&[REC_PUT])?;
973    w.write_all(&key_len.to_le_bytes())?;
974    w.write_all(kb)?;
975    w.write_all(&value_len.to_le_bytes())?;
976    w.write_all(value)?;
977    w.write_all(&[ver_len])?;
978    w.write_all(vb)?;
979
980    Ok(4 + 1 + 2 + kb.len() + 4 + value.len() + 1 + vb.len())
981}
982
983fn write_delete_record(
984    w: &mut impl Write,
985    key: &str,
986    version: &VersionToken,
987) -> Result<usize, SnapshotError> {
988    let kb = key.as_bytes();
989    let vb = version.as_bytes();
990
991    let key_len = u16::try_from(kb.len()).map_err(|_| {
992        SnapshotError::InvalidFormat(format!(
993            "key too long: {} bytes (max {})",
994            kb.len(),
995            u16::MAX
996        ))
997    })?;
998    // Length-prefixed version, matching `write_put_record`. See its comment for
999    // why this is bytes rather than a fixed u64.
1000    let ver_len = u8::try_from(vb.len()).map_err(|_| {
1001        SnapshotError::InvalidFormat(format!(
1002            "version too long: {} bytes (max {})",
1003            vb.len(),
1004            u8::MAX
1005        ))
1006    })?;
1007
1008    let mut h = crc32fast::Hasher::new();
1009    h.update(&[REC_DELETE]);
1010    h.update(&key_len.to_le_bytes());
1011    h.update(kb);
1012    h.update(&[ver_len]);
1013    h.update(vb);
1014    let crc = h.finalize();
1015
1016    w.write_all(&crc.to_le_bytes())?;
1017    w.write_all(&[REC_DELETE])?;
1018    w.write_all(&key_len.to_le_bytes())?;
1019    w.write_all(kb)?;
1020    w.write_all(&[ver_len])?;
1021    w.write_all(vb)?;
1022
1023    Ok(4 + 1 + 2 + kb.len() + 1 + vb.len())
1024}
1025
1026fn write_cursor_record(w: &mut impl Write, cursor: &WatchCursor) -> Result<usize, SnapshotError> {
1027    let cb = cursor.version().as_bytes();
1028    // The record encodes the cursor length as a single byte. `VersionToken`
1029    // caps inline storage at 10 bytes, so this never trips today — but checking
1030    // here rather than casting means a future backend that widens the token
1031    // surfaces a format error instead of silently truncating the length prefix
1032    // (which the reader would then mis-frame as corruption).
1033    let cb_len = u8::try_from(cb.len()).map_err(|_| {
1034        SnapshotError::InvalidFormat(format!(
1035            "cursor too long: {} bytes (max {})",
1036            cb.len(),
1037            u8::MAX
1038        ))
1039    })?;
1040
1041    let mut h = crc32fast::Hasher::new();
1042    h.update(&[REC_CURSOR]);
1043    h.update(&[cb_len]);
1044    h.update(cb);
1045    let crc = h.finalize();
1046
1047    w.write_all(&crc.to_le_bytes())?;
1048    w.write_all(&[REC_CURSOR])?;
1049    w.write_all(&[cb_len])?;
1050    w.write_all(cb)?;
1051
1052    Ok(4 + 1 + 1 + cb.len())
1053}
1054
1055// ---------------------------------------------------------------------------
1056// Internal: compaction
1057// ---------------------------------------------------------------------------
1058
1059fn compact_to_file(
1060    path: &Path,
1061    entries: &HashMap<String, KvEntry>,
1062    cursor: &WatchCursor,
1063) -> Result<(), SnapshotError> {
1064    // The tempfile must live in the same directory as the target so the final
1065    // `persist` is an atomic same-filesystem rename. Falling back to "." here
1066    // would silently place it in the cwd, and a cross-filesystem rename then
1067    // fails with EXDEV — a hard error masquerading as a path quirk. A snapshot
1068    // path with no parent is a caller bug; surface it.
1069    let dir = path.parent().ok_or_else(|| {
1070        SnapshotError::InvalidFormat(format!("snapshot path has no parent: {}", path.display()))
1071    })?;
1072    // Collect the live entries once, then drive both the buffer-size estimate
1073    // and the write pass off this slice — a single walk of `entries` instead of
1074    // one to sum sizes and another to write.
1075    //
1076    // Write in sorted key order so a given logical state always serializes to
1077    // identical bytes. `HashMap` iteration order is randomized per process,
1078    // which would otherwise make every compaction emit a different layout —
1079    // defeating byte-level snapshot comparison (e.g. an integrity checksum) and
1080    // making file diffs noise. The sort is O(n log n) on the live key set, which
1081    // is trivial next to the I/O it precedes.
1082    let mut sorted: Vec<&KvEntry> = entries.values().collect();
1083    sorted.sort_unstable_by(|a, b| a.key.cmp(&b.key));
1084
1085    // Buffer the writes: each record is 5–8 individual `write_all` calls, so an
1086    // unbuffered tempfile would issue thousands of `write(2)` syscalls per
1087    // compaction. Size the buffer to the exact serialized length (clamped to
1088    // [8 KiB, 1 MiB]) so the whole snapshot flushes in a handful of syscalls
1089    // instead of one per default 8 KiB page — and a pathologically large table
1090    // can't balloon the buffer past 1 MiB.
1091    let estimated: usize = HEADER_LEN
1092        + sorted
1093            .iter()
1094            .map(|e| 4 + 1 + 2 + e.key.len() + 4 + e.value.len() + 1 + e.version.as_bytes().len())
1095            .sum::<usize>()
1096        + if cursor.is_none() {
1097            0
1098        } else {
1099            4 + 1 + 1 + cursor.version().as_bytes().len()
1100        };
1101    let capacity = estimated.clamp(8 * 1024, 1024 * 1024);
1102    let mut buf = io::BufWriter::with_capacity(capacity, tempfile::NamedTempFile::new_in(dir)?);
1103
1104    buf.write_all(MAGIC)?;
1105    buf.write_all(&FORMAT_VERSION.to_le_bytes())?;
1106
1107    for entry in sorted {
1108        write_put_record(&mut buf, &entry.key, &entry.value, &entry.version)?;
1109    }
1110
1111    if !cursor.is_none() {
1112        write_cursor_record(&mut buf, cursor)?;
1113    }
1114
1115    buf.flush()?;
1116    let tmp = buf
1117        .into_inner()
1118        .map_err(|e| SnapshotError::Io(e.into_error()))?;
1119
1120    tmp.as_file().sync_all()?;
1121    tmp.persist(path).map_err(|e| SnapshotError::Io(e.error))?;
1122
1123    Ok(())
1124}
1125
1126// ---------------------------------------------------------------------------
1127// Tests
1128// ---------------------------------------------------------------------------
1129
1130#[cfg(test)]
1131mod tests {
1132    use super::*;
1133    use tempfile::TempDir;
1134
1135    fn entry(key: &str, value: &[u8], rev: u64) -> KvEntry {
1136        KvEntry {
1137            key: key.to_string(),
1138            value: value.to_vec(),
1139            version: VersionToken::from_u64(rev),
1140        }
1141    }
1142
1143    fn cursor(rev: u64) -> WatchCursor {
1144        WatchCursor::from_u64(rev)
1145    }
1146
1147    fn put(key: &str, value: &[u8], rev: u64) -> KvUpdate {
1148        KvUpdate::Put(entry(key, value, rev))
1149    }
1150
1151    fn delete(key: &str, rev: u64) -> KvUpdate {
1152        KvUpdate::Delete {
1153            key: key.to_string(),
1154            version: VersionToken::from_u64(rev),
1155        }
1156    }
1157
1158    #[test]
1159    fn round_trip() {
1160        let dir = TempDir::new().unwrap();
1161        let path = dir.path().join("test.snap");
1162
1163        let mut w = SnapshotWriter::open(&path, u64::MAX).unwrap();
1164        w.write_update(&put("node.us-east-1", b"val1", 1)).unwrap();
1165        w.write_update(&put("node.eu-west-1", b"val2", 2)).unwrap();
1166        w.checkpoint(&cursor(2)).unwrap();
1167        drop(w);
1168
1169        let snap = load(&path).unwrap().unwrap();
1170        assert_eq!(snap.entries.len(), 2);
1171        assert_eq!(snap.cursor.as_u64(), Some(2));
1172
1173        assert_eq!(snap.entries["node.us-east-1"].value, b"val1");
1174        assert_eq!(snap.entries["node.eu-west-1"].value, b"val2");
1175    }
1176
1177    #[test]
1178    fn multiple_batches() {
1179        let dir = TempDir::new().unwrap();
1180        let path = dir.path().join("test.snap");
1181
1182        let mut w = SnapshotWriter::open(&path, u64::MAX).unwrap();
1183        w.write_update(&put("a", b"v1", 1)).unwrap();
1184        w.checkpoint(&cursor(1)).unwrap();
1185        w.write_update(&put("b", b"v2", 2)).unwrap();
1186        w.checkpoint(&cursor(2)).unwrap();
1187        drop(w);
1188
1189        let snap = load(&path).unwrap().unwrap();
1190        assert_eq!(snap.entries.len(), 2);
1191        assert_eq!(snap.cursor.as_u64(), Some(2));
1192    }
1193
1194    #[test]
1195    fn delete_removes_entry() {
1196        let dir = TempDir::new().unwrap();
1197        let path = dir.path().join("test.snap");
1198
1199        let mut w = SnapshotWriter::open(&path, u64::MAX).unwrap();
1200        w.write_update(&put("a", b"v1", 1)).unwrap();
1201        w.write_update(&put("b", b"v2", 2)).unwrap();
1202        w.checkpoint(&cursor(2)).unwrap();
1203        w.write_update(&delete("a", 3)).unwrap();
1204        w.checkpoint(&cursor(3)).unwrap();
1205        drop(w);
1206
1207        let snap = load(&path).unwrap().unwrap();
1208        assert_eq!(snap.entries.len(), 1);
1209        assert!(snap.entries.contains_key("b"));
1210    }
1211
1212    #[test]
1213    fn purge_removes_entry() {
1214        let dir = TempDir::new().unwrap();
1215        let path = dir.path().join("test.snap");
1216
1217        let mut w = SnapshotWriter::open(&path, u64::MAX).unwrap();
1218        w.write_update(&put("a", b"v1", 1)).unwrap();
1219        w.checkpoint(&cursor(1)).unwrap();
1220        w.write_update(&KvUpdate::Purge {
1221            key: "a".to_string(),
1222            version: VersionToken::from_u64(2),
1223        })
1224        .unwrap();
1225        w.checkpoint(&cursor(2)).unwrap();
1226        drop(w);
1227
1228        let snap = load(&path).unwrap().unwrap();
1229        assert!(!snap.entries.contains_key("a"));
1230    }
1231
1232    #[test]
1233    fn missing_file_returns_none() {
1234        let dir = TempDir::new().unwrap();
1235        assert!(load(&dir.path().join("nope.snap")).unwrap().is_none());
1236    }
1237
1238    #[test]
1239    fn corrupted_mid_file() {
1240        let dir = TempDir::new().unwrap();
1241        let path = dir.path().join("test.snap");
1242
1243        let mut w = SnapshotWriter::open(&path, u64::MAX).unwrap();
1244        // Write three entries so corruption in the middle has records on both sides
1245        w.write_update(&put("a", b"aaaa-long-value-here", 1))
1246            .unwrap();
1247        w.checkpoint(&cursor(1)).unwrap();
1248        w.write_update(&put("b", b"bbbb-long-value-here", 2))
1249            .unwrap();
1250        w.checkpoint(&cursor(2)).unwrap();
1251        w.write_update(&put("c", b"cccc-long-value-here", 3))
1252            .unwrap();
1253        w.checkpoint(&cursor(3)).unwrap();
1254        drop(w);
1255
1256        let mut data = fs::read(&path).unwrap();
1257        // Flip a byte in the second record area (well past the first, well before EOF)
1258        let target = HEADER_LEN + 40;
1259        assert!(
1260            target < data.len() - 60,
1261            "need enough room after corruption for valid records"
1262        );
1263        data[target] ^= 0xFF;
1264        fs::write(&path, &data).unwrap();
1265
1266        match load(&path) {
1267            Err(SnapshotError::Corrupted) => {}
1268            other => panic!("expected Corrupted, got {other:?}"),
1269        }
1270    }
1271
1272    #[test]
1273    fn truncated_final_record_recovered() {
1274        let dir = TempDir::new().unwrap();
1275        let path = dir.path().join("test.snap");
1276
1277        let mut w = SnapshotWriter::open(&path, u64::MAX).unwrap();
1278        w.write_update(&put("a", b"v1", 1)).unwrap();
1279        w.checkpoint(&cursor(1)).unwrap();
1280        w.write_update(&put("b", b"v2", 2)).unwrap();
1281        w.checkpoint(&cursor(2)).unwrap();
1282        drop(w);
1283
1284        // Chop a few bytes off the end (simulates crash mid-write)
1285        let mut data = fs::read(&path).unwrap();
1286        data.truncate(data.len() - 3);
1287        fs::write(&path, &data).unwrap();
1288
1289        let snap = load(&path).unwrap().unwrap();
1290        assert!(snap.entries.contains_key("a"));
1291    }
1292
1293    #[test]
1294    fn truncated_tail_repaired_then_appendable() {
1295        // The already-compact skip optimization must NOT skip the rewrite when
1296        // the log has a truncated tail: leaving the partial record on disk would
1297        // let the next append land after it and corrupt the log.
1298        let dir = TempDir::new().unwrap();
1299        let path = dir.path().join("test.snap");
1300
1301        let mut w = SnapshotWriter::open(&path, u64::MAX).unwrap();
1302        w.write_update(&put("a", b"v1", 1)).unwrap();
1303        w.write_update(&put("b", b"v2", 2)).unwrap();
1304        w.checkpoint(&cursor(2)).unwrap();
1305        drop(w);
1306
1307        // Simulate a crash mid-write: chop the final bytes.
1308        let mut data = fs::read(&path).unwrap();
1309        data.truncate(data.len() - 3);
1310        fs::write(&path, &data).unwrap();
1311
1312        // load() repairs the file by rewriting without the partial tail.
1313        let snap = load(&path).unwrap().unwrap();
1314        assert!(snap.entries.contains_key("a"));
1315        assert!(snap.entries.contains_key("b"));
1316
1317        // Appending after the repaired load must not corrupt the log.
1318        let mut w = SnapshotWriter::open(&path, u64::MAX).unwrap();
1319        w.write_update(&put("c", b"v3", 3)).unwrap();
1320        w.checkpoint(&cursor(3)).unwrap();
1321        drop(w);
1322
1323        let snap = load(&path).unwrap().unwrap();
1324        assert_eq!(snap.entries.len(), 3);
1325        assert!(snap.entries.contains_key("c"));
1326        assert_eq!(snap.cursor.as_u64(), Some(3));
1327    }
1328
1329    #[test]
1330    fn repeated_cursor_records_trigger_compaction() {
1331        // A service that checkpoints frequently writes one cursor record per
1332        // checkpoint. Even with unique keys and a clean EOF, the stale cursor
1333        // records are dead weight: load() must NOT treat the file as already
1334        // compact, or the bloat persists until the writer's own threshold fires
1335        // (and never, if the process crashes first).
1336        let dir = TempDir::new().unwrap();
1337        let path = dir.path().join("test.snap");
1338
1339        let mut w = SnapshotWriter::open(&path, u64::MAX).unwrap();
1340        w.write_update(&put("a", b"v1", 1)).unwrap();
1341        for i in 1..=10u64 {
1342            w.checkpoint(&cursor(i)).unwrap();
1343        }
1344        drop(w);
1345
1346        let size_before = fs::metadata(&path).unwrap().len();
1347        let snap = load(&path).unwrap().unwrap();
1348        let size_after = fs::metadata(&path).unwrap().len();
1349
1350        // Single entry + only the latest cursor survive.
1351        assert_eq!(snap.entries.len(), 1);
1352        assert_eq!(snap.cursor.as_u64(), Some(10));
1353        assert!(
1354            size_after < size_before,
1355            "stale cursor records should be compacted away: {size_before} -> {size_after}"
1356        );
1357
1358        // The rewritten file is now genuinely compact: a second load is a no-op.
1359        let after_first = fs::read(&path).unwrap();
1360        load(&path).unwrap().unwrap();
1361        let after_second = fs::read(&path).unwrap();
1362        assert_eq!(after_first, after_second);
1363    }
1364
1365    #[test]
1366    fn already_compact_file_reloads_unchanged() {
1367        // A compact file (unique keys, no deletes, clean EOF) should reload
1368        // correctly even though load() skips the rewrite.
1369        let dir = TempDir::new().unwrap();
1370        let path = dir.path().join("test.snap");
1371
1372        let mut w = SnapshotWriter::open(&path, u64::MAX).unwrap();
1373        w.write_update(&put("a", b"v1", 1)).unwrap();
1374        w.write_update(&put("b", b"v2", 2)).unwrap();
1375        w.checkpoint(&cursor(2)).unwrap();
1376        drop(w);
1377
1378        // First load compacts. Capture the resulting bytes.
1379        load(&path).unwrap().unwrap();
1380        let after_first = fs::read(&path).unwrap();
1381
1382        // Second load sees an already-compact file: skips the rewrite, so the
1383        // bytes are byte-for-byte identical, and the data still round-trips.
1384        let snap = load(&path).unwrap().unwrap();
1385        let after_second = fs::read(&path).unwrap();
1386
1387        assert_eq!(after_first, after_second);
1388        assert_eq!(snap.entries.len(), 2);
1389        assert_eq!(snap.cursor.as_u64(), Some(2));
1390    }
1391
1392    #[test]
1393    fn bad_magic() {
1394        let dir = TempDir::new().unwrap();
1395        let path = dir.path().join("test.snap");
1396        fs::write(&path, b"XXXX\x01\x00").unwrap();
1397
1398        match load(&path) {
1399            Err(SnapshotError::InvalidFormat(msg)) => assert!(msg.contains("magic")),
1400            other => panic!("expected InvalidFormat, got {other:?}"),
1401        }
1402    }
1403
1404    #[test]
1405    fn wrong_version_rejected() {
1406        let dir = TempDir::new().unwrap();
1407        let path = dir.path().join("test.snap");
1408        // Valid magic, but a format version this build doesn't understand.
1409        // A future engineer who bumps FORMAT_VERSION must see old files rejected
1410        // rather than silently misparsed.
1411        let mut data = Vec::new();
1412        data.extend_from_slice(MAGIC);
1413        data.extend_from_slice(&(FORMAT_VERSION + 1).to_le_bytes());
1414        fs::write(&path, &data).unwrap();
1415
1416        match load(&path) {
1417            Err(SnapshotError::InvalidFormat(msg)) => {
1418                assert!(
1419                    msg.contains("version"),
1420                    "message should mention version: {msg}"
1421                )
1422            }
1423            other => panic!("expected InvalidFormat, got {other:?}"),
1424        }
1425    }
1426
1427    #[test]
1428    fn empty_log_returns_none() {
1429        let dir = TempDir::new().unwrap();
1430        let path = dir.path().join("test.snap");
1431
1432        let mut f = File::create(&path).unwrap();
1433        f.write_all(MAGIC).unwrap();
1434        f.write_all(&FORMAT_VERSION.to_le_bytes()).unwrap();
1435        drop(f);
1436
1437        assert!(load(&path).unwrap().is_none());
1438    }
1439
1440    #[test]
1441    fn compaction_on_load_shrinks_file() {
1442        let dir = TempDir::new().unwrap();
1443        let path = dir.path().join("test.snap");
1444
1445        let mut w = SnapshotWriter::open(&path, u64::MAX).unwrap();
1446        for i in 0..10u64 {
1447            w.write_update(&put("same-key", format!("v{i}").as_bytes(), i))
1448                .unwrap();
1449            w.checkpoint(&cursor(i)).unwrap();
1450        }
1451        drop(w);
1452
1453        let size_before = fs::metadata(&path).unwrap().len();
1454        let snap = load(&path).unwrap().unwrap();
1455        let size_after = fs::metadata(&path).unwrap().len();
1456
1457        assert_eq!(snap.entries.len(), 1);
1458        assert_eq!(snap.entries["same-key"].value, b"v9");
1459        assert!(
1460            size_after < size_before,
1461            "compaction should shrink: {size_before} -> {size_after}"
1462        );
1463    }
1464
1465    #[test]
1466    fn compact_when_threshold_exceeded() {
1467        let dir = TempDir::new().unwrap();
1468        let path = dir.path().join("test.snap");
1469
1470        // Threshold low enough to trigger after a few writes
1471        let mut w = SnapshotWriter::open(&path, 100).unwrap();
1472        for i in 0..20u64 {
1473            w.write_update(&put("key", format!("value-{i}").as_bytes(), i))
1474                .unwrap();
1475            if w.checkpoint(&cursor(i)).unwrap() {
1476                w.compact().unwrap();
1477            }
1478        }
1479        drop(w);
1480
1481        let snap = load(&path).unwrap().unwrap();
1482        assert_eq!(snap.entries.len(), 1);
1483        assert_eq!(snap.entries["key"].value, b"value-19");
1484    }
1485
1486    #[test]
1487    fn reopen_appends() {
1488        let dir = TempDir::new().unwrap();
1489        let path = dir.path().join("test.snap");
1490
1491        // First writer
1492        let mut w = SnapshotWriter::open(&path, u64::MAX).unwrap();
1493        w.write_update(&put("a", b"v1", 1)).unwrap();
1494        w.checkpoint(&cursor(1)).unwrap();
1495        drop(w);
1496
1497        // Second writer appends
1498        let mut w = SnapshotWriter::open(&path, u64::MAX).unwrap();
1499        w.write_update(&put("b", b"v2", 2)).unwrap();
1500        w.checkpoint(&cursor(2)).unwrap();
1501        drop(w);
1502
1503        let snap = load(&path).unwrap().unwrap();
1504        assert_eq!(snap.entries.len(), 2);
1505        assert_eq!(snap.cursor.as_u64(), Some(2));
1506    }
1507
1508    #[test]
1509    fn large_values() {
1510        let dir = TempDir::new().unwrap();
1511        let path = dir.path().join("test.snap");
1512
1513        let big = vec![0xABu8; 100_000];
1514        let mut w = SnapshotWriter::open(&path, u64::MAX).unwrap();
1515        w.write_update(&put("big", &big, 1)).unwrap();
1516        w.checkpoint(&cursor(1)).unwrap();
1517        drop(w);
1518
1519        let snap = load(&path).unwrap().unwrap();
1520        assert_eq!(snap.entries.len(), 1);
1521        assert_eq!(snap.entries["big"].value.len(), 100_000);
1522        assert!(snap.entries["big"].value.iter().all(|&b| b == 0xAB));
1523    }
1524
1525    #[test]
1526    fn cursor_only_snapshot_returns_some_with_empty_entries() {
1527        // A service that checkpoints before writing any entries produces a file
1528        // with only a cursor record. load() must return Some (not None) so callers
1529        // get the resume position even when there's nothing to preload. The guard
1530        // `entries.is_empty() && cursor.is_none()` must NOT fire when the cursor
1531        // is present.
1532        let dir = TempDir::new().unwrap();
1533        let path = dir.path().join("test.snap");
1534
1535        let mut w = SnapshotWriter::open(&path, u64::MAX).unwrap();
1536        w.checkpoint(&cursor(42)).unwrap();
1537        drop(w);
1538
1539        let snap = load(&path)
1540            .unwrap()
1541            .expect("cursor-only snapshot should return Some");
1542        assert!(snap.entries.is_empty(), "no entries written, none expected");
1543        assert_eq!(
1544            snap.cursor.as_u64(),
1545            Some(42),
1546            "cursor must survive round-trip"
1547        );
1548    }
1549
1550    #[test]
1551    fn stale_keys_detects_removed_entries() {
1552        let dir = TempDir::new().unwrap();
1553        let path = dir.path().join("test.snap");
1554
1555        let mut w = SnapshotWriter::open(&path, u64::MAX).unwrap();
1556        w.write_update(&put("node.a", b"v1", 1)).unwrap();
1557        w.write_update(&put("node.b", b"v2", 2)).unwrap();
1558        w.write_update(&put("node.c", b"v3", 3)).unwrap();
1559        w.checkpoint(&cursor(3)).unwrap();
1560        drop(w);
1561
1562        let snap = load(&path).unwrap().unwrap();
1563
1564        // Simulate a fresh scan that only has "node.a" and "node.c"
1565        let mut stale = snap.stale_keys(["node.a", "node.c"]);
1566        stale.sort();
1567        assert_eq!(stale, vec!["node.b"]);
1568
1569        // All keys present → no stale
1570        let stale = snap.stale_keys(["node.a", "node.b", "node.c"]);
1571        assert!(stale.is_empty());
1572
1573        // No keys present → all stale
1574        let mut stale: Vec<&str> = snap.stale_keys(std::iter::empty::<&str>());
1575        stale.sort();
1576        assert_eq!(stale, vec!["node.a", "node.b", "node.c"]);
1577    }
1578
1579    #[test]
1580    fn non_u64_version_token_round_trips() {
1581        // Regression: Put/Delete records store the version as length-prefixed raw
1582        // bytes, not a fixed u64. A 10-byte FDB-style versionstamp must survive a
1583        // round-trip intact — the old u64-only field flattened it to 0, which
1584        // would make every later CAS on a restored entry fail RevisionMismatch.
1585        let dir = TempDir::new().unwrap();
1586        let path = dir.path().join("test.snap");
1587
1588        let stamp = [9u8, 8, 7, 6, 5, 4, 3, 2, 1, 0];
1589        let token = VersionToken::from_fdb_versionstamp(&stamp);
1590        assert!(token.as_u64().is_none(), "10-byte token is not a u64");
1591
1592        let mut w = SnapshotWriter::open(&path, u64::MAX).unwrap();
1593        w.write_update(&KvUpdate::Put(KvEntry {
1594            key: "fdb.key".to_string(),
1595            value: b"v".to_vec(),
1596            version: token.clone(),
1597        }))
1598        .unwrap();
1599        w.checkpoint(&cursor(1)).unwrap();
1600        drop(w);
1601
1602        let snap = load(&path).unwrap().unwrap();
1603        assert_eq!(
1604            snap.entries["fdb.key"].version.as_bytes(),
1605            &stamp,
1606            "versionstamp must survive the snapshot round-trip byte-for-byte"
1607        );
1608    }
1609
1610    /// Proves the `compact()` flush-before-read fix: records written but not yet
1611    /// checkpointed still sit in the BufWriter. Before the fix, `compact()` read
1612    /// the file (missing those records), rewrote it, and the old buffer then
1613    /// flushed into the renamed-away inode — silently dropping the writes. With
1614    /// the leading flush, an un-checkpointed write survives compaction.
1615    #[test]
1616    fn compact_preserves_uncheckpointed_writes() {
1617        let dir = TempDir::new().unwrap();
1618        let path = dir.path().join("test.snap");
1619
1620        let mut w = SnapshotWriter::open(&path, u64::MAX).unwrap();
1621        // Buffered only — deliberately NO checkpoint() before compacting.
1622        w.write_update(&put("node.a", b"survives", 1)).unwrap();
1623        w.compact().unwrap();
1624        drop(w);
1625
1626        let snap = load(&path).unwrap().unwrap();
1627        assert!(
1628            snap.entries.contains_key("node.a"),
1629            "compact() must not drop buffered-but-uncheckpointed writes"
1630        );
1631        assert_eq!(snap.entries["node.a"].value, b"survives");
1632    }
1633}