Skip to main content

slipstream/
kv.rs

1use async_trait::async_trait;
2use std::fmt;
3use tokio::sync::mpsc::Sender;
4
5/// Opaque position in a watch stream for resuming after disconnect.
6///
7/// Backends store whatever they need to resume (NATS: u64 revision).
8/// Callers should treat this as opaque and only pass it back to
9/// `watch_all_from` / `watch_prefix_from`.
10#[derive(Debug, Clone, Default, PartialEq, Eq)]
11pub struct WatchCursor(VersionToken);
12
13impl WatchCursor {
14    /// No cursor — forces a full watch on next connect.
15    pub fn none() -> Self {
16        Self(VersionToken::unknown())
17    }
18
19    /// Returns true if this cursor has no position (will trigger full watch).
20    pub fn is_none(&self) -> bool {
21        self.0.is_unknown()
22    }
23
24    /// Create a cursor from a version token.
25    pub fn from_version(token: VersionToken) -> Self {
26        Self(token)
27    }
28
29    /// Create a cursor from a u64 revision (convenience for NATS).
30    pub fn from_u64(rev: u64) -> Self {
31        Self(VersionToken::from_u64(rev))
32    }
33
34    /// Try to extract as u64 revision.
35    #[must_use]
36    pub fn as_u64(&self) -> Option<u64> {
37        self.0.as_u64()
38    }
39
40    /// Access the underlying version token.
41    pub(crate) fn version(&self) -> &VersionToken {
42        &self.0
43    }
44}
45
46/// Error type for KV operations.
47///
48/// `KvError` is `Clone` so a single failure can fan out to multiple waiters
49/// (e.g. callers blocked on a shared connect result). The underlying backend
50/// errors — `std::io::Error`, the `async-nats` error types — are *not* `Clone`,
51/// so their detail is flattened into the message string at this boundary rather
52/// than retained as a `#[source]` cause. Keeping `KvError: Clone` across the
53/// object-safe `async_trait` surface is the deliberate trade-off; the cost is a
54/// structured cause chain, which is why the `String` variants carry pre-rendered
55/// context instead of a nested error.
56#[derive(Debug, Clone, thiserror::Error)]
57pub enum KvError {
58    #[error("store not connected")]
59    NotConnected,
60    #[error("connection failed: {0}")]
61    ConnectionFailed(String),
62    #[error("key not found")]
63    KeyNotFound,
64    /// Key already exists (create-if-not-exists conflict).
65    #[error("key already exists")]
66    AlreadyExists,
67    /// CAS conflict: current version doesn't match expected.
68    #[error("revision mismatch")]
69    RevisionMismatch,
70    #[error("deserialization error: {0}")]
71    DeserializationError(String),
72    #[error("serialization error: {0}")]
73    SerializationError(String),
74    #[error("watch error: {0}")]
75    WatchError(String),
76    #[error("operation failed: {0}")]
77    OperationFailed(String),
78    #[error("operation timed out")]
79    Timeout,
80    /// The watch cursor/revision is too old — the backend has compacted past it.
81    /// Callers should fall back to a full scan + watch.
82    #[error("watch cursor expired (compacted)")]
83    CursorExpired,
84}
85
86/// Opaque version token that abstracts store-specific versioning.
87///
88/// Different stores use different versioning schemes:
89/// - NATS: 8-byte u64 revision
90/// - FDB: 10-byte versionstamp
91/// - Redis: could be stream ID + sequence
92///
93/// Stored inline (no heap allocation) — fits up to 10 bytes, which covers
94/// every current backend.
95#[derive(Clone, Default, PartialEq, Eq, Hash)]
96pub struct VersionToken {
97    len: u8,
98    buf: [u8; 10],
99}
100
101impl fmt::Debug for VersionToken {
102    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
103        let bytes = self.as_bytes();
104        if let Some(v) = self.as_u64() {
105            write!(f, "VersionToken(u64: {v})")
106        } else if bytes.is_empty() {
107            write!(f, "VersionToken(unknown)")
108        } else {
109            write!(f, "VersionToken({bytes:?})")
110        }
111    }
112}
113
114impl VersionToken {
115    /// Create an empty/unknown version (for entries without version info).
116    pub fn unknown() -> Self {
117        Self::default()
118    }
119
120    /// Check if this is an unknown/empty version.
121    pub fn is_unknown(&self) -> bool {
122        self.len == 0
123    }
124
125    /// Create from NATS u64 revision.
126    pub fn from_u64(rev: u64) -> Self {
127        let mut buf = [0u8; 10];
128        buf[..8].copy_from_slice(&rev.to_be_bytes());
129        Self { len: 8, buf }
130    }
131
132    /// Create from FDB versionstamp (10 bytes).
133    ///
134    /// `cfg(test)` until a FoundationDB backend ships and the round-trip is
135    /// tested end-to-end: a 10-byte token has no `as_u64()`, so handing one to
136    /// the NATS backend's CAS path yields an unactionable `OperationFailed`.
137    /// Today it exists only for the snapshot length-prefixed-version tests; an
138    /// FDB backend should lift the gate (and the visibility) rather than add a
139    /// second constructor.
140    #[cfg(test)]
141    pub(crate) fn from_fdb_versionstamp(vs: &[u8; 10]) -> Self {
142        Self { len: 10, buf: *vs }
143    }
144
145    /// Try to extract as u64 (for NATS compatibility).
146    #[must_use]
147    pub fn as_u64(&self) -> Option<u64> {
148        if self.len == 8 {
149            Some(u64::from_be_bytes(self.buf[..8].try_into().unwrap_or_else(
150                |_| unreachable!("len == 8 guarantees an 8-byte slice"),
151            )))
152        } else {
153            None
154        }
155    }
156
157    /// Get the raw bytes.
158    pub fn as_bytes(&self) -> &[u8] {
159        &self.buf[..self.len as usize]
160    }
161
162    /// Create from raw bytes (crate-internal, e.g. snapshot deserialization).
163    ///
164    /// Returns `None` if `bytes` exceeds the 10-byte inline capacity. Silently
165    /// truncating instead would store a version that differs from the real
166    /// revision, causing every later CAS to fail with `RevisionMismatch` and no
167    /// actionable error — so an oversized token is rejected at the boundary
168    /// rather than absorbed. Callers parse a length-prefixed field that is
169    /// structurally bounded to 10 bytes, so `None` is unreachable in practice;
170    /// returning it (instead of panicking) keeps the failure mode a recoverable
171    /// format error for any future caller that lacks that guard.
172    #[must_use]
173    pub(crate) fn from_raw(bytes: &[u8]) -> Option<Self> {
174        if bytes.len() > 10 {
175            return None;
176        }
177        let len = bytes.len() as u8;
178        let mut buf = [0u8; 10];
179        buf[..len as usize].copy_from_slice(bytes);
180        Some(Self { len, buf })
181    }
182}
183
184/// A single key-value entry with metadata.
185#[derive(Debug, Clone)]
186pub struct KvEntry {
187    pub key: String,
188    pub value: Vec<u8>,
189    pub version: VersionToken,
190}
191
192/// Update event from a watch stream.
193#[derive(Debug, Clone)]
194pub enum KvUpdate {
195    /// Key was created or updated.
196    Put(KvEntry),
197    /// Key was deleted.
198    Delete { key: String, version: VersionToken },
199    /// Key was purged (NATS-specific: all history removed).
200    /// Stores without purge semantics should map this to Delete.
201    Purge { key: String, version: VersionToken },
202}
203
204impl KvUpdate {
205    /// Get the key affected by this update.
206    pub fn key(&self) -> &str {
207        match self {
208            KvUpdate::Put(e) => &e.key,
209            KvUpdate::Delete { key, .. } => key,
210            KvUpdate::Purge { key, .. } => key,
211        }
212    }
213
214    /// Get the version of this update.
215    pub fn version(&self) -> &VersionToken {
216        match self {
217            KvUpdate::Put(e) => &e.version,
218            KvUpdate::Delete { version, .. } => version,
219            KvUpdate::Purge { version, .. } => version,
220        }
221    }
222}
223
224/// Core read-only KV operations - the minimal interface every store must implement.
225#[async_trait]
226pub trait KvReader: Send + Sync {
227    /// Get a value by key. Returns `None` if the key doesn't exist.
228    ///
229    /// Backends that use empty-value tombstones (NATS: `delete_with_version`
230    /// writes an empty-value Put so concurrent CAS writers still conflict) also
231    /// return `None` for a *stored* empty value — `get()` cannot tell a real
232    /// `b""` apart from a tombstone. A caller using zero-length values as a
233    /// presence signal (locks, feature flags) must use [`entry`](Self::entry),
234    /// which exposes the raw record including empty-value Puts.
235    async fn get(&self, key: &str) -> Result<Option<KvEntry>, KvError>;
236
237    /// Get all keys matching a prefix. Returns keys only, not values.
238    async fn keys(&self, prefix: &str) -> Result<Vec<String>, KvError>;
239
240    /// Get multiple entries by prefix. Useful for bulk loading.
241    async fn scan(&self, prefix: &str) -> Result<Vec<KvEntry>, KvError>;
242
243    /// Get the raw entry for a key, including tombstones (empty-value Put
244    /// entries written by `delete_with_version`). Most callers should use
245    /// `get()` instead, which filters tombstones for consistency with `scan()`.
246    ///
247    /// REQUIRED (no default) — deliberately. A default delegating to `get()`
248    /// silently hid tombstones on any backend that forgot to override it,
249    /// which breaks CAS callers that need the tombstone's version: e.g.
250    /// [`ExportLease::try_acquire`](crate::ExportLease::try_acquire) reads an
251    /// abandoned (CAS-deleted) lease's version through `entry()` for its
252    /// takeover write — with a `get()` default it would see `None` and report
253    /// the round as live instead of stealing it. Backends without empty-value
254    /// tombstone semantics (where delete genuinely removes the key) should
255    /// implement this as a delegation to `get()` — explicitly, so the choice
256    /// is a reviewed decision rather than an inherited footgun.
257    async fn entry(&self, key: &str) -> Result<Option<KvEntry>, KvError>;
258}
259
260/// Watch capability - optional, not all stores support real-time updates.
261///
262/// The non-`_from` watches are **state-sync** streams: they first deliver the
263/// current value of every matching key (the "re-list", as a stream of puts plus
264/// any surviving delete markers), then live updates. A consumer starting with
265/// no cursor therefore converges on the full bucket state without a separate
266/// scan — and without the scan-to-watch race a separate scan would open. The
267/// `_from` variants skip the re-list and deliver only the delta past the cursor.
268#[async_trait]
269pub trait KvWatcher: Send + Sync {
270    /// Watch all keys: current state first, then live changes. Sends updates
271    /// through the channel. Returns when the watch ends or an error occurs.
272    async fn watch_all(&self, tx: Sender<KvUpdate>) -> Result<(), KvError>;
273
274    /// Watch keys matching a prefix: current state first, then live changes.
275    async fn watch_prefix(&self, prefix: &str, tx: Sender<KvUpdate>) -> Result<(), KvError>;
276
277    /// Watch keys matching ANY of `prefixes`, delivered through one channel.
278    ///
279    /// The contract is exactly the union of the prefixes — no other keys. A
280    /// backend with native multi-filter consumers (NATS server 2.10+) serves all
281    /// `prefixes` from a SINGLE consumer; that matters because consumers are a
282    /// per-stream resource (measured at ~tens of KB of server state each, growing
283    /// super-linearly past a few thousand on one stream), so a watcher scoped to N
284    /// prefixes must not cost N consumers.
285    async fn watch_prefixes(&self, prefixes: &[&str], tx: Sender<KvUpdate>) -> Result<(), KvError>;
286
287    /// Resume watching all keys from a previously saved cursor position.
288    ///
289    /// Returns `KvError::CursorExpired` if the backend has compacted past the
290    /// cursor — callers should fall back to a full `watch_all()`.
291    ///
292    /// Default implementation ignores the cursor and delegates to `watch_all()`.
293    async fn watch_all_from(
294        &self,
295        cursor: &WatchCursor,
296        tx: Sender<KvUpdate>,
297    ) -> Result<(), KvError> {
298        let _ = cursor;
299        self.watch_all(tx).await
300    }
301
302    /// Resume watching keys with a prefix from a previously saved cursor.
303    ///
304    /// Default implementation ignores the cursor and delegates to `watch_prefix()`.
305    async fn watch_prefix_from(
306        &self,
307        prefix: &str,
308        cursor: &WatchCursor,
309        tx: Sender<KvUpdate>,
310    ) -> Result<(), KvError> {
311        let _ = cursor;
312        self.watch_prefix(prefix, tx).await
313    }
314
315    /// Resume watching the union of `prefixes` from a previously saved cursor.
316    ///
317    /// Same single-consumer contract as [`watch_prefixes`](Self::watch_prefixes),
318    /// same delta semantics as the other `_from` variants: only updates past the
319    /// cursor are delivered, or [`KvError::CursorExpired`] if the backend has
320    /// compacted past it.
321    ///
322    /// Default implementation ignores the cursor and delegates to
323    /// `watch_prefixes()` — correct (the state-sync re-list is a superset of any
324    /// delta) but a full replay; backends that can seek a multi-filter stream
325    /// should override it.
326    async fn watch_prefixes_from(
327        &self,
328        prefixes: &[&str],
329        cursor: &WatchCursor,
330        tx: Sender<KvUpdate>,
331    ) -> Result<(), KvError> {
332        let _ = cursor;
333        self.watch_prefixes(prefixes, tx).await
334    }
335}
336
337/// Write operations - optional, edge proxy is primarily read-only.
338#[async_trait]
339pub trait KvWriter: Send + Sync {
340    /// Put a value. Returns the new version token.
341    async fn put(&self, key: &str, value: &[u8]) -> Result<VersionToken, KvError>;
342
343    /// Delete a key. Best-effort: may return `true` even if the key did not
344    /// exist (NATS does not report pre-existence). Use `get()` first if you
345    /// need to distinguish "deleted something" from "nothing to delete".
346    async fn delete(&self, key: &str) -> Result<bool, KvError>;
347
348    /// Create a key only if it doesn't exist.
349    /// Returns `AlreadyExists` if the key has a live value.
350    async fn create(&self, key: &str, value: &[u8]) -> Result<VersionToken, KvError>;
351
352    /// Compare-and-swap: update only if current version matches `expected`.
353    /// Returns `RevisionMismatch` on conflict.
354    async fn update(
355        &self,
356        key: &str,
357        value: &[u8],
358        expected: &VersionToken,
359    ) -> Result<VersionToken, KvError>;
360
361    /// CAS-gated delete: delete only if current version matches `expected`.
362    /// Returns `RevisionMismatch` on conflict.
363    /// Writes an empty value (logical delete) so concurrent writers get a conflict.
364    async fn delete_with_version(
365        &self,
366        key: &str,
367        expected: &VersionToken,
368    ) -> Result<bool, KvError>;
369}
370
371/// TTL support - optional, for stores that support key expiration.
372#[async_trait]
373pub trait KvTtl: KvWriter {
374    /// Put a value with TTL. Value expires after duration.
375    async fn put_with_ttl(
376        &self,
377        key: &str,
378        value: &[u8],
379        ttl: std::time::Duration,
380    ) -> Result<VersionToken, KvError>;
381}
382
383/// Purge support - optional, for stores that can reclaim a key's storage.
384///
385/// Unlike [`KvWriter::delete`] (which writes a delete marker) and
386/// [`KvWriter::delete_with_version`] (which writes an empty-value tombstone),
387/// `purge` removes a key *and reclaims its bytes*. On NATS this issues a
388/// rollup (`Nats-Rollup: sub`) that drops all prior revisions of the subject,
389/// so the bytes stop counting against the stream's `max_bytes`.
390///
391/// Use this to bound a bucket that has no `max_age`: dead keys deleted with
392/// `delete`/`delete_with_version` accumulate forever, but purged keys are
393/// reclaimed.
394#[async_trait]
395pub trait KvPurge: KvWriter {
396    /// Purge a key, reclaiming its storage. Idempotent: purging an absent key
397    /// is not an error.
398    async fn purge(&self, key: &str) -> Result<(), KvError>;
399}
400
401#[cfg(test)]
402mod tests {
403    use super::*;
404
405    #[test]
406    fn from_raw_roundtrips_within_capacity() {
407        // The largest token any backend uses is a 10-byte FDB versionstamp.
408        let bytes = [1u8, 2, 3, 4, 5, 6, 7, 8, 9, 10];
409        let token = VersionToken::from_raw(&bytes).expect("10 bytes is within capacity");
410        assert_eq!(token.as_bytes(), &bytes);
411
412        // An 8-byte token is still interpretable as a NATS u64 revision.
413        let rev = 0x0102_0304_0506_0708u64;
414        let token = VersionToken::from_raw(&rev.to_be_bytes()).expect("8 bytes is within capacity");
415        assert_eq!(token.as_u64(), Some(rev));
416
417        // Empty input is the "unknown" token.
418        assert!(
419            VersionToken::from_raw(&[])
420                .expect("empty is within capacity")
421                .is_unknown()
422        );
423    }
424
425    #[test]
426    fn from_raw_rejects_above_capacity() {
427        // 11 bytes exceeds the 10-byte inline buffer. This guards against a
428        // loosened `parse_cursor` bound ever feeding oversized data through —
429        // returning `None` surfaces the format/backend mismatch at its origin
430        // instead of silently truncating into a wrong revision.
431        assert!(VersionToken::from_raw(&[0u8; 11]).is_none());
432    }
433}