slipstream/snapshot.rs
1//! Append-only snapshot log for KV state.
2//!
3//! Streams [`KvUpdate`]s to disk as they arrive via [`SnapshotWriter`],
4//! with no in-memory state beyond a file handle and byte counter.
5//! On startup, [`load`] replays the log to reconstruct entries + cursor,
6//! then compacts the file.
7//!
8//! ## File format
9//!
10//! 6-byte header followed by a sequence of CRC'd records:
11//!
12//! ```text
13//! Header: b"PGSS" ++ version:u16le
14//! Record: crc32:u32le ++ type:u8 ++ payload (varies by type)
15//!
16//! Put: key_len:u16le ++ key ++ value_len:u32le ++ value ++ ver_len:u8 ++ version
17//! Delete: key_len:u16le ++ key ++ ver_len:u8 ++ version
18//! Cursor: cur_len:u8 ++ cursor
19//! ```
20//!
21//! `version` is the raw [`VersionToken`] bytes (≤10), not a fixed u64 — a
22//! 10-byte FDB versionstamp round-trips intact, where a `u64`-only field would
23//! silently flatten it to 0 and break every later CAS.
24//!
25//! Used by edge/tunnel services to survive restarts without a full
26//! NATS KV scan. The snapshot is a cache — delete it and the system
27//! falls back to `load_all()`.
28//!
29//! ## Blocking I/O
30//!
31//! Every function in this module performs **synchronous** file I/O and is
32//! deliberately runtime-agnostic — it pulls in no async runtime so a caller can
33//! place it on whichever executor (or none) it wants. The flip side is that none
34//! of these calls may run directly on an async executor thread: [`load`],
35//! [`SnapshotWriter::compact`], and friends `read`/`write`/`rename` whole files
36//! and will stall the reactor if awaited inline. Async callers must offload them
37//! with `tokio::task::spawn_blocking` (or the equivalent).
38//! [`SnapshotWriter::compact`] is the heaviest — it reads and rewrites the entire
39//! log — but the rule is the same for all of them.
40
41use std::collections::{HashMap, HashSet};
42use std::fs::{self, File, OpenOptions};
43use std::io::{self, Write};
44use std::path::{Path, PathBuf};
45
46use crate::artifact::{ExportManifest, ExportStage, verify_and_stage_import};
47use crate::kv::{KvEntry, KvUpdate, VersionToken, WatchCursor};
48
49const MAGIC: &[u8; 4] = b"PGSS";
50// v2: Put/Delete records store the version as length-prefixed raw bytes instead
51// of a fixed 8-byte u64, so non-u64 tokens (e.g. FDB versionstamps) survive a
52// round-trip. v1 files are rejected by `replay_log`; the snapshot is a cache, so
53// a rejected file just triggers a fresh NATS scan + watch replay.
54const FORMAT_VERSION: u16 = 2;
55const HEADER_LEN: usize = 6;
56
57const REC_PUT: u8 = 0x01;
58const REC_DELETE: u8 = 0x02;
59const REC_CURSOR: u8 = 0x03;
60
61// Minimum complete record sizes (CRC + type + minimum payload)
62const MIN_CURSOR_RECORD: usize = 4 + 1 + 1; // 6
63
64/// Errors from snapshot operations.
65///
66/// Uses `thiserror` to match [`crate::KvError`]; unlike `KvError` (which is
67/// `Clone` and so flattens its causes to strings), snapshot errors are observed
68/// by a single caller, so `Io` keeps its `#[source]` chain via `#[from]`.
69#[derive(Debug, thiserror::Error)]
70pub enum SnapshotError {
71 #[error("snapshot I/O error: {0}")]
72 Io(#[from] io::Error),
73 #[error("invalid snapshot format: {0}")]
74 InvalidFormat(String),
75 #[error("snapshot corrupted (CRC mismatch)")]
76 Corrupted,
77 /// A pluggable [`SnapshotStore`] backend (e.g. the `fjall` on-disk store)
78 /// reported an error. Kept backend-agnostic — no backend type leaks into this
79 /// enum's signature — so enabling a backend feature does not change the public
80 /// error surface.
81 #[error("snapshot backend error: {0}")]
82 Backend(String),
83 /// An export artifact failed validation: malformed/mismatched manifest,
84 /// checksum mismatch, version-policy rejection, unavailable destination, or a
85 /// post-import cursor that disagrees with the manifest.
86 ///
87 /// Distinct from [`InvalidFormat`](Self::InvalidFormat) (a *store-file*
88 /// problem — "my local fold is broken") because the recovery is different: an
89 /// invalid artifact means "fetch another artifact or fall back to a full
90 /// scan", never "repair the local store".
91 #[error("invalid snapshot artifact: {0}")]
92 ArtifactInvalid(String),
93}
94
95/// Result of loading a snapshot from disk.
96#[derive(Debug)]
97pub struct Snapshot {
98 /// Watch cursor at the time of the last checkpoint.
99 pub cursor: WatchCursor,
100 /// Live KV entries keyed by name (deduplicated, deletes applied).
101 pub entries: HashMap<String, KvEntry>,
102}
103
104impl Snapshot {
105 /// Keys present in this snapshot but absent from a fresh scan result.
106 ///
107 /// After a cursor-expired fallback to full `watch_all()`, callers should
108 /// compare the snapshot against the live key set and emit synthetic
109 /// `Delete` events for stale keys to ensure convergence.
110 pub fn stale_keys<'a, I>(&'a self, current_keys: I) -> Vec<&'a str>
111 where
112 I: IntoIterator<Item = &'a str>,
113 {
114 let current: HashSet<&str> = current_keys.into_iter().collect();
115 self.entries
116 .keys()
117 .filter(|k| !current.contains(k.as_str()))
118 .map(|k| k.as_str())
119 .collect()
120 }
121}
122
123/// Append-only snapshot writer.
124///
125/// Streams [`KvUpdate`] records to disk via a buffered writer. No
126/// in-memory state beyond the file handle and a byte counter for
127/// compaction triggering.
128///
129/// Compacts automatically when bytes written since last compaction
130/// exceeds `compact_threshold`. Compaction replays the log into a
131/// transient [`HashMap`], rewrites via tempfile+rename, and reopens
132/// for append.
133pub struct SnapshotWriter {
134 path: PathBuf,
135 // `None` only after a `compact()` rewrote the file (atomic rename succeeded)
136 // but failed to reopen it for append. The old handle then pointed at the
137 // renamed-away inode; rather than keep writing into that orphan — silently
138 // losing every later record on close — we drop it and poison the writer so
139 // `write_update`/`checkpoint`/`flush` return an error until reconstructed.
140 writer: Option<io::BufWriter<File>>,
141 bytes_since_compact: u64,
142 compact_threshold: u64,
143}
144
145impl SnapshotWriter {
146 /// Open or create a snapshot log.
147 ///
148 /// If the file doesn't exist, writes the header. If it exists, opens
149 /// for append. `compact_threshold` controls how many bytes accumulate
150 /// before an automatic compaction.
151 pub fn open(path: &Path, compact_threshold: u64) -> Result<Self, SnapshotError> {
152 // Open first, then size the file from its own handle. The previous
153 // `path.exists()` + `fs::metadata(path)` pair issued two extra `stat(2)`
154 // syscalls and left a TOCTOU window (the file could be created or removed
155 // between the checks). One `open(2)` plus a handle `metadata()` is both
156 // fewer syscalls and race-free — the length we read is the length of the
157 // file we hold open.
158 let file = OpenOptions::new().create(true).append(true).open(path)?;
159 let existing_len = file.metadata()?.len();
160
161 let mut writer = io::BufWriter::new(file);
162
163 // A file with at least a full header is an existing log: everything past
164 // the 6-byte header counts toward the compaction threshold. Anything
165 // shorter (brand-new, or a header torn by a crash mid-create) gets a
166 // fresh header written before the first record.
167 let bytes_since_compact = if existing_len >= HEADER_LEN as u64 {
168 existing_len - HEADER_LEN as u64
169 } else {
170 writer.write_all(MAGIC)?;
171 writer.write_all(&FORMAT_VERSION.to_le_bytes())?;
172 writer.flush()?;
173 0
174 };
175
176 Ok(Self {
177 path: path.to_path_buf(),
178 writer: Some(writer),
179 bytes_since_compact,
180 compact_threshold,
181 })
182 }
183
184 /// Borrow the underlying writer, or fail if a prior `compact()` poisoned it
185 /// (see the `writer` field doc). Keeps the orphaned-fd failure mode a
186 /// surfaced error rather than a silent data loss.
187 fn writer(&mut self) -> Result<&mut io::BufWriter<File>, SnapshotError> {
188 self.writer.as_mut().ok_or_else(|| {
189 SnapshotError::Io(io::Error::other(
190 "snapshot writer poisoned: a prior compact() failed to reopen the log for append",
191 ))
192 })
193 }
194
195 /// Write a single [`KvUpdate`] record to the log.
196 ///
197 /// Buffered — does not flush to disk until [`checkpoint`](Self::checkpoint).
198 #[must_use = "I/O errors mean the write was lost"]
199 pub fn write_update(&mut self, update: &KvUpdate) -> Result<(), SnapshotError> {
200 let w = self.writer()?;
201 let bytes = match update {
202 KvUpdate::Put(entry) => write_put_record(w, &entry.key, &entry.value, &entry.version)?,
203 KvUpdate::Delete { key, version } | KvUpdate::Purge { key, version } => {
204 write_delete_record(w, key, version)?
205 }
206 };
207 self.bytes_since_compact += bytes as u64;
208 Ok(())
209 }
210
211 /// Write a cursor checkpoint and flush the buffer to the OS.
212 ///
213 /// The flush is a `write(2)` into the page cache — it survives a process
214 /// crash, but NOT power loss: there is no `fsync` on this path. The durable
215 /// `sync_all` happens in [`compact`](Self::compact). That's deliberate — the
216 /// snapshot is a cache backed by NATS and checkpoints are frequent, so an
217 /// fsync per checkpoint isn't worth its latency; a tail lost to power loss is
218 /// rebuilt from a NATS scan + watch replay.
219 ///
220 /// Returns `true` when the log has grown past the compaction threshold
221 /// and the caller should run [`compact`](Self::compact). Separating
222 /// the check from the I/O lets async callers offload compaction to a
223 /// blocking task instead of stalling the executor.
224 #[must_use = "returns true when compaction is needed"]
225 pub fn checkpoint(&mut self, cursor: &WatchCursor) -> Result<bool, SnapshotError> {
226 let w = self.writer()?;
227 let bytes = write_cursor_record(w, cursor)?;
228 w.flush()?;
229 self.bytes_since_compact += bytes as u64;
230 Ok(self.bytes_since_compact > self.compact_threshold)
231 }
232
233 /// Flush the buffer to disk without writing a cursor record.
234 #[must_use = "I/O errors mean the flush failed"]
235 pub fn flush(&mut self) -> Result<(), SnapshotError> {
236 self.writer()?.flush()?;
237 Ok(())
238 }
239
240 /// Compact the snapshot log: replay, deduplicate, and rewrite.
241 ///
242 /// Performs synchronous file I/O. In async contexts, run via
243 /// `spawn_blocking` to avoid stalling the executor.
244 #[must_use = "compaction errors leave the log uncompacted"]
245 pub fn compact(&mut self) -> Result<(), SnapshotError> {
246 // Flush buffered records to the file before reading it back. Records
247 // written since the last `checkpoint()` still sit in the BufWriter; if we
248 // read+rewrite without flushing, they're excluded from the replay, and
249 // the old BufWriter then flushes them on drop into the inode we just
250 // renamed away — silently losing them. Flushing first makes `compact()`
251 // safe regardless of whether a `checkpoint()` preceded it.
252 self.writer()?.flush()?;
253 let data = fs::read(&self.path)?;
254 let (entries, cursor, _already_compact) = replay_log(&data)?;
255 compact_to_file(&self.path, &entries, &cursor)?;
256
257 // The rename in `compact_to_file` has already replaced the file, so the
258 // current handle now points at the orphaned (renamed-away) inode. Drop it
259 // *before* reopening: if the reopen fails, `writer` stays `None` and the
260 // writer is poisoned (see field doc), turning a would-be silent loss into
261 // a surfaced error on the next write.
262 self.writer = None;
263 let file = OpenOptions::new().append(true).open(&self.path)?;
264 self.writer = Some(io::BufWriter::new(file));
265 self.bytes_since_compact = 0;
266
267 Ok(())
268 }
269}
270
271/// Load a snapshot from disk.
272///
273/// Replays the append log, deduplicates (last write wins per key),
274/// compacts the file, and returns the live entries + cursor.
275///
276/// Returns `Ok(None)` if the file doesn't exist or contains no entries.
277pub fn load(path: &Path) -> Result<Option<Snapshot>, SnapshotError> {
278 let data = match fs::read(path) {
279 Ok(d) => d,
280 Err(e) if e.kind() == io::ErrorKind::NotFound => return Ok(None),
281 Err(e) => return Err(SnapshotError::Io(e)),
282 };
283
284 let (entries, cursor, already_compact) = replay_log(&data)?;
285
286 if entries.is_empty() && cursor.is_none() {
287 return Ok(None);
288 }
289
290 // Skip the rewrite when the log is already in compact form (no duplicate
291 // keys, no deletes, no truncated tail). Avoids a full read+write on every
292 // startup after the first. A truncated tail forces a rewrite even when the
293 // surviving records are unique: leaving the partial record on disk would
294 // let the next append land after it, corrupting the log.
295 if !already_compact {
296 compact_to_file(path, &entries, &cursor)?;
297 }
298
299 Ok(Some(Snapshot { cursor, entries }))
300}
301
302// ---------------------------------------------------------------------------
303// The durable-fold contract
304// ---------------------------------------------------------------------------
305
306/// A durable, resumable, queryable fold of a KV watch stream.
307///
308/// This is the primitive slipstream owns: a place to apply a stream of
309/// [`KvUpdate`]s such that, after any restart, the fold resumes from exactly
310/// where it left off and stays a faithful function of the log. [`watch_applied`]
311/// drives it; consumers pick the backend.
312///
313/// [`watch_applied`]: crate::watch_applied
314///
315/// ## Invariants every implementation must hold
316///
317/// - **Pure function of the log.** Delete the store, replay every update with
318/// revision `>` the persisted cursor, and you get byte-identical state. The
319/// store caches the fold; it is never the source of truth (that is NATS).
320/// - **Cursor-after-apply.** A persisted cursor `C` implies every update with
321/// revision `≤ C` is durably folded in. [`apply`](Self::apply) writes data and
322/// cursor together so the cursor never names a revision whose data is absent —
323/// on a transactional backend in one txn, on the append log data-then-cursor
324/// (a torn write leaves data *ahead* of the cursor, which replay re-folds, never
325/// skips).
326/// - **Snapshot is a cache.** Any tail lost to power loss (under a no-sync
327/// durability mode) is rebuilt by resuming the watch from the recovered cursor;
328/// never a correctness failure.
329///
330/// ## Threading
331///
332/// Methods are **synchronous** and may block on I/O — the same runtime-agnostic
333/// discipline as the rest of this module. [`watch_applied`] offloads
334/// [`apply`](Self::apply) to a blocking task; a consumer calling
335/// [`get`](Self::get)/[`range`](Self::range) from an async context should do the
336/// same.
337pub trait SnapshotStore: Sized + Send {
338 /// Open (or resume) the store at `path`.
339 ///
340 /// Returns the persisted resume cursor — [`WatchCursor::none`] when the store
341 /// is fresh — and the store ready to [`apply`](Self::apply)/query. Pass the
342 /// returned cursor to [`watch_applied`](crate::watch_applied) as the resume
343 /// position.
344 ///
345 /// Backends with tuning knobs (compaction threshold, sync mode) expose them
346 /// on their own constructors; this uses safe defaults.
347 fn load(path: &Path) -> Result<(WatchCursor, Self), SnapshotError>;
348
349 /// Atomically fold `batch` into the store and advance the resume cursor.
350 ///
351 /// Data and cursor become durable together (see the cursor-after-apply
352 /// invariant). `batch` is the raw, revision-ordered updates received since the
353 /// last flush — including ones a consumer's `parse` rejected, since they are
354 /// still part of the bucket's state. `cursor` is the highest revision received
355 /// in the batch.
356 fn apply(&mut self, batch: &[KvUpdate], cursor: &WatchCursor) -> Result<(), SnapshotError>;
357
358 /// Look up the live entry for `key`. `None` if absent or deleted.
359 fn get(&self, key: &str) -> Result<Option<KvEntry>, SnapshotError>;
360
361 /// All live entries whose key starts with `prefix`, in ascending key order.
362 ///
363 /// Buffers the whole match set into a `Vec`. Convenient for the bounded
364 /// prefixes an in-RAM consumer scans, but a broad prefix against an on-disk
365 /// fold (the 1B-route case an on-disk backend exists for) materializes every
366 /// match at once — and an empty `prefix` is an unbounded full scan that
367 /// buffers the *entire* fold. Use
368 /// [`for_each_in_range`](Self::for_each_in_range) for either.
369 fn range(&self, prefix: &str) -> Result<Vec<KvEntry>, SnapshotError>;
370
371 /// Stream every live entry whose key starts with `prefix`, in ascending key
372 /// order, invoking `f` once per entry — without buffering the whole match
373 /// set in memory.
374 ///
375 /// This is the memory-bounded counterpart to [`range`](Self::range). The
376 /// reason to pick an on-disk backend is a fold too large for RAM; for such a
377 /// consumer a broad [`range`](Self::range) defeats the purpose by collecting
378 /// every match into one `Vec`. `f` returning `Err` stops the scan early and
379 /// propagates that error.
380 ///
381 /// The provided implementation delegates to [`range`](Self::range) — correct
382 /// for in-RAM backends, where the fold already fits in memory. On-disk
383 /// backends override it to stream straight from storage.
384 fn for_each_in_range(
385 &self,
386 prefix: &str,
387 mut f: impl FnMut(KvEntry) -> Result<(), SnapshotError>,
388 ) -> Result<(), SnapshotError> {
389 for entry in self.range(prefix)? {
390 f(entry)?;
391 }
392 Ok(())
393 }
394
395 /// The most recently applied (and durably persisted) resume cursor —
396 /// [`WatchCursor::none`] when nothing has been applied.
397 fn cursor(&self) -> WatchCursor;
398
399 /// Export this fold as a transferable **artifact directory** at `dest_dir`:
400 /// the backend's files under `data/` plus a `MANIFEST.json` carrying
401 /// per-file checksums, the backend's identity and on-disk format generation,
402 /// and the watch cursor the payload is exactly consistent with
403 /// (== [`cursor`](Self::cursor) at the moment of export).
404 ///
405 /// This is the bootstrap primitive for replicas of a **bounded** log: ship
406 /// the artifact to a new/rebuilding node, import it there, and resume the
407 /// watch from the embedded cursor — replaying only the log tail instead of
408 /// a full state that the log may no longer hold.
409 ///
410 /// ## Contract
411 ///
412 /// - `dest_dir` must not exist (or be an empty directory). Never overwrites.
413 /// - On `Ok`, the artifact at `dest_dir` is **complete and fsynced**: it was
414 /// assembled in a temp directory, the manifest written last, and atomically
415 /// renamed into place. A crash mid-export leaves no artifact at `dest_dir`.
416 /// - `&mut self` is deliberate: export must not run concurrently with
417 /// [`apply`](Self::apply), which is what makes the embedded cursor exact.
418 /// Inside [`watch_applied`](crate::watch_applied), use its export-request
419 /// channel rather than calling this directly.
420 /// - **Artifacts are transient.** Directory-copy backends hardlink immutable
421 /// files where possible, so a lingering artifact pins storage the source
422 /// later compacts away — upload (or move) it, then delete it. Stage and
423 /// destination should be on the same filesystem as the fold; a
424 /// cross-filesystem destination silently degrades hardlinks to full copies.
425 /// - Blocking I/O, like everything on this trait.
426 fn export_to(&mut self, dest_dir: &Path) -> Result<ExportManifest, SnapshotError>;
427}
428
429// ---------------------------------------------------------------------------
430// AppendLogSnapshot: the default, pure-Rust backend
431// ---------------------------------------------------------------------------
432
433/// Default compaction threshold for [`AppendLogSnapshot::load`]: 10 MiB of
434/// appended records before a compaction is triggered. Matches the value the
435/// hand-rolled callers used; tune via [`AppendLogSnapshot::open`].
436pub const DEFAULT_COMPACT_THRESHOLD: u64 = 10 * 1024 * 1024;
437
438/// The append-only CRC log as a [`SnapshotStore`] (the default backend).
439///
440/// Wraps the existing [`SnapshotWriter`] + [`load`] machinery — same v2 on-disk
441/// format, same CRC framing, same FDB-versionstamp-safe cursor encoding, so files
442/// written by either path are mutually loadable. Keeps the whole fold in a
443/// [`HashMap`] in RAM to serve [`get`](SnapshotStore::get)/[`range`](SnapshotStore::range),
444/// which is exactly what edge/tunnel-style consumers already do — small state,
445/// pure Rust. A consumer that cannot hold its fold in RAM wants an on-disk
446/// backend instead (e.g. the `fjall` feature).
447pub struct AppendLogSnapshot {
448 writer: SnapshotWriter,
449 entries: HashMap<String, KvEntry>,
450 cursor: WatchCursor,
451}
452
453impl AppendLogSnapshot {
454 /// Backend identity in artifact manifests.
455 pub(crate) const BACKEND: &'static str = "append-log";
456 /// On-disk format generation in artifact manifests (the append log's
457 /// [`FORMAT_VERSION`]).
458 pub(crate) const BACKEND_VERSION: &'static str = "2";
459 /// The single payload file inside an artifact, relative to its `data/` dir.
460 const ARTIFACT_PAYLOAD: &'static str = "fold.snap";
461
462 /// Open or resume the log at `path` with an explicit compaction threshold.
463 ///
464 /// Replays any existing log into the in-RAM fold (and compacts it, exactly as
465 /// [`load`] does), then opens the writer for append. Returns the resume cursor
466 /// alongside the store.
467 pub fn open(path: &Path, compact_threshold: u64) -> Result<(WatchCursor, Self), SnapshotError> {
468 let (cursor, entries) = match load(path)? {
469 Some(snap) => (snap.cursor, snap.entries),
470 None => (WatchCursor::none(), HashMap::new()),
471 };
472 let writer = SnapshotWriter::open(path, compact_threshold)?;
473 Ok((
474 cursor.clone(),
475 Self {
476 writer,
477 entries,
478 cursor,
479 },
480 ))
481 }
482
483 /// Import an exported artifact (see [`SnapshotStore::export_to`]) as a new
484 /// fold at `dest_path`, returning the embedded resume cursor and the opened
485 /// store.
486 ///
487 /// `dest_path` is a **file** path (this backend is a single log file) and
488 /// must not already exist. The artifact is fully verified against its
489 /// manifest — checksums, backend identity, format generation — and the
490 /// staged copy is loaded and its cursor compared against the manifest's
491 /// before anything lands at `dest_path`; a bad artifact never becomes a
492 /// fold. A crash mid-import leaves nothing at `dest_path`; a crash after
493 /// the final rename leaves a valid fold (a retried import then refuses the
494 /// existing destination — just [`open`](Self::open) it).
495 pub fn import(
496 artifact_dir: &Path,
497 dest_path: &Path,
498 compact_threshold: u64,
499 ) -> Result<(WatchCursor, Self), SnapshotError> {
500 let (manifest, stage) =
501 verify_and_stage_import(artifact_dir, dest_path, Self::BACKEND, |v| {
502 if v == Self::BACKEND_VERSION {
503 Ok(())
504 } else {
505 Err(SnapshotError::ArtifactInvalid(format!(
506 "append-log artifact has format generation {v:?}, this build reads {:?}",
507 Self::BACKEND_VERSION
508 )))
509 }
510 })?;
511
512 // This backend's artifact is exactly one payload file.
513 let expected = format!(
514 "{}/{}",
515 crate::artifact::PAYLOAD_DIR,
516 Self::ARTIFACT_PAYLOAD
517 );
518 if manifest.files.len() != 1 || manifest.files[0].path != expected {
519 return Err(SnapshotError::ArtifactInvalid(format!(
520 "append-log artifact must contain exactly {expected:?}"
521 )));
522 }
523
524 // Verify the staged payload opens and agrees with the manifest cursor
525 // BEFORE the rename — a bad artifact must never land at the destination.
526 let staged_file = stage.payload().join(Self::ARTIFACT_PAYLOAD);
527 let staged_cursor = match load(&staged_file)? {
528 Some(snap) => snap.cursor,
529 // An empty fold exports to an artifact whose log holds no entries
530 // and no cursor; `load` reports that as `None`.
531 None => WatchCursor::none(),
532 };
533 if staged_cursor != manifest.cursor {
534 return Err(SnapshotError::ArtifactInvalid(format!(
535 "payload cursor {staged_cursor:?} disagrees with manifest cursor {:?}",
536 manifest.cursor
537 )));
538 }
539
540 stage.finalize_file(Self::ARTIFACT_PAYLOAD)?;
541 Self::open(dest_path, compact_threshold)
542 }
543}
544
545impl SnapshotStore for AppendLogSnapshot {
546 fn load(path: &Path) -> Result<(WatchCursor, Self), SnapshotError> {
547 Self::open(path, DEFAULT_COMPACT_THRESHOLD)
548 }
549
550 fn apply(&mut self, batch: &[KvUpdate], cursor: &WatchCursor) -> Result<(), SnapshotError> {
551 // Stream the raw records, then the cursor checkpoint — data-then-cursor,
552 // so a torn write leaves data ahead of the cursor (re-folded on resume),
553 // never a cursor ahead of its data. The in-RAM fold mirrors the same
554 // mutations so get/range stay consistent with what was just durably written.
555 for update in batch {
556 self.writer.write_update(update)?;
557 match update {
558 KvUpdate::Put(entry) => {
559 self.entries.insert(entry.key.clone(), entry.clone());
560 }
561 KvUpdate::Delete { key, .. } | KvUpdate::Purge { key, .. } => {
562 self.entries.remove(key);
563 }
564 }
565 }
566 // checkpoint() flushes the buffered records + the cursor record to the OS;
567 // it returns true when the log has grown past the compaction threshold, in
568 // which case we compact inline (this whole call already runs off the hot
569 // path via spawn_blocking in watch_applied).
570 if self.writer.checkpoint(cursor)? {
571 self.writer.compact()?;
572 }
573 self.cursor = cursor.clone();
574 Ok(())
575 }
576
577 fn get(&self, key: &str) -> Result<Option<KvEntry>, SnapshotError> {
578 Ok(self.entries.get(key).cloned())
579 }
580
581 fn range(&self, prefix: &str) -> Result<Vec<KvEntry>, SnapshotError> {
582 let mut out: Vec<KvEntry> = self
583 .entries
584 .values()
585 .filter(|e| e.key.starts_with(prefix))
586 .cloned()
587 .collect();
588 out.sort_unstable_by(|a, b| a.key.cmp(&b.key));
589 Ok(out)
590 }
591
592 fn cursor(&self) -> WatchCursor {
593 self.cursor.clone()
594 }
595
596 fn export_to(&mut self, dest_dir: &Path) -> Result<ExportManifest, SnapshotError> {
597 let stage = ExportStage::new(dest_dir)?;
598 fs::create_dir(stage.payload())?;
599 // The in-RAM fold is the complete live state, so the artifact payload is
600 // simply a compacted log written from it — same sorted, CRC'd, synced v2
601 // file `compact_to_file` produces for the store itself. No need to flush
602 // or read back the live log file.
603 let payload = stage.payload().join(Self::ARTIFACT_PAYLOAD);
604 compact_to_file(&payload, &self.entries, &self.cursor)?;
605
606 // Verify the payload by loading it back: cursor and entry count must
607 // match the live fold. Cheap for the in-RAM backend, and keeps the
608 // "an artifact that exists is trustworthy" guarantee uniform across
609 // backends.
610 let (loaded_cursor, loaded_len) = match load(&payload)? {
611 Some(snap) => (snap.cursor, snap.entries.len()),
612 None => (WatchCursor::none(), 0),
613 };
614 if loaded_cursor != self.cursor || loaded_len != self.entries.len() {
615 return Err(SnapshotError::ArtifactInvalid(format!(
616 "exported payload disagrees with live fold (cursor {loaded_cursor:?} vs {:?}, \
617 {loaded_len} vs {} entries)",
618 self.cursor,
619 self.entries.len()
620 )));
621 }
622
623 stage.seal_and_finalize(Self::BACKEND, Self::BACKEND_VERSION, &self.cursor)
624 }
625}
626
627// ---------------------------------------------------------------------------
628// Internal: log replay
629// ---------------------------------------------------------------------------
630
631/// Replay the append log into the live key set.
632///
633/// Records borrow directly from `data` — keys and values are not allocated
634/// until the surviving set is materialized at the end, so overwritten or
635/// deleted records cost nothing. Returns the entries, the latest cursor, and
636/// whether the log was already compact (no duplicate keys, no deletes, no
637/// truncated tail) so callers can skip a redundant rewrite.
638fn replay_log(data: &[u8]) -> Result<(HashMap<String, KvEntry>, WatchCursor, bool), SnapshotError> {
639 if data.len() < HEADER_LEN {
640 return Err(SnapshotError::InvalidFormat("file too short".into()));
641 }
642 if &data[0..4] != MAGIC {
643 return Err(SnapshotError::InvalidFormat("bad magic".into()));
644 }
645 let version = u16::from_le_bytes([data[4], data[5]]);
646 if version != FORMAT_VERSION {
647 return Err(SnapshotError::InvalidFormat(format!(
648 "unsupported version {version}"
649 )));
650 }
651
652 // Upper-bound the working set by data size to avoid rehashing on large
653 // snapshots, capped so a tiny-record file can't request a huge table.
654 let estimated = (data.len() - HEADER_LEN) / 30;
655 let mut live: HashMap<&str, (&[u8], VersionToken)> =
656 HashMap::with_capacity(estimated.min(4096));
657 let mut cursor = WatchCursor::none();
658 let mut pos = HEADER_LEN;
659
660 // The log is compact only if every record contributed a unique surviving
661 // entry and we consumed the file cleanly. Any overwrite, delete, or
662 // truncated tail means a rewrite would shrink or repair the file.
663 let mut redundant = false;
664 let mut clean_eof = true;
665
666 while pos < data.len() {
667 match parse_record(&data[pos..]) {
668 Ok((record, consumed)) => {
669 match record {
670 Record::Put {
671 key,
672 value,
673 version,
674 } => {
675 if live.insert(key, (value, version)).is_some() {
676 redundant = true;
677 }
678 }
679 Record::Delete { key } => {
680 live.remove(key);
681 redundant = true;
682 }
683 Record::Cursor(c) => {
684 // Each new cursor record supersedes the previous one, so
685 // any earlier cursor on disk is now dead weight. Mark the
686 // log redundant so `load()` rewrites it — otherwise a
687 // service that checkpoints frequently accumulates one
688 // cursor record per checkpoint and `already_compact` stays
689 // true (entries are unique, EOF is clean), leaving the
690 // bloat in place until the writer's own threshold fires.
691 if !cursor.is_none() {
692 redundant = true;
693 }
694 cursor = c;
695 }
696 }
697 pos += consumed;
698 }
699 Err(RecordError::Truncated) => {
700 // KNOWN LIMITATION: a record's length is read from its (CRC-but-
701 // not-yet-verified) length fields, so corruption that inflates a
702 // key_len/value_len makes the record look like it runs past EOF —
703 // indistinguishable here from a genuine torn final write. Both
704 // land as `Truncated`, so we stop and silently drop everything
705 // after this point. Detecting it would need a framed, separately-
706 // checksummed length (a format change). Acceptable because the
707 // snapshot is a cache: a short read just triggers a fuller NATS
708 // scan + watch replay, never data loss of record.
709 clean_eof = false;
710 break;
711 }
712 Err(RecordError::CrcMismatch { consumed }) => {
713 // Near EOF → crash recovery (partial final write).
714 // Otherwise → mid-file corruption.
715 if pos + consumed >= data.len() || data.len() - (pos + consumed) < MIN_CURSOR_RECORD
716 {
717 clean_eof = false;
718 break;
719 }
720 return Err(SnapshotError::Corrupted);
721 }
722 Err(RecordError::Invalid(msg)) => {
723 return Err(SnapshotError::InvalidFormat(msg));
724 }
725 }
726 }
727
728 // Materialize owned entries for the survivors only — one key + one value
729 // allocation per live key, instead of per record.
730 let mut entries: HashMap<String, KvEntry> = HashMap::with_capacity(live.len());
731 for (key, (value, version)) in live {
732 let key = key.to_string();
733 entries.insert(
734 key.clone(),
735 KvEntry {
736 key,
737 value: value.to_vec(),
738 version,
739 },
740 );
741 }
742
743 let already_compact = !redundant && clean_eof;
744 Ok((entries, cursor, already_compact))
745}
746
747enum Record<'a> {
748 Put {
749 key: &'a str,
750 value: &'a [u8],
751 version: VersionToken,
752 },
753 Delete {
754 key: &'a str,
755 },
756 Cursor(WatchCursor),
757}
758
759enum RecordError {
760 Truncated,
761 CrcMismatch { consumed: usize },
762 Invalid(String),
763}
764
765fn parse_record(data: &[u8]) -> Result<(Record<'_>, usize), RecordError> {
766 // Need at least CRC (4) + type (1)
767 if data.len() < 5 {
768 return Err(RecordError::Truncated);
769 }
770
771 let stored_crc = u32::from_le_bytes([data[0], data[1], data[2], data[3]]);
772
773 match data[4] {
774 REC_PUT => parse_put(data, stored_crc),
775 REC_DELETE => parse_delete(data, stored_crc),
776 REC_CURSOR => parse_cursor(data, stored_crc),
777 other => Err(RecordError::Invalid(format!(
778 "unknown record type: {other:#x}"
779 ))),
780 }
781}
782
783fn parse_put(data: &[u8], stored_crc: u32) -> Result<(Record<'_>, usize), RecordError> {
784 // CRC(4) + type(1) + key_len(2) = 7 minimum to read key_len
785 if data.len() < 7 {
786 return Err(RecordError::Truncated);
787 }
788 let key_len = u16::from_le_bytes([data[5], data[6]]) as usize;
789 let vl_off = 7 + key_len;
790
791 if data.len() < vl_off + 4 {
792 return Err(RecordError::Truncated);
793 }
794 let value_len = u32::from_le_bytes([
795 data[vl_off],
796 data[vl_off + 1],
797 data[vl_off + 2],
798 data[vl_off + 3],
799 ]) as usize;
800
801 // ver_len byte sits right after the value.
802 let ver_len_off = vl_off + 4 + value_len;
803 if data.len() < ver_len_off + 1 {
804 return Err(RecordError::Truncated);
805 }
806 let ver_len = data[ver_len_off] as usize;
807 // A version token holds at most 10 bytes inline. A larger length means a
808 // corrupt or incompatible record — reject it rather than letting it reach
809 // `VersionToken::from_raw`, which would panic.
810 if ver_len > 10 {
811 return Err(RecordError::Invalid(format!(
812 "version length {ver_len} exceeds max version token size (10)"
813 )));
814 }
815
816 let total = ver_len_off + 1 + ver_len;
817 if data.len() < total {
818 return Err(RecordError::Truncated);
819 }
820
821 let computed = crc32fast::hash(&data[4..total]);
822 if computed != stored_crc {
823 return Err(RecordError::CrcMismatch { consumed: total });
824 }
825
826 let key = std::str::from_utf8(&data[7..7 + key_len])
827 .map_err(|e| RecordError::Invalid(format!("invalid UTF-8 key: {e}")))?;
828 let value = &data[vl_off + 4..vl_off + 4 + value_len];
829 // The `ver_len > 10` check above bounds this to the inline capacity, so
830 // `from_raw` always returns `Some` here; the guard makes that explicit.
831 let version = VersionToken::from_raw(&data[ver_len_off + 1..total]).ok_or_else(|| {
832 RecordError::Invalid(format!(
833 "version length {ver_len} exceeds max version token size (10)"
834 ))
835 })?;
836
837 Ok((
838 Record::Put {
839 key,
840 value,
841 version,
842 },
843 total,
844 ))
845}
846
847fn parse_delete(data: &[u8], stored_crc: u32) -> Result<(Record<'_>, usize), RecordError> {
848 if data.len() < 7 {
849 return Err(RecordError::Truncated);
850 }
851 let key_len = u16::from_le_bytes([data[5], data[6]]) as usize;
852 let ver_len_off = 7 + key_len;
853 if data.len() < ver_len_off + 1 {
854 return Err(RecordError::Truncated);
855 }
856 let ver_len = data[ver_len_off] as usize;
857 // See `parse_put`: reject oversized version lengths before `from_raw`.
858 if ver_len > 10 {
859 return Err(RecordError::Invalid(format!(
860 "version length {ver_len} exceeds max version token size (10)"
861 )));
862 }
863 let total = ver_len_off + 1 + ver_len;
864
865 if data.len() < total {
866 return Err(RecordError::Truncated);
867 }
868
869 let computed = crc32fast::hash(&data[4..total]);
870 if computed != stored_crc {
871 return Err(RecordError::CrcMismatch { consumed: total });
872 }
873
874 let key = std::str::from_utf8(&data[7..7 + key_len])
875 .map_err(|e| RecordError::Invalid(format!("invalid UTF-8 key: {e}")))?;
876 // The delete record's version is written for format symmetry but unused on
877 // replay (a delete just removes the key from the live set).
878
879 Ok((Record::Delete { key }, total))
880}
881
882fn parse_cursor(data: &[u8], stored_crc: u32) -> Result<(Record<'_>, usize), RecordError> {
883 if data.len() < 6 {
884 return Err(RecordError::Truncated);
885 }
886 let cursor_len = data[5] as usize;
887 // A version token holds at most 10 bytes inline. A larger length means a
888 // corrupt or incompatible record — reject it rather than letting it reach
889 // `VersionToken::from_raw`, which would panic.
890 if cursor_len > 10 {
891 return Err(RecordError::Invalid(format!(
892 "cursor length {cursor_len} exceeds max version token size (10)"
893 )));
894 }
895 let total = 6 + cursor_len;
896
897 if data.len() < total {
898 return Err(RecordError::Truncated);
899 }
900
901 let computed = crc32fast::hash(&data[4..total]);
902 if computed != stored_crc {
903 return Err(RecordError::CrcMismatch { consumed: total });
904 }
905
906 // The `cursor_len > 10` check above bounds this to the inline capacity, so
907 // `from_raw` always returns `Some` here; the guard makes that explicit.
908 let version = VersionToken::from_raw(&data[6..total]).ok_or_else(|| {
909 RecordError::Invalid(format!(
910 "cursor length {cursor_len} exceeds max version token size (10)"
911 ))
912 })?;
913
914 Ok((Record::Cursor(WatchCursor::from_version(version)), total))
915}
916
917// ---------------------------------------------------------------------------
918// Internal: record writing (incremental CRC, no allocations)
919// ---------------------------------------------------------------------------
920
921fn write_put_record(
922 w: &mut impl Write,
923 key: &str,
924 value: &[u8],
925 version: &VersionToken,
926) -> Result<usize, SnapshotError> {
927 let kb = key.as_bytes();
928 let vb = version.as_bytes();
929
930 // The wire format encodes key length as u16 and value length as u32.
931 // Reject anything that would truncate on cast — a silent truncation here
932 // produces a record whose CRC covers the full bytes but whose stored length
933 // is wrong, which the reader can only interpret as mid-file corruption.
934 let key_len = u16::try_from(kb.len()).map_err(|_| {
935 SnapshotError::InvalidFormat(format!(
936 "key too long: {} bytes (max {})",
937 kb.len(),
938 u16::MAX
939 ))
940 })?;
941 let value_len = u32::try_from(value.len()).map_err(|_| {
942 SnapshotError::InvalidFormat(format!(
943 "value too long: {} bytes (max {})",
944 value.len(),
945 u32::MAX
946 ))
947 })?;
948 // The version is stored as length-prefixed raw bytes so any backend's token
949 // (NATS u64, FDB 10-byte versionstamp) round-trips intact. `VersionToken`
950 // caps inline storage at 10 bytes, so this `u8` length never truncates today;
951 // checking surfaces a format error rather than corrupting the frame if a
952 // future token widens past 255 bytes.
953 let ver_len = u8::try_from(vb.len()).map_err(|_| {
954 SnapshotError::InvalidFormat(format!(
955 "version too long: {} bytes (max {})",
956 vb.len(),
957 u8::MAX
958 ))
959 })?;
960
961 let mut h = crc32fast::Hasher::new();
962 h.update(&[REC_PUT]);
963 h.update(&key_len.to_le_bytes());
964 h.update(kb);
965 h.update(&value_len.to_le_bytes());
966 h.update(value);
967 h.update(&[ver_len]);
968 h.update(vb);
969 let crc = h.finalize();
970
971 w.write_all(&crc.to_le_bytes())?;
972 w.write_all(&[REC_PUT])?;
973 w.write_all(&key_len.to_le_bytes())?;
974 w.write_all(kb)?;
975 w.write_all(&value_len.to_le_bytes())?;
976 w.write_all(value)?;
977 w.write_all(&[ver_len])?;
978 w.write_all(vb)?;
979
980 Ok(4 + 1 + 2 + kb.len() + 4 + value.len() + 1 + vb.len())
981}
982
983fn write_delete_record(
984 w: &mut impl Write,
985 key: &str,
986 version: &VersionToken,
987) -> Result<usize, SnapshotError> {
988 let kb = key.as_bytes();
989 let vb = version.as_bytes();
990
991 let key_len = u16::try_from(kb.len()).map_err(|_| {
992 SnapshotError::InvalidFormat(format!(
993 "key too long: {} bytes (max {})",
994 kb.len(),
995 u16::MAX
996 ))
997 })?;
998 // Length-prefixed version, matching `write_put_record`. See its comment for
999 // why this is bytes rather than a fixed u64.
1000 let ver_len = u8::try_from(vb.len()).map_err(|_| {
1001 SnapshotError::InvalidFormat(format!(
1002 "version too long: {} bytes (max {})",
1003 vb.len(),
1004 u8::MAX
1005 ))
1006 })?;
1007
1008 let mut h = crc32fast::Hasher::new();
1009 h.update(&[REC_DELETE]);
1010 h.update(&key_len.to_le_bytes());
1011 h.update(kb);
1012 h.update(&[ver_len]);
1013 h.update(vb);
1014 let crc = h.finalize();
1015
1016 w.write_all(&crc.to_le_bytes())?;
1017 w.write_all(&[REC_DELETE])?;
1018 w.write_all(&key_len.to_le_bytes())?;
1019 w.write_all(kb)?;
1020 w.write_all(&[ver_len])?;
1021 w.write_all(vb)?;
1022
1023 Ok(4 + 1 + 2 + kb.len() + 1 + vb.len())
1024}
1025
1026fn write_cursor_record(w: &mut impl Write, cursor: &WatchCursor) -> Result<usize, SnapshotError> {
1027 let cb = cursor.version().as_bytes();
1028 // The record encodes the cursor length as a single byte. `VersionToken`
1029 // caps inline storage at 10 bytes, so this never trips today — but checking
1030 // here rather than casting means a future backend that widens the token
1031 // surfaces a format error instead of silently truncating the length prefix
1032 // (which the reader would then mis-frame as corruption).
1033 let cb_len = u8::try_from(cb.len()).map_err(|_| {
1034 SnapshotError::InvalidFormat(format!(
1035 "cursor too long: {} bytes (max {})",
1036 cb.len(),
1037 u8::MAX
1038 ))
1039 })?;
1040
1041 let mut h = crc32fast::Hasher::new();
1042 h.update(&[REC_CURSOR]);
1043 h.update(&[cb_len]);
1044 h.update(cb);
1045 let crc = h.finalize();
1046
1047 w.write_all(&crc.to_le_bytes())?;
1048 w.write_all(&[REC_CURSOR])?;
1049 w.write_all(&[cb_len])?;
1050 w.write_all(cb)?;
1051
1052 Ok(4 + 1 + 1 + cb.len())
1053}
1054
1055// ---------------------------------------------------------------------------
1056// Internal: compaction
1057// ---------------------------------------------------------------------------
1058
1059fn compact_to_file(
1060 path: &Path,
1061 entries: &HashMap<String, KvEntry>,
1062 cursor: &WatchCursor,
1063) -> Result<(), SnapshotError> {
1064 // The tempfile must live in the same directory as the target so the final
1065 // `persist` is an atomic same-filesystem rename. Falling back to "." here
1066 // would silently place it in the cwd, and a cross-filesystem rename then
1067 // fails with EXDEV — a hard error masquerading as a path quirk. A snapshot
1068 // path with no parent is a caller bug; surface it.
1069 let dir = path.parent().ok_or_else(|| {
1070 SnapshotError::InvalidFormat(format!("snapshot path has no parent: {}", path.display()))
1071 })?;
1072 // Collect the live entries once, then drive both the buffer-size estimate
1073 // and the write pass off this slice — a single walk of `entries` instead of
1074 // one to sum sizes and another to write.
1075 //
1076 // Write in sorted key order so a given logical state always serializes to
1077 // identical bytes. `HashMap` iteration order is randomized per process,
1078 // which would otherwise make every compaction emit a different layout —
1079 // defeating byte-level snapshot comparison (e.g. an integrity checksum) and
1080 // making file diffs noise. The sort is O(n log n) on the live key set, which
1081 // is trivial next to the I/O it precedes.
1082 let mut sorted: Vec<&KvEntry> = entries.values().collect();
1083 sorted.sort_unstable_by(|a, b| a.key.cmp(&b.key));
1084
1085 // Buffer the writes: each record is 5–8 individual `write_all` calls, so an
1086 // unbuffered tempfile would issue thousands of `write(2)` syscalls per
1087 // compaction. Size the buffer to the exact serialized length (clamped to
1088 // [8 KiB, 1 MiB]) so the whole snapshot flushes in a handful of syscalls
1089 // instead of one per default 8 KiB page — and a pathologically large table
1090 // can't balloon the buffer past 1 MiB.
1091 let estimated: usize = HEADER_LEN
1092 + sorted
1093 .iter()
1094 .map(|e| 4 + 1 + 2 + e.key.len() + 4 + e.value.len() + 1 + e.version.as_bytes().len())
1095 .sum::<usize>()
1096 + if cursor.is_none() {
1097 0
1098 } else {
1099 4 + 1 + 1 + cursor.version().as_bytes().len()
1100 };
1101 let capacity = estimated.clamp(8 * 1024, 1024 * 1024);
1102 let mut buf = io::BufWriter::with_capacity(capacity, tempfile::NamedTempFile::new_in(dir)?);
1103
1104 buf.write_all(MAGIC)?;
1105 buf.write_all(&FORMAT_VERSION.to_le_bytes())?;
1106
1107 for entry in sorted {
1108 write_put_record(&mut buf, &entry.key, &entry.value, &entry.version)?;
1109 }
1110
1111 if !cursor.is_none() {
1112 write_cursor_record(&mut buf, cursor)?;
1113 }
1114
1115 buf.flush()?;
1116 let tmp = buf
1117 .into_inner()
1118 .map_err(|e| SnapshotError::Io(e.into_error()))?;
1119
1120 tmp.as_file().sync_all()?;
1121 tmp.persist(path).map_err(|e| SnapshotError::Io(e.error))?;
1122
1123 Ok(())
1124}
1125
1126// ---------------------------------------------------------------------------
1127// Tests
1128// ---------------------------------------------------------------------------
1129
1130#[cfg(test)]
1131mod tests {
1132 use super::*;
1133 use tempfile::TempDir;
1134
1135 fn entry(key: &str, value: &[u8], rev: u64) -> KvEntry {
1136 KvEntry {
1137 key: key.to_string(),
1138 value: value.to_vec(),
1139 version: VersionToken::from_u64(rev),
1140 }
1141 }
1142
1143 fn cursor(rev: u64) -> WatchCursor {
1144 WatchCursor::from_u64(rev)
1145 }
1146
1147 fn put(key: &str, value: &[u8], rev: u64) -> KvUpdate {
1148 KvUpdate::Put(entry(key, value, rev))
1149 }
1150
1151 fn delete(key: &str, rev: u64) -> KvUpdate {
1152 KvUpdate::Delete {
1153 key: key.to_string(),
1154 version: VersionToken::from_u64(rev),
1155 }
1156 }
1157
1158 #[test]
1159 fn round_trip() {
1160 let dir = TempDir::new().unwrap();
1161 let path = dir.path().join("test.snap");
1162
1163 let mut w = SnapshotWriter::open(&path, u64::MAX).unwrap();
1164 w.write_update(&put("node.us-east-1", b"val1", 1)).unwrap();
1165 w.write_update(&put("node.eu-west-1", b"val2", 2)).unwrap();
1166 w.checkpoint(&cursor(2)).unwrap();
1167 drop(w);
1168
1169 let snap = load(&path).unwrap().unwrap();
1170 assert_eq!(snap.entries.len(), 2);
1171 assert_eq!(snap.cursor.as_u64(), Some(2));
1172
1173 assert_eq!(snap.entries["node.us-east-1"].value, b"val1");
1174 assert_eq!(snap.entries["node.eu-west-1"].value, b"val2");
1175 }
1176
1177 #[test]
1178 fn multiple_batches() {
1179 let dir = TempDir::new().unwrap();
1180 let path = dir.path().join("test.snap");
1181
1182 let mut w = SnapshotWriter::open(&path, u64::MAX).unwrap();
1183 w.write_update(&put("a", b"v1", 1)).unwrap();
1184 w.checkpoint(&cursor(1)).unwrap();
1185 w.write_update(&put("b", b"v2", 2)).unwrap();
1186 w.checkpoint(&cursor(2)).unwrap();
1187 drop(w);
1188
1189 let snap = load(&path).unwrap().unwrap();
1190 assert_eq!(snap.entries.len(), 2);
1191 assert_eq!(snap.cursor.as_u64(), Some(2));
1192 }
1193
1194 #[test]
1195 fn delete_removes_entry() {
1196 let dir = TempDir::new().unwrap();
1197 let path = dir.path().join("test.snap");
1198
1199 let mut w = SnapshotWriter::open(&path, u64::MAX).unwrap();
1200 w.write_update(&put("a", b"v1", 1)).unwrap();
1201 w.write_update(&put("b", b"v2", 2)).unwrap();
1202 w.checkpoint(&cursor(2)).unwrap();
1203 w.write_update(&delete("a", 3)).unwrap();
1204 w.checkpoint(&cursor(3)).unwrap();
1205 drop(w);
1206
1207 let snap = load(&path).unwrap().unwrap();
1208 assert_eq!(snap.entries.len(), 1);
1209 assert!(snap.entries.contains_key("b"));
1210 }
1211
1212 #[test]
1213 fn purge_removes_entry() {
1214 let dir = TempDir::new().unwrap();
1215 let path = dir.path().join("test.snap");
1216
1217 let mut w = SnapshotWriter::open(&path, u64::MAX).unwrap();
1218 w.write_update(&put("a", b"v1", 1)).unwrap();
1219 w.checkpoint(&cursor(1)).unwrap();
1220 w.write_update(&KvUpdate::Purge {
1221 key: "a".to_string(),
1222 version: VersionToken::from_u64(2),
1223 })
1224 .unwrap();
1225 w.checkpoint(&cursor(2)).unwrap();
1226 drop(w);
1227
1228 let snap = load(&path).unwrap().unwrap();
1229 assert!(!snap.entries.contains_key("a"));
1230 }
1231
1232 #[test]
1233 fn missing_file_returns_none() {
1234 let dir = TempDir::new().unwrap();
1235 assert!(load(&dir.path().join("nope.snap")).unwrap().is_none());
1236 }
1237
1238 #[test]
1239 fn corrupted_mid_file() {
1240 let dir = TempDir::new().unwrap();
1241 let path = dir.path().join("test.snap");
1242
1243 let mut w = SnapshotWriter::open(&path, u64::MAX).unwrap();
1244 // Write three entries so corruption in the middle has records on both sides
1245 w.write_update(&put("a", b"aaaa-long-value-here", 1))
1246 .unwrap();
1247 w.checkpoint(&cursor(1)).unwrap();
1248 w.write_update(&put("b", b"bbbb-long-value-here", 2))
1249 .unwrap();
1250 w.checkpoint(&cursor(2)).unwrap();
1251 w.write_update(&put("c", b"cccc-long-value-here", 3))
1252 .unwrap();
1253 w.checkpoint(&cursor(3)).unwrap();
1254 drop(w);
1255
1256 let mut data = fs::read(&path).unwrap();
1257 // Flip a byte in the second record area (well past the first, well before EOF)
1258 let target = HEADER_LEN + 40;
1259 assert!(
1260 target < data.len() - 60,
1261 "need enough room after corruption for valid records"
1262 );
1263 data[target] ^= 0xFF;
1264 fs::write(&path, &data).unwrap();
1265
1266 match load(&path) {
1267 Err(SnapshotError::Corrupted) => {}
1268 other => panic!("expected Corrupted, got {other:?}"),
1269 }
1270 }
1271
1272 #[test]
1273 fn truncated_final_record_recovered() {
1274 let dir = TempDir::new().unwrap();
1275 let path = dir.path().join("test.snap");
1276
1277 let mut w = SnapshotWriter::open(&path, u64::MAX).unwrap();
1278 w.write_update(&put("a", b"v1", 1)).unwrap();
1279 w.checkpoint(&cursor(1)).unwrap();
1280 w.write_update(&put("b", b"v2", 2)).unwrap();
1281 w.checkpoint(&cursor(2)).unwrap();
1282 drop(w);
1283
1284 // Chop a few bytes off the end (simulates crash mid-write)
1285 let mut data = fs::read(&path).unwrap();
1286 data.truncate(data.len() - 3);
1287 fs::write(&path, &data).unwrap();
1288
1289 let snap = load(&path).unwrap().unwrap();
1290 assert!(snap.entries.contains_key("a"));
1291 }
1292
1293 #[test]
1294 fn truncated_tail_repaired_then_appendable() {
1295 // The already-compact skip optimization must NOT skip the rewrite when
1296 // the log has a truncated tail: leaving the partial record on disk would
1297 // let the next append land after it and corrupt the log.
1298 let dir = TempDir::new().unwrap();
1299 let path = dir.path().join("test.snap");
1300
1301 let mut w = SnapshotWriter::open(&path, u64::MAX).unwrap();
1302 w.write_update(&put("a", b"v1", 1)).unwrap();
1303 w.write_update(&put("b", b"v2", 2)).unwrap();
1304 w.checkpoint(&cursor(2)).unwrap();
1305 drop(w);
1306
1307 // Simulate a crash mid-write: chop the final bytes.
1308 let mut data = fs::read(&path).unwrap();
1309 data.truncate(data.len() - 3);
1310 fs::write(&path, &data).unwrap();
1311
1312 // load() repairs the file by rewriting without the partial tail.
1313 let snap = load(&path).unwrap().unwrap();
1314 assert!(snap.entries.contains_key("a"));
1315 assert!(snap.entries.contains_key("b"));
1316
1317 // Appending after the repaired load must not corrupt the log.
1318 let mut w = SnapshotWriter::open(&path, u64::MAX).unwrap();
1319 w.write_update(&put("c", b"v3", 3)).unwrap();
1320 w.checkpoint(&cursor(3)).unwrap();
1321 drop(w);
1322
1323 let snap = load(&path).unwrap().unwrap();
1324 assert_eq!(snap.entries.len(), 3);
1325 assert!(snap.entries.contains_key("c"));
1326 assert_eq!(snap.cursor.as_u64(), Some(3));
1327 }
1328
1329 #[test]
1330 fn repeated_cursor_records_trigger_compaction() {
1331 // A service that checkpoints frequently writes one cursor record per
1332 // checkpoint. Even with unique keys and a clean EOF, the stale cursor
1333 // records are dead weight: load() must NOT treat the file as already
1334 // compact, or the bloat persists until the writer's own threshold fires
1335 // (and never, if the process crashes first).
1336 let dir = TempDir::new().unwrap();
1337 let path = dir.path().join("test.snap");
1338
1339 let mut w = SnapshotWriter::open(&path, u64::MAX).unwrap();
1340 w.write_update(&put("a", b"v1", 1)).unwrap();
1341 for i in 1..=10u64 {
1342 w.checkpoint(&cursor(i)).unwrap();
1343 }
1344 drop(w);
1345
1346 let size_before = fs::metadata(&path).unwrap().len();
1347 let snap = load(&path).unwrap().unwrap();
1348 let size_after = fs::metadata(&path).unwrap().len();
1349
1350 // Single entry + only the latest cursor survive.
1351 assert_eq!(snap.entries.len(), 1);
1352 assert_eq!(snap.cursor.as_u64(), Some(10));
1353 assert!(
1354 size_after < size_before,
1355 "stale cursor records should be compacted away: {size_before} -> {size_after}"
1356 );
1357
1358 // The rewritten file is now genuinely compact: a second load is a no-op.
1359 let after_first = fs::read(&path).unwrap();
1360 load(&path).unwrap().unwrap();
1361 let after_second = fs::read(&path).unwrap();
1362 assert_eq!(after_first, after_second);
1363 }
1364
1365 #[test]
1366 fn already_compact_file_reloads_unchanged() {
1367 // A compact file (unique keys, no deletes, clean EOF) should reload
1368 // correctly even though load() skips the rewrite.
1369 let dir = TempDir::new().unwrap();
1370 let path = dir.path().join("test.snap");
1371
1372 let mut w = SnapshotWriter::open(&path, u64::MAX).unwrap();
1373 w.write_update(&put("a", b"v1", 1)).unwrap();
1374 w.write_update(&put("b", b"v2", 2)).unwrap();
1375 w.checkpoint(&cursor(2)).unwrap();
1376 drop(w);
1377
1378 // First load compacts. Capture the resulting bytes.
1379 load(&path).unwrap().unwrap();
1380 let after_first = fs::read(&path).unwrap();
1381
1382 // Second load sees an already-compact file: skips the rewrite, so the
1383 // bytes are byte-for-byte identical, and the data still round-trips.
1384 let snap = load(&path).unwrap().unwrap();
1385 let after_second = fs::read(&path).unwrap();
1386
1387 assert_eq!(after_first, after_second);
1388 assert_eq!(snap.entries.len(), 2);
1389 assert_eq!(snap.cursor.as_u64(), Some(2));
1390 }
1391
1392 #[test]
1393 fn bad_magic() {
1394 let dir = TempDir::new().unwrap();
1395 let path = dir.path().join("test.snap");
1396 fs::write(&path, b"XXXX\x01\x00").unwrap();
1397
1398 match load(&path) {
1399 Err(SnapshotError::InvalidFormat(msg)) => assert!(msg.contains("magic")),
1400 other => panic!("expected InvalidFormat, got {other:?}"),
1401 }
1402 }
1403
1404 #[test]
1405 fn wrong_version_rejected() {
1406 let dir = TempDir::new().unwrap();
1407 let path = dir.path().join("test.snap");
1408 // Valid magic, but a format version this build doesn't understand.
1409 // A future engineer who bumps FORMAT_VERSION must see old files rejected
1410 // rather than silently misparsed.
1411 let mut data = Vec::new();
1412 data.extend_from_slice(MAGIC);
1413 data.extend_from_slice(&(FORMAT_VERSION + 1).to_le_bytes());
1414 fs::write(&path, &data).unwrap();
1415
1416 match load(&path) {
1417 Err(SnapshotError::InvalidFormat(msg)) => {
1418 assert!(
1419 msg.contains("version"),
1420 "message should mention version: {msg}"
1421 )
1422 }
1423 other => panic!("expected InvalidFormat, got {other:?}"),
1424 }
1425 }
1426
1427 #[test]
1428 fn empty_log_returns_none() {
1429 let dir = TempDir::new().unwrap();
1430 let path = dir.path().join("test.snap");
1431
1432 let mut f = File::create(&path).unwrap();
1433 f.write_all(MAGIC).unwrap();
1434 f.write_all(&FORMAT_VERSION.to_le_bytes()).unwrap();
1435 drop(f);
1436
1437 assert!(load(&path).unwrap().is_none());
1438 }
1439
1440 #[test]
1441 fn compaction_on_load_shrinks_file() {
1442 let dir = TempDir::new().unwrap();
1443 let path = dir.path().join("test.snap");
1444
1445 let mut w = SnapshotWriter::open(&path, u64::MAX).unwrap();
1446 for i in 0..10u64 {
1447 w.write_update(&put("same-key", format!("v{i}").as_bytes(), i))
1448 .unwrap();
1449 w.checkpoint(&cursor(i)).unwrap();
1450 }
1451 drop(w);
1452
1453 let size_before = fs::metadata(&path).unwrap().len();
1454 let snap = load(&path).unwrap().unwrap();
1455 let size_after = fs::metadata(&path).unwrap().len();
1456
1457 assert_eq!(snap.entries.len(), 1);
1458 assert_eq!(snap.entries["same-key"].value, b"v9");
1459 assert!(
1460 size_after < size_before,
1461 "compaction should shrink: {size_before} -> {size_after}"
1462 );
1463 }
1464
1465 #[test]
1466 fn compact_when_threshold_exceeded() {
1467 let dir = TempDir::new().unwrap();
1468 let path = dir.path().join("test.snap");
1469
1470 // Threshold low enough to trigger after a few writes
1471 let mut w = SnapshotWriter::open(&path, 100).unwrap();
1472 for i in 0..20u64 {
1473 w.write_update(&put("key", format!("value-{i}").as_bytes(), i))
1474 .unwrap();
1475 if w.checkpoint(&cursor(i)).unwrap() {
1476 w.compact().unwrap();
1477 }
1478 }
1479 drop(w);
1480
1481 let snap = load(&path).unwrap().unwrap();
1482 assert_eq!(snap.entries.len(), 1);
1483 assert_eq!(snap.entries["key"].value, b"value-19");
1484 }
1485
1486 #[test]
1487 fn reopen_appends() {
1488 let dir = TempDir::new().unwrap();
1489 let path = dir.path().join("test.snap");
1490
1491 // First writer
1492 let mut w = SnapshotWriter::open(&path, u64::MAX).unwrap();
1493 w.write_update(&put("a", b"v1", 1)).unwrap();
1494 w.checkpoint(&cursor(1)).unwrap();
1495 drop(w);
1496
1497 // Second writer appends
1498 let mut w = SnapshotWriter::open(&path, u64::MAX).unwrap();
1499 w.write_update(&put("b", b"v2", 2)).unwrap();
1500 w.checkpoint(&cursor(2)).unwrap();
1501 drop(w);
1502
1503 let snap = load(&path).unwrap().unwrap();
1504 assert_eq!(snap.entries.len(), 2);
1505 assert_eq!(snap.cursor.as_u64(), Some(2));
1506 }
1507
1508 #[test]
1509 fn large_values() {
1510 let dir = TempDir::new().unwrap();
1511 let path = dir.path().join("test.snap");
1512
1513 let big = vec![0xABu8; 100_000];
1514 let mut w = SnapshotWriter::open(&path, u64::MAX).unwrap();
1515 w.write_update(&put("big", &big, 1)).unwrap();
1516 w.checkpoint(&cursor(1)).unwrap();
1517 drop(w);
1518
1519 let snap = load(&path).unwrap().unwrap();
1520 assert_eq!(snap.entries.len(), 1);
1521 assert_eq!(snap.entries["big"].value.len(), 100_000);
1522 assert!(snap.entries["big"].value.iter().all(|&b| b == 0xAB));
1523 }
1524
1525 #[test]
1526 fn cursor_only_snapshot_returns_some_with_empty_entries() {
1527 // A service that checkpoints before writing any entries produces a file
1528 // with only a cursor record. load() must return Some (not None) so callers
1529 // get the resume position even when there's nothing to preload. The guard
1530 // `entries.is_empty() && cursor.is_none()` must NOT fire when the cursor
1531 // is present.
1532 let dir = TempDir::new().unwrap();
1533 let path = dir.path().join("test.snap");
1534
1535 let mut w = SnapshotWriter::open(&path, u64::MAX).unwrap();
1536 w.checkpoint(&cursor(42)).unwrap();
1537 drop(w);
1538
1539 let snap = load(&path)
1540 .unwrap()
1541 .expect("cursor-only snapshot should return Some");
1542 assert!(snap.entries.is_empty(), "no entries written, none expected");
1543 assert_eq!(
1544 snap.cursor.as_u64(),
1545 Some(42),
1546 "cursor must survive round-trip"
1547 );
1548 }
1549
1550 #[test]
1551 fn stale_keys_detects_removed_entries() {
1552 let dir = TempDir::new().unwrap();
1553 let path = dir.path().join("test.snap");
1554
1555 let mut w = SnapshotWriter::open(&path, u64::MAX).unwrap();
1556 w.write_update(&put("node.a", b"v1", 1)).unwrap();
1557 w.write_update(&put("node.b", b"v2", 2)).unwrap();
1558 w.write_update(&put("node.c", b"v3", 3)).unwrap();
1559 w.checkpoint(&cursor(3)).unwrap();
1560 drop(w);
1561
1562 let snap = load(&path).unwrap().unwrap();
1563
1564 // Simulate a fresh scan that only has "node.a" and "node.c"
1565 let mut stale = snap.stale_keys(["node.a", "node.c"]);
1566 stale.sort();
1567 assert_eq!(stale, vec!["node.b"]);
1568
1569 // All keys present → no stale
1570 let stale = snap.stale_keys(["node.a", "node.b", "node.c"]);
1571 assert!(stale.is_empty());
1572
1573 // No keys present → all stale
1574 let mut stale: Vec<&str> = snap.stale_keys(std::iter::empty::<&str>());
1575 stale.sort();
1576 assert_eq!(stale, vec!["node.a", "node.b", "node.c"]);
1577 }
1578
1579 #[test]
1580 fn non_u64_version_token_round_trips() {
1581 // Regression: Put/Delete records store the version as length-prefixed raw
1582 // bytes, not a fixed u64. A 10-byte FDB-style versionstamp must survive a
1583 // round-trip intact — the old u64-only field flattened it to 0, which
1584 // would make every later CAS on a restored entry fail RevisionMismatch.
1585 let dir = TempDir::new().unwrap();
1586 let path = dir.path().join("test.snap");
1587
1588 let stamp = [9u8, 8, 7, 6, 5, 4, 3, 2, 1, 0];
1589 let token = VersionToken::from_fdb_versionstamp(&stamp);
1590 assert!(token.as_u64().is_none(), "10-byte token is not a u64");
1591
1592 let mut w = SnapshotWriter::open(&path, u64::MAX).unwrap();
1593 w.write_update(&KvUpdate::Put(KvEntry {
1594 key: "fdb.key".to_string(),
1595 value: b"v".to_vec(),
1596 version: token.clone(),
1597 }))
1598 .unwrap();
1599 w.checkpoint(&cursor(1)).unwrap();
1600 drop(w);
1601
1602 let snap = load(&path).unwrap().unwrap();
1603 assert_eq!(
1604 snap.entries["fdb.key"].version.as_bytes(),
1605 &stamp,
1606 "versionstamp must survive the snapshot round-trip byte-for-byte"
1607 );
1608 }
1609
1610 /// Proves the `compact()` flush-before-read fix: records written but not yet
1611 /// checkpointed still sit in the BufWriter. Before the fix, `compact()` read
1612 /// the file (missing those records), rewrote it, and the old buffer then
1613 /// flushed into the renamed-away inode — silently dropping the writes. With
1614 /// the leading flush, an un-checkpointed write survives compaction.
1615 #[test]
1616 fn compact_preserves_uncheckpointed_writes() {
1617 let dir = TempDir::new().unwrap();
1618 let path = dir.path().join("test.snap");
1619
1620 let mut w = SnapshotWriter::open(&path, u64::MAX).unwrap();
1621 // Buffered only — deliberately NO checkpoint() before compacting.
1622 w.write_update(&put("node.a", b"survives", 1)).unwrap();
1623 w.compact().unwrap();
1624 drop(w);
1625
1626 let snap = load(&path).unwrap().unwrap();
1627 assert!(
1628 snap.entries.contains_key("node.a"),
1629 "compact() must not drop buffered-but-uncheckpointed writes"
1630 );
1631 assert_eq!(snap.entries["node.a"].value, b"survives");
1632 }
1633}