beyond-slipstream 0.3.1

Watchable distributed config over NATS JetStream — cache locally, stream updates, resume from a sequence number after any restart.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
//! On-disk [`SnapshotStore`] backed by [fjall](https://docs.rs/fjall) — for
//! consumers whose fold is too large to hold in RAM (e.g. routing at ~1B routes).
//!
//! fjall is a pure-Rust LSM-tree (no C toolchain), so the crate core stays
//! pure-Rust and this backend is opt-in behind `feature = "fjall"`. The same
//! engine is already used by `../objects`.
//!
//! ## How it honors the [`SnapshotStore`] invariants
//!
//! - **Atomic data + cursor.** Each [`apply`](SnapshotStore::apply) is a single
//!   fjall write batch: every put/delete *and* the resume cursor land under one
//!   sequence number and commit together. There is no window where the cursor
//!   names a revision whose data is missing.
//! - **Self-sufficient under NO_SYNC.** The durability mode is configurable. With
//!   sync off (the default — same cache philosophy as the append log's
//!   no-fsync-per-checkpoint path), a commit is not fsync'd; a power-loss crash can
//!   lose the un-synced *tail*. That is safe precisely because data and cursor are
//!   one atomic batch: whatever survived has its matching cursor, so on reopen the
//!   consumer resumes the watch from the recovered cursor and re-folds the tail
//!   from NATS. Set `sync = true` to fsync every commit.
//! - **Queryable.** [`get`](SnapshotStore::get) and [`range`](SnapshotStore::range)
//!   read straight from fjall's block-cached, `Slice`-backed storage — no full-DB
//!   deserialization — so a 1B-route consumer can build its serving index from a
//!   prefix scan.
//!
//! ## Threading
//!
//! fjall is synchronous; [`watch_applied`](crate::watch_applied) already offloads
//! [`apply`](SnapshotStore::apply) to a blocking task, and async callers querying
//! [`get`](SnapshotStore::get)/[`range`](SnapshotStore::range) should use
//! `spawn_blocking` likewise.

use std::path::Path;

use fjall::{Config, Database, Keyspace, KeyspaceCreateOptions, PersistMode};

use crate::kv::{KvEntry, KvUpdate, VersionToken, WatchCursor};
use crate::snapshot::{SnapshotError, SnapshotStore};

/// Partition holding the folded KV state: `key` → encoded `(version, value)`.
const DATA_PARTITION: &str = "data";
/// Partition holding fold metadata (just the resume cursor today).
const META_PARTITION: &str = "meta";
/// Key under [`META_PARTITION`] storing the resume cursor's raw version bytes.
const CURSOR_KEY: &[u8] = b"cursor";

/// Durability and read-cache configuration for [`FjallSnapshot`].
///
/// Defaults to NO_SYNC (`sync: false`) — same cache philosophy as the append
/// log's no-fsync-per-checkpoint path.
#[derive(Debug, Clone, Copy)]
pub struct FjallConfig {
    /// `fsync` every [`apply`](SnapshotStore::apply) commit when `true`. When
    /// `false` (the default), commits are not fsync'd (NO_SYNC): faster, and a
    /// tail lost to power loss is rebuilt by resuming the watch from the recovered
    /// cursor — the snapshot is a cache.
    pub sync: bool,

    /// Block-cache capacity in bytes for the LSM. fjall's own default is 32 MiB,
    /// which starves reads against a multi-hundred-MB fold: a working-set hydration
    /// (a prefix range over one service's keys) then misses the cache and hits disk,
    /// and the miss rate climbs as the fold grows (measured: 32 MiB → p50 174 us /
    /// p99 1.45 ms at 4M routes; a 2 GiB cache → 7 us / 13 us). This default sizes
    /// the cache to the hot set so hydrations stay cache-resident. `0` falls back to
    /// fjall's 32 MiB default. Set this to roughly the resident working-set size.
    pub cache_size_bytes: u64,
}

impl Default for FjallConfig {
    fn default() -> Self {
        Self {
            sync: false,
            // 1 GiB: holds index/data blocks for a ~1e6-service working set
            // resident, matching the routing registries' default resident cap.
            cache_size_bytes: 1024 * 1024 * 1024,
        }
    }
}

/// On-disk durable fold backed by fjall. See the [module docs](self).
pub struct FjallSnapshot {
    // fjall 3 renamed its types: the database root is `Database` (was `Keyspace`)
    // and each named partition is a `Keyspace` (was `PartitionHandle`).
    db: Database,
    data: Keyspace,
    meta: Keyspace,
    config: FjallConfig,
    cursor: WatchCursor,
}

impl FjallSnapshot {
    /// Open or resume the store at `path` with explicit durability config.
    ///
    /// `path` is a directory (fjall keyspace), created if absent. Returns the
    /// persisted resume cursor — [`WatchCursor::none`] when fresh — and the store.
    pub fn open(path: &Path, config: FjallConfig) -> Result<(WatchCursor, Self), SnapshotError> {
        std::fs::create_dir_all(path)?;
        let mut db_config = Config::new(path);
        // Size the LSM block cache to the working set (default 1 GiB). fjall's own
        // default is 32 MiB, far too small for the fold — see `FjallConfig::cache_size_bytes`.
        if config.cache_size_bytes > 0 {
            db_config.cache = std::sync::Arc::new(lsm_tree::Cache::with_capacity_bytes(
                config.cache_size_bytes,
            ));
        }
        let db = Database::open(db_config).map_err(map_fjall)?;
        let data = db
            .keyspace(DATA_PARTITION, KeyspaceCreateOptions::default)
            .map_err(map_fjall)?;
        let meta = db
            .keyspace(META_PARTITION, KeyspaceCreateOptions::default)
            .map_err(map_fjall)?;

        let cursor = match meta.get(CURSOR_KEY).map_err(map_fjall)? {
            Some(raw) => VersionToken::from_raw(&raw)
                .map(WatchCursor::from_version)
                .ok_or_else(|| {
                    SnapshotError::InvalidFormat(format!(
                        "stored cursor is {} bytes, exceeds version token capacity",
                        raw.len()
                    ))
                })?,
            None => WatchCursor::none(),
        };

        Ok((
            cursor.clone(),
            Self {
                db,
                data,
                meta,
                config,
                cursor,
            },
        ))
    }

    /// The most recently applied resume cursor.
    pub fn cursor(&self) -> &WatchCursor {
        &self.cursor
    }

    /// A cheap, concurrent-read-safe handle to the fold's data partition.
    ///
    /// fjall serves readers concurrently with the writer, so a consumer can clone
    /// this out *before* handing the fold to [`watch_applied`](crate::watch_applied)
    /// (which takes the store by value, `apply` being `&mut self`) and then
    /// `get`/`range` the fold from a separate serving task. That is the
    /// working-set-serving pattern for a fold too large to hold resident: seed the
    /// hot set, serve it from RAM, and `range` the cold tail from the fold on a
    /// cache miss — without the serving path ever touching the writer.
    pub fn reader(&self) -> FjallReader {
        FjallReader {
            data: self.data.clone(),
        }
    }
}

/// A concurrent read handle over a [`FjallSnapshot`]'s data partition, cloned via
/// [`FjallSnapshot::reader`]. Reads share the same on-disk fold as the writer and
/// are safe to run concurrently with it.
#[derive(Clone)]
pub struct FjallReader {
    data: Keyspace,
}

impl FjallReader {
    /// Live entry for `key`, or `None` if absent/deleted.
    pub fn get(&self, key: &str) -> Result<Option<KvEntry>, SnapshotError> {
        match self.data.get(key.as_bytes()).map_err(map_fjall)? {
            Some(raw) => Ok(Some(decode_entry(key, &raw)?)),
            None => Ok(None),
        }
    }

    /// Stream every live entry whose key starts with `prefix`, ascending, without
    /// buffering the whole match set — the memory-bounded scan for an on-disk fold.
    pub fn for_each_in_range(
        &self,
        prefix: &str,
        mut f: impl FnMut(KvEntry) -> Result<(), SnapshotError>,
    ) -> Result<(), SnapshotError> {
        for guard in self.data.prefix(prefix.as_bytes()) {
            let (raw_key, raw_val) = guard.into_inner().map_err(map_fjall)?;
            let key = std::str::from_utf8(&raw_key).map_err(|e| {
                SnapshotError::InvalidFormat(format!("non-UTF-8 key in fjall store: {e}"))
            })?;
            f(decode_entry(key, &raw_val)?)?;
        }
        Ok(())
    }

    /// Buffered counterpart to [`for_each_in_range`](Self::for_each_in_range) for
    /// bounded prefixes (e.g. one service's routes, or the whole `node.` map).
    pub fn range(&self, prefix: &str) -> Result<Vec<KvEntry>, SnapshotError> {
        let mut out = Vec::new();
        self.for_each_in_range(prefix, |e| {
            out.push(e);
            Ok(())
        })?;
        Ok(out)
    }
}

impl SnapshotStore for FjallSnapshot {
    fn load(path: &Path) -> Result<(WatchCursor, Self), SnapshotError> {
        Self::open(path, FjallConfig::default())
    }

    fn apply(&mut self, batch: &[KvUpdate], cursor: &WatchCursor) -> Result<(), SnapshotError> {
        // One atomic batch: every data mutation AND the cursor commit under a
        // single sequence number. Either the whole fold step is durable or none of
        // it is — the cursor never outraces its data.
        let mut wb = self.db.batch().durability(self.durability());
        // One scratch buffer reused across the whole batch. `insert` converts its
        // value into fjall's owned `Slice` eagerly — it copies the bytes before
        // returning — so the buffer is free to be refilled for the next entry. That
        // turns N per-`Put` assembly allocations into one amortized allocation.
        let mut scratch = Vec::new();
        for update in batch {
            match update {
                KvUpdate::Put(entry) => {
                    encode_value_into(&mut scratch, &entry.value, &entry.version)?;
                    wb.insert(&self.data, entry.key.as_bytes(), scratch.as_slice());
                }
                KvUpdate::Delete { key, .. } | KvUpdate::Purge { key, .. } => {
                    wb.remove(&self.data, key.as_bytes());
                }
            }
        }
        // Cursor in the SAME batch as the data it names.
        wb.insert(&self.meta, CURSOR_KEY, cursor.version().as_bytes());
        wb.commit().map_err(map_fjall)?;

        self.cursor = cursor.clone();
        Ok(())
    }

    fn get(&self, key: &str) -> Result<Option<KvEntry>, SnapshotError> {
        match self.data.get(key.as_bytes()).map_err(map_fjall)? {
            Some(raw) => Ok(Some(decode_entry(key, &raw)?)),
            None => Ok(None),
        }
    }

    fn range(&self, prefix: &str) -> Result<Vec<KvEntry>, SnapshotError> {
        // Collect the streaming scan — same decode path as `for_each_in_range`,
        // just buffered. fjall yields keys in ascending byte order, so the result
        // is already sorted (unlike the HashMap-backed append log).
        let mut out = Vec::new();
        self.for_each_in_range(prefix, |entry| {
            out.push(entry);
            Ok(())
        })?;
        Ok(out)
    }

    fn for_each_in_range(
        &self,
        prefix: &str,
        mut f: impl FnMut(KvEntry) -> Result<(), SnapshotError>,
    ) -> Result<(), SnapshotError> {
        // fjall's prefix iterator is lazy — entries are decoded and handed to `f`
        // one at a time, so a 1B-route consumer building a serving index never
        // holds more than a single `KvEntry` in memory at once.
        for guard in self.data.prefix(prefix.as_bytes()) {
            // fjall 3 yields a lazy `Guard` per entry; `into_inner` resolves it to
            // the `(key, value)` pair (loading the value, which keeps the scan lazy
            // for key-only iterations elsewhere).
            let (raw_key, raw_val) = guard.into_inner().map_err(map_fjall)?;
            let key = std::str::from_utf8(&raw_key).map_err(|e| {
                SnapshotError::InvalidFormat(format!("non-UTF-8 key in fjall store: {e}"))
            })?;
            f(decode_entry(key, &raw_val)?)?;
        }
        Ok(())
    }
}

impl FjallSnapshot {
    /// Per-commit durability: `fsync` when configured, otherwise NO_SYNC.
    fn durability(&self) -> Option<PersistMode> {
        if self.config.sync {
            Some(PersistMode::SyncAll)
        } else {
            // Explicit NO_SYNC: flush to OS buffers only — survives a process crash,
            // not a power loss, which is exactly the cache semantics the module docs
            // promise. Stating `Buffer` rather than `None` keeps that guarantee
            // independent of whatever default durability the keyspace was opened
            // with, so a future change to fjall's default can't silently make
            // `sync: false` durable (or weaker).
            Some(PersistMode::Buffer)
        }
    }
}

/// Encode a stored value as `[ver_len:u8][version bytes][value bytes]` into `buf`.
///
/// `buf` is cleared and refilled (its capacity is reused across a batch). The
/// version is length-prefixed raw bytes for the same reason the append-log format
/// uses it: a backend's token (NATS u64, FDB 10-byte versionstamp) must round-trip
/// intact.
///
/// `VersionToken` caps inline storage at 10 bytes, so the `u8` length prefix never
/// truncates today. Checking with `try_from` rather than casting surfaces a format
/// error instead of silently writing a wrong length — which would frame a record
/// `decode_entry` then mis-parses — if a future token ever widens past 255 bytes.
/// This mirrors `write_put_record` in `snapshot.rs`.
fn encode_value_into(
    buf: &mut Vec<u8>,
    value: &[u8],
    version: &VersionToken,
) -> Result<(), SnapshotError> {
    let vb = version.as_bytes();
    let ver_len = u8::try_from(vb.len()).map_err(|_| {
        SnapshotError::InvalidFormat(format!(
            "version too long: {} bytes (max {})",
            vb.len(),
            u8::MAX
        ))
    })?;
    buf.clear();
    buf.reserve(1 + vb.len() + value.len());
    buf.push(ver_len);
    buf.extend_from_slice(vb);
    buf.extend_from_slice(value);
    Ok(())
}

/// Decode a `[ver_len:u8][version][value]` record back into a [`KvEntry`].
fn decode_entry(key: &str, raw: &[u8]) -> Result<KvEntry, SnapshotError> {
    let ver_len = *raw.first().ok_or_else(|| {
        SnapshotError::InvalidFormat("fjall value record is empty (no version length)".into())
    })? as usize;
    let value_off = 1 + ver_len;
    if raw.len() < value_off {
        return Err(SnapshotError::InvalidFormat(format!(
            "fjall value record truncated: need {value_off} bytes for version, have {}",
            raw.len()
        )));
    }
    let version = VersionToken::from_raw(&raw[1..value_off]).ok_or_else(|| {
        SnapshotError::InvalidFormat(format!(
            "version length {ver_len} exceeds version token capacity"
        ))
    })?;
    Ok(KvEntry {
        key: key.to_string(),
        value: raw[value_off..].to_vec(),
        version,
    })
}

/// Map a [`fjall::Error`] into the backend-agnostic [`SnapshotError`].
fn map_fjall(e: fjall::Error) -> SnapshotError {
    match e {
        // Surface I/O failures (disk full, permission denied, …) as a real
        // `io::Error` so the OS errno and the `#[source]` chain survive for
        // operators, instead of being flattened into an opaque backend string.
        fjall::Error::Io(io) => SnapshotError::Io(io),
        // Everything else keeps fjall's own variant name — its `Display` renders
        // as `FjallError: {variant:?}`, so `Poisoned` (a flush/commit failure
        // that should crash the app), journal recovery, decode, etc. stay legible
        // in logs without leaking the `fjall` type into this error enum.
        other => SnapshotError::Backend(other.to_string()),
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use tempfile::TempDir;

    /// A 10-byte FDB versionstamp has no `u64` form; the length-prefixed value
    /// format must carry it intact. A `u64`-only field would flatten it to 0 and
    /// silently break every later CAS — so this is the load-bearing reason the
    /// record stores a length-prefixed token rather than a fixed 8 bytes.
    #[test]
    fn encode_decode_round_trips_fdb_versionstamp() {
        let vs = VersionToken::from_fdb_versionstamp(&[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
        let mut enc = Vec::new();
        encode_value_into(&mut enc, b"payload", &vs).expect("encode");
        let entry = decode_entry("k", &enc).expect("decode");

        assert_eq!(entry.version.as_bytes(), &[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
        assert!(
            entry.version.as_u64().is_none(),
            "a 10-byte token has no u64 form — it must not be flattened"
        );
        assert_eq!(entry.value, b"payload");
    }

    /// An empty value (the CAS-tombstone shape) encodes to just the version prefix
    /// and decodes back to a present, empty-valued entry with its version intact.
    #[test]
    fn encode_decode_round_trips_empty_value() {
        let mut enc = Vec::new();
        encode_value_into(&mut enc, b"", &VersionToken::from_u64(7)).expect("encode");
        let entry = decode_entry("k", &enc).expect("decode");

        assert!(entry.value.is_empty());
        assert_eq!(entry.version.as_u64(), Some(7));
    }

    /// A zero-byte record has no version-length byte — corruption, not a valid
    /// record. It must surface as a recoverable `InvalidFormat`, never a panic.
    #[test]
    fn decode_entry_rejects_empty_record() {
        let err = decode_entry("k", &[]).unwrap_err();
        assert!(
            matches!(err, SnapshotError::InvalidFormat(_)),
            "empty record must be a format error, got {err:?}"
        );
    }

    /// A record that claims a longer version than its bytes provide is truncated
    /// on-disk corruption — reject it instead of reading past the buffer.
    #[test]
    fn decode_entry_rejects_truncated_version() {
        // Claims a 5-byte version, but only 2 bytes follow the length prefix.
        let raw = [5u8, 0xAA, 0xBB];
        let err = decode_entry("k", &raw).unwrap_err();
        assert!(
            matches!(err, SnapshotError::InvalidFormat(_)),
            "truncated version must be a format error, got {err:?}"
        );
    }

    /// A version length beyond `VersionToken`'s 10-byte capacity can't round-trip;
    /// `from_raw` rejects it and `decode_entry` maps that to `InvalidFormat` rather
    /// than silently truncating to a wrong (CAS-breaking) version.
    #[test]
    fn decode_entry_rejects_oversized_version() {
        // ver_len = 11 with 11 trailing bytes: passes the truncation check, then
        // trips the capacity check inside `VersionToken::from_raw`.
        let mut raw = vec![11u8];
        raw.extend_from_slice(&[0u8; 11]);
        let err = decode_entry("k", &raw).unwrap_err();
        assert!(
            matches!(err, SnapshotError::InvalidFormat(_)),
            "oversized version must be a format error, got {err:?}"
        );
    }

    /// A persisted cursor blob larger than the version-token capacity must surface
    /// as a recoverable `InvalidFormat` at `open`, not a panic or a silently
    /// truncated cursor that would resume the watch from the wrong position.
    #[test]
    fn open_rejects_corrupted_cursor() {
        let dir = TempDir::new().unwrap();
        let path = dir.path().join("store");

        {
            let (_c, store) =
                FjallSnapshot::open(&path, FjallConfig::default()).expect("initial open");
            // Write an 11-byte blob straight into the meta partition under the
            // cursor key, bypassing the apply path's bounded encoding.
            store
                .meta
                .insert(CURSOR_KEY, [0u8; 11])
                .expect("insert oversized cursor");
            store.db.persist(PersistMode::SyncAll).expect("persist");
        }

        // `FjallSnapshot` isn't `Debug`, so match the result rather than `unwrap_err`.
        match FjallSnapshot::open(&path, FjallConfig::default()) {
            Err(SnapshotError::InvalidFormat(_)) => {}
            Err(other) => panic!("expected InvalidFormat, got {other:?}"),
            Ok(_) => panic!("expected open to reject the oversized cursor"),
        }
    }
}