Skip to main content

slipstream/
snapshot.rs

1//! Append-only snapshot log for KV state.
2//!
3//! Streams [`KvUpdate`]s to disk as they arrive via [`SnapshotWriter`],
4//! with no in-memory state beyond a file handle and byte counter.
5//! On startup, [`load`] replays the log to reconstruct entries + cursor,
6//! then compacts the file.
7//!
8//! ## File format
9//!
10//! 6-byte header followed by a sequence of CRC'd records:
11//!
12//! ```text
13//! Header:  b"PGSS" ++ version:u16le
14//! Record:  crc32:u32le ++ type:u8 ++ payload (varies by type)
15//!
16//! Put:     key_len:u16le ++ key ++ value_len:u32le ++ value ++ ver_len:u8 ++ version
17//! Delete:  key_len:u16le ++ key ++ ver_len:u8 ++ version
18//! Cursor:  cur_len:u8 ++ cursor
19//! ```
20//!
21//! `version` is the raw [`VersionToken`] bytes (≤10), not a fixed u64 — a
22//! 10-byte FDB versionstamp round-trips intact, where a `u64`-only field would
23//! silently flatten it to 0 and break every later CAS.
24//!
25//! Used by edge/tunnel services to survive restarts without a full
26//! NATS KV scan. The snapshot is a cache — delete it and the system
27//! falls back to `load_all()`.
28//!
29//! ## Blocking I/O
30//!
31//! Every function in this module performs **synchronous** file I/O and is
32//! deliberately runtime-agnostic — it pulls in no async runtime so a caller can
33//! place it on whichever executor (or none) it wants. The flip side is that none
34//! of these calls may run directly on an async executor thread: [`load`],
35//! [`SnapshotWriter::compact`], and friends `read`/`write`/`rename` whole files
36//! and will stall the reactor if awaited inline. Async callers must offload them
37//! with `tokio::task::spawn_blocking` (or the equivalent).
38//! [`SnapshotWriter::compact`] is the heaviest — it reads and rewrites the entire
39//! log — but the rule is the same for all of them.
40
41use std::collections::{HashMap, HashSet};
42use std::fs::{self, File, OpenOptions};
43use std::io::{self, Write};
44use std::path::{Path, PathBuf};
45
46use crate::kv::{KvEntry, KvUpdate, VersionToken, WatchCursor};
47
48const MAGIC: &[u8; 4] = b"PGSS";
49// v2: Put/Delete records store the version as length-prefixed raw bytes instead
50// of a fixed 8-byte u64, so non-u64 tokens (e.g. FDB versionstamps) survive a
51// round-trip. v1 files are rejected by `replay_log`; the snapshot is a cache, so
52// a rejected file just triggers a fresh NATS scan + watch replay.
53const FORMAT_VERSION: u16 = 2;
54const HEADER_LEN: usize = 6;
55
56const REC_PUT: u8 = 0x01;
57const REC_DELETE: u8 = 0x02;
58const REC_CURSOR: u8 = 0x03;
59
60// Minimum complete record sizes (CRC + type + minimum payload)
61const MIN_CURSOR_RECORD: usize = 4 + 1 + 1; // 6
62
63/// Errors from snapshot operations.
64///
65/// Uses `thiserror` to match [`crate::KvError`]; unlike `KvError` (which is
66/// `Clone` and so flattens its causes to strings), snapshot errors are observed
67/// by a single caller, so `Io` keeps its `#[source]` chain via `#[from]`.
68#[derive(Debug, thiserror::Error)]
69pub enum SnapshotError {
70    #[error("snapshot I/O error: {0}")]
71    Io(#[from] io::Error),
72    #[error("invalid snapshot format: {0}")]
73    InvalidFormat(String),
74    #[error("snapshot corrupted (CRC mismatch)")]
75    Corrupted,
76}
77
78/// Result of loading a snapshot from disk.
79#[derive(Debug)]
80pub struct Snapshot {
81    /// Watch cursor at the time of the last checkpoint.
82    pub cursor: WatchCursor,
83    /// Live KV entries keyed by name (deduplicated, deletes applied).
84    pub entries: HashMap<String, KvEntry>,
85}
86
87impl Snapshot {
88    /// Keys present in this snapshot but absent from a fresh scan result.
89    ///
90    /// After a cursor-expired fallback to full `watch_all()`, callers should
91    /// compare the snapshot against the live key set and emit synthetic
92    /// `Delete` events for stale keys to ensure convergence.
93    pub fn stale_keys<'a, I>(&'a self, current_keys: I) -> Vec<&'a str>
94    where
95        I: IntoIterator<Item = &'a str>,
96    {
97        let current: HashSet<&str> = current_keys.into_iter().collect();
98        self.entries
99            .keys()
100            .filter(|k| !current.contains(k.as_str()))
101            .map(|k| k.as_str())
102            .collect()
103    }
104}
105
106/// Append-only snapshot writer.
107///
108/// Streams [`KvUpdate`] records to disk via a buffered writer. No
109/// in-memory state beyond the file handle and a byte counter for
110/// compaction triggering.
111///
112/// Compacts automatically when bytes written since last compaction
113/// exceeds `compact_threshold`. Compaction replays the log into a
114/// transient [`HashMap`], rewrites via tempfile+rename, and reopens
115/// for append.
116pub struct SnapshotWriter {
117    path: PathBuf,
118    // `None` only after a `compact()` rewrote the file (atomic rename succeeded)
119    // but failed to reopen it for append. The old handle then pointed at the
120    // renamed-away inode; rather than keep writing into that orphan — silently
121    // losing every later record on close — we drop it and poison the writer so
122    // `write_update`/`checkpoint`/`flush` return an error until reconstructed.
123    writer: Option<io::BufWriter<File>>,
124    bytes_since_compact: u64,
125    compact_threshold: u64,
126}
127
128impl SnapshotWriter {
129    /// Open or create a snapshot log.
130    ///
131    /// If the file doesn't exist, writes the header. If it exists, opens
132    /// for append. `compact_threshold` controls how many bytes accumulate
133    /// before an automatic compaction.
134    pub fn open(path: &Path, compact_threshold: u64) -> Result<Self, SnapshotError> {
135        // Open first, then size the file from its own handle. The previous
136        // `path.exists()` + `fs::metadata(path)` pair issued two extra `stat(2)`
137        // syscalls and left a TOCTOU window (the file could be created or removed
138        // between the checks). One `open(2)` plus a handle `metadata()` is both
139        // fewer syscalls and race-free — the length we read is the length of the
140        // file we hold open.
141        let file = OpenOptions::new().create(true).append(true).open(path)?;
142        let existing_len = file.metadata()?.len();
143
144        let mut writer = io::BufWriter::new(file);
145
146        // A file with at least a full header is an existing log: everything past
147        // the 6-byte header counts toward the compaction threshold. Anything
148        // shorter (brand-new, or a header torn by a crash mid-create) gets a
149        // fresh header written before the first record.
150        let bytes_since_compact = if existing_len >= HEADER_LEN as u64 {
151            existing_len - HEADER_LEN as u64
152        } else {
153            writer.write_all(MAGIC)?;
154            writer.write_all(&FORMAT_VERSION.to_le_bytes())?;
155            writer.flush()?;
156            0
157        };
158
159        Ok(Self {
160            path: path.to_path_buf(),
161            writer: Some(writer),
162            bytes_since_compact,
163            compact_threshold,
164        })
165    }
166
167    /// Borrow the underlying writer, or fail if a prior `compact()` poisoned it
168    /// (see the `writer` field doc). Keeps the orphaned-fd failure mode a
169    /// surfaced error rather than a silent data loss.
170    fn writer(&mut self) -> Result<&mut io::BufWriter<File>, SnapshotError> {
171        self.writer.as_mut().ok_or_else(|| {
172            SnapshotError::Io(io::Error::other(
173                "snapshot writer poisoned: a prior compact() failed to reopen the log for append",
174            ))
175        })
176    }
177
178    /// Write a single [`KvUpdate`] record to the log.
179    ///
180    /// Buffered — does not flush to disk until [`checkpoint`](Self::checkpoint).
181    #[must_use = "I/O errors mean the write was lost"]
182    pub fn write_update(&mut self, update: &KvUpdate) -> Result<(), SnapshotError> {
183        let w = self.writer()?;
184        let bytes = match update {
185            KvUpdate::Put(entry) => write_put_record(w, &entry.key, &entry.value, &entry.version)?,
186            KvUpdate::Delete { key, version } | KvUpdate::Purge { key, version } => {
187                write_delete_record(w, key, version)?
188            }
189        };
190        self.bytes_since_compact += bytes as u64;
191        Ok(())
192    }
193
194    /// Write a cursor checkpoint and flush the buffer to the OS.
195    ///
196    /// The flush is a `write(2)` into the page cache — it survives a process
197    /// crash, but NOT power loss: there is no `fsync` on this path. The durable
198    /// `sync_all` happens in [`compact`](Self::compact). That's deliberate — the
199    /// snapshot is a cache backed by NATS and checkpoints are frequent, so an
200    /// fsync per checkpoint isn't worth its latency; a tail lost to power loss is
201    /// rebuilt from a NATS scan + watch replay.
202    ///
203    /// Returns `true` when the log has grown past the compaction threshold
204    /// and the caller should run [`compact`](Self::compact). Separating
205    /// the check from the I/O lets async callers offload compaction to a
206    /// blocking task instead of stalling the executor.
207    #[must_use = "returns true when compaction is needed"]
208    pub fn checkpoint(&mut self, cursor: &WatchCursor) -> Result<bool, SnapshotError> {
209        let w = self.writer()?;
210        let bytes = write_cursor_record(w, cursor)?;
211        w.flush()?;
212        self.bytes_since_compact += bytes as u64;
213        Ok(self.bytes_since_compact > self.compact_threshold)
214    }
215
216    /// Flush the buffer to disk without writing a cursor record.
217    #[must_use = "I/O errors mean the flush failed"]
218    pub fn flush(&mut self) -> Result<(), SnapshotError> {
219        self.writer()?.flush()?;
220        Ok(())
221    }
222
223    /// Compact the snapshot log: replay, deduplicate, and rewrite.
224    ///
225    /// Performs synchronous file I/O. In async contexts, run via
226    /// `spawn_blocking` to avoid stalling the executor.
227    #[must_use = "compaction errors leave the log uncompacted"]
228    pub fn compact(&mut self) -> Result<(), SnapshotError> {
229        // Flush buffered records to the file before reading it back. Records
230        // written since the last `checkpoint()` still sit in the BufWriter; if we
231        // read+rewrite without flushing, they're excluded from the replay, and
232        // the old BufWriter then flushes them on drop into the inode we just
233        // renamed away — silently losing them. Flushing first makes `compact()`
234        // safe regardless of whether a `checkpoint()` preceded it.
235        self.writer()?.flush()?;
236        let data = fs::read(&self.path)?;
237        let (entries, cursor, _already_compact) = replay_log(&data)?;
238        compact_to_file(&self.path, &entries, &cursor)?;
239
240        // The rename in `compact_to_file` has already replaced the file, so the
241        // current handle now points at the orphaned (renamed-away) inode. Drop it
242        // *before* reopening: if the reopen fails, `writer` stays `None` and the
243        // writer is poisoned (see field doc), turning a would-be silent loss into
244        // a surfaced error on the next write.
245        self.writer = None;
246        let file = OpenOptions::new().append(true).open(&self.path)?;
247        self.writer = Some(io::BufWriter::new(file));
248        self.bytes_since_compact = 0;
249
250        Ok(())
251    }
252}
253
254/// Load a snapshot from disk.
255///
256/// Replays the append log, deduplicates (last write wins per key),
257/// compacts the file, and returns the live entries + cursor.
258///
259/// Returns `Ok(None)` if the file doesn't exist or contains no entries.
260pub fn load(path: &Path) -> Result<Option<Snapshot>, SnapshotError> {
261    let data = match fs::read(path) {
262        Ok(d) => d,
263        Err(e) if e.kind() == io::ErrorKind::NotFound => return Ok(None),
264        Err(e) => return Err(SnapshotError::Io(e)),
265    };
266
267    let (entries, cursor, already_compact) = replay_log(&data)?;
268
269    if entries.is_empty() && cursor.is_none() {
270        return Ok(None);
271    }
272
273    // Skip the rewrite when the log is already in compact form (no duplicate
274    // keys, no deletes, no truncated tail). Avoids a full read+write on every
275    // startup after the first. A truncated tail forces a rewrite even when the
276    // surviving records are unique: leaving the partial record on disk would
277    // let the next append land after it, corrupting the log.
278    if !already_compact {
279        compact_to_file(path, &entries, &cursor)?;
280    }
281
282    Ok(Some(Snapshot { cursor, entries }))
283}
284
285// ---------------------------------------------------------------------------
286// Internal: log replay
287// ---------------------------------------------------------------------------
288
289/// Replay the append log into the live key set.
290///
291/// Records borrow directly from `data` — keys and values are not allocated
292/// until the surviving set is materialized at the end, so overwritten or
293/// deleted records cost nothing. Returns the entries, the latest cursor, and
294/// whether the log was already compact (no duplicate keys, no deletes, no
295/// truncated tail) so callers can skip a redundant rewrite.
296fn replay_log(data: &[u8]) -> Result<(HashMap<String, KvEntry>, WatchCursor, bool), SnapshotError> {
297    if data.len() < HEADER_LEN {
298        return Err(SnapshotError::InvalidFormat("file too short".into()));
299    }
300    if &data[0..4] != MAGIC {
301        return Err(SnapshotError::InvalidFormat("bad magic".into()));
302    }
303    let version = u16::from_le_bytes([data[4], data[5]]);
304    if version != FORMAT_VERSION {
305        return Err(SnapshotError::InvalidFormat(format!(
306            "unsupported version {version}"
307        )));
308    }
309
310    // Upper-bound the working set by data size to avoid rehashing on large
311    // snapshots, capped so a tiny-record file can't request a huge table.
312    let estimated = (data.len() - HEADER_LEN) / 30;
313    let mut live: HashMap<&str, (&[u8], VersionToken)> =
314        HashMap::with_capacity(estimated.min(4096));
315    let mut cursor = WatchCursor::none();
316    let mut pos = HEADER_LEN;
317
318    // The log is compact only if every record contributed a unique surviving
319    // entry and we consumed the file cleanly. Any overwrite, delete, or
320    // truncated tail means a rewrite would shrink or repair the file.
321    let mut redundant = false;
322    let mut clean_eof = true;
323
324    while pos < data.len() {
325        match parse_record(&data[pos..]) {
326            Ok((record, consumed)) => {
327                match record {
328                    Record::Put {
329                        key,
330                        value,
331                        version,
332                    } => {
333                        if live.insert(key, (value, version)).is_some() {
334                            redundant = true;
335                        }
336                    }
337                    Record::Delete { key } => {
338                        live.remove(key);
339                        redundant = true;
340                    }
341                    Record::Cursor(c) => {
342                        // Each new cursor record supersedes the previous one, so
343                        // any earlier cursor on disk is now dead weight. Mark the
344                        // log redundant so `load()` rewrites it — otherwise a
345                        // service that checkpoints frequently accumulates one
346                        // cursor record per checkpoint and `already_compact` stays
347                        // true (entries are unique, EOF is clean), leaving the
348                        // bloat in place until the writer's own threshold fires.
349                        if !cursor.is_none() {
350                            redundant = true;
351                        }
352                        cursor = c;
353                    }
354                }
355                pos += consumed;
356            }
357            Err(RecordError::Truncated) => {
358                // KNOWN LIMITATION: a record's length is read from its (CRC-but-
359                // not-yet-verified) length fields, so corruption that inflates a
360                // key_len/value_len makes the record look like it runs past EOF —
361                // indistinguishable here from a genuine torn final write. Both
362                // land as `Truncated`, so we stop and silently drop everything
363                // after this point. Detecting it would need a framed, separately-
364                // checksummed length (a format change). Acceptable because the
365                // snapshot is a cache: a short read just triggers a fuller NATS
366                // scan + watch replay, never data loss of record.
367                clean_eof = false;
368                break;
369            }
370            Err(RecordError::CrcMismatch { consumed }) => {
371                // Near EOF → crash recovery (partial final write).
372                // Otherwise → mid-file corruption.
373                if pos + consumed >= data.len() || data.len() - (pos + consumed) < MIN_CURSOR_RECORD
374                {
375                    clean_eof = false;
376                    break;
377                }
378                return Err(SnapshotError::Corrupted);
379            }
380            Err(RecordError::Invalid(msg)) => {
381                return Err(SnapshotError::InvalidFormat(msg));
382            }
383        }
384    }
385
386    // Materialize owned entries for the survivors only — one key + one value
387    // allocation per live key, instead of per record.
388    let mut entries: HashMap<String, KvEntry> = HashMap::with_capacity(live.len());
389    for (key, (value, version)) in live {
390        let key = key.to_string();
391        entries.insert(
392            key.clone(),
393            KvEntry {
394                key,
395                value: value.to_vec(),
396                version,
397            },
398        );
399    }
400
401    let already_compact = !redundant && clean_eof;
402    Ok((entries, cursor, already_compact))
403}
404
405enum Record<'a> {
406    Put {
407        key: &'a str,
408        value: &'a [u8],
409        version: VersionToken,
410    },
411    Delete {
412        key: &'a str,
413    },
414    Cursor(WatchCursor),
415}
416
417enum RecordError {
418    Truncated,
419    CrcMismatch { consumed: usize },
420    Invalid(String),
421}
422
423fn parse_record(data: &[u8]) -> Result<(Record<'_>, usize), RecordError> {
424    // Need at least CRC (4) + type (1)
425    if data.len() < 5 {
426        return Err(RecordError::Truncated);
427    }
428
429    let stored_crc = u32::from_le_bytes([data[0], data[1], data[2], data[3]]);
430
431    match data[4] {
432        REC_PUT => parse_put(data, stored_crc),
433        REC_DELETE => parse_delete(data, stored_crc),
434        REC_CURSOR => parse_cursor(data, stored_crc),
435        other => Err(RecordError::Invalid(format!(
436            "unknown record type: {other:#x}"
437        ))),
438    }
439}
440
441fn parse_put(data: &[u8], stored_crc: u32) -> Result<(Record<'_>, usize), RecordError> {
442    // CRC(4) + type(1) + key_len(2) = 7 minimum to read key_len
443    if data.len() < 7 {
444        return Err(RecordError::Truncated);
445    }
446    let key_len = u16::from_le_bytes([data[5], data[6]]) as usize;
447    let vl_off = 7 + key_len;
448
449    if data.len() < vl_off + 4 {
450        return Err(RecordError::Truncated);
451    }
452    let value_len = u32::from_le_bytes([
453        data[vl_off],
454        data[vl_off + 1],
455        data[vl_off + 2],
456        data[vl_off + 3],
457    ]) as usize;
458
459    // ver_len byte sits right after the value.
460    let ver_len_off = vl_off + 4 + value_len;
461    if data.len() < ver_len_off + 1 {
462        return Err(RecordError::Truncated);
463    }
464    let ver_len = data[ver_len_off] as usize;
465    // A version token holds at most 10 bytes inline. A larger length means a
466    // corrupt or incompatible record — reject it rather than letting it reach
467    // `VersionToken::from_raw`, which would panic.
468    if ver_len > 10 {
469        return Err(RecordError::Invalid(format!(
470            "version length {ver_len} exceeds max version token size (10)"
471        )));
472    }
473
474    let total = ver_len_off + 1 + ver_len;
475    if data.len() < total {
476        return Err(RecordError::Truncated);
477    }
478
479    let computed = crc32fast::hash(&data[4..total]);
480    if computed != stored_crc {
481        return Err(RecordError::CrcMismatch { consumed: total });
482    }
483
484    let key = std::str::from_utf8(&data[7..7 + key_len])
485        .map_err(|e| RecordError::Invalid(format!("invalid UTF-8 key: {e}")))?;
486    let value = &data[vl_off + 4..vl_off + 4 + value_len];
487    // The `ver_len > 10` check above bounds this to the inline capacity, so
488    // `from_raw` always returns `Some` here; the guard makes that explicit.
489    let version = VersionToken::from_raw(&data[ver_len_off + 1..total]).ok_or_else(|| {
490        RecordError::Invalid(format!(
491            "version length {ver_len} exceeds max version token size (10)"
492        ))
493    })?;
494
495    Ok((
496        Record::Put {
497            key,
498            value,
499            version,
500        },
501        total,
502    ))
503}
504
505fn parse_delete(data: &[u8], stored_crc: u32) -> Result<(Record<'_>, usize), RecordError> {
506    if data.len() < 7 {
507        return Err(RecordError::Truncated);
508    }
509    let key_len = u16::from_le_bytes([data[5], data[6]]) as usize;
510    let ver_len_off = 7 + key_len;
511    if data.len() < ver_len_off + 1 {
512        return Err(RecordError::Truncated);
513    }
514    let ver_len = data[ver_len_off] as usize;
515    // See `parse_put`: reject oversized version lengths before `from_raw`.
516    if ver_len > 10 {
517        return Err(RecordError::Invalid(format!(
518            "version length {ver_len} exceeds max version token size (10)"
519        )));
520    }
521    let total = ver_len_off + 1 + ver_len;
522
523    if data.len() < total {
524        return Err(RecordError::Truncated);
525    }
526
527    let computed = crc32fast::hash(&data[4..total]);
528    if computed != stored_crc {
529        return Err(RecordError::CrcMismatch { consumed: total });
530    }
531
532    let key = std::str::from_utf8(&data[7..7 + key_len])
533        .map_err(|e| RecordError::Invalid(format!("invalid UTF-8 key: {e}")))?;
534    // The delete record's version is written for format symmetry but unused on
535    // replay (a delete just removes the key from the live set).
536
537    Ok((Record::Delete { key }, total))
538}
539
540fn parse_cursor(data: &[u8], stored_crc: u32) -> Result<(Record<'_>, usize), RecordError> {
541    if data.len() < 6 {
542        return Err(RecordError::Truncated);
543    }
544    let cursor_len = data[5] as usize;
545    // A version token holds at most 10 bytes inline. A larger length means a
546    // corrupt or incompatible record — reject it rather than letting it reach
547    // `VersionToken::from_raw`, which would panic.
548    if cursor_len > 10 {
549        return Err(RecordError::Invalid(format!(
550            "cursor length {cursor_len} exceeds max version token size (10)"
551        )));
552    }
553    let total = 6 + cursor_len;
554
555    if data.len() < total {
556        return Err(RecordError::Truncated);
557    }
558
559    let computed = crc32fast::hash(&data[4..total]);
560    if computed != stored_crc {
561        return Err(RecordError::CrcMismatch { consumed: total });
562    }
563
564    // The `cursor_len > 10` check above bounds this to the inline capacity, so
565    // `from_raw` always returns `Some` here; the guard makes that explicit.
566    let version = VersionToken::from_raw(&data[6..total]).ok_or_else(|| {
567        RecordError::Invalid(format!(
568            "cursor length {cursor_len} exceeds max version token size (10)"
569        ))
570    })?;
571
572    Ok((Record::Cursor(WatchCursor::from_version(version)), total))
573}
574
575// ---------------------------------------------------------------------------
576// Internal: record writing (incremental CRC, no allocations)
577// ---------------------------------------------------------------------------
578
579fn write_put_record(
580    w: &mut impl Write,
581    key: &str,
582    value: &[u8],
583    version: &VersionToken,
584) -> Result<usize, SnapshotError> {
585    let kb = key.as_bytes();
586    let vb = version.as_bytes();
587
588    // The wire format encodes key length as u16 and value length as u32.
589    // Reject anything that would truncate on cast — a silent truncation here
590    // produces a record whose CRC covers the full bytes but whose stored length
591    // is wrong, which the reader can only interpret as mid-file corruption.
592    let key_len = u16::try_from(kb.len()).map_err(|_| {
593        SnapshotError::InvalidFormat(format!(
594            "key too long: {} bytes (max {})",
595            kb.len(),
596            u16::MAX
597        ))
598    })?;
599    let value_len = u32::try_from(value.len()).map_err(|_| {
600        SnapshotError::InvalidFormat(format!(
601            "value too long: {} bytes (max {})",
602            value.len(),
603            u32::MAX
604        ))
605    })?;
606    // The version is stored as length-prefixed raw bytes so any backend's token
607    // (NATS u64, FDB 10-byte versionstamp) round-trips intact. `VersionToken`
608    // caps inline storage at 10 bytes, so this `u8` length never truncates today;
609    // checking surfaces a format error rather than corrupting the frame if a
610    // future token widens past 255 bytes.
611    let ver_len = u8::try_from(vb.len()).map_err(|_| {
612        SnapshotError::InvalidFormat(format!(
613            "version too long: {} bytes (max {})",
614            vb.len(),
615            u8::MAX
616        ))
617    })?;
618
619    let mut h = crc32fast::Hasher::new();
620    h.update(&[REC_PUT]);
621    h.update(&key_len.to_le_bytes());
622    h.update(kb);
623    h.update(&value_len.to_le_bytes());
624    h.update(value);
625    h.update(&[ver_len]);
626    h.update(vb);
627    let crc = h.finalize();
628
629    w.write_all(&crc.to_le_bytes())?;
630    w.write_all(&[REC_PUT])?;
631    w.write_all(&key_len.to_le_bytes())?;
632    w.write_all(kb)?;
633    w.write_all(&value_len.to_le_bytes())?;
634    w.write_all(value)?;
635    w.write_all(&[ver_len])?;
636    w.write_all(vb)?;
637
638    Ok(4 + 1 + 2 + kb.len() + 4 + value.len() + 1 + vb.len())
639}
640
641fn write_delete_record(
642    w: &mut impl Write,
643    key: &str,
644    version: &VersionToken,
645) -> Result<usize, SnapshotError> {
646    let kb = key.as_bytes();
647    let vb = version.as_bytes();
648
649    let key_len = u16::try_from(kb.len()).map_err(|_| {
650        SnapshotError::InvalidFormat(format!(
651            "key too long: {} bytes (max {})",
652            kb.len(),
653            u16::MAX
654        ))
655    })?;
656    // Length-prefixed version, matching `write_put_record`. See its comment for
657    // why this is bytes rather than a fixed u64.
658    let ver_len = u8::try_from(vb.len()).map_err(|_| {
659        SnapshotError::InvalidFormat(format!(
660            "version too long: {} bytes (max {})",
661            vb.len(),
662            u8::MAX
663        ))
664    })?;
665
666    let mut h = crc32fast::Hasher::new();
667    h.update(&[REC_DELETE]);
668    h.update(&key_len.to_le_bytes());
669    h.update(kb);
670    h.update(&[ver_len]);
671    h.update(vb);
672    let crc = h.finalize();
673
674    w.write_all(&crc.to_le_bytes())?;
675    w.write_all(&[REC_DELETE])?;
676    w.write_all(&key_len.to_le_bytes())?;
677    w.write_all(kb)?;
678    w.write_all(&[ver_len])?;
679    w.write_all(vb)?;
680
681    Ok(4 + 1 + 2 + kb.len() + 1 + vb.len())
682}
683
684fn write_cursor_record(w: &mut impl Write, cursor: &WatchCursor) -> Result<usize, SnapshotError> {
685    let cb = cursor.version().as_bytes();
686    // The record encodes the cursor length as a single byte. `VersionToken`
687    // caps inline storage at 10 bytes, so this never trips today — but checking
688    // here rather than casting means a future backend that widens the token
689    // surfaces a format error instead of silently truncating the length prefix
690    // (which the reader would then mis-frame as corruption).
691    let cb_len = u8::try_from(cb.len()).map_err(|_| {
692        SnapshotError::InvalidFormat(format!(
693            "cursor too long: {} bytes (max {})",
694            cb.len(),
695            u8::MAX
696        ))
697    })?;
698
699    let mut h = crc32fast::Hasher::new();
700    h.update(&[REC_CURSOR]);
701    h.update(&[cb_len]);
702    h.update(cb);
703    let crc = h.finalize();
704
705    w.write_all(&crc.to_le_bytes())?;
706    w.write_all(&[REC_CURSOR])?;
707    w.write_all(&[cb_len])?;
708    w.write_all(cb)?;
709
710    Ok(4 + 1 + 1 + cb.len())
711}
712
713// ---------------------------------------------------------------------------
714// Internal: compaction
715// ---------------------------------------------------------------------------
716
717fn compact_to_file(
718    path: &Path,
719    entries: &HashMap<String, KvEntry>,
720    cursor: &WatchCursor,
721) -> Result<(), SnapshotError> {
722    // The tempfile must live in the same directory as the target so the final
723    // `persist` is an atomic same-filesystem rename. Falling back to "." here
724    // would silently place it in the cwd, and a cross-filesystem rename then
725    // fails with EXDEV — a hard error masquerading as a path quirk. A snapshot
726    // path with no parent is a caller bug; surface it.
727    let dir = path.parent().ok_or_else(|| {
728        SnapshotError::InvalidFormat(format!("snapshot path has no parent: {}", path.display()))
729    })?;
730    // Collect the live entries once, then drive both the buffer-size estimate
731    // and the write pass off this slice — a single walk of `entries` instead of
732    // one to sum sizes and another to write.
733    //
734    // Write in sorted key order so a given logical state always serializes to
735    // identical bytes. `HashMap` iteration order is randomized per process,
736    // which would otherwise make every compaction emit a different layout —
737    // defeating byte-level snapshot comparison (e.g. an integrity checksum) and
738    // making file diffs noise. The sort is O(n log n) on the live key set, which
739    // is trivial next to the I/O it precedes.
740    let mut sorted: Vec<&KvEntry> = entries.values().collect();
741    sorted.sort_unstable_by(|a, b| a.key.cmp(&b.key));
742
743    // Buffer the writes: each record is 5–8 individual `write_all` calls, so an
744    // unbuffered tempfile would issue thousands of `write(2)` syscalls per
745    // compaction. Size the buffer to the exact serialized length (clamped to
746    // [8 KiB, 1 MiB]) so the whole snapshot flushes in a handful of syscalls
747    // instead of one per default 8 KiB page — and a pathologically large table
748    // can't balloon the buffer past 1 MiB.
749    let estimated: usize = HEADER_LEN
750        + sorted
751            .iter()
752            .map(|e| 4 + 1 + 2 + e.key.len() + 4 + e.value.len() + 1 + e.version.as_bytes().len())
753            .sum::<usize>()
754        + if cursor.is_none() {
755            0
756        } else {
757            4 + 1 + 1 + cursor.version().as_bytes().len()
758        };
759    let capacity = estimated.clamp(8 * 1024, 1024 * 1024);
760    let mut buf = io::BufWriter::with_capacity(capacity, tempfile::NamedTempFile::new_in(dir)?);
761
762    buf.write_all(MAGIC)?;
763    buf.write_all(&FORMAT_VERSION.to_le_bytes())?;
764
765    for entry in sorted {
766        write_put_record(&mut buf, &entry.key, &entry.value, &entry.version)?;
767    }
768
769    if !cursor.is_none() {
770        write_cursor_record(&mut buf, cursor)?;
771    }
772
773    buf.flush()?;
774    let tmp = buf
775        .into_inner()
776        .map_err(|e| SnapshotError::Io(e.into_error()))?;
777
778    tmp.as_file().sync_all()?;
779    tmp.persist(path).map_err(|e| SnapshotError::Io(e.error))?;
780
781    Ok(())
782}
783
784// ---------------------------------------------------------------------------
785// Tests
786// ---------------------------------------------------------------------------
787
788#[cfg(test)]
789mod tests {
790    use super::*;
791    use tempfile::TempDir;
792
793    fn entry(key: &str, value: &[u8], rev: u64) -> KvEntry {
794        KvEntry {
795            key: key.to_string(),
796            value: value.to_vec(),
797            version: VersionToken::from_u64(rev),
798        }
799    }
800
801    fn cursor(rev: u64) -> WatchCursor {
802        WatchCursor::from_u64(rev)
803    }
804
805    fn put(key: &str, value: &[u8], rev: u64) -> KvUpdate {
806        KvUpdate::Put(entry(key, value, rev))
807    }
808
809    fn delete(key: &str, rev: u64) -> KvUpdate {
810        KvUpdate::Delete {
811            key: key.to_string(),
812            version: VersionToken::from_u64(rev),
813        }
814    }
815
816    #[test]
817    fn round_trip() {
818        let dir = TempDir::new().unwrap();
819        let path = dir.path().join("test.snap");
820
821        let mut w = SnapshotWriter::open(&path, u64::MAX).unwrap();
822        w.write_update(&put("node.us-east-1", b"val1", 1)).unwrap();
823        w.write_update(&put("node.eu-west-1", b"val2", 2)).unwrap();
824        w.checkpoint(&cursor(2)).unwrap();
825        drop(w);
826
827        let snap = load(&path).unwrap().unwrap();
828        assert_eq!(snap.entries.len(), 2);
829        assert_eq!(snap.cursor.as_u64(), Some(2));
830
831        assert_eq!(snap.entries["node.us-east-1"].value, b"val1");
832        assert_eq!(snap.entries["node.eu-west-1"].value, b"val2");
833    }
834
835    #[test]
836    fn multiple_batches() {
837        let dir = TempDir::new().unwrap();
838        let path = dir.path().join("test.snap");
839
840        let mut w = SnapshotWriter::open(&path, u64::MAX).unwrap();
841        w.write_update(&put("a", b"v1", 1)).unwrap();
842        w.checkpoint(&cursor(1)).unwrap();
843        w.write_update(&put("b", b"v2", 2)).unwrap();
844        w.checkpoint(&cursor(2)).unwrap();
845        drop(w);
846
847        let snap = load(&path).unwrap().unwrap();
848        assert_eq!(snap.entries.len(), 2);
849        assert_eq!(snap.cursor.as_u64(), Some(2));
850    }
851
852    #[test]
853    fn delete_removes_entry() {
854        let dir = TempDir::new().unwrap();
855        let path = dir.path().join("test.snap");
856
857        let mut w = SnapshotWriter::open(&path, u64::MAX).unwrap();
858        w.write_update(&put("a", b"v1", 1)).unwrap();
859        w.write_update(&put("b", b"v2", 2)).unwrap();
860        w.checkpoint(&cursor(2)).unwrap();
861        w.write_update(&delete("a", 3)).unwrap();
862        w.checkpoint(&cursor(3)).unwrap();
863        drop(w);
864
865        let snap = load(&path).unwrap().unwrap();
866        assert_eq!(snap.entries.len(), 1);
867        assert!(snap.entries.contains_key("b"));
868    }
869
870    #[test]
871    fn purge_removes_entry() {
872        let dir = TempDir::new().unwrap();
873        let path = dir.path().join("test.snap");
874
875        let mut w = SnapshotWriter::open(&path, u64::MAX).unwrap();
876        w.write_update(&put("a", b"v1", 1)).unwrap();
877        w.checkpoint(&cursor(1)).unwrap();
878        w.write_update(&KvUpdate::Purge {
879            key: "a".to_string(),
880            version: VersionToken::from_u64(2),
881        })
882        .unwrap();
883        w.checkpoint(&cursor(2)).unwrap();
884        drop(w);
885
886        let snap = load(&path).unwrap().unwrap();
887        assert!(!snap.entries.contains_key("a"));
888    }
889
890    #[test]
891    fn missing_file_returns_none() {
892        let dir = TempDir::new().unwrap();
893        assert!(load(&dir.path().join("nope.snap")).unwrap().is_none());
894    }
895
896    #[test]
897    fn corrupted_mid_file() {
898        let dir = TempDir::new().unwrap();
899        let path = dir.path().join("test.snap");
900
901        let mut w = SnapshotWriter::open(&path, u64::MAX).unwrap();
902        // Write three entries so corruption in the middle has records on both sides
903        w.write_update(&put("a", b"aaaa-long-value-here", 1))
904            .unwrap();
905        w.checkpoint(&cursor(1)).unwrap();
906        w.write_update(&put("b", b"bbbb-long-value-here", 2))
907            .unwrap();
908        w.checkpoint(&cursor(2)).unwrap();
909        w.write_update(&put("c", b"cccc-long-value-here", 3))
910            .unwrap();
911        w.checkpoint(&cursor(3)).unwrap();
912        drop(w);
913
914        let mut data = fs::read(&path).unwrap();
915        // Flip a byte in the second record area (well past the first, well before EOF)
916        let target = HEADER_LEN + 40;
917        assert!(
918            target < data.len() - 60,
919            "need enough room after corruption for valid records"
920        );
921        data[target] ^= 0xFF;
922        fs::write(&path, &data).unwrap();
923
924        match load(&path) {
925            Err(SnapshotError::Corrupted) => {}
926            other => panic!("expected Corrupted, got {other:?}"),
927        }
928    }
929
930    #[test]
931    fn truncated_final_record_recovered() {
932        let dir = TempDir::new().unwrap();
933        let path = dir.path().join("test.snap");
934
935        let mut w = SnapshotWriter::open(&path, u64::MAX).unwrap();
936        w.write_update(&put("a", b"v1", 1)).unwrap();
937        w.checkpoint(&cursor(1)).unwrap();
938        w.write_update(&put("b", b"v2", 2)).unwrap();
939        w.checkpoint(&cursor(2)).unwrap();
940        drop(w);
941
942        // Chop a few bytes off the end (simulates crash mid-write)
943        let mut data = fs::read(&path).unwrap();
944        data.truncate(data.len() - 3);
945        fs::write(&path, &data).unwrap();
946
947        let snap = load(&path).unwrap().unwrap();
948        assert!(snap.entries.contains_key("a"));
949    }
950
951    #[test]
952    fn truncated_tail_repaired_then_appendable() {
953        // The already-compact skip optimization must NOT skip the rewrite when
954        // the log has a truncated tail: leaving the partial record on disk would
955        // let the next append land after it and corrupt the log.
956        let dir = TempDir::new().unwrap();
957        let path = dir.path().join("test.snap");
958
959        let mut w = SnapshotWriter::open(&path, u64::MAX).unwrap();
960        w.write_update(&put("a", b"v1", 1)).unwrap();
961        w.write_update(&put("b", b"v2", 2)).unwrap();
962        w.checkpoint(&cursor(2)).unwrap();
963        drop(w);
964
965        // Simulate a crash mid-write: chop the final bytes.
966        let mut data = fs::read(&path).unwrap();
967        data.truncate(data.len() - 3);
968        fs::write(&path, &data).unwrap();
969
970        // load() repairs the file by rewriting without the partial tail.
971        let snap = load(&path).unwrap().unwrap();
972        assert!(snap.entries.contains_key("a"));
973        assert!(snap.entries.contains_key("b"));
974
975        // Appending after the repaired load must not corrupt the log.
976        let mut w = SnapshotWriter::open(&path, u64::MAX).unwrap();
977        w.write_update(&put("c", b"v3", 3)).unwrap();
978        w.checkpoint(&cursor(3)).unwrap();
979        drop(w);
980
981        let snap = load(&path).unwrap().unwrap();
982        assert_eq!(snap.entries.len(), 3);
983        assert!(snap.entries.contains_key("c"));
984        assert_eq!(snap.cursor.as_u64(), Some(3));
985    }
986
987    #[test]
988    fn repeated_cursor_records_trigger_compaction() {
989        // A service that checkpoints frequently writes one cursor record per
990        // checkpoint. Even with unique keys and a clean EOF, the stale cursor
991        // records are dead weight: load() must NOT treat the file as already
992        // compact, or the bloat persists until the writer's own threshold fires
993        // (and never, if the process crashes first).
994        let dir = TempDir::new().unwrap();
995        let path = dir.path().join("test.snap");
996
997        let mut w = SnapshotWriter::open(&path, u64::MAX).unwrap();
998        w.write_update(&put("a", b"v1", 1)).unwrap();
999        for i in 1..=10u64 {
1000            w.checkpoint(&cursor(i)).unwrap();
1001        }
1002        drop(w);
1003
1004        let size_before = fs::metadata(&path).unwrap().len();
1005        let snap = load(&path).unwrap().unwrap();
1006        let size_after = fs::metadata(&path).unwrap().len();
1007
1008        // Single entry + only the latest cursor survive.
1009        assert_eq!(snap.entries.len(), 1);
1010        assert_eq!(snap.cursor.as_u64(), Some(10));
1011        assert!(
1012            size_after < size_before,
1013            "stale cursor records should be compacted away: {size_before} -> {size_after}"
1014        );
1015
1016        // The rewritten file is now genuinely compact: a second load is a no-op.
1017        let after_first = fs::read(&path).unwrap();
1018        load(&path).unwrap().unwrap();
1019        let after_second = fs::read(&path).unwrap();
1020        assert_eq!(after_first, after_second);
1021    }
1022
1023    #[test]
1024    fn already_compact_file_reloads_unchanged() {
1025        // A compact file (unique keys, no deletes, clean EOF) should reload
1026        // correctly even though load() skips the rewrite.
1027        let dir = TempDir::new().unwrap();
1028        let path = dir.path().join("test.snap");
1029
1030        let mut w = SnapshotWriter::open(&path, u64::MAX).unwrap();
1031        w.write_update(&put("a", b"v1", 1)).unwrap();
1032        w.write_update(&put("b", b"v2", 2)).unwrap();
1033        w.checkpoint(&cursor(2)).unwrap();
1034        drop(w);
1035
1036        // First load compacts. Capture the resulting bytes.
1037        load(&path).unwrap().unwrap();
1038        let after_first = fs::read(&path).unwrap();
1039
1040        // Second load sees an already-compact file: skips the rewrite, so the
1041        // bytes are byte-for-byte identical, and the data still round-trips.
1042        let snap = load(&path).unwrap().unwrap();
1043        let after_second = fs::read(&path).unwrap();
1044
1045        assert_eq!(after_first, after_second);
1046        assert_eq!(snap.entries.len(), 2);
1047        assert_eq!(snap.cursor.as_u64(), Some(2));
1048    }
1049
1050    #[test]
1051    fn bad_magic() {
1052        let dir = TempDir::new().unwrap();
1053        let path = dir.path().join("test.snap");
1054        fs::write(&path, b"XXXX\x01\x00").unwrap();
1055
1056        match load(&path) {
1057            Err(SnapshotError::InvalidFormat(msg)) => assert!(msg.contains("magic")),
1058            other => panic!("expected InvalidFormat, got {other:?}"),
1059        }
1060    }
1061
1062    #[test]
1063    fn wrong_version_rejected() {
1064        let dir = TempDir::new().unwrap();
1065        let path = dir.path().join("test.snap");
1066        // Valid magic, but a format version this build doesn't understand.
1067        // A future engineer who bumps FORMAT_VERSION must see old files rejected
1068        // rather than silently misparsed.
1069        let mut data = Vec::new();
1070        data.extend_from_slice(MAGIC);
1071        data.extend_from_slice(&(FORMAT_VERSION + 1).to_le_bytes());
1072        fs::write(&path, &data).unwrap();
1073
1074        match load(&path) {
1075            Err(SnapshotError::InvalidFormat(msg)) => {
1076                assert!(
1077                    msg.contains("version"),
1078                    "message should mention version: {msg}"
1079                )
1080            }
1081            other => panic!("expected InvalidFormat, got {other:?}"),
1082        }
1083    }
1084
1085    #[test]
1086    fn empty_log_returns_none() {
1087        let dir = TempDir::new().unwrap();
1088        let path = dir.path().join("test.snap");
1089
1090        let mut f = File::create(&path).unwrap();
1091        f.write_all(MAGIC).unwrap();
1092        f.write_all(&FORMAT_VERSION.to_le_bytes()).unwrap();
1093        drop(f);
1094
1095        assert!(load(&path).unwrap().is_none());
1096    }
1097
1098    #[test]
1099    fn compaction_on_load_shrinks_file() {
1100        let dir = TempDir::new().unwrap();
1101        let path = dir.path().join("test.snap");
1102
1103        let mut w = SnapshotWriter::open(&path, u64::MAX).unwrap();
1104        for i in 0..10u64 {
1105            w.write_update(&put("same-key", format!("v{i}").as_bytes(), i))
1106                .unwrap();
1107            w.checkpoint(&cursor(i)).unwrap();
1108        }
1109        drop(w);
1110
1111        let size_before = fs::metadata(&path).unwrap().len();
1112        let snap = load(&path).unwrap().unwrap();
1113        let size_after = fs::metadata(&path).unwrap().len();
1114
1115        assert_eq!(snap.entries.len(), 1);
1116        assert_eq!(snap.entries["same-key"].value, b"v9");
1117        assert!(
1118            size_after < size_before,
1119            "compaction should shrink: {size_before} -> {size_after}"
1120        );
1121    }
1122
1123    #[test]
1124    fn compact_when_threshold_exceeded() {
1125        let dir = TempDir::new().unwrap();
1126        let path = dir.path().join("test.snap");
1127
1128        // Threshold low enough to trigger after a few writes
1129        let mut w = SnapshotWriter::open(&path, 100).unwrap();
1130        for i in 0..20u64 {
1131            w.write_update(&put("key", format!("value-{i}").as_bytes(), i))
1132                .unwrap();
1133            if w.checkpoint(&cursor(i)).unwrap() {
1134                w.compact().unwrap();
1135            }
1136        }
1137        drop(w);
1138
1139        let snap = load(&path).unwrap().unwrap();
1140        assert_eq!(snap.entries.len(), 1);
1141        assert_eq!(snap.entries["key"].value, b"value-19");
1142    }
1143
1144    #[test]
1145    fn reopen_appends() {
1146        let dir = TempDir::new().unwrap();
1147        let path = dir.path().join("test.snap");
1148
1149        // First writer
1150        let mut w = SnapshotWriter::open(&path, u64::MAX).unwrap();
1151        w.write_update(&put("a", b"v1", 1)).unwrap();
1152        w.checkpoint(&cursor(1)).unwrap();
1153        drop(w);
1154
1155        // Second writer appends
1156        let mut w = SnapshotWriter::open(&path, u64::MAX).unwrap();
1157        w.write_update(&put("b", b"v2", 2)).unwrap();
1158        w.checkpoint(&cursor(2)).unwrap();
1159        drop(w);
1160
1161        let snap = load(&path).unwrap().unwrap();
1162        assert_eq!(snap.entries.len(), 2);
1163        assert_eq!(snap.cursor.as_u64(), Some(2));
1164    }
1165
1166    #[test]
1167    fn large_values() {
1168        let dir = TempDir::new().unwrap();
1169        let path = dir.path().join("test.snap");
1170
1171        let big = vec![0xABu8; 100_000];
1172        let mut w = SnapshotWriter::open(&path, u64::MAX).unwrap();
1173        w.write_update(&put("big", &big, 1)).unwrap();
1174        w.checkpoint(&cursor(1)).unwrap();
1175        drop(w);
1176
1177        let snap = load(&path).unwrap().unwrap();
1178        assert_eq!(snap.entries.len(), 1);
1179        assert_eq!(snap.entries["big"].value.len(), 100_000);
1180        assert!(snap.entries["big"].value.iter().all(|&b| b == 0xAB));
1181    }
1182
1183    #[test]
1184    fn cursor_only_snapshot_returns_some_with_empty_entries() {
1185        // A service that checkpoints before writing any entries produces a file
1186        // with only a cursor record. load() must return Some (not None) so callers
1187        // get the resume position even when there's nothing to preload. The guard
1188        // `entries.is_empty() && cursor.is_none()` must NOT fire when the cursor
1189        // is present.
1190        let dir = TempDir::new().unwrap();
1191        let path = dir.path().join("test.snap");
1192
1193        let mut w = SnapshotWriter::open(&path, u64::MAX).unwrap();
1194        w.checkpoint(&cursor(42)).unwrap();
1195        drop(w);
1196
1197        let snap = load(&path)
1198            .unwrap()
1199            .expect("cursor-only snapshot should return Some");
1200        assert!(snap.entries.is_empty(), "no entries written, none expected");
1201        assert_eq!(
1202            snap.cursor.as_u64(),
1203            Some(42),
1204            "cursor must survive round-trip"
1205        );
1206    }
1207
1208    #[test]
1209    fn stale_keys_detects_removed_entries() {
1210        let dir = TempDir::new().unwrap();
1211        let path = dir.path().join("test.snap");
1212
1213        let mut w = SnapshotWriter::open(&path, u64::MAX).unwrap();
1214        w.write_update(&put("node.a", b"v1", 1)).unwrap();
1215        w.write_update(&put("node.b", b"v2", 2)).unwrap();
1216        w.write_update(&put("node.c", b"v3", 3)).unwrap();
1217        w.checkpoint(&cursor(3)).unwrap();
1218        drop(w);
1219
1220        let snap = load(&path).unwrap().unwrap();
1221
1222        // Simulate a fresh scan that only has "node.a" and "node.c"
1223        let mut stale = snap.stale_keys(["node.a", "node.c"]);
1224        stale.sort();
1225        assert_eq!(stale, vec!["node.b"]);
1226
1227        // All keys present → no stale
1228        let stale = snap.stale_keys(["node.a", "node.b", "node.c"]);
1229        assert!(stale.is_empty());
1230
1231        // No keys present → all stale
1232        let mut stale: Vec<&str> = snap.stale_keys(std::iter::empty::<&str>());
1233        stale.sort();
1234        assert_eq!(stale, vec!["node.a", "node.b", "node.c"]);
1235    }
1236
1237    #[test]
1238    fn non_u64_version_token_round_trips() {
1239        // Regression: Put/Delete records store the version as length-prefixed raw
1240        // bytes, not a fixed u64. A 10-byte FDB-style versionstamp must survive a
1241        // round-trip intact — the old u64-only field flattened it to 0, which
1242        // would make every later CAS on a restored entry fail RevisionMismatch.
1243        let dir = TempDir::new().unwrap();
1244        let path = dir.path().join("test.snap");
1245
1246        let stamp = [9u8, 8, 7, 6, 5, 4, 3, 2, 1, 0];
1247        let token = VersionToken::from_fdb_versionstamp(&stamp);
1248        assert!(token.as_u64().is_none(), "10-byte token is not a u64");
1249
1250        let mut w = SnapshotWriter::open(&path, u64::MAX).unwrap();
1251        w.write_update(&KvUpdate::Put(KvEntry {
1252            key: "fdb.key".to_string(),
1253            value: b"v".to_vec(),
1254            version: token.clone(),
1255        }))
1256        .unwrap();
1257        w.checkpoint(&cursor(1)).unwrap();
1258        drop(w);
1259
1260        let snap = load(&path).unwrap().unwrap();
1261        assert_eq!(
1262            snap.entries["fdb.key"].version.as_bytes(),
1263            &stamp,
1264            "versionstamp must survive the snapshot round-trip byte-for-byte"
1265        );
1266    }
1267
1268    /// Proves the `compact()` flush-before-read fix: records written but not yet
1269    /// checkpointed still sit in the BufWriter. Before the fix, `compact()` read
1270    /// the file (missing those records), rewrote it, and the old buffer then
1271    /// flushed into the renamed-away inode — silently dropping the writes. With
1272    /// the leading flush, an un-checkpointed write survives compaction.
1273    #[test]
1274    fn compact_preserves_uncheckpointed_writes() {
1275        let dir = TempDir::new().unwrap();
1276        let path = dir.path().join("test.snap");
1277
1278        let mut w = SnapshotWriter::open(&path, u64::MAX).unwrap();
1279        // Buffered only — deliberately NO checkpoint() before compacting.
1280        w.write_update(&put("node.a", b"survives", 1)).unwrap();
1281        w.compact().unwrap();
1282        drop(w);
1283
1284        let snap = load(&path).unwrap().unwrap();
1285        assert!(
1286            snap.entries.contains_key("node.a"),
1287            "compact() must not drop buffered-but-uncheckpointed writes"
1288        );
1289        assert_eq!(snap.entries["node.a"].value, b"survives");
1290    }
1291}