beyond-slipstream 0.7.2

Watchable distributed config over NATS JetStream — cache locally, stream updates, resume from a sequence number after any restart.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
# Slipstream Architecture

Takes NATS JetStream KV buckets as input and produces a local durable fold — a `SnapshotStore` whose cursor is guaranteed to name only revisions whose `apply` has returned — so edge nodes survive restarts without full replay, and can bootstrap from a content-addressed artifact when the NATS log has compacted past their cursor.

## Data Flow

### Happy path: startup with snapshot — no export

```
Disk ──► load(path) ──► replay_log() ──► HashMap<key, KvEntry> + WatchCursor
                                                    │
                                    watcher.watch_all_from(cursor, tx)
                                                    │
                                              CursorExpired?
                                             /              \
                                           Yes               No
                                            │                 │
                            stale-key resync (synthetic      delta stream
                            deletes) + watch_all(tx)              │
                            (state-sync re-list)                  │
                                            └───────┬─────────────┘
                                                    │
                                   KvUpdate → cache.apply() + snap.write_update()
                                                    │
                                     snap.checkpoint(cursor) ──► compact() if due
```

### Read path

```
reader.get("key")    ──► NATS kv.entry() ──► filter tombstones ──► KvEntry | None
reader.entry("key")  ──► NATS kv.entry() ──► raw (includes tombstones, for CAS)
reader.scan("pfx.")  ──► ephemeral push consumer (DeliverPolicy::LastPerSubject) ──► Vec<KvEntry>
reader.keys("pfx.")  ──► same consumer, headers_only ──► Vec<String>
```

### CAS write path

```
writer.create("lock", val)           ──► kv.create()        ──► AlreadyExists | VersionToken
writer.update("node", val, ver)      ──► kv.update()        ──► RevisionMismatch | VersionToken
writer.delete_with_version("k", ver) ──► kv.update(key, []) ──► RevisionMismatch | bool
```

CAS tombstone (empty-value Put) is how `delete_with_version` works — it writes an empty value via a CAS operation so concurrent writers see a conflict. `get()` and `scan()` filter these out; `entry()` exposes them for CAS callers that need the version.

### Bootstrap via artifact (bounded log, cursor evicted)

```
ObjectStore (<prefix>/<key>.manifest.json)
  ↓ download pointer
ExportManifest (cursor, backend, file hashes)
  ↓ derive payload key from blake3(manifest)[..8]
ObjectStore (<prefix>/<key>.payloads/<hash>.tar)
  ↓ stream to temp dir (verified: embedded manifest == pointer bytes, every file hash re-checked)
Staged artifact dir
  ↓ rename into place (atomic; bad artifact never becomes the fold)
SnapshotStore (recovered cursor = manifest.cursor)
  ↓ watch_applied(resume=cursor)
NATS JetStream (tail delta from cursor+1)
  ↓ apply tail
Converged fold
```

### Export round (one winner per fleet, TTL-gated)

```
ExportLease::try_acquire(ttl)
  ↓ create-only CAS wins; expired/corrupt lease CAS-stolen
watch_applied ExportRequest channel
  ↓ pending batch flushed first
store.export_to(artifact_dir)
  ↓ backend snapshot + verify-by-reopen (cursor equality gate)
ExportManifest (cursor, backend, file BLAKE3 hashes)
  ↓
ObjectStoreTransport::upload(key, artifact_dir)
  ├─ tar data/ → multipart upload (8 MiB chunks, 8 concurrent)
  │    key = <prefix>/<key>.payloads/<blake3(manifest)[..8]>.tar
  └─ swap_pointer() → conditional put at <prefix>/<key>.manifest.json
       ├─ pointer_publish_allowed(current, candidate_rank)?
       │    true  → CAS put (max 8 retries on concurrent swap)
       │    false → SupersededByNewer (slow exporter gives up cleanly)
       └─ PublishOutcome::Published | SupersededByNewer
  ↓ Published
LeaseGuard::complete(cursor)
  ↓
ObjectStoreTransport::prune(key, grace=4×ttl)
  └─ delete payloads: payload_prunable(payload_rank, pointer_rank, is_target, aged_out)?
  ↓
delete local artifact dir (transience enforced — not hoped for)
```

### Watch resumption

```
watch_all_from(cursor, tx)
  cursor.is_none() ─────────────────────────► watch_all(tx)        (full replay)
  cursor has rev ──► kv.watch_all_from_revision(rev+1)
                       │                       │
                    Success                CursorExpired (NATS compacted past cursor)
                       │                       │
                   delta stream          caller falls back to watch_all(tx)
```

Every non-`_from` watch is a **state-sync** stream (NATS `DeliverPolicy::LastPerSubject`): the current value of every matching key is delivered first — the re-list — then live updates. A no-cursor consumer therefore converges on full bucket state from the watch alone, with no separate scan and no scan-to-watch race window. The `_from` variants skip the re-list and deliver only the delta.

## Concepts & Terminology

| Term                     | Definition                                                           | NOT                                              |
| ------------------------ | -------------------------------------------------------------------- | ------------------------------------------------ |
| `Connection`             | Socket lifecycle manager + store factory                             | Not a store; not the NATS client itself          |
| `KvStore`                | Named bucket; vends reader, watcher, writer                          | Not the connection; holds no socket              |
| `KvReader`               | Point-in-time reads: `get`, `entry`, `keys`, `scan`                  | Not a live stream; returns a snapshot moment     |
| `KvWatcher`              | Live update stream pushed via mpsc channel                           | Not a polling loop; push from NATS               |
| `KvWriter`               | Write, soft-delete, CAS (`create`, `update`, `delete_with_version`)  | Not multi-key transactions                       |
| `WatchCursor`            | Opaque resume position in a watch stream (NATS: u64 revision)        | Not a per-key version; only for watch resumption |
| `VersionToken`           | Opaque per-key version (NATS: 8-byte u64; FDB: 10-byte versionstamp) | Not a wall-clock timestamp; not globally ordered |
| `KvEntry`                | One key + value + version from a read                                | Not a watch event; immutable once returned       |
| `KvUpdate`               | One watch event: `Put`, `Delete`, or `Purge`                         | Not a read result; carries deletes too           |
| `Snapshot`               | Deduplicated KV state + cursor persisted to disk                     | Not the source of truth; a cache of NATS         |
| `SnapshotWriter`         | Append-only log of `KvUpdate`s; no in-memory state beyond a counter  | Not the in-memory cache itself                   |
| `SnapshotStore`          | Trait: the durable-fold contract — atomic `apply(batch, cursor)`, `load`, `get`, `range` | Not a serving index; stops at fold + cursor + query |
| `AppendLogSnapshot`      | Default `SnapshotStore`: append-only log + in-RAM fold (pure-Rust)   | Not for folds larger than RAM                     |
| `FjallSnapshot`          | On-disk `SnapshotStore` (fjall LSM, `feature = "fjall"`) for large folds | Not in the pure-Rust core; opt-in feature        |
| `RocksDbSnapshot`        | On-disk `SnapshotStore` (RocksDB, `feature = "rocksdb"`) for large folds | Not pure-Rust; opt-in feature with a C++ build dep |
| `watch_applied`          | Combinator: batch → apply → *then* advance cursor / fold into `SnapshotStore`; resyncs stale keys on cursor expiry | Not a raw watch; the cursor follows `apply`, not receipt |
| `WatchScope`             | What `watch_applied` watches: `All`, `Prefix`, or `Prefixes` (multi-filter union) | Not N consumers; `Prefixes` costs one consumer    |
| `ConnectionCapabilities` | Feature flags for runtime branching (CAS, streaming watch, …)        | Not enforced; purely advisory                    |
| `ExportRequest`          | One-shot channel message that asks `watch_applied` to flush + export the current fold | Not a store operation; handled between flushes   |
| `ExportLease`            | Fleet-wide at-most-one coordinator: CAS key that prevents N nodes exporting the same round | Not a correctness gate; only dedup. Pointer monotonicity is the gate |
| `LeaseGuard`             | RAII guard for a won export round: `complete(cursor)` stamps success, `abandon()` frees it early via CAS delete | Not held across restarts; process crash → expiry → next node steals |
| `LeaseRecord`            | Value stored in the lease key: holder, acquired/expires timestamps (wall-clock, embedded — no server TTL), optional completed cursor | Expiry is compared by callers, not the store; requires NTP-sane clocks |
| `ExportManifest`         | Artifact metadata committed to disk and object store: backend identity, format generation, cursor, per-file BLAKE3 hashes | The manifest bytes are their own content address (blake3[..8]) |
| `ArtifactTransport`      | Trait: `upload(key, dir) → ManifestAndOutcome`, `download(key) → ManifestAndDir`, `pointer(key)`, `prune(key, grace)` | Not an object store; adapts `ObjectStore` for the pointer-swap protocol |
| `ObjectStoreTransport`   | Concrete `ArtifactTransport` over any `object_store` backend (S3, GCS, Azure, local) | `file://` lacks CAS and FAILS CLOSED unless `with_non_atomic_pointer_fallback()` (dev only) |
| `PublishOutcome`         | `Published` (pointer advanced) or `SupersededByNewer` (refused — a newer pointer exists) | Not an error; a slow exporter's normal exit under concurrent rounds |
| `PointerState`           | What `swap_pointer` observes before deciding: `Absent`, `Present { rank: Some(n) }`, or `Present { rank: None }` (corrupt) | Used only by `pointer_publish_allowed`; never stored independently |

## Layer Architecture

```
┌─────────────────────────────────────────────────────────────┐
│        KvReader │ KvWatcher │ KvWriter │ KvTtl              │
│         (async_trait, object-safe, Arc<dyn Trait>)          │
├─────────────────────────────────────────────────────────────┤
│                        KvStore                              │
│          (named bucket — vends the three roles above)       │
├─────────────────────────────────────────────────────────────┤
│                       Connection                            │
│         (connect/shutdown/is_healthy + store factory)       │
├─────────────────────────────────────────────────────────────┤
│                    NatsConnection                           │
│   NatsKvStore │ NatsKvReader │ NatsKvWatcher │ NatsKvWriterImpl
│              (concrete NATS JetStream impl)                 │
└─────────────────────────────────────────────────────────────┘
                  snapshot.rs (orthogonal, optional)
┌─────────────────────────────────────────────────────────────┐
│   SnapshotStore trait: apply(batch, cursor) │ load │ get │ range
│   AppendLogSnapshot (default, in-RAM)                       │
│   FjallSnapshot │ RocksDbSnapshot (feature-gated, on-disk)  │
│          (append-only CRC log, tempfile+rename compact)     │
└─────────────────────────────────────────────────────────────┘
                  applied.rs (combinator over KvWatcher + snapshot)
┌──────────────────────────────────────────────────────────────┐
│   watch_applied(): batch → apply → advance cursor/checkpoint │
│   (cursor-after-apply; the safe default for resumable watch) │
└──────────────────────────────────────────────────────────────┘
```

## Core Mechanism

### Resumable Watch

The cursor is the NATS stream sequence number at the last checkpoint. On restart, pass it to `watch_all_from()` to subscribe at `cursor+1` — only the delta arrives, not the full history.

When the cursor expires (NATS retention window evicted those records), `CursorExpired` is returned. The fallback `watch_all()` re-list re-delivers the current value of every live key, but it cannot cover keys **deleted during the gap whose delete markers were also evicted** — those need synthetic `Delete` events diffed from prior state. `watch_applied` does this automatically when given a reader (see below). A raw-API caller hand-rolls the same diff with `Snapshot::stale_keys()`:

```rust
match watcher.watch_all_from(&snap.cursor, tx).await {
    Ok(()) => {}
    Err(KvError::CursorExpired) => {
        let live = reader.keys("").await?;
        for key in snap.stale_keys(live.iter().map(|s| s.as_str())) {
            cache.remove(key);
        }
        watcher.watch_all(tx).await?;
    }
    Err(e) => return Err(e.into()),
}
```

### Applied-Cursor Watch (`watch_applied`)

The resumable watch above hands the caller raw machinery — a channel of `KvUpdate`s, a cursor, a snapshot writer — and trusts each one to hand-roll the loop that batches updates, applies them, and advances the cursor. Every hand-rolled instance got the same step wrong: it advanced the cursor on **receipt** of an update (`high_water = rev` at `rx.recv()`) and applied the batch afterward. The combinator `watch_applied` exists to encode the correct discipline once.

**The resume guarantee.** This library's contract is "resume from a sequence number after any restart." `watch_applied` sharpens that into a single invariant:

> A persisted/reported cursor `C` ⟹ every update with revision ≤ `C` has been **applied** — the caller's `apply()` has returned for it.

The cursor is written from `apply()`'s completion, never from the channel's delivery. Concretely, on each flush the combinator runs `apply(batch)` to completion, *then* sets `cursor = batch_high`, *then* checkpoints the snapshot at that cursor, *then* fires `on_applied`. Nothing advances the cursor before `apply` returns.

**Why receipt is the wrong signal.** Bumping the cursor at `rx.recv()` and applying later opens a crash window: the persisted cursor claims "caught up to rev N" while rev N still sits in an unapplied batch buffer (or in flight to a separate apply task). On crash+resume the watch re-arms at `cursor+1`, *past* the unapplied rev N, and silently skips it. The data is gone with no error — a hole in the exact guarantee the crate advertises.

This is the lesson of Saltzer, Reed & Clark, *End-to-End Arguments in System Design* (1984): a checkpoint placed below the endpoint — here, at the transport's delivery rather than at the application of the update — can only ever be a performance hint, never a correctness guarantee. The "it happened" property can only be established at the endpoint that actually performs the work, so the cursor is sourced from `apply`, not from `recv`. The cursor-as-monotonic-index shape itself is the HashiCorp Consul anti-entropy / blocking-query lineage: a client re-arms its watch from the last index it *reconciled*, never from the index it merely *saw*.

**Cursor authority covers rejected entries.** `batch_high` tracks the highest revision *received* since the last flush, including updates that `parse` rejected (corrupt bytes, irrelevant keys). A rejected entry is still "nothing to apply," so it is covered by the cursor — and because NATS delivers in revision order, advancing to the max revision after one atomic `apply` is sound: having seen the max means every revision below it has been seen too. Without this, a run of irrelevant keys would pin the cursor in place and force redundant replay on every restart.

The one exception: an update carrying the **unknown** version (an unparseable ACK subject on the hand-built multi-prefix consumer path) never touches `batch_high` — it can neither mint a cursor position nor clobber the real high from earlier in the batch. The update is still applied; skipping it under-advances at worst, and re-delivery on resume is idempotent. (The pre-guard code fabricated revision 0 for these, which the loop adopted as a real position — regressing the persisted cursor to 0 and forcing a full replay on the next restart.)

**Snapshot consistency.** Raw `KvUpdate`s stream to the snapshot log as they arrive, but the *checkpoint* cursor is the post-apply cursor. A crash after a raw record is written but before its `apply`/checkpoint leaves the log holding data *ahead* of its cursor — which is safe: the cursor never names a revision whose `apply` had not returned, so resume re-delivers and re-applies that tail rather than skipping it. Compaction runs off the hot path via `spawn_blocking`, as everywhere else in the snapshot subsystem.

**Flush triggers.** A batch flushes when any of these fires: the `window` elapses, `batch.len()` reaches `config.max`, a shutdown is signalled, or the channel closes with a pending batch (the remainder is flushed before returning).

**Transient store failures re-queue, not drop.** If `store.apply()` returns an error, the raw batch is prepended to the next flush's accumulation and the watch continues. The streak counter increments; at 16 consecutive failures the watch fail-stops with `KvError::WatchError`. Dropping the failed batch and continuing was the shipped behavior until `transient_store_failure_never_leaves_a_cursor_gap` reproduced the bug: a transient failure followed by a successful flush advanced the cursor over a hole that survived every restart, because the restart re-folds from the advanced cursor, skipping the missing range. Cursor authority requires the store's cursor and contents to advance together, always (`applied.rs:303–394`, `tests/model_applied.rs`).

**Cursor-expired resync.** On `CursorExpired` from the resume path the combinator falls back to the full-scope watch (`watch_all` / `watch_prefix` / `watch_prefixes`), whose state-sync re-list re-delivers every live key as puts. The re-list cannot cover keys deleted during the gap whose markers were evicted with the cursor, so — when the combinator is given a `KvReader` and a store — it closes that hole first: the watch task lists the bucket's live keys, hands them to the main loop, and waits for an ack; the main loop flushes, diffs the fold's in-scope keys against the listing, and runs a synthetic `KvUpdate::Delete` (unknown version — never advances the cursor) through `parse`/`apply`/store for each key that vanished; only then does the fallback watch start. That ack ordering is the invariant: a synthetic delete always precedes the re-list put for the same key, so delete-then-recreate during the gap converges. Without a reader the fallback is re-list-only and logs the possible stale keys.

This is the layer the tunnel router (swap route table) and edge origin watcher (rebuild hashrings) both collapse onto: `parse` extracts the domain registration, `apply` swaps the live state, `on_applied` persists the cursor.

### Live retention floor guard (`stream_watch_floor_guarded`, `nats.rs:1042`)

NATS does **not** error when a live consumer's position is overrun by retention — it silently skips evicted messages (the same silent-clamp behaviour that `resume_window_ok` closes at resume time). For an All-scope watch, a skipped delete marker permanently diverges the fold, with zero log lines. The floor guard closes this mid-stream:

- **In-band (primary):** when a delivered revision jumps the frontier by more than 1, fetch `first_sequence` and call `resume_window_ok(frontier, first_seq)`. A benign interior gap — per-subject overwrites below the floor threshold — passes. Head eviction past the frontier fails the watch into the restart → `CursorExpired` → resync repair path. The check runs *before* the entry is processed: the fold never advances past unexamined evidence of loss.
- **Backstop (30 s):** when no deliveries arrive, the periodic probe catches the no-traffic case where there is no in-band evidence to act on.

A periodic-only design was rejected by the model checker: deliveries catching the frontier up past a gap between probes erase the evidence, leaving permanent silent divergence with the guard running. The in-band check is what makes the design correct (`tests/model_live_watch.rs`).

Scope: this guard is sound only for the All-scope unfiltered resume watch, where every stream message is deliverable and a delivery gap implies a missed message. Prefix-scoped watches deliver sparse revisions — gaps from non-matching subjects are indistinguishable from eviction gaps client-side — and retain the retention-outlives-lag operating axiom.

### scan() and keys() via Ephemeral Push Consumer

Both use `DeliverPolicy::LastPerSubject` — one ephemeral push consumer delivers the latest value per key in a single streaming operation, rather than N sequential `get()` calls. `keys()` adds `headers_only: true` so no value bytes cross the wire.

The consumer is always `AckPolicy::None`. The default `AckPolicy::Explicit` stops delivery after `max_ack_pending` (1000) un-acked messages, silently truncating any bucket with >1000 keys.

The consumer is created with **subscribe-before-create** ordering: the inbox subscription is registered before the consumer exists, closing a race in async-nats ≤0.46 where early messages arrive before the subscription is ready.

### ACK Subject Format Parsing in scan()

Each message delivered by the `scan()` push consumer carries the KV revision in its JetStream ACK subject (the message's reply subject). The revision is the stream sequence number, and it sits at a field offset that varies by NATS server version:

```
Legacy (9 tokens):  $JS.ACK.<stream>.<consumer>.<delivered>.<stream_seq>.<consumer_seq>.<ts>.<pending>
Modern (11–12 tok): $JS.ACK.<domain>.<account>.<stream>.<consumer>.<delivered>.<stream_seq>.<consumer_seq>.<ts>.<pending>[.<token>]
```

The stream sequence sits at index 5 (legacy) or index 7 (modern). The final token is always `num_pending` (typically 0), which looks like a sequence but is not. The previous implementation took the last token and produced a wrong version on every scanned entry; the current parser reads from the front and branches on token count.

The implementation uses a fixed 8-element stack array for the first 8 tokens (no heap allocation per message). An A/B against the previous `Vec`-collecting approach measured **~3.1× speedup** — 1.59 ms → 0.51 ms per 10k ACK parses. See `benches/ack.rs`.

### 30-Second Operation Timeout

Every NATS operation is wrapped in `timed()` (30 s). Without it, a CLOSE_WAIT connection (half-dead TCP) parks `await`s forever — async-nats does not fail in-flight requests when the TCP layer goes dead. 30 s is generous for legitimate slow ops (JetStream stream sync, leader election) while still being debuggable.

### VersionToken: Inline Multi-Backend Versioning

`VersionToken` is a 10-byte inline buffer — no heap allocation. It covers all current backends without widening:

| Backend  | Encoding                 | `as_u64()` |
| -------- | ------------------------ | ---------- |
| NATS     | 8-byte big-endian u64    | `Some(rev)` |
| FDB      | 10-byte versionstamp     | `None`     |
| Unknown  | len=0                    | `None`     |

### Machine-Checked Protocol Kernels (`protocol.rs`)

Three pure-function guards live in `protocol.rs`. Production code and the Stateright exhaustive model checker (`tests/model.rs`) call the **same functions** — not two hand-synchronized copies. A change to any guard is re-verified against the full bounded state space on the next `cargo test --test model`. Mutation tests prove each guard is load-bearing: substituting a broken variant produces a counterexample.

**`pointer_publish_allowed(current: &PointerState, candidate_rank: u64) → bool`**

The monotonic pointer guard. Returns `true` for an open slot, a corrupt pointer, or a candidate at or above the existing cursor — and `false` exactly when the existing pointer is parseable and strictly newer.

```
Absent                  → true   (open slot)
Present { rank: None }  → true   (corrupt pointer is replaceable, not a wedge)
Present { rank: Some(n) }
  candidate >= n        → true
  candidate < n         → false  (stale publish refused)
```

Machine-checked as: _"published cursor never regresses."_

**`payload_prunable(payload_rank, pointer_rank, is_pointer_target, aged_out) → bool`**

The prune guard. A payload is deletable only when all hold: it is not the pointer's target, its rank is parseable AND strictly below the pointer's, and its age has cleared the grace period.

```
!is_pointer_target && aged_out && payload_rank.is_some_and(|r| r < pointer_rank)
```

Strictly-below (not `<=`) is the structural fix that makes dangling pointers impossible: `pointer_publish_allowed` refuses any candidate strictly below the pointer, and the pointer is monotone — so anything this guard deletes can never be successfully published afterward. The model checker found a dangling-pointer counterexample under an earlier age-only rule; this guard is the fix.

Machine-checked as: _"pointer target always fetchable"_ under zero-grace pruning.

**`resume_window_ok(revision: u64, first_sequence: u64) → bool`**

The cursor-expiry guard. Resume reads `revision + 1` onward; it is sound iff `first_sequence ≤ revision + 1`. NATS does not error on a below-head start — it silently clamps to `first_sequence`, skipping evicted delete markers with no fallback. This check is ours to make.

```
first_sequence ≤ revision.saturating_add(1)  → sound
first_sequence >  revision + 1               → CursorExpired
```

Machine-checked as: _"bootstrap never silently diverges."_ Empirically pinned by `tests/resync.rs::nats_silently_clamps_resume_below_first_seq`.

### Export Round State Machine

```
[Idle] ──ExportRequest──► [Acquiring]
                              │
                    try_acquire(ttl)
                         /         \
                      Won          Lost (another holder)
                       │               │
                [Exporting]        [Idle]
                       │
             flush pending batch
             store.export_to(dir)
             verify-by-reopen (cursor eq)
                       │
                 [Uploading]
                       │
              tar + multipart put
              pointer_publish_allowed?
              ├─ true  → CAS swap (≤8 retries)
              └─ false → SupersededByNewer
                       │
              Published    SupersededByNewer
                 │                │
          [Completing]      [Abandoning]
                 │                │
         complete(cursor)   abandon() → CAS delete
         prune(grace=4×ttl)
         delete local dir
                 │
            [Idle]
```

| State        | Entry Condition                          | On Failure                             |
| ------------ | ---------------------------------------- | -------------------------------------- |
| Acquiring    | `ExportRequest` received by watch task   | —                                      |
| Exporting    | Lease won (CAS create or takeover)       | Abandon lease; delete local dir        |
| Uploading    | Export + verify-by-reopen succeeded      | Abandon lease; delete local dir        |
| Completing   | `Published`                              | Prune/complete failures are non-fatal  |
| Abandoning   | `SupersededByNewer` or upload failed     | CAS delete failure is non-fatal        |

### watch_applied loop state transitions

| From | Trigger | Guard | What Actually Happens |
|---|---|---|---|
| Watching | `rx.recv()` delivers a `KvUpdate` | version not unknown | `batch_high` updated; raw update pushed to `raw_batch`; `parse()` called; window timer armed on first update |
| Watching | `rx.recv()` delivers an unknown-version `KvUpdate` | — | Same, except `batch_high` untouched — an unparseable revision neither mints nor clobbers a cursor position |
| Batching | Window elapses | — | `apply(batch)`; cursor = `batch_high`; `store.apply(raw_batch, cursor)` on blocking task; `on_applied` fired |
| Batching | `batch.len() == config.max` | — | Same as window flush |
| Flush | `store.apply()` succeeds | streak < 16 | Streak reset; `on_applied` fired; continue watching |
| Flush | `store.apply()` transient error | streak < 16 | Raw batch prepended to next `raw_batch`; streak++; warn; continue |
| Flush | `store.apply()` error | streak ≥ 16 | `KvError::WatchError` returned; watch fail-stops |
| Watching | Watch task returns `CursorExpired` | first_seq > cursor+1 | Resync (if reader wired): list keys → synthetic deletes → ack; then `watch_all()` fallback |
| Resync | `reader.keys()` fails | — | Fatal: `KvError::WatchError`; restart re-runs the full resume → expiry → resync path |
| Resync | Fold range fails | — | Fatal: same |
| Watching | `GuardTrip` (in-band gap or 30s backstop) | `!resume_window_ok(frontier, first_seq)` | Watch task fails; `KvError::WatchError` surfaces; caller's restart hits `CursorExpired` and runs resync |
| Watching | `ExportRequest` | — | Flush pending batch; `store.export_to()` on blocking task; reply on oneshot; continue |
| Any | Shutdown signal | — | Flush pending batch; abort watch task; return applied cursor |
| Any | Channel closes cleanly | — | Flush remaining batch; return applied cursor |

## Snapshot Subsystem

### The durable-fold trait

The durable fold is a trait, `SnapshotStore`, so the backend is pluggable while the contract is fixed:

```rust
fn load(path) -> (WatchCursor, Self);          // resume position + store
fn apply(&mut self, batch, cursor);            // fold data AND advance cursor, atomically
fn get(key) -> Option<KvEntry>;                // point query
fn range(prefix) -> Vec<KvEntry>;              // ordered prefix scan
```

Three invariants bind every implementation:

- **Pure function of the log.** Delete the store, replay every update with revision `> cursor`, and the state is identical. The store caches the fold; NATS is the source of truth.
- **Cursor-after-apply.** `apply` makes data and cursor durable together, so the cursor never names a revision whose data is absent — one transaction on a transactional backend, data-then-cursor on the append log (a torn write leaves data *ahead* of the cursor, which replay re-folds, never skips).
- **Snapshot is a cache.** A tail lost to power loss (under a no-sync durability mode) is rebuilt by resuming the watch from the recovered cursor.

`watch_applied` is generic over `SnapshotStore`: on each flush, after `apply` returns, it hands the raw batch + post-apply cursor to `store.apply(...)` on a blocking task. The trait stops at fold + cursor + query; serving structures built from the fold (routing rings, hashrings) live in the consumer, which reads them out via `get`/`range`.

| Backend | Module | State | Durability |
| ------- | ------ | ----- | ---------- |
| `AppendLogSnapshot` (default) | `snapshot.rs` | Append-only CRC log + in-RAM `HashMap` fold | `checkpoint` flush (page cache); `fsync` only at `compact` |
| `FjallSnapshot` (`feature = "fjall"`) | `snapshot_fjall.rs` | On-disk fjall LSM (`data` + `meta` partitions) | One atomic batch per `apply` (data + cursor); per-commit `fsync` configurable (NO_SYNC default) |
| `RocksDbSnapshot` (`feature = "rocksdb"`) | `snapshot_rocksdb.rs` | On-disk RocksDB (`data` + `meta` column families), tuned for billion-key folds (hit-optimized ribbon filters, partitioned index, zstd bottommost — see the module's Tuning docs) | One atomic `WriteBatch` per `apply` (data + cursor); WAL always on, per-commit `fsync` configurable (NO_SYNC default) |

The two LSM backends are interchangeable in contract and share the value-record codec (`snapshot_record.rs`); fjall keeps the crate pure-Rust, RocksDB trades a C++ build dependency for the battle-tested engine and its operational tooling (`ldb`, `sst_dump`). Both keep the cursor in the same atomic batch as the data it names, so under NO_SYNC a crash can lose the un-synced tail but never desynchronize cursor from data — on reopen the recovered cursor is consistent and the watch re-folds the tail. The rest of this section describes the **append-log backend** (the default), whose on-disk format is below.

### File Format

```
Header:  b"PGSS" ++ version:u16le

Record:  crc32:u32le ++ type:u8 ++ payload

Put:     key_len:u16le ++ key ++ value_len:u32le ++ value ++ ver_len:u8 ++ version_bytes
Delete:  key_len:u16le ++ key ++ ver_len:u8 ++ version_bytes
Cursor:  cur_len:u8 ++ cursor_bytes
```

Version bytes are stored as length-prefixed raw bytes, not a fixed `u64`. A 10-byte FDB versionstamp round-trips intact; a `u64`-only field would flatten it to 0 and break every subsequent CAS on a restored entry.

CRC covers from the type byte through the end of the record. A truncated final record (crash mid-write) is silently discarded. A CRC mismatch in the middle of the file returns `SnapshotError::Corrupted`.

### State Machine

```
APPENDING ──► checkpoint() returns true ──► NEEDS_COMPACT
    │                                              │
write_update()                           compact() [blocking: replay → dedup → tempfile → rename]
    │                                              │
    └──────────────────────────────────────────────┘
                bytes_since_compact = 0
```

| From          | Event                            | To            | Guard / Side-effect                              |
| ------------- | -------------------------------- | ------------- | ------------------------------------------------ |
| APPENDING     | `write_update()`                 | APPENDING     | Buffered; bytes_since_compact += n               |
| APPENDING     | `checkpoint()` → true            | NEEDS_COMPACT | Cursor + cursor record flushed to page cache     |
| APPENDING     | `checkpoint()` → false           | APPENDING     | Same flush; below threshold                      |
| NEEDS_COMPACT | `compact()` succeeds             | APPENDING     | Tempfile → sync_all → rename; counter reset to 0 |
| NEEDS_COMPACT | `compact()` fails on reopen      | POISONED      | `writer = None`; subsequent writes return `Io`   |
| POISONED      | any `write_update`/`checkpoint`  | POISONED      | `Err(Io("snapshot writer poisoned"))` returned   |

### Load + Compaction

`load()` replays the full log into a `HashMap` (last write wins per key, deletes remove entries), then rewrites to a compact file (no duplicates) via tempfile + `sync_all` + rename. It skips the rewrite when the log is already compact (no duplicate keys, no delete records, clean EOF).

`compact()` flushes the BufWriter first so un-checkpointed records survive. It reads the current file, replays it, writes to a same-directory tempfile (same filesystem = atomic rename, no `EXDEV`), `sync_all`s, then renames.

`checkpoint()` writes only a cursor record and calls `BufWriter::flush()` — a `write(2)` into the page cache. This survives a process crash but NOT a power loss. The only `fsync` is in `compact()`. The snapshot is a cache; a lost tail is rebuilt from a NATS scan + watch replay.

### Export Lease (`export_lease.rs`)

`ExportLease` coordinates fleet-wide at-most-one export rounds using a NATS KV key as a mutex, with expiry embedded in the value (no server TTL machinery required).

**Acquisition:**
- `create(key, value)` → wins if key absent; exactly one caller fleet-wide succeeds
- If key exists: parse `LeaseRecord` → compare `expires_at` to wall clock
  - Expired or unparseable → CAS `update(key, new_record, observed_version)` to steal
  - Live → return `None` (another holder is active)

**`LeaseRecord` wire format (JSON):**
```json
{ "holder_id": "node-42", "acquired_at_unix": 1718000000, "expires_at_unix": 1718000300,
  "completed_cursor_hex": null, "completed_at_unix": null }
```
After upload: `completed_cursor_hex` and `completed_at_unix` are filled in by `complete(cursor)`.

**`LeaseGuard` RAII discipline:**
- `complete(cursor)` → CAS-rewrite lease with cursor + timestamp (fleet-visible last-export record)
- `abandon()` → CAS-delete the key to free the round early (next trigger elects immediately)
- Drop without completing → lease expires naturally; next node steals after TTL

**Why embedded TTL over server TTL:** Portable to any `KvWriter`; no version/bucket-flag requirements. The cost is wall-clock comparison — acceptable because a premature steal merely produces a duplicate artifact (last-write-wins on the same key), never corruption. The lease is work-dedup, not a correctness gate.

**Why corrupt-lease stealing:** One unparseable value would otherwise wedge the fleet at `expires_at` forever. The same rule as `pointer_publish_allowed`'s treatment of corrupt pointers: unknown/corrupt state is replaceable, not a hard stop.

### Export / Import (replica bootstrap)

When the watched bucket is a **bounded** log (size-capped, history evicted), a fold is no longer rebuildable from NATS alone — the folds become the only full replicas. Export/import makes them transferable, which is what lets a new node, a node with a lost/corrupt fold, or a node whose cursor aged out of the log bootstrap at all.

**Artifact anatomy.** A directory: the backend's files under `data/`, plus `MANIFEST.json` carrying the artifact schema version, the backend identity and its on-disk format generation, per-file sizes + BLAKE3 digests, and — the load-bearing field — the **watch cursor the payload is exactly consistent with**. Import resumes the watch from that cursor and replays only the log tail. The manifest is written last and the whole stage is atomically renamed, so an artifact that exists is complete; a crash mid-export leaves only a hidden temp dir.

**The cursor-consistency invariant.** `export_to(&mut self)` cannot run concurrently with `apply` (exclusive borrow), and inside `watch_applied` exports run between flushes via the `ExportRequest` channel (pending batch flushed first) — so the embedded cursor equals the applied cursor, exactly. Every backend re-proves it at export time by **reopening the copy** and checking cursor equality: because every `apply` commits the cursor in the same atomic batch as its data, a recovered cursor that matches the live one is a complete tail-loss detector.

**Per-backend export mechanics.**

- **append-log**: write a compacted log from the in-RAM fold (`compact_to_file`), verify by `load()`.
- **RocksDB**: the engine's native `Checkpoint` (memtable flush + SST hardlinks) — consistent by construction; verify-open anyway for the uniform guarantee.
- **fjall** (no checkpoint API): `persist(SyncAll)` (journal complete), best-effort quiesce (rotate memtables, drain flushes/compactions, bounded), copy the DB dir — hardlink immutable `tables/`/`blobs/`, byte-copy journal + metadata + `lock` — with a bounded retry against background GC, then verify-by-reopen. Correctness rests on the exclusive borrow + the verify gate, never on the quiesce.

**Import** stages a verified copy beside the destination (every hash re-checked — the transport that delivered the artifact is untrusted), opens the staged copy and gates on manifest-cursor equality, then atomically renames. A bad artifact never becomes a fold; a crash mid-import leaves nothing at the destination.

**Storage accounting.** Exports are hardlink-dominant (extra disk ≈ journal + small metadata, not 2×), but a lingering artifact pins files the source later compacts away — artifacts are **transient**: upload, then delete (`run_export_round` enforces this). Stage and destination must be on the fold's filesystem; the EXDEV fallback silently degrades hardlinks to full copies.

**Fleet coordination.** `ExportLease` makes exactly one replica perform a given export round: create-only CAS to win, expiry embedded in the value (no server TTL machinery), CAS takeover of expired/corrupt leases, `complete()` publishes the exported cursor on the key (the fleet-visible last-export record), `abandon()` frees a failed round early. Clock skew at worst causes a duplicate export — safe; the lease is dedup, not a correctness gate.

**Transport** (feature `transport`): `ObjectStoreTransport` adapts any `object_store` backend (S3, GCS, Azure, local). Object layout:

```
<prefix>/<key>.payloads/<blake3(manifest)[0..8] hex>.tar   ← content-addressed payload (write-once)
<prefix>/<key>.manifest.json                               ← monotonic pointer (published last)
```

The payload key is derived from the manifest bytes, making collisions require hash breakage. The manifest is the single trusted object; the payload key is derived from it at download time — readers never trust a separately-stored pointer to the payload.

`run_export_round` composes the full at-most-once round: lease → export (via `ExportRequest`) → upload → complete → prune → delete-local. Per-backend `import_remote` composes: download pointer → derive payload key → download + verify → import.

## Connection Lifecycle

```
NEW (healthy=false, handle=None)
    │
    │ .connect()
    ▼
CONNECTED (healthy=true, handle=Some(NatsHandle))
    │                  │
    │ .shutdown()      │ .store() → NatsKvStore
    ▼                  │ .is_healthy() → AtomicBool::load (O(1), no lock)
SHUTDOWN (healthy=false, handle=None)
    │
    └─► .connect() can reconnect
```

`is_healthy()` for the `new()` + `connect()` path reads an `AtomicBool` driven by an installed NATS event callback (`Connected`/`Disconnected`). For the `from_client()` path (pre-connected client, no event callback), it reads the client's live `connection_state()` instead.

The double-check pattern in `connect()` guards a concurrent connect race: a second caller that wins the dial drops its handle (leaving `installed=false` on the event callback) so the teardown event does not clobber the winner's `healthy` flag.

## Design Decisions

### Why a `watch_applied` combinator instead of leaving the loop to callers?

The raw `KvWatcher` + `WatchCursor` + `SnapshotWriter` pieces let callers hand-roll the batch/apply/advance loop — and every known caller advanced the cursor on *receipt* rather than after *apply*, silently skipping un-applied updates on crash+resume. That is a footgun in the library's core guarantee, not a caller bug to be fixed N times. Encoding cursor-after-apply once, behind a combinator that callers can't get wrong, is cheaper and safer than documentation. `apply` stays the only domain logic; the cursor/snapshot/`on_applied` bookkeeping is the library's. See [Applied-Cursor Watch](#applied-cursor-watch-watch_applied).

### Why do watches deliver current state first (state-sync semantics)?

async-nats's bare `watch`/`watch_all`/`watch_many` ride `DeliverPolicy::New` — live updates only. A consumer built on that needs a separate `scan()` to seed, and any write landing between the scan and the watch attach is in neither — silently lost until the next reseed (the seed-then-watch race, demonstrated in `watch_prefix_relist_covers_seed_then_watch_gap`). Mapping every non-`_from` watch to `_with_history` (`LastPerSubject`) makes the watch itself deliver the seed, in revision order, with no race window: one primitive, correct by construction. The cost is a full re-list on every no-cursor watch start — which is what a no-cursor start *means*; consumers that have state resume with a `_from` variant and skip it.

### Why does the cursor-expired resync list keys instead of re-scanning values?

The fallback watch's re-list already carries every live key's value, so the resync only needs to learn which keys *no longer exist* — `reader.keys()` (headers only, no value bytes) is sufficient and cheap. The fold side of the diff is equally key-only: it streams via `for_each_in_range` rather than `range()`, so an All-scope resync against an on-disk backend never materializes the whole fold (values included) on the repair path. The synthetic deletes carry an unknown version and never advance the cursor: the fold's persisted cursor stays at its (expired) position until real re-list revisions move it, so a crash mid-resync just re-runs the same idempotent diff on the next start. Ordering, not versioning, provides correctness: the resync acks before the fallback watch is established, so a synthetic delete can never land after the re-list put that resurrects the same key.

### Why KvError: Clone instead of Box<dyn Error>?

A failed connect future may be observed by multiple concurrent callers waiting on a shared result. `Clone` lets the error fan out to N waiters without `Arc`. The cost: `std::io::Error` and `async-nats` error types are not `Clone`, so their structured cause chain is flattened into a pre-rendered `String` at the boundary. The trade-off is explicit: no `#[source]` chain, but the message carries context instead.

### Why object-safe async traits (Arc<dyn Trait>) instead of generics?

`KvStore` vends `Arc<dyn KvReader>` / `Arc<dyn KvWatcher>` / `Arc<dyn KvWriter>` so callers hold narrowed capabilities without knowing the backend type at compile time. This lets services swap NATS for an in-memory stub in tests, and lets the edge proxy hold only `Arc<dyn KvReader>` without dragging in write types. The `async_trait` macro desugars to `Pin<Box<dyn Future>>` to satisfy object safety.

### Why optional watcher() and writer()?

Not all backends support streaming watch or writes. Optional returns (`Option<Arc<dyn …>>`) let the read path — the hot path for config consumers — be free of watch/write complexity. The edge proxy, for example, only calls `reader()`. Callers check `ConnectionCapabilities` to branch on feature availability before attempting optional paths.

### Why a raw JetStream API fallback for bucket creation?

Synadia Cloud returns `$JS.API.STREAM.CREATE` response shapes that `async-nats`'s `create_key_value()` cannot parse. The `create_kv_bucket_raw()` fallback sends the JSON config as a plain request/reply and classifies responses by error code:

- `10058` in `code` or `err_code` → stream already exists (non-fatal)
- `400` + "maximum number of streams" → Synadia at-limit but bucket may exist (non-fatal)
- Anything else → hard failure

The standard async-nats path is tried first; raw is a fallback — taken on a parse-shaped error **or a timeout**. A timeout must not skip the fallback: Synadia Cloud, the deployment the raw path exists for, is also the most likely to be slow or distant, and the raw path's own per-op timeouts bound a genuinely dead connection anyway.

### Why subscribe-before-create in scan/keys?

async-nats ≤0.46 has a race: the server can deliver the first batch of push-consumer messages before the client's subscribe call completes. Creating the consumer first loses those early messages. Subscribing to the inbox first closes the race — the subscription is ready before the server can deliver.

### Why checkpoint() does not fsync?

Checkpoints are frequent (every N watch events). An fsync per checkpoint would add milliseconds of disk-sync latency to the hot watch path. Since the snapshot is a cache backed by NATS, a tail lost to power loss is rebuilt from a NATS replay — not a correctness failure. The only `fsync` is in `compact_to_file()`, where it guarantees the new compact file is durable before the atomic rename replaces the old one.

### Why raw backend-dir artifacts instead of a logical re-encode?

Export copies the backend's own files rather than re-encoding entries into a portable format. The engine's native integrity machinery (fjall journal CRCs + version checksums, RocksDB MANIFEST validation) then verifies the artifact on open — the same code path that validates the fold after a crash — and hardlinks make export cost ~O(metadata) instead of O(data). The trade is per-backend artifact formats; the manifest's backend identity + format generation make that explicit, and the engines' own format markers travel inside the payload as defense in depth.

### Why does export verify by reopening the copy?

fjall has no checkpoint API, so its copy is assembled from parts; rather than trusting that assembly, every backend opens the staged copy and requires the recovered cursor to equal the live fold's. Cursor-in-every-apply-batch makes this a complete tail-loss detector, and it reuses the engine's recovery as the oracle instead of reimplementing consistency checks. The cost — one extra open per export — is noise for a periodic exporter.

### Why is the export lease's expiry in the value instead of a server TTL?

Per-message TTLs need a new-enough NATS server, a bucket flag, and a backend that supports them; an `expires_at` inside the value works on any `KvWriter` and keeps acquisition/takeover as two plain CAS operations. The cost is wall-clock comparison across nodes — acceptable because a premature steal merely produces a duplicate artifact (last-write-wins on the same key), never corruption: the lease is work-dedup, not a correctness gate.

### Why a content-addressed, write-once payload + monotonic pointer instead of two fixed-key objects?

The legacy layout wrote the payload at `<key>.payload.tar` and the manifest at `<key>.manifest.json` as independent last-write-wins objects. The model checker proved two failure modes under that layout: (1) a cursor regression when a slow exporter's stale manifest overwrites a faster one's, and (2) a dangling pointer when the manifest advances past a payload that an older concurrent publisher still holds. The shipped layout fixes both structurally:

- **Content-addressed payload** — the key is `blake3(manifest)[..8].tar`. Two different artifacts can never share a key. Re-uploading the same artifact is an idempotent overwrite. A slow exporter's payload is valid until pruned; it just can't become the pointer.
- **Monotonic pointer via CAS** — `pointer_publish_allowed` refuses any candidate strictly below the current pointer. Combined with CAS (no torn reads between observe and write), the pointer is monotone non-decreasing. Slow exporters always get `SupersededByNewer`, never succeed at regression.
- **Strictly-below prune rule** — `payload_prunable` only deletes ranks strictly below the pointer. Because the pointer is monotone and `pointer_publish_allowed` refuses `< pointer`, a pruned payload can never become reachable again. The model checker found the dangling counterexample under an earlier age-only rule; this is the structural fix.

Machine-checked as three theorems in `tests/model.rs`: published cursor never regresses, pointer target always fetchable (zero-grace prune), bootstrap never silently diverges.

### Why extract protocol guards into `protocol.rs` and share them with the model?

Production code and an exhaustive model checker running the same logic closes the drift gap between proof and implementation. Previously the model had its own inline guard copies; a change to the production guard would not update the model, leaving the proof covering the old variant. Extracting the three guards into `protocol.rs` and importing them from both call sites means every `cargo test --test model` verifies the current production code, not a snapshot. The cost is a hard dependency: `tests/model.rs` imports `pub mod protocol` directly, so the module cannot go private.

### Why write in sorted key order during compaction?

`HashMap` iteration order is random per process. Sorting produces a deterministic byte layout for a given logical state, enabling byte-level snapshot comparison (integrity checksums, test assertions) and making file diffs readable. The O(n log n) sort is negligible relative to the I/O it precedes.

## NATS Mapping

| Concept                  | NATS primitive                                                                  |
| ------------------------ | ------------------------------------------------------------------------------- |
| `KvStore`                | JetStream KV bucket (`KV_{name}` stream, `$KV.{name}.>` subjects)              |
| `VersionToken`           | Per-key stream sequence number (u64, stored big-endian in the 8-byte token)    |
| `WatchCursor`            | Stream sequence number at last checkpoint                                       |
| `delete()`               | `kv.delete()` — writes `KV-Operation: DEL` marker; always returns `Ok(true)`   |
| `delete_with_version()`  | `kv.update(key, [], rev)` — CAS write of empty value as tombstone               |
| `KvUpdate::Purge`        | `KV-Operation: PURGE` — all history removed; treated same as Delete in snapshot |
| `scan()` / `keys()`      | Ephemeral push consumer with `DeliverPolicy::LastPerSubject`                    |
| `watch_all()`            | `kv.watch_with_history(">")` — `LastPerSubject`: state-sync re-list, then live |
| `watch_prefix()`         | `kv.watch_with_history("{prefix}>")` — server-side subject filter, same re-list |
| `watch_prefixes()`       | `kv.watch_many_with_history([..])` — ONE multi-filter consumer (NATS 2.10)      |
| `watch_all_from(cursor)` | `kv.watch_all_from_revision(cursor+1)` — server-side delta delivery            |
| `watch_prefixes_from()`  | Hand-built ordered push consumer: `filter_subjects` + `ByStartSequence(cursor+1)` (async-nats has no `watch_many_from_revision`) |

## Trust Model

**What the store layer verifies:**
- NATS credentials are valid (at `connect()`)
- Bucket exists or can be created (at `store()`)
- Snapshot CRC per record (at `load()` and `compact()`)
- Snapshot magic bytes and format version (at `load()`)
- Resume window soundness — `resume_window_ok(revision, first_sequence)` before attaching (NATS silently clamps; we error explicitly)

**What passes through unchecked:**
- Key names (no validation; NATS accepts any key)
- Value content (raw bytes; deserialization is the caller's responsibility)
- Bucket permissions (NATS auth rules govern access; the store layer does not re-check)
- Channel capacity (caller sets it; a full channel backpressures the watcher task)
- Snapshot cursor validity (stale cursors surface as `CursorExpired` from NATS, not from the snapshot layer)

**Why this is acceptable:** Applications own value encoding (JSON, proto, etc.). NATS owns authorization. The store layer is a transport adapter.

**What the transport layer verifies (import path):**
- Embedded manifest in the tar matches the pointer bytes downloaded from object store (content-address cross-check)
- Every payload file's BLAKE3 hash matches the manifest (per-file integrity)
- Recovered cursor of the staged copy equals the manifest cursor (tail-loss detector)
- Backend identity + format generation match the local node's backend

**What the transport layer does NOT verify:**
- Object store authorization (delegated to `object_store` credentials)
- Pointer freshness (callers decide whether the embedded cursor is recent enough)
- Whether the exporting node's fold was correct (garbage in, garbage out — the integrity guarantees cover transport, not source correctness)

## Failure Modes

| Failure                              | What Actually Happens                                                       | Recovery                                          |
| ------------------------------------ | --------------------------------------------------------------------------- | ------------------------------------------------- |
| Transient `store.apply()` failure    | Raw batch re-queued at front; next flush commits cumulatively; streak counter incremented | Automatic up to 16 consecutive failures |
| 16 consecutive `store.apply()` failures | Watch fail-stops: `KvError::WatchError` returned | Restart re-folds from last good cursor; investigate store (disk, permissions) |
| Live watch retention overrun (All scope) | Floor guard detects in-band (gapped delivery) or via 30 s backstop; watch fails with `KvError::WatchError` | Caller restarts; resume hits `CursorExpired`; resync repairs stale keys |
| Live watch retention overrun (Prefix scope) | **Undetected until the next restart** — prefix watches deliver sparse revisions, so eviction gaps are indistinguishable from non-matching subjects client-side; the floor guard is sound for All scope only (a periodic probe here would false-positive into spurious resyncs, a design the model checker rejected) | Next restart's resume-window check hits `CursorExpired` → resync repairs; operate prefix-scoped watches with retention window comfortably above the restart/redeploy interval |
| Resync `reader.keys()` failure       | Watch fail-stops (fail-stop, not degrade — degrade semantics proven to leave stale keys permanently) | Restart re-runs full resume → expiry → resync |
| `CursorExpired`                      | `watch_applied` lists live keys, diffs fold, applies synthetic deletes, then falls back to state-sync re-list | Automatic; raw callers use `stale_keys()` manually |
| `WatchError`                         | Watch stream dropped (NATS restart, reconnect)                              | Re-subscribe                                      |
| `Timeout` on any NATS op             | CLOSE_WAIT half-dead TCP parks the `await` without this guard              | Call `shutdown()` + `connect()`                   |
| `RevisionMismatch` on CAS            | Concurrent writer won the race                                              | Re-read with `entry()`, resolve, retry            |
| `AlreadyExists` on `create()`        | Key already present; caller's create was not exclusive                      | Read live value, decide whether to proceed        |
| Snapshot truncated tail              | `load()` discards partial final record; earlier records intact              | Resume from recovered cursor; tail re-folded      |
| Snapshot mid-file CRC mismatch       | `SnapshotError::Corrupted`                                                  | Delete snapshot, full NATS replay                 |
| Snapshot wrong format version        | `SnapshotError::InvalidFormat`                                              | Delete snapshot, full NATS replay                 |
| `compact()` I/O error                | Writer poisoned; subsequent writes return `Io`                              | Delete snapshot, rebuild from NATS                |
| Synadia Cloud stream limit           | Raw API path treats as non-fatal; verifies bucket with `get_key_value`      | Non-fatal if bucket exists                        |
| Crash between payload upload and pointer swap | Old pointer remains fully consistent; payload orphaned | Next export round publishes new pointer; stale payload pruned after grace |
| Slow exporter after newer round published | `pointer_publish_allowed` returns false → `SupersededByNewer`           | Lease abandoned, local artifact deleted; payload orphaned until prune |
| Tampered / torn artifact             | `ArtifactInvalid` at import (hash + cursor gate); nothing written to destination | Fetch another artifact                        |
| Artifact backend/format mismatch     | `ArtifactInvalid` before any open; engine format markers re-checked internally | Fetch another artifact                        |
| Export under churn — fjall copy      | File GC'd mid-copy; retry ×3; verify-by-reopen catches anything torn       | Abandon lease on persistent failure               |
| Export/upload fails mid-round        | Lease abandoned (CAS delete); local artifact deleted                       | Next trigger elects a new node                    |
| Artifact cursor older than NATS log  | `CursorExpired` on resume → full watch fallback + stale-key resync          | Checkpoint more often than log retention window   |
| Corrupt lease value                  | Treated as expired: CAS-stolen by next acquirer                             | Non-fatal; one bad write cannot wedge the fleet   |
| `prune()` I/O error                  | Stale payloads linger; warning logged                                       | Retried next export round; correctness unaffected |

## Package Structure

| File                       | What It Does                                                                         |
| -------------------------- | ------------------------------------------------------------------------------------ |
| `src/kv.rs`                | Core traits (`KvReader`, `KvWriter`, `KvWatcher`, `KvTtl`) and types (`KvEntry`, `KvUpdate`, `VersionToken`, `WatchCursor`, `KvError`) |
| `src/stores.rs`            | `Connection`, `KvStore`, `StoreConfig`, `StorageType`, `ConnectionCapabilities`      |
| `src/nats.rs`              | NATS JetStream implementation; bucket creation, scan consumer lifecycle, timeout wrapping, Synadia Cloud workarounds, `check_resume_window` |
| `src/protocol.rs`          | Pure-function protocol guards: `pointer_publish_allowed`, `payload_prunable`, `resume_window_ok` — called by both production code and the Stateright model |
| `src/snapshot.rs`          | `SnapshotStore` trait; append-only log + `AppendLogSnapshot` (default backend): `SnapshotWriter`, `load()`, `replay_log()`, `compact_to_file()` |
| `src/snapshot_fjall.rs`    | `FjallSnapshot`: on-disk `SnapshotStore` backed by fjall (`feature = "fjall"`)       |
| `src/snapshot_rocksdb.rs`  | `RocksDbSnapshot`: on-disk `SnapshotStore` backed by RocksDB (`feature = "rocksdb"`) |
| `src/snapshot_record.rs`   | Shared `[ver_len][version][value]` value-record codec for the LSM backends           |
| `src/artifact.rs`          | `ExportManifest`, `ArtifactFile`, BLAKE3 integrity; stage-then-rename discipline; backend `export_to` + `import` (append-log, fjall, RocksDB) |
| `src/export_lease.rs`      | `ExportLease`, `LeaseGuard`, `LeaseRecord`: fleet-wide at-most-one via embedded-expiry CAS |
| `src/transport.rs`         | `ObjectStoreTransport`, `ArtifactTransport` trait, `run_export_round`: monotonic pointer swap, multipart upload, prune, content-addressed keys (`feature = "transport"`) |
| `src/applied.rs`           | `watch_applied` cursor-after-apply combinator, generic over `SnapshotStore`: `WatchScope`, `BatchConfig`, cursor-expired stale-key resync, `ExportRequest` handling |
| `src/lib.rs`               | Re-exports all public types; no logic                                                |
| `benches/`                 | Criterion benchmarks: snapshot write/checkpoint/load throughput, batch throughput, ACK subject parsing |
| `tests/integration.rs`     | NATS JetStream backend integration suite: each test boots its own `nats-server` on a free port; covers bucket create, CAS, watch semantics, cursor resume, and delete reconciliation |
| `tests/snapshot_store.rs`  | Backend-agnostic `SnapshotStore` conformance suite: every check is generic over the backend, instantiated for `AppendLogSnapshot`, `FjallSnapshot`, and `RocksDbSnapshot`; new backends get the whole suite for free |
| `tests/transport.rs`       | Integration: upload/download/manifest round-trip, pointer swap, prune, non-CAS fail-closed |
| `tests/transport_s3.rs`    | Live MinIO: CAS semantics (create, If-Match update, refusal, crash-window availability, prune) verified against a real object store |
| `tests/bootstrap.rs`       | Tier-2 bootstrap proofs on live NATS + on-disk backends: exports from a live `watch_applied` loop under churn, imports as a second node, and asserts **delta-only resume** via delivery count (not just convergence — a full replay would produce the same end state) |
| `tests/multi_export.rs`    | Prevention proofs on live NATS + real fjall/RocksDB: slow-exporter clobber refused, crash window keeps bootstrap available, multi-SST post-compaction fidelity |
| `tests/resync.rs`          | Live-NATS conformance: NATS silent-clamp pinned; full expiry → resync chain e2e; no-reader divergence pinned |
| `tests/model.rs`           | Stateright exhaustive model (~250M states, fleet size 2–3, unbounded rounds): pointer swap theorems (no regression, no torn pair, no dangling pointer, no silent divergence, terminal liveness); mutation tests prove each protocol guard load-bearing |
| `tests/model_applied.rs`   | Stateright cursor-authority model: delivery → flush → transient failure → crash/restart; `DropFailedBatch` and `ResumeFromMemApplied` mutations pin both pre-fix bug classes |
| `tests/model_resync_order.rs` | Stateright resync ack-barrier model: proves synthetic deletes strictly precede re-list puts; `NoAckBarrier` mutation reaches lost-recreate divergence |
| `tests/model_live_watch.rs` | Stateright floor-guard model: guarded variant proves fold always converges; unguarded variant pins permanent silent divergence as the machine-checked record of the pre-guard code |
| `tests/common/mod.rs`      | Shared test helpers: ephemeral NATS server, MinIO harness, `ManifestPutCrash` transport injection |

## Configuration

### StoreConfig (bucket creation only)

Config applies only at creation. If the bucket already exists, the existing one is returned as-is — `max_bytes`, `num_replicas`, `max_history`, `max_age` are ignored. To change settings on a live bucket, alter the underlying JetStream stream out-of-band.

| Field          | Default    | Rationale                                                          |
| -------------- | ---------- | ------------------------------------------------------------------ |
| `max_bytes`    | 10 MiB     | Required by Synadia Cloud; omit only for self-hosted NATS          |
| `max_history`  | 1          | Config stores rarely need change history                           |
| `num_replicas` | 1          | Set to 3 for production HA clusters                                |
| `max_age`      | None       | Set to gate retention window (also determines when cursors expire) |

### NatsConnectionConfig

| Field        | Notes                                                               |
| ------------ | ------------------------------------------------------------------- |
| `url`        | `nats://` or `tls://`; may embed `user:pass@` for legacy auth       |
| `creds`      | Base64-encoded `.creds` content (containers, ECS — no file mount)  |
| `creds_file` | Path to `.creds` on disk (bare-metal, local dev)                    |

Credentials priority: `creds` > `creds_file` > URL-embedded > no auth.

### Snapshot Tuning

| Parameter           | Effect                                                                                      |
| ------------------- | ------------------------------------------------------------------------------------------- |
| `compact_threshold` | Bytes appended since last compaction before `checkpoint()` returns `true`. Typical: 1–10 MB |

### BatchConfig (watch_applied)

| Field    | Default | Effect                                                                     |
| -------- | ------- | -------------------------------------------------------------------------- |
| `window` | 10 ms   | Max time a batch stays open before flush                                   |
| `max`    | 100     | Max updates per batch before early flush                                   |
| `channel_capacity` | 256 | Depth of the watch-task → loop channel. A full channel backpressures the watch task (by design); during state-sync hydration of a large bucket it can become the effective batch boundary, so tune it together with `max` |

| Constant | Value | Effect |
|---|---|---|
| `MAX_STORE_APPLY_FAILURES` | 16 | Consecutive transient store-apply failures before the watch fail-stops. Prevents the re-queued batch backlog from growing unboundedly under a persistently failing store. |
| `FLOOR_GUARD_INTERVAL` | 30 s | Period of the no-traffic backstop probe in the live floor guard. Primary detection is in-band (gapped delivery); this only fires when no deliveries arrive. |

### ObjectStoreTransport / ExportLease

| Setting                              | Default | Effect                                                                              |
| ------------------------------------ | ------- | ----------------------------------------------------------------------------------- |
| `prefix`                             | —       | Object-store key namespace; all payloads + pointer live under this prefix           |
| `allow_non_atomic_pointer`           | false   | Opt-in for `file://` stores that lack CAS (dev/test only); FAILS CLOSED by default  |
| `ExportLease::ttl`                   | —       | Round period; embedded in `expires_at`; determines prune grace (`4 × ttl`)          |

**Prune grace is `4 × lease_ttl`:** protects payloads a concurrent publisher may still reference (one TTL) plus in-flight bootstrap readers (three more TTLs of headroom). Increase if bootstrap download latency approaches `ttl`.

### Per-backend durability

| Backend          | Config     | Default    | Effect                                                                 |
| ---------------- | ---------- | ---------- | ---------------------------------------------------------------------- |
| `AppendLogSnapshot` | —       | page-cache | `checkpoint()` flushes to OS page cache; `compact()` fsyncs the new file |
| `FjallSnapshot`  | `sync`     | false      | `true` → fsync on every `apply` batch; `false` → OS page cache         |
| `RocksDbSnapshot` | `sync`    | false      | `true` → fsync on every `apply` WriteBatch; WAL always on              |