slipstream/kv.rs
1use async_trait::async_trait;
2use std::fmt;
3use tokio::sync::mpsc::Sender;
4
5/// Opaque position in a watch stream for resuming after disconnect.
6///
7/// Backends store whatever they need to resume (NATS: u64 revision).
8/// Callers should treat this as opaque and only pass it back to
9/// `watch_all_from` / `watch_prefix_from`.
10#[derive(Debug, Clone, Default)]
11pub struct WatchCursor(VersionToken);
12
13impl WatchCursor {
14 /// No cursor — forces a full watch on next connect.
15 pub fn none() -> Self {
16 Self(VersionToken::unknown())
17 }
18
19 /// Returns true if this cursor has no position (will trigger full watch).
20 pub fn is_none(&self) -> bool {
21 self.0.is_unknown()
22 }
23
24 /// Create a cursor from a version token.
25 pub fn from_version(token: VersionToken) -> Self {
26 Self(token)
27 }
28
29 /// Create a cursor from a u64 revision (convenience for NATS).
30 pub fn from_u64(rev: u64) -> Self {
31 Self(VersionToken::from_u64(rev))
32 }
33
34 /// Try to extract as u64 revision.
35 #[must_use]
36 pub fn as_u64(&self) -> Option<u64> {
37 self.0.as_u64()
38 }
39
40 /// Access the underlying version token.
41 pub(crate) fn version(&self) -> &VersionToken {
42 &self.0
43 }
44}
45
46/// Error type for KV operations.
47///
48/// `KvError` is `Clone` so a single failure can fan out to multiple waiters
49/// (e.g. callers blocked on a shared connect result). The underlying backend
50/// errors — `std::io::Error`, the `async-nats` error types — are *not* `Clone`,
51/// so their detail is flattened into the message string at this boundary rather
52/// than retained as a `#[source]` cause. Keeping `KvError: Clone` across the
53/// object-safe `async_trait` surface is the deliberate trade-off; the cost is a
54/// structured cause chain, which is why the `String` variants carry pre-rendered
55/// context instead of a nested error.
56#[derive(Debug, Clone, thiserror::Error)]
57pub enum KvError {
58 #[error("store not connected")]
59 NotConnected,
60 #[error("connection failed: {0}")]
61 ConnectionFailed(String),
62 #[error("key not found")]
63 KeyNotFound,
64 /// Key already exists (create-if-not-exists conflict).
65 #[error("key already exists")]
66 AlreadyExists,
67 /// CAS conflict: current version doesn't match expected.
68 #[error("revision mismatch")]
69 RevisionMismatch,
70 #[error("deserialization error: {0}")]
71 DeserializationError(String),
72 #[error("serialization error: {0}")]
73 SerializationError(String),
74 #[error("watch error: {0}")]
75 WatchError(String),
76 #[error("operation failed: {0}")]
77 OperationFailed(String),
78 #[error("operation timed out")]
79 Timeout,
80 /// The watch cursor/revision is too old — the backend has compacted past it.
81 /// Callers should fall back to a full scan + watch.
82 #[error("watch cursor expired (compacted)")]
83 CursorExpired,
84}
85
86/// Opaque version token that abstracts store-specific versioning.
87///
88/// Different stores use different versioning schemes:
89/// - NATS: 8-byte u64 revision
90/// - FDB: 10-byte versionstamp
91/// - Redis: could be stream ID + sequence
92///
93/// Stored inline (no heap allocation) — fits up to 10 bytes, which covers
94/// every current backend.
95#[derive(Clone, Default, PartialEq, Eq, Hash)]
96pub struct VersionToken {
97 len: u8,
98 buf: [u8; 10],
99}
100
101impl fmt::Debug for VersionToken {
102 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
103 let bytes = self.as_bytes();
104 if let Some(v) = self.as_u64() {
105 write!(f, "VersionToken(u64: {v})")
106 } else if bytes.is_empty() {
107 write!(f, "VersionToken(unknown)")
108 } else {
109 write!(f, "VersionToken({bytes:?})")
110 }
111 }
112}
113
114impl VersionToken {
115 /// Create an empty/unknown version (for entries without version info).
116 pub fn unknown() -> Self {
117 Self::default()
118 }
119
120 /// Check if this is an unknown/empty version.
121 pub fn is_unknown(&self) -> bool {
122 self.len == 0
123 }
124
125 /// Create from NATS u64 revision.
126 pub fn from_u64(rev: u64) -> Self {
127 let mut buf = [0u8; 10];
128 buf[..8].copy_from_slice(&rev.to_be_bytes());
129 Self { len: 8, buf }
130 }
131
132 /// Create from FDB versionstamp (10 bytes).
133 ///
134 /// `pub(crate)` until a FoundationDB backend ships and the round-trip is
135 /// tested end-to-end: a 10-byte token has no `as_u64()`, so handing one to
136 /// the NATS backend's CAS path yields an unactionable `OperationFailed`.
137 /// Exposed within the crate for the snapshot length-prefixed-version tests.
138 // Only the test suite constructs one today; retained as the seam the FDB
139 // backend will use, so `allow(dead_code)` rather than deletion.
140 #[allow(dead_code)]
141 pub(crate) fn from_fdb_versionstamp(vs: &[u8; 10]) -> Self {
142 Self { len: 10, buf: *vs }
143 }
144
145 /// Try to extract as u64 (for NATS compatibility).
146 #[must_use]
147 pub fn as_u64(&self) -> Option<u64> {
148 if self.len == 8 {
149 Some(u64::from_be_bytes(self.buf[..8].try_into().unwrap_or_else(
150 |_| unreachable!("len == 8 guarantees an 8-byte slice"),
151 )))
152 } else {
153 None
154 }
155 }
156
157 /// Get the raw bytes.
158 pub fn as_bytes(&self) -> &[u8] {
159 &self.buf[..self.len as usize]
160 }
161
162 /// Create from raw bytes (crate-internal, e.g. snapshot deserialization).
163 ///
164 /// Returns `None` if `bytes` exceeds the 10-byte inline capacity. Silently
165 /// truncating instead would store a version that differs from the real
166 /// revision, causing every later CAS to fail with `RevisionMismatch` and no
167 /// actionable error — so an oversized token is rejected at the boundary
168 /// rather than absorbed. Callers parse a length-prefixed field that is
169 /// structurally bounded to 10 bytes, so `None` is unreachable in practice;
170 /// returning it (instead of panicking) keeps the failure mode a recoverable
171 /// format error for any future caller that lacks that guard.
172 #[must_use]
173 pub(crate) fn from_raw(bytes: &[u8]) -> Option<Self> {
174 if bytes.len() > 10 {
175 return None;
176 }
177 let len = bytes.len() as u8;
178 let mut buf = [0u8; 10];
179 buf[..len as usize].copy_from_slice(bytes);
180 Some(Self { len, buf })
181 }
182}
183
184/// A single key-value entry with metadata.
185#[derive(Debug, Clone)]
186pub struct KvEntry {
187 pub key: String,
188 pub value: Vec<u8>,
189 pub version: VersionToken,
190}
191
192/// Update event from a watch stream.
193#[derive(Debug, Clone)]
194pub enum KvUpdate {
195 /// Key was created or updated.
196 Put(KvEntry),
197 /// Key was deleted.
198 Delete { key: String, version: VersionToken },
199 /// Key was purged (NATS-specific: all history removed).
200 /// Stores without purge semantics should map this to Delete.
201 Purge { key: String, version: VersionToken },
202}
203
204impl KvUpdate {
205 /// Get the key affected by this update.
206 pub fn key(&self) -> &str {
207 match self {
208 KvUpdate::Put(e) => &e.key,
209 KvUpdate::Delete { key, .. } => key,
210 KvUpdate::Purge { key, .. } => key,
211 }
212 }
213
214 /// Get the version of this update.
215 pub fn version(&self) -> &VersionToken {
216 match self {
217 KvUpdate::Put(e) => &e.version,
218 KvUpdate::Delete { version, .. } => version,
219 KvUpdate::Purge { version, .. } => version,
220 }
221 }
222}
223
224/// Core read-only KV operations - the minimal interface every store must implement.
225#[async_trait]
226pub trait KvReader: Send + Sync {
227 /// Get a value by key. Returns `None` if the key doesn't exist.
228 ///
229 /// Backends that use empty-value tombstones (NATS: `delete_with_version`
230 /// writes an empty-value Put so concurrent CAS writers still conflict) also
231 /// return `None` for a *stored* empty value — `get()` cannot tell a real
232 /// `b""` apart from a tombstone. A caller using zero-length values as a
233 /// presence signal (locks, feature flags) must use [`entry`](Self::entry),
234 /// which exposes the raw record including empty-value Puts.
235 async fn get(&self, key: &str) -> Result<Option<KvEntry>, KvError>;
236
237 /// Get all keys matching a prefix. Returns keys only, not values.
238 async fn keys(&self, prefix: &str) -> Result<Vec<String>, KvError>;
239
240 /// Get multiple entries by prefix. Useful for bulk loading.
241 async fn scan(&self, prefix: &str) -> Result<Vec<KvEntry>, KvError>;
242
243 /// Get the raw entry for a key, including tombstones (empty-value Put
244 /// entries written by `delete_with_version`). Most callers should use
245 /// `get()` instead, which filters tombstones for consistency with `scan()`.
246 ///
247 /// Override in backends where tombstone version access is needed for
248 /// CAS conflict detection.
249 async fn entry(&self, key: &str) -> Result<Option<KvEntry>, KvError> {
250 self.get(key).await
251 }
252}
253
254/// Watch capability - optional, not all stores support real-time updates.
255#[async_trait]
256pub trait KvWatcher: Send + Sync {
257 /// Watch all keys for changes. Sends updates through the channel.
258 /// Returns when the watch ends or an error occurs.
259 async fn watch_all(&self, tx: Sender<KvUpdate>) -> Result<(), KvError>;
260
261 /// Watch keys matching a prefix.
262 async fn watch_prefix(&self, prefix: &str, tx: Sender<KvUpdate>) -> Result<(), KvError>;
263
264 /// Resume watching all keys from a previously saved cursor position.
265 ///
266 /// Returns `KvError::CursorExpired` if the backend has compacted past the
267 /// cursor — callers should fall back to a full `watch_all()`.
268 ///
269 /// Default implementation ignores the cursor and delegates to `watch_all()`.
270 async fn watch_all_from(
271 &self,
272 cursor: &WatchCursor,
273 tx: Sender<KvUpdate>,
274 ) -> Result<(), KvError> {
275 let _ = cursor;
276 self.watch_all(tx).await
277 }
278
279 /// Resume watching keys with a prefix from a previously saved cursor.
280 ///
281 /// Default implementation ignores the cursor and delegates to `watch_prefix()`.
282 async fn watch_prefix_from(
283 &self,
284 prefix: &str,
285 cursor: &WatchCursor,
286 tx: Sender<KvUpdate>,
287 ) -> Result<(), KvError> {
288 let _ = cursor;
289 self.watch_prefix(prefix, tx).await
290 }
291}
292
293/// Write operations - optional, edge proxy is primarily read-only.
294#[async_trait]
295pub trait KvWriter: Send + Sync {
296 /// Put a value. Returns the new version token.
297 async fn put(&self, key: &str, value: &[u8]) -> Result<VersionToken, KvError>;
298
299 /// Delete a key. Best-effort: may return `true` even if the key did not
300 /// exist (NATS does not report pre-existence). Use `get()` first if you
301 /// need to distinguish "deleted something" from "nothing to delete".
302 async fn delete(&self, key: &str) -> Result<bool, KvError>;
303
304 /// Create a key only if it doesn't exist.
305 /// Returns `AlreadyExists` if the key has a live value.
306 async fn create(&self, key: &str, value: &[u8]) -> Result<VersionToken, KvError>;
307
308 /// Compare-and-swap: update only if current version matches `expected`.
309 /// Returns `RevisionMismatch` on conflict.
310 async fn update(
311 &self,
312 key: &str,
313 value: &[u8],
314 expected: &VersionToken,
315 ) -> Result<VersionToken, KvError>;
316
317 /// CAS-gated delete: delete only if current version matches `expected`.
318 /// Returns `RevisionMismatch` on conflict.
319 /// Writes an empty value (logical delete) so concurrent writers get a conflict.
320 async fn delete_with_version(
321 &self,
322 key: &str,
323 expected: &VersionToken,
324 ) -> Result<bool, KvError>;
325}
326
327/// TTL support - optional, for stores that support key expiration.
328#[async_trait]
329pub trait KvTtl: KvWriter {
330 /// Put a value with TTL. Value expires after duration.
331 async fn put_with_ttl(
332 &self,
333 key: &str,
334 value: &[u8],
335 ttl: std::time::Duration,
336 ) -> Result<VersionToken, KvError>;
337}
338
339#[cfg(test)]
340mod tests {
341 use super::*;
342
343 #[test]
344 fn from_raw_roundtrips_within_capacity() {
345 // The largest token any backend uses is a 10-byte FDB versionstamp.
346 let bytes = [1u8, 2, 3, 4, 5, 6, 7, 8, 9, 10];
347 let token = VersionToken::from_raw(&bytes).expect("10 bytes is within capacity");
348 assert_eq!(token.as_bytes(), &bytes);
349
350 // An 8-byte token is still interpretable as a NATS u64 revision.
351 let rev = 0x0102_0304_0506_0708u64;
352 let token = VersionToken::from_raw(&rev.to_be_bytes()).expect("8 bytes is within capacity");
353 assert_eq!(token.as_u64(), Some(rev));
354
355 // Empty input is the "unknown" token.
356 assert!(
357 VersionToken::from_raw(&[])
358 .expect("empty is within capacity")
359 .is_unknown()
360 );
361 }
362
363 #[test]
364 fn from_raw_rejects_above_capacity() {
365 // 11 bytes exceeds the 10-byte inline buffer. This guards against a
366 // loosened `parse_cursor` bound ever feeding oversized data through —
367 // returning `None` surfaces the format/backend mismatch at its origin
368 // instead of silently truncating into a wrong revision.
369 assert!(VersionToken::from_raw(&[0u8; 11]).is_none());
370 }
371}