# Slipstream
You have config in NATS JetStream: routing tables, TLS certs, WASM configs. Edge nodes need a local copy, kept in sync, that survives restarts without replaying the full stream.
Slipstream materializes a NATS JetStream KV bucket into a local fold on each consumer. A watch cursor (a stream sequence number) tracks position in the change stream; on restart, only the delta since the last checkpoint arrives from NATS.
NATS is a bounded log. Entries are evicted past `max_bytes` and `max_age`. Once retention compacts past a cursor, there is no replay path from NATS. The local fold is the durable state; folds across the fleet are the only full replicas.
```
NATS JetStream KV
┌──────────────────────────────────────────────────────┐
│ [evicted] ◄──── seq 998 seq 999 seq 1000 ──────► │ max_bytes / max_age
└───────────────────────────┬──────────────────────────┘
│ KvUpdate stream
▼
watch_applied()
┌─────────────────────┐
│ parse │
│ apply() │ ← your domain logic
│ cursor = seq 1000 │ advances after apply() returns
└──────────┬──────────┘
│
┌──────────▼──────────┐
│ local fold │ folds are the only full replicas
│ cursor = 1000 │ once NATS evicts past cursor
└──────────┬──────────┘
┌───────────────┴──────────────────────┐
│ │
▼ ▼
restart new node / cursor expired
resume from cursor = 1000 export fold to object storage
NATS delivers seq 1001+ only import + resume from embedded cursor
```
The cursor advances after `apply()` returns, not on receipt. A crash between delivery and application re-delivers on the next start instead of silently skipping. `watch_applied` enforces this invariant.
For folds that outgrow RAM, `fjall` (pure Rust) and `rocksdb` backends hold state on disk.
## Install
```toml
[dependencies]
beyond-slipstream = "0.5"
```
On-disk snapshot backends are opt-in cargo features:
```toml
beyond-slipstream = { version = "0.5", features = ["fjall"] } # pure-Rust LSM, no C toolchain
beyond-slipstream = { version = "0.5", features = ["rocksdb"] } # RocksDB (requires C++ toolchain + libclang)
beyond-slipstream = { version = "0.5", features = ["transport"] } # export/import via object_store (S3, GCS, local)
```
## Concepts
| `Connection` | NATS connection lifecycle + store factory |
| `KvStore` | Named bucket. Vends reader, watcher, writer |
| `KvReader` | Point-in-time reads: `get`, `entry`, `keys`, `scan` |
| `KvWatcher` | Live update stream via channel |
| `KvWriter` | Write, soft-delete, CAS (`create`, `update`, `delete_with_version`) |
| `WatchCursor` | Opaque position in a watch stream. Save it; pass it back on reconnect |
| `VersionToken` | Opaque version — NATS: u64 revision; FDB: 10-byte versionstamp |
| `KvEntry` | One key + value + version from a read |
| `KvUpdate` | One watch event: `Put`, `Delete`, or `Purge` |
| `Snapshot` | Deduplicated KV state + cursor at a point in time. Disk cache, not source of truth |
| `SnapshotWriter` | Append-only log of `KvUpdate`s; survives restarts without a full NATS scan |
| `SnapshotStore` | Trait: the durable-fold contract — `apply` (data + cursor, atomically), `load`, `get`, `range` |
| `AppendLogSnapshot` | Default `SnapshotStore`: the append-only log + an in-RAM fold (pure-Rust, small state) |
| `FjallSnapshot` | On-disk `SnapshotStore` for folds too large for RAM; queryable (`feature = "fjall"`) |
| `RocksDbSnapshot` | Same contract on RocksDB, for consumers who prefer the C++ LSM (`feature = "rocksdb"`) |
| `watch_applied` | Watch loop that advances the cursor only after your `apply` returns, folding into any `SnapshotStore` |
| `ConnectionCapabilities` | Feature flags for runtime branching (CAS, streaming watch, global ordering) |
## Usage
### Connect
```rust
use slipstream::{Connection, NatsConnection, NatsConnectionConfig};
let conn = NatsConnection::new(NatsConnectionConfig {
url: "nats://localhost:4222".into(),
creds: None,
creds_file: None,
});
conn.connect().await?;
```
### Open a store
```rust
use slipstream::{StoreConfig, StorageType};
use std::time::Duration;
let store = conn.store_with_config(StoreConfig {
name: "nodes".into(),
storage: StorageType::Persistent,
max_bytes: Some(512 * 1024 * 1024), // required by Synadia Cloud
max_history: Some(1),
max_age: Some(Duration::from_secs(30 * 24 * 3600)),
num_replicas: Some(3), // HA clusters
..Default::default()
}).await?;
```
`max_bytes` is required on Synadia Cloud. Omit only for self-hosted NATS.
### Read
```rust
use slipstream::KvReader;
let reader = store.reader();
// Single key — filters tombstones; use entry() to include them for CAS
if let Some(entry) = reader.get("node.us-east-1").await? {
println!("{}: {:?}", entry.key, entry.version);
}
// All entries under prefix
// Uses DeliverPolicy::LastPerSubject: one NATS consumer, not N round-trips.
let entries = reader.scan("node.").await?;
// Key names only (no value transfer)
let keys = reader.keys("node.").await?;
```
### Write
```rust
use slipstream::KvWriter;
let writer = store.writer().expect("store is writable");
// Unconditional write
let version = writer.put("node.us-east-1", &payload).await?;
// Create only. Returns AlreadyExists if key has a live value.
let version = writer.create("lock.migration", &payload).await?;
// CAS update. Returns RevisionMismatch if version doesn't match.
let new_version = writer.update("node.us-east-1", &payload, &version).await?;
// CAS delete. Returns RevisionMismatch on conflict.
writer.delete_with_version("node.us-east-1", &version).await?;
// Best-effort delete — returns Ok(true) even if key didn't exist.
writer.delete("node.us-east-1").await?;
```
### Watch
```rust
use slipstream::{KvUpdate, KvWatcher};
let watcher = store.watcher().expect("store supports streaming");
let (tx, mut rx) = tokio::sync::mpsc::channel(128);
// Watches are state-sync streams: the current value of every matching key is
// delivered first (as puts), then live updates. No separate scan needed — and
// no scan-to-watch race window.
//
// watch_all blocks until the stream ends — run it in a separate task
tokio::spawn(async move {
watcher.watch_all(tx).await.unwrap();
});
while let Some(update) = rx.recv().await {
match update {
KvUpdate::Put(entry) => { /* ... */ }
KvUpdate::Delete { key, version } => { /* ... */ }
KvUpdate::Purge { key, version } => { /* ... */ }
}
}
```
Dropping `rx` cancels the watch. The watcher task exits and unsubscribes automatically.
### Resumable watch
The cursor is a sequence number. Persist it; pass it back on reconnect. NATS delivers only the delta since that position.
```rust
let cursor = load_cursor().unwrap_or(WatchCursor::none());
match watcher.watch_all_from(&cursor, tx.clone()).await {
Ok(()) => {}
Err(KvError::CursorExpired) => {
// NATS compacted past the cursor. Full replay required.
watcher.watch_all(tx).await?;
}
Err(e) => return Err(e.into()),
}
```
`watch_prefix_from()` works the same way for prefix-filtered streams, and
`watch_prefixes_from()` resumes the union of several prefixes on one
multi-filter consumer.
## Snapshot
For services that cache KV state locally, the snapshot persists both state and cursor to disk. On restart, load the snapshot and resume the watch from its cursor — only the delta since the last checkpoint arrives from NATS.
### Startup
```rust
use slipstream::snapshot;
if let Some(snap) = snapshot::load(Path::new("/var/lib/svc/state.snap"))? {
for (key, entry) in snap.entries {
cache.insert(key, entry.value);
}
watcher.watch_all_from(&snap.cursor, tx).await?;
} else {
watcher.watch_all(tx).await?;
}
```
### Runtime
```rust
use slipstream::snapshot::SnapshotWriter;
let mut snap = SnapshotWriter::open(
Path::new("/var/lib/svc/state.snap"),
10 * 1024 * 1024, // compact after 10MB of appended records
)?;
while let Some(update) = rx.recv().await {
cache.apply(&update);
snap.write_update(&update); // buffered, no I/O
// checkpoint() flushes + syncs to disk; returns true when compaction is due
if snap.checkpoint(¤t_cursor)? {
// compact() is blocking I/O; run via spawn_blocking in async contexts
tokio::task::spawn_blocking(move || snap.compact()).await??;
}
}
```
This loop has a trap: `current_cursor` must track what `cache.apply()` has consumed, not what `rx.recv()` delivered. Get it wrong and a crash skips updates on resume. [`watch_applied`](#applied-watch) runs this loop for you with that invariant enforced.
The snapshot is a cache. Delete it and the service falls back to full replay on next start.
### File format
```
Header: b"PGSS" ++ version:u16le
Record: crc32:u32le ++ type:u8 ++ payload
```
| `REC_PUT` | 0x01 | key_len:u16le ++ key ++ value_len:u32le ++ value ++ ver_len:u8 ++ version_bytes |
| `REC_DELETE` | 0x02 | key_len:u16le ++ key ++ ver_len:u8 ++ version_bytes |
| `REC_CURSOR` | 0x03 | cursor_len:u8 ++ cursor bytes |
`version_bytes` is the raw [`VersionToken`] bytes (≤10), not a fixed u64, so NATS revisions (8 bytes) and FDB versionstamps (10 bytes) both round-trip intact.
A truncated final record (crash mid-write) is discarded; earlier records are intact. A CRC failure mid-file returns `SnapshotError::Corrupted`.
### Pluggable backends
The durable fold is a trait, [`SnapshotStore`], so a consumer picks where its fold lives. The contract is small — apply a batch and advance the cursor *atomically*, resume from the cursor on restart, and query the result:
```rust
pub trait SnapshotStore: Sized + Send {
fn load(path: &Path) -> Result<(WatchCursor, Self), SnapshotError>;
fn apply(&mut self, batch: &[KvUpdate], cursor: &WatchCursor) -> Result<(), SnapshotError>;
fn get(&self, key: &str) -> Result<Option<KvEntry>, SnapshotError>;
fn range(&self, prefix: &str) -> Result<Vec<KvEntry>, SnapshotError>;
}
```
Every backend keeps the same invariants: the fold is a pure function of the log (delete the store, replay from the cursor, get identical state), the cursor never names a revision whose data isn't durable (cursor-after-apply), and the store is a cache — a tail lost to power loss is rebuilt by resuming the watch.
| `AppendLogSnapshot` | **Default.** Fold fits in RAM (edge/tunnel-style services) | Pure-Rust, the append-only log above plus an in-RAM map serving `get`/`range`. No extra dependencies. |
| `FjallSnapshot` | Fold too large for RAM (e.g. routing at ~1B keys) | On-disk [fjall](https://docs.rs/fjall) LSM, `feature = "fjall"`. Pure-Rust. Each `apply` is one atomic batch (data **and** cursor); durability (NO_SYNC vs fsync) is configurable. |
| `RocksDbSnapshot` | Same as `FjallSnapshot`, preferring the battle-tested C++ LSM and its tooling (`ldb`, `sst_dump`) | On-disk [RocksDB](https://docs.rs/rust-rocksdb), `feature = "rocksdb"`. Each `apply` is one atomic `WriteBatch` (data **and** cursor); WAL always on, per-commit fsync configurable. Tuned for billion-key route folds (hit-optimized ribbon filters, partitioned index, zstd bottommost, batched `multi_get`). Builds C++ (needs a toolchain + libclang). |
Pick a backend, then hand it to [`watch_applied`](#applied-watch) — `load` returns the resume cursor alongside the store:
```rust
use slipstream::{AppendLogSnapshot, SnapshotStore};
// Default in-RAM backend:
let (resume, store) = AppendLogSnapshot::load(Path::new("/var/lib/svc/state.snap"))?;
// Or, behind `feature = "fjall"`, an on-disk fold for a large consumer:
// let (resume, store) = FjallSnapshot::open(dir, FjallConfig { sync: false, ..Default::default() })?;
// Or the same on RocksDB, behind `feature = "rocksdb"`:
// let (resume, store) = RocksDbSnapshot::open(dir, RocksDbConfig { sync: false, ..Default::default() })?;
let final_cursor = watch_applied(
watcher, WatchScope::All, Some(resume),
Some(reader), // arms the cursor-expired stale-key resync; None to skip
Some(store), None, // store; export-request channel
BatchConfig::default(),
parse, apply, on_applied, shutdown,
).await?;
```
The trait stops at *durable fold + cursor + query*. Serving structures built from the fold (routing rings, hashrings, indexes) live in the consumer — query them out of the store with `get`/`range`. A consumer with a different engine can implement `SnapshotStore` itself; the rest of slipstream is unchanged.
## Applied watch
`watch_applied` drives the watch-batch-apply-checkpoint loop and enforces one rule the hand-rolled version can't: the cursor advances only after your `apply` returns, never on receipt. It is generic over the [`SnapshotStore`](#pluggable-backends) backend, so the consumer chooses where the durable fold lives (or `None` to run without persistence).
```rust
use slipstream::{watch_applied, AppendLogSnapshot, BatchConfig, KvUpdate, WatchCursor, WatchScope};
let final_cursor = watch_applied(
watcher,
WatchScope::All, // or Prefix("node.".into()) / Prefixes(vec![...])
Some(resume), // Option<WatchCursor> — resume here, or None
Some(reader), // Option<Arc<dyn KvReader>> — arms the
// cursor-expired stale-key resync, or None
Some(store), // any SnapshotStore (e.g. AppendLogSnapshot), or None
None, // Option<mpsc::Receiver<ExportRequest>> — live exports
BatchConfig::default(), // 10ms window, 100 updates per batch
|update: &KvUpdate| parse(update), // KvUpdate -> Option<U>; None just drops it
|batch: Vec<U>| cache.apply_batch(batch), // your only domain logic
|cursor: WatchCursor| persist(cursor), // fires after apply returns
shutdown, // tokio::sync::watch::Receiver<bool>
).await?;
```
A batch closes when `window` elapses or it hits `max` updates, whichever comes first. Then, in order: `apply(batch)` runs to completion, the cursor advances to the batch's highest revision, the batch + cursor are folded into the `store` atomically (on a blocking task), and `on_applied` fires.
Persist the cursor on receipt instead and a crash between receive and apply loses data: the cursor reads "caught up to rev N" while rev N sits in an unapplied buffer, and the next resume starts past it. `watch_applied` checkpoints at the applied cursor, so a persisted cursor always means every update up to it has been applied.
- `parse` returning `None` (corrupt bytes, irrelevant key) still advances the cursor — nothing to apply means nothing to skip.
- On `CursorExpired`, it falls back to a full watch automatically. With a `reader` wired, it first diffs the fold against the bucket's live keys and applies synthetic deletes for keys that vanished during the gap (their delete markers were evicted with the cursor) — the one case the fallback re-list can't cover.
- It returns the final applied cursor on shutdown or stream close.
`apply` runs inline. If it panics, the panic aborts the watch.
## NATS mapping
| Store | JetStream KV bucket (`KV_{name}` stream) |
| `VersionToken` | Per-key revision (u64, big-endian) |
| `WatchCursor` | NATS revision at last checkpoint |
| `delete()` | Writes empty value (soft delete). Always returns `Ok(true)` |
| `KvUpdate::Purge` | Hard delete: all history removed from stream |
| `scan()` | `DeliverPolicy::LastPerSubject`: one entry per key, one consumer |
| `watch_*()` | `DeliverPolicy::LastPerSubject`: current state, then live updates |
| `watch_prefix()` | Native NATS subject filter (`{prefix}>` wildcard) |
## Feature detection
```rust
let caps = conn.capabilities();
if caps.cas { /* safe to call create/update/delete_with_version */ }
if caps.streaming_watch { /* watcher() is Some */ }
if caps.prefix_watch { /* watch_prefix() uses a server-side filter */ }
if caps.global_ordering { /* VersionToken is globally ordered across keys */ }
```
## Errors
| `NotConnected` | Operation before `connect()` | Call `connect()` |
| `AlreadyExists` | `create()` on a live key | Read current state, decide |
| `RevisionMismatch` | CAS conflict on `update()` / `delete_with_version()` | Re-read, retry |
| `CursorExpired` | `watch_*_from()` cursor compacted by NATS | Fall back to `watch_all()` |
| `WatchError` | NATS stream dropped | Re-subscribe |
## Credentials
Priority order, first match wins:
1. `creds`: base64-encoded `.creds` content (containers, ECS)
2. `creds_file`: path to `.creds` on disk (bare-metal, local dev)
3. URL-embedded `user:pass@host`
4. No auth