beyond-slipstream 0.5.0

Watchable distributed config over NATS JetStream — cache locally, stream updates, resume from a sequence number after any restart.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
# Slipstream Architecture

Trait-based KV abstraction over NATS JetStream: read, write, and watch distributed config with a resumable cursor so services replay only the delta after any restart.

## Data Flow

### Happy path: startup with snapshot

```
Disk ──► load(path) ──► replay_log() ──► HashMap<key, KvEntry> + WatchCursor
                                                    │
                                    watcher.watch_all_from(cursor, tx)
                                                    │
                                              CursorExpired?
                                             /              \
                                           Yes               No
                                            │                 │
                            stale-key resync (synthetic      delta stream
                            deletes) + watch_all(tx)              │
                            (state-sync re-list)                  │
                                            └───────┬─────────────┘
                                                    │
                                   KvUpdate → cache.apply() + snap.write_update()
                                                    │
                                     snap.checkpoint(cursor) ──► compact() if due
```

### Read path

```
reader.get("key")    ──► NATS kv.entry() ──► filter tombstones ──► KvEntry | None
reader.entry("key")  ──► NATS kv.entry() ──► raw (includes tombstones, for CAS)
reader.scan("pfx.")  ──► ephemeral push consumer (DeliverPolicy::LastPerSubject) ──► Vec<KvEntry>
reader.keys("pfx.")  ──► same consumer, headers_only ──► Vec<String>
```

### CAS write path

```
writer.create("lock", val)           ──► kv.create()        ──► AlreadyExists | VersionToken
writer.update("node", val, ver)      ──► kv.update()        ──► RevisionMismatch | VersionToken
writer.delete_with_version("k", ver) ──► kv.update(key, []) ──► RevisionMismatch | bool
```

CAS tombstone (empty-value Put) is how `delete_with_version` works — it writes an empty value via a CAS operation so concurrent writers see a conflict. `get()` and `scan()` filter these out; `entry()` exposes them for CAS callers that need the version.

### Watch resumption

```
watch_all_from(cursor, tx)
  cursor.is_none() ─────────────────────────► watch_all(tx)        (full replay)
  cursor has rev ──► kv.watch_all_from_revision(rev+1)
                       │                       │
                    Success                CursorExpired (NATS compacted past cursor)
                       │                       │
                   delta stream          caller falls back to watch_all(tx)
```

Every non-`_from` watch is a **state-sync** stream (NATS `DeliverPolicy::LastPerSubject`): the current value of every matching key is delivered first — the re-list — then live updates. A no-cursor consumer therefore converges on full bucket state from the watch alone, with no separate scan and no scan-to-watch race window. The `_from` variants skip the re-list and deliver only the delta.

## Concepts & Terminology

| Term                     | Definition                                                           | NOT                                              |
| ------------------------ | -------------------------------------------------------------------- | ------------------------------------------------ |
| `Connection`             | Socket lifecycle manager + store factory                             | Not a store; not the NATS client itself          |
| `KvStore`                | Named bucket; vends reader, watcher, writer                          | Not the connection; holds no socket              |
| `KvReader`               | Point-in-time reads: `get`, `entry`, `keys`, `scan`                  | Not a live stream; returns a snapshot moment     |
| `KvWatcher`              | Live update stream pushed via mpsc channel                           | Not a polling loop; push from NATS               |
| `KvWriter`               | Write, soft-delete, CAS (`create`, `update`, `delete_with_version`)  | Not multi-key transactions                       |
| `WatchCursor`            | Opaque resume position in a watch stream (NATS: u64 revision)        | Not a per-key version; only for watch resumption |
| `VersionToken`           | Opaque per-key version (NATS: 8-byte u64; FDB: 10-byte versionstamp) | Not a wall-clock timestamp; not globally ordered |
| `KvEntry`                | One key + value + version from a read                                | Not a watch event; immutable once returned       |
| `KvUpdate`               | One watch event: `Put`, `Delete`, or `Purge`                         | Not a read result; carries deletes too           |
| `Snapshot`               | Deduplicated KV state + cursor persisted to disk                     | Not the source of truth; a cache of NATS         |
| `SnapshotWriter`         | Append-only log of `KvUpdate`s; no in-memory state beyond a counter  | Not the in-memory cache itself                   |
| `SnapshotStore`          | Trait: the durable-fold contract — atomic `apply(batch, cursor)`, `load`, `get`, `range` | Not a serving index; stops at fold + cursor + query |
| `AppendLogSnapshot`      | Default `SnapshotStore`: append-only log + in-RAM fold (pure-Rust)   | Not for folds larger than RAM                     |
| `FjallSnapshot`          | On-disk `SnapshotStore` (fjall LSM, `feature = "fjall"`) for large folds | Not in the pure-Rust core; opt-in feature        |
| `RocksDbSnapshot`        | On-disk `SnapshotStore` (RocksDB, `feature = "rocksdb"`) for large folds | Not pure-Rust; opt-in feature with a C++ build dep |
| `watch_applied`          | Combinator: batch → apply → *then* advance cursor / fold into `SnapshotStore`; resyncs stale keys on cursor expiry | Not a raw watch; the cursor follows `apply`, not receipt |
| `WatchScope`             | What `watch_applied` watches: `All`, `Prefix`, or `Prefixes` (multi-filter union) | Not N consumers; `Prefixes` costs one consumer    |
| `ConnectionCapabilities` | Feature flags for runtime branching (CAS, streaming watch, …)        | Not enforced; purely advisory                    |

## Layer Architecture

```
┌─────────────────────────────────────────────────────────────┐
│        KvReader │ KvWatcher │ KvWriter │ KvTtl              │
│         (async_trait, object-safe, Arc<dyn Trait>)          │
├─────────────────────────────────────────────────────────────┤
│                        KvStore                              │
│          (named bucket — vends the three roles above)       │
├─────────────────────────────────────────────────────────────┤
│                       Connection                            │
│         (connect/shutdown/is_healthy + store factory)       │
├─────────────────────────────────────────────────────────────┤
│                    NatsConnection                           │
│   NatsKvStore │ NatsKvReader │ NatsKvWatcher │ NatsKvWriterImpl
│              (concrete NATS JetStream impl)                 │
└─────────────────────────────────────────────────────────────┘
                  snapshot.rs (orthogonal, optional)
┌─────────────────────────────────────────────────────────────┐
│   SnapshotStore trait: apply(batch, cursor) │ load │ get │ range
│   AppendLogSnapshot (default, in-RAM)                       │
│   FjallSnapshot │ RocksDbSnapshot (feature-gated, on-disk)  │
│          (append-only CRC log, tempfile+rename compact)     │
└─────────────────────────────────────────────────────────────┘
                  applied.rs (combinator over KvWatcher + snapshot)
┌──────────────────────────────────────────────────────────────┐
│   watch_applied(): batch → apply → advance cursor/checkpoint │
│   (cursor-after-apply; the safe default for resumable watch) │
└──────────────────────────────────────────────────────────────┘
```

## Core Mechanism

### Resumable Watch

The cursor is the NATS stream sequence number at the last checkpoint. On restart, pass it to `watch_all_from()` to subscribe at `cursor+1` — only the delta arrives, not the full history.

When the cursor expires (NATS retention window evicted those records), `CursorExpired` is returned. The fallback `watch_all()` re-list re-delivers the current value of every live key, but it cannot cover keys **deleted during the gap whose delete markers were also evicted** — those need synthetic `Delete` events diffed from prior state. `watch_applied` does this automatically when given a reader (see below). A raw-API caller hand-rolls the same diff with `Snapshot::stale_keys()`:

```rust
match watcher.watch_all_from(&snap.cursor, tx).await {
    Ok(()) => {}
    Err(KvError::CursorExpired) => {
        let live = reader.keys("").await?;
        for key in snap.stale_keys(live.iter().map(|s| s.as_str())) {
            cache.remove(key);
        }
        watcher.watch_all(tx).await?;
    }
    Err(e) => return Err(e.into()),
}
```

### Applied-Cursor Watch (`watch_applied`)

The resumable watch above hands the caller raw machinery — a channel of `KvUpdate`s, a cursor, a snapshot writer — and trusts each one to hand-roll the loop that batches updates, applies them, and advances the cursor. Every hand-rolled instance got the same step wrong: it advanced the cursor on **receipt** of an update (`high_water = rev` at `rx.recv()`) and applied the batch afterward. The combinator `watch_applied` exists to encode the correct discipline once.

**The resume guarantee.** This library's contract is "resume from a sequence number after any restart." `watch_applied` sharpens that into a single invariant:

> A persisted/reported cursor `C` ⟹ every update with revision ≤ `C` has been **applied** — the caller's `apply()` has returned for it.

The cursor is written from `apply()`'s completion, never from the channel's delivery. Concretely, on each flush the combinator runs `apply(batch)` to completion, *then* sets `cursor = batch_high`, *then* checkpoints the snapshot at that cursor, *then* fires `on_applied`. Nothing advances the cursor before `apply` returns.

**Why receipt is the wrong signal.** Bumping the cursor at `rx.recv()` and applying later opens a crash window: the persisted cursor claims "caught up to rev N" while rev N still sits in an unapplied batch buffer (or in flight to a separate apply task). On crash+resume the watch re-arms at `cursor+1`, *past* the unapplied rev N, and silently skips it. The data is gone with no error — a hole in the exact guarantee the crate advertises.

This is the lesson of Saltzer, Reed & Clark, *End-to-End Arguments in System Design* (1984): a checkpoint placed below the endpoint — here, at the transport's delivery rather than at the application of the update — can only ever be a performance hint, never a correctness guarantee. The "it happened" property can only be established at the endpoint that actually performs the work, so the cursor is sourced from `apply`, not from `recv`. The cursor-as-monotonic-index shape itself is the HashiCorp Consul anti-entropy / blocking-query lineage: a client re-arms its watch from the last index it *reconciled*, never from the index it merely *saw*.

**Cursor authority covers rejected entries.** `batch_high` tracks the highest revision *received* since the last flush, including updates that `parse` rejected (corrupt bytes, irrelevant keys). A rejected entry is still "nothing to apply," so it is covered by the cursor — and because NATS delivers in revision order, advancing to the max revision after one atomic `apply` is sound: having seen the max means every revision below it has been seen too. Without this, a run of irrelevant keys would pin the cursor in place and force redundant replay on every restart.

**Snapshot consistency.** Raw `KvUpdate`s stream to the snapshot log as they arrive, but the *checkpoint* cursor is the post-apply cursor. A crash after a raw record is written but before its `apply`/checkpoint leaves the log holding data *ahead* of its cursor — which is safe: the cursor never names a revision whose `apply` had not returned, so resume re-delivers and re-applies that tail rather than skipping it. Compaction runs off the hot path via `spawn_blocking`, as everywhere else in the snapshot subsystem.

**Flush triggers.** A batch flushes when any of these fires: the `window` elapses, `batch.len()` reaches `config.max`, a shutdown is signalled, or the channel closes with a pending batch (the remainder is flushed before returning).

**Cursor-expired resync.** On `CursorExpired` from the resume path the combinator falls back to the full-scope watch (`watch_all` / `watch_prefix` / `watch_prefixes`), whose state-sync re-list re-delivers every live key as puts. The re-list cannot cover keys deleted during the gap whose markers were evicted with the cursor, so — when the combinator is given a `KvReader` and a store — it closes that hole first: the watch task lists the bucket's live keys, hands them to the main loop, and waits for an ack; the main loop flushes, diffs the fold's in-scope keys against the listing, and runs a synthetic `KvUpdate::Delete` (unknown version — never advances the cursor) through `parse`/`apply`/store for each key that vanished; only then does the fallback watch start. That ack ordering is the invariant: a synthetic delete always precedes the re-list put for the same key, so delete-then-recreate during the gap converges. Without a reader the fallback is re-list-only and logs the possible stale keys.

This is the layer the tunnel router (swap route table) and edge origin watcher (rebuild hashrings) both collapse onto: `parse` extracts the domain registration, `apply` swaps the live state, `on_applied` persists the cursor.

### scan() and keys() via Ephemeral Push Consumer

Both use `DeliverPolicy::LastPerSubject` — one ephemeral push consumer delivers the latest value per key in a single streaming operation, rather than N sequential `get()` calls. `keys()` adds `headers_only: true` so no value bytes cross the wire.

The consumer is always `AckPolicy::None`. The default `AckPolicy::Explicit` stops delivery after `max_ack_pending` (1000) un-acked messages, silently truncating any bucket with >1000 keys.

The consumer is created with **subscribe-before-create** ordering: the inbox subscription is registered before the consumer exists, closing a race in async-nats ≤0.46 where early messages arrive before the subscription is ready.

### ACK Subject Format Parsing in scan()

Each message delivered by the `scan()` push consumer carries the KV revision in its JetStream ACK subject (the message's reply subject). The revision is the stream sequence number, and it sits at a field offset that varies by NATS server version:

```
Legacy (9 tokens):  $JS.ACK.<stream>.<consumer>.<delivered>.<stream_seq>.<consumer_seq>.<ts>.<pending>
Modern (11–12 tok): $JS.ACK.<domain>.<account>.<stream>.<consumer>.<delivered>.<stream_seq>.<consumer_seq>.<ts>.<pending>[.<token>]
```

The stream sequence sits at index 5 (legacy) or index 7 (modern). The final token is always `num_pending` (typically 0), which looks like a sequence but is not. The previous implementation took the last token and produced a wrong version on every scanned entry; the current parser reads from the front and branches on token count.

The implementation uses a fixed 8-element stack array for the first 8 tokens (no heap allocation per message). An A/B against the previous `Vec`-collecting approach measured **~3.1× speedup** — 1.59 ms → 0.51 ms per 10k ACK parses. See `benches/ack.rs`.

### 30-Second Operation Timeout

Every NATS operation is wrapped in `timed()` (30 s). Without it, a CLOSE_WAIT connection (half-dead TCP) parks `await`s forever — async-nats does not fail in-flight requests when the TCP layer goes dead. 30 s is generous for legitimate slow ops (JetStream stream sync, leader election) while still being debuggable.

### VersionToken: Inline Multi-Backend Versioning

`VersionToken` is a 10-byte inline buffer — no heap allocation. It covers all current backends without widening:

| Backend  | Encoding                 | `as_u64()` |
| -------- | ------------------------ | ---------- |
| NATS     | 8-byte big-endian u64    | `Some(rev)` |
| FDB      | 10-byte versionstamp     | `None`     |
| Unknown  | len=0                    | `None`     |

## Snapshot Subsystem

### The durable-fold trait

The durable fold is a trait, `SnapshotStore`, so the backend is pluggable while the contract is fixed:

```rust
fn load(path) -> (WatchCursor, Self);          // resume position + store
fn apply(&mut self, batch, cursor);            // fold data AND advance cursor, atomically
fn get(key) -> Option<KvEntry>;                // point query
fn range(prefix) -> Vec<KvEntry>;              // ordered prefix scan
```

Three invariants bind every implementation:

- **Pure function of the log.** Delete the store, replay every update with revision `> cursor`, and the state is identical. The store caches the fold; NATS is the source of truth.
- **Cursor-after-apply.** `apply` makes data and cursor durable together, so the cursor never names a revision whose data is absent — one transaction on a transactional backend, data-then-cursor on the append log (a torn write leaves data *ahead* of the cursor, which replay re-folds, never skips).
- **Snapshot is a cache.** A tail lost to power loss (under a no-sync durability mode) is rebuilt by resuming the watch from the recovered cursor.

`watch_applied` is generic over `SnapshotStore`: on each flush, after `apply` returns, it hands the raw batch + post-apply cursor to `store.apply(...)` on a blocking task. The trait stops at fold + cursor + query; serving structures built from the fold (routing rings, hashrings) live in the consumer, which reads them out via `get`/`range`.

| Backend | Module | State | Durability |
| ------- | ------ | ----- | ---------- |
| `AppendLogSnapshot` (default) | `snapshot.rs` | Append-only CRC log + in-RAM `HashMap` fold | `checkpoint` flush (page cache); `fsync` only at `compact` |
| `FjallSnapshot` (`feature = "fjall"`) | `snapshot_fjall.rs` | On-disk fjall LSM (`data` + `meta` partitions) | One atomic batch per `apply` (data + cursor); per-commit `fsync` configurable (NO_SYNC default) |
| `RocksDbSnapshot` (`feature = "rocksdb"`) | `snapshot_rocksdb.rs` | On-disk RocksDB (`data` + `meta` column families), tuned for billion-key folds (hit-optimized ribbon filters, partitioned index, zstd bottommost — see the module's Tuning docs) | One atomic `WriteBatch` per `apply` (data + cursor); WAL always on, per-commit `fsync` configurable (NO_SYNC default) |

The two LSM backends are interchangeable in contract and share the value-record codec (`snapshot_record.rs`); fjall keeps the crate pure-Rust, RocksDB trades a C++ build dependency for the battle-tested engine and its operational tooling (`ldb`, `sst_dump`). Both keep the cursor in the same atomic batch as the data it names, so under NO_SYNC a crash can lose the un-synced tail but never desynchronize cursor from data — on reopen the recovered cursor is consistent and the watch re-folds the tail. The rest of this section describes the **append-log backend** (the default), whose on-disk format is below.

### File Format

```
Header:  b"PGSS" ++ version:u16le

Record:  crc32:u32le ++ type:u8 ++ payload

Put:     key_len:u16le ++ key ++ value_len:u32le ++ value ++ ver_len:u8 ++ version_bytes
Delete:  key_len:u16le ++ key ++ ver_len:u8 ++ version_bytes
Cursor:  cur_len:u8 ++ cursor_bytes
```

Version bytes are stored as length-prefixed raw bytes, not a fixed `u64`. A 10-byte FDB versionstamp round-trips intact; a `u64`-only field would flatten it to 0 and break every subsequent CAS on a restored entry.

CRC covers from the type byte through the end of the record. A truncated final record (crash mid-write) is silently discarded. A CRC mismatch in the middle of the file returns `SnapshotError::Corrupted`.

### State Machine

```
APPENDING ──► checkpoint() returns true ──► NEEDS_COMPACT
    │                                              │
write_update()                           compact() [blocking: replay → dedup → tempfile → rename]
    │                                              │
    └──────────────────────────────────────────────┘
                bytes_since_compact = 0
```

| From          | Event                            | To            | Guard / Side-effect                              |
| ------------- | -------------------------------- | ------------- | ------------------------------------------------ |
| APPENDING     | `write_update()`                 | APPENDING     | Buffered; bytes_since_compact += n               |
| APPENDING     | `checkpoint()` → true            | NEEDS_COMPACT | Cursor + cursor record flushed to page cache     |
| APPENDING     | `checkpoint()` → false           | APPENDING     | Same flush; below threshold                      |
| NEEDS_COMPACT | `compact()` succeeds             | APPENDING     | Tempfile → sync_all → rename; counter reset to 0 |
| NEEDS_COMPACT | `compact()` fails on reopen      | POISONED      | `writer = None`; subsequent writes return `Io`   |
| POISONED      | any `write_update`/`checkpoint`  | POISONED      | `Err(Io("snapshot writer poisoned"))` returned   |

### Load + Compaction

`load()` replays the full log into a `HashMap` (last write wins per key, deletes remove entries), then rewrites to a compact file (no duplicates) via tempfile + `sync_all` + rename. It skips the rewrite when the log is already compact (no duplicate keys, no delete records, clean EOF).

`compact()` flushes the BufWriter first so un-checkpointed records survive. It reads the current file, replays it, writes to a same-directory tempfile (same filesystem = atomic rename, no `EXDEV`), `sync_all`s, then renames.

`checkpoint()` writes only a cursor record and calls `BufWriter::flush()` — a `write(2)` into the page cache. This survives a process crash but NOT a power loss. The only `fsync` is in `compact()`. The snapshot is a cache; a lost tail is rebuilt from a NATS scan + watch replay.

### Export / Import (replica bootstrap)

When the watched bucket is a **bounded** log (size-capped, history evicted), a fold is no longer rebuildable from NATS alone — the folds become the only full replicas. Export/import makes them transferable, which is what lets a new node, a node with a lost/corrupt fold, or a node whose cursor aged out of the log bootstrap at all.

**Artifact anatomy.** A directory: the backend's files under `data/`, plus `MANIFEST.json` carrying the artifact schema version, the backend identity and its on-disk format generation, per-file sizes + BLAKE3 digests, and — the load-bearing field — the **watch cursor the payload is exactly consistent with**. Import resumes the watch from that cursor and replays only the log tail. The manifest is written last and the whole stage is atomically renamed, so an artifact that exists is complete; a crash mid-export leaves only a hidden temp dir.

**The cursor-consistency invariant.** `export_to(&mut self)` cannot run concurrently with `apply` (exclusive borrow), and inside `watch_applied` exports run between flushes via the `ExportRequest` channel (pending batch flushed first) — so the embedded cursor equals the applied cursor, exactly. Every backend re-proves it at export time by **reopening the copy** and checking cursor equality: because every `apply` commits the cursor in the same atomic batch as its data, a recovered cursor that matches the live one is a complete tail-loss detector.

**Per-backend export mechanics.**

- **append-log**: write a compacted log from the in-RAM fold (`compact_to_file`), verify by `load()`.
- **RocksDB**: the engine's native `Checkpoint` (memtable flush + SST hardlinks) — consistent by construction; verify-open anyway for the uniform guarantee.
- **fjall** (no checkpoint API): `persist(SyncAll)` (journal complete), best-effort quiesce (rotate memtables, drain flushes/compactions, bounded), copy the DB dir — hardlink immutable `tables/`/`blobs/`, byte-copy journal + metadata + `lock` — with a bounded retry against background GC, then verify-by-reopen. Correctness rests on the exclusive borrow + the verify gate, never on the quiesce.

**Import** stages a verified copy beside the destination (every hash re-checked — the transport that delivered the artifact is untrusted), opens the staged copy and gates on manifest-cursor equality, then atomically renames. A bad artifact never becomes a fold; a crash mid-import leaves nothing at the destination.

**Storage accounting.** Exports are hardlink-dominant (extra disk ≈ journal + small metadata, not 2×), but a lingering artifact pins files the source later compacts away — artifacts are **transient**: upload, then delete (`run_export_round` enforces this). Stage and destination must be on the fold's filesystem; the EXDEV fallback silently degrades hardlinks to full copies.

**Fleet coordination.** `ExportLease` makes exactly one replica perform a given export round: create-only CAS to win, expiry embedded in the value (no server TTL machinery), CAS takeover of expired/corrupt leases, `complete()` publishes the exported cursor on the key (the fleet-visible last-export record), `abandon()` frees a failed round early. Clock skew at worst causes a duplicate export — safe; the lease is dedup, not a correctness gate.

**Transport** (feature `transport`): plain tar of the artifact at `<prefix>/<key>` via any `object_store` backend, manifest duplicated as a sibling `<key>.manifest.json` uploaded last (remote completeness marker + cheap cursor peek). `run_export_round` composes lease → export → upload → complete → delete-local; per-backend `import_remote` composes download → verify → import.

## Connection Lifecycle

```
NEW (healthy=false, handle=None)
    │
    │ .connect()
    ▼
CONNECTED (healthy=true, handle=Some(NatsHandle))
    │                  │
    │ .shutdown()      │ .store() → NatsKvStore
    ▼                  │ .is_healthy() → AtomicBool::load (O(1), no lock)
SHUTDOWN (healthy=false, handle=None)
    │
    └─► .connect() can reconnect
```

`is_healthy()` for the `new()` + `connect()` path reads an `AtomicBool` driven by an installed NATS event callback (`Connected`/`Disconnected`). For the `from_client()` path (pre-connected client, no event callback), it reads the client's live `connection_state()` instead.

The double-check pattern in `connect()` guards a concurrent connect race: a second caller that wins the dial drops its handle (leaving `installed=false` on the event callback) so the teardown event does not clobber the winner's `healthy` flag.

## Design Decisions

### Why a `watch_applied` combinator instead of leaving the loop to callers?

The raw `KvWatcher` + `WatchCursor` + `SnapshotWriter` pieces let callers hand-roll the batch/apply/advance loop — and every known caller advanced the cursor on *receipt* rather than after *apply*, silently skipping un-applied updates on crash+resume. That is a footgun in the library's core guarantee, not a caller bug to be fixed N times. Encoding cursor-after-apply once, behind a combinator that callers can't get wrong, is cheaper and safer than documentation. `apply` stays the only domain logic; the cursor/snapshot/`on_applied` bookkeeping is the library's. See [Applied-Cursor Watch](#applied-cursor-watch-watch_applied).

### Why do watches deliver current state first (state-sync semantics)?

async-nats's bare `watch`/`watch_all`/`watch_many` ride `DeliverPolicy::New` — live updates only. A consumer built on that needs a separate `scan()` to seed, and any write landing between the scan and the watch attach is in neither — silently lost until the next reseed (the seed-then-watch race, demonstrated in `watch_prefix_relist_covers_seed_then_watch_gap`). Mapping every non-`_from` watch to `_with_history` (`LastPerSubject`) makes the watch itself deliver the seed, in revision order, with no race window: one primitive, correct by construction. The cost is a full re-list on every no-cursor watch start — which is what a no-cursor start *means*; consumers that have state resume with a `_from` variant and skip it.

### Why does the cursor-expired resync list keys instead of re-scanning values?

The fallback watch's re-list already carries every live key's value, so the resync only needs to learn which keys *no longer exist* — `reader.keys()` (headers only, no value bytes) is sufficient and cheap. The synthetic deletes carry an unknown version and never advance the cursor: the fold's persisted cursor stays at its (expired) position until real re-list revisions move it, so a crash mid-resync just re-runs the same idempotent diff on the next start. Ordering, not versioning, provides correctness: the resync acks before the fallback watch is established, so a synthetic delete can never land after the re-list put that resurrects the same key.

### Why KvError: Clone instead of Box<dyn Error>?

A failed connect future may be observed by multiple concurrent callers waiting on a shared result. `Clone` lets the error fan out to N waiters without `Arc`. The cost: `std::io::Error` and `async-nats` error types are not `Clone`, so their structured cause chain is flattened into a pre-rendered `String` at the boundary. The trade-off is explicit: no `#[source]` chain, but the message carries context instead.

### Why object-safe async traits (Arc<dyn Trait>) instead of generics?

`KvStore` vends `Arc<dyn KvReader>` / `Arc<dyn KvWatcher>` / `Arc<dyn KvWriter>` so callers hold narrowed capabilities without knowing the backend type at compile time. This lets services swap NATS for an in-memory stub in tests, and lets the edge proxy hold only `Arc<dyn KvReader>` without dragging in write types. The `async_trait` macro desugars to `Pin<Box<dyn Future>>` to satisfy object safety.

### Why optional watcher() and writer()?

Not all backends support streaming watch or writes. Optional returns (`Option<Arc<dyn …>>`) let the read path — the hot path for config consumers — be free of watch/write complexity. The edge proxy, for example, only calls `reader()`. Callers check `ConnectionCapabilities` to branch on feature availability before attempting optional paths.

### Why a raw JetStream API fallback for bucket creation?

Synadia Cloud returns `$JS.API.STREAM.CREATE` response shapes that `async-nats`'s `create_key_value()` cannot parse. The `create_kv_bucket_raw()` fallback sends the JSON config as a plain request/reply and classifies responses by error code:

- `10058` in `code` or `err_code` → stream already exists (non-fatal)
- `400` + "maximum number of streams" → Synadia at-limit but bucket may exist (non-fatal)
- Anything else → hard failure

The standard async-nats path is tried first; raw is a fallback.

### Why subscribe-before-create in scan/keys?

async-nats ≤0.46 has a race: the server can deliver the first batch of push-consumer messages before the client's subscribe call completes. Creating the consumer first loses those early messages. Subscribing to the inbox first closes the race — the subscription is ready before the server can deliver.

### Why checkpoint() does not fsync?

Checkpoints are frequent (every N watch events). An fsync per checkpoint would add milliseconds of disk-sync latency to the hot watch path. Since the snapshot is a cache backed by NATS, a tail lost to power loss is rebuilt from a NATS replay — not a correctness failure. The only `fsync` is in `compact_to_file()`, where it guarantees the new compact file is durable before the atomic rename replaces the old one.

### Why raw backend-dir artifacts instead of a logical re-encode?

Export copies the backend's own files rather than re-encoding entries into a portable format. The engine's native integrity machinery (fjall journal CRCs + version checksums, RocksDB MANIFEST validation) then verifies the artifact on open — the same code path that validates the fold after a crash — and hardlinks make export cost ~O(metadata) instead of O(data). The trade is per-backend artifact formats; the manifest's backend identity + format generation make that explicit, and the engines' own format markers travel inside the payload as defense in depth.

### Why does export verify by reopening the copy?

fjall has no checkpoint API, so its copy is assembled from parts; rather than trusting that assembly, every backend opens the staged copy and requires the recovered cursor to equal the live fold's. Cursor-in-every-apply-batch makes this a complete tail-loss detector, and it reuses the engine's recovery as the oracle instead of reimplementing consistency checks. The cost — one extra open per export — is noise for a periodic exporter.

### Why is the export lease's expiry in the value instead of a server TTL?

Per-message TTLs need a new-enough NATS server, a bucket flag, and a backend that supports them; an `expires_at` inside the value works on any `KvWriter` and keeps acquisition/takeover as two plain CAS operations. The cost is wall-clock comparison across nodes — acceptable because a premature steal merely produces a duplicate artifact (last-write-wins on the same key), never corruption: the lease is work-dedup, not a correctness gate.

### Why write in sorted key order during compaction?

`HashMap` iteration order is random per process. Sorting produces a deterministic byte layout for a given logical state, enabling byte-level snapshot comparison (integrity checksums, test assertions) and making file diffs readable. The O(n log n) sort is negligible relative to the I/O it precedes.

## NATS Mapping

| Concept                  | NATS primitive                                                                  |
| ------------------------ | ------------------------------------------------------------------------------- |
| `KvStore`                | JetStream KV bucket (`KV_{name}` stream, `$KV.{name}.>` subjects)              |
| `VersionToken`           | Per-key stream sequence number (u64, stored big-endian in the 8-byte token)    |
| `WatchCursor`            | Stream sequence number at last checkpoint                                       |
| `delete()`               | `kv.delete()` — writes `KV-Operation: DEL` marker; always returns `Ok(true)`   |
| `delete_with_version()`  | `kv.update(key, [], rev)` — CAS write of empty value as tombstone               |
| `KvUpdate::Purge`        | `KV-Operation: PURGE` — all history removed; treated same as Delete in snapshot |
| `scan()` / `keys()`      | Ephemeral push consumer with `DeliverPolicy::LastPerSubject`                    |
| `watch_all()`            | `kv.watch_with_history(">")` — `LastPerSubject`: state-sync re-list, then live |
| `watch_prefix()`         | `kv.watch_with_history("{prefix}>")` — server-side subject filter, same re-list |
| `watch_prefixes()`       | `kv.watch_many_with_history([..])` — ONE multi-filter consumer (NATS 2.10)      |
| `watch_all_from(cursor)` | `kv.watch_all_from_revision(cursor+1)` — server-side delta delivery            |
| `watch_prefixes_from()`  | Hand-built ordered push consumer: `filter_subjects` + `ByStartSequence(cursor+1)` (async-nats has no `watch_many_from_revision`) |

## Trust Model

**What the store layer verifies:**
- NATS credentials are valid (at `connect()`)
- Bucket exists or can be created (at `store()`)
- Snapshot CRC per record (at `load()` and `compact()`)
- Snapshot magic bytes and format version (at `load()`)

**What passes through unchecked:**
- Key names (no validation; NATS accepts any key)
- Value content (raw bytes; deserialization is the caller's responsibility)
- Bucket permissions (NATS auth rules govern access; the store layer does not re-check)
- Channel capacity (caller sets it; a full channel backpressures the watcher task)
- Snapshot cursor validity (stale cursors surface as `CursorExpired` from NATS, not from the snapshot layer)

**Why this is acceptable:** Applications own value encoding (JSON, proto, etc.). NATS owns authorization. The store layer is a transport adapter.

## Failure Modes

| Failure                         | Recovery                                                       |
| ------------------------------- | -------------------------------------------------------------- |
| `CursorExpired`                 | `watch_applied` resyncs (synthetic deletes) + falls back to the state-sync re-list; raw callers use `stale_keys()` |
| `WatchError`                    | Re-subscribe; watch stream dropped (NATS restart, reconnect)   |
| `Timeout` on any op             | CLOSE_WAIT connection; call `shutdown()` + `connect()`         |
| `RevisionMismatch` on CAS       | Re-read with `entry()`, resolve conflict, retry                |
| `AlreadyExists` on `create()`   | Read the live value, decide whether to proceed                 |
| Snapshot truncated tail         | `load()` discards partial record; earlier state preserved      |
| Snapshot mid-file CRC mismatch  | `SnapshotError::Corrupted`; delete file, do full NATS replay   |
| Snapshot wrong format version   | `SnapshotError::InvalidFormat`; delete file, full NATS replay  |
| `compact()` I/O error           | Retry; if persistent, delete file and rebuild from NATS        |
| Synadia Cloud stream limit      | Raw API path treats as non-fatal; verifies with `get_key_value` |
| Tampered/torn artifact          | `ArtifactInvalid` at import (hash/cursor gate); nothing lands at the destination — fetch another artifact |
| Artifact backend/format mismatch | `ArtifactInvalid` before any open; payload format markers re-checked by the engine itself |
| Export under churn (fjall)      | Copy retries on GC'd files (×3); verify-by-reopen catches anything torn |
| Export/upload fails mid-round   | Lease abandoned, local artifact deleted; next trigger elects a new node |
| Artifact cursor older than the log | `CursorExpired` on resume → full watch fallback; checkpoint more often than log retention |

## Package Structure

| File              | Purpose                                                                              |
| ----------------- | ------------------------------------------------------------------------------------ |
| `src/kv.rs`       | Core traits (`KvReader`, `KvWriter`, `KvWatcher`, `KvTtl`) and types (`KvEntry`, `KvUpdate`, `VersionToken`, `WatchCursor`, `KvError`) |
| `src/stores.rs`   | `Connection`, `KvStore`, `StoreConfig`, `StorageType`, `ConnectionCapabilities`      |
| `src/nats.rs`     | NATS JetStream implementation; bucket creation, scan consumer lifecycle, timeout wrapping, Synadia Cloud workarounds |
| `src/snapshot.rs` | `SnapshotStore` trait; append-only log + `AppendLogSnapshot` (default backend): `SnapshotWriter`, `load()`, `replay_log()`, `compact_to_file()` |
| `src/snapshot_fjall.rs` | `FjallSnapshot`: on-disk `SnapshotStore` backed by fjall (`feature = "fjall"`)  |
| `src/snapshot_rocksdb.rs` | `RocksDbSnapshot`: on-disk `SnapshotStore` backed by RocksDB (`feature = "rocksdb"`) |
| `src/snapshot_record.rs` | Shared `[ver_len][version][value]` value-record codec for the LSM backends |
| `src/applied.rs`  | `watch_applied` cursor-after-apply combinator, generic over `SnapshotStore`: `WatchScope`, `BatchConfig`, cursor-expired stale-key resync |
| `src/lib.rs`      | Re-exports all public types; no logic                                                |
| `benches/`        | Criterion benchmarks for snapshot write/checkpoint/load throughput and batch throughput |
| `tests/`          | Integration tests (require live NATS)                                                |

## Configuration

### StoreConfig (bucket creation only)

Config applies only at creation. If the bucket already exists, the existing one is returned as-is — `max_bytes`, `num_replicas`, `max_history`, `max_age` are ignored. To change settings on a live bucket, alter the underlying JetStream stream out-of-band.

| Field          | Default    | Rationale                                                          |
| -------------- | ---------- | ------------------------------------------------------------------ |
| `max_bytes`    | 10 MiB     | Required by Synadia Cloud; omit only for self-hosted NATS          |
| `max_history`  | 1          | Config stores rarely need change history                           |
| `num_replicas` | 1          | Set to 3 for production HA clusters                                |
| `max_age`      | None       | Set to gate retention window (also determines when cursors expire) |

### NatsConnectionConfig

| Field        | Notes                                                               |
| ------------ | ------------------------------------------------------------------- |
| `url`        | `nats://` or `tls://`; may embed `user:pass@` for legacy auth       |
| `creds`      | Base64-encoded `.creds` content (containers, ECS — no file mount)  |
| `creds_file` | Path to `.creds` on disk (bare-metal, local dev)                    |

Credentials priority: `creds` > `creds_file` > URL-embedded > no auth.

### Snapshot Tuning

| Parameter           | Effect                                                                                      |
| ------------------- | ------------------------------------------------------------------------------------------- |
| `compact_threshold` | Bytes appended since last compaction before `checkpoint()` returns `true`. Typical: 1–10 MB |