Skip to main content

zlayer_agent/
kv.rs

1//! Reusable in-memory key-value store
2//!
3//! This module provides [`KvStore`], a thread-safe, cloneable, in-memory
4//! key-value store with optional per-key TTL, quota enforcement, atomic
5//! increment / compare-and-swap, and a broadcast-based watch capability.
6//!
7//! It was extracted from the WASM `DefaultHost` so external consumers (daemons,
8//! tests, other runtimes) can use the same store natively. The WASM host now
9//! delegates to this type, so behaviour is identical across both paths.
10//!
11//! # Sharing
12//!
13//! [`KvStore`] holds its state behind an [`Arc`], so cloning a `KvStore`
14//! produces a handle to the *same* underlying store. Writes go through interior
15//! mutability (a [`parking_lot::RwLock`]), which is why the mutating methods
16//! take `&self` rather than `&mut self` — a clone can write through to the
17//! shared store concurrently.
18//!
19//! # Watch
20//!
21//! Every successful mutation publishes a [`KvEvent`] over a
22//! [`tokio::sync::broadcast`] channel. Use [`KvStore::subscribe`] for the raw
23//! receiver or [`KvStore::watch_prefix`] for a filtered [`futures_util::Stream`].
24
25use std::collections::HashMap;
26use std::sync::Arc;
27
28use futures_util::{Stream, StreamExt};
29use parking_lot::RwLock;
30use tokio::sync::broadcast;
31use tokio_stream::wrappers::BroadcastStream;
32
33/// Default maximum value size in bytes (1 MiB).
34const DEFAULT_MAX_VALUE_SIZE: usize = 1024 * 1024;
35
36/// Default maximum number of keys.
37const DEFAULT_MAX_KEYS: usize = 10000;
38
39/// Capacity of the watch broadcast channel.
40const WATCH_CHANNEL_CAPACITY: usize = 1024;
41
42/// Key-value storage error types matching WIT kv-error variant
43#[derive(Debug, Clone, PartialEq, Eq)]
44pub enum KvError {
45    /// Key not found
46    NotFound,
47    /// Value too large
48    ValueTooLarge,
49    /// Storage quota exceeded
50    QuotaExceeded,
51    /// Key format invalid
52    InvalidKey,
53    /// Generic storage error
54    Storage(String),
55}
56
57impl std::fmt::Display for KvError {
58    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
59        match self {
60            KvError::NotFound => write!(f, "key not found"),
61            KvError::ValueTooLarge => write!(f, "value too large"),
62            KvError::QuotaExceeded => write!(f, "storage quota exceeded"),
63            KvError::InvalidKey => write!(f, "invalid key format"),
64            KvError::Storage(msg) => write!(f, "storage error: {msg}"),
65        }
66    }
67}
68
69impl std::error::Error for KvError {}
70
71/// Key-value entry with optional TTL
72#[derive(Debug, Clone)]
73pub struct KvEntry {
74    /// Raw stored bytes
75    pub value: Vec<u8>,
76    /// Optional expiry instant; `None` means the entry never expires
77    pub expires_at: Option<std::time::Instant>,
78}
79
80impl KvEntry {
81    /// Create a new entry without an expiry.
82    #[must_use]
83    pub fn new(value: Vec<u8>) -> Self {
84        Self {
85            value,
86            expires_at: None,
87        }
88    }
89
90    /// Create a new entry that expires `ttl_ns` nanoseconds from now.
91    #[must_use]
92    pub fn with_ttl(value: Vec<u8>, ttl_ns: u64) -> Self {
93        let expires_at = Some(std::time::Instant::now() + std::time::Duration::from_nanos(ttl_ns));
94        Self { value, expires_at }
95    }
96
97    /// Whether this entry's TTL has elapsed.
98    #[must_use]
99    pub fn is_expired(&self) -> bool {
100        self.expires_at
101            .is_some_and(|exp| std::time::Instant::now() >= exp)
102    }
103}
104
105/// The kind of mutation that produced a [`KvEvent`].
106#[derive(Debug, Clone, PartialEq, Eq)]
107pub enum KvEventKind {
108    /// A key was created or updated.
109    Set,
110    /// A key was removed.
111    Delete,
112}
113
114/// A change event published to watchers when the store is mutated.
115#[derive(Debug, Clone)]
116pub struct KvEvent {
117    /// The affected key.
118    pub key: String,
119    /// What kind of change occurred.
120    pub kind: KvEventKind,
121    /// The new value for `Set` events; `None` for `Delete`.
122    pub value: Option<Vec<u8>>,
123}
124
125/// A pluggable asynchronous backend for [`KvStore`].
126///
127/// When a backend is installed via [`KvStore::with_backend`], the store's
128/// `*_async` methods delegate their data operations to the backend instead of
129/// the local in-memory map. This is how "cluster mode" works: a Raft-backed
130/// implementation can route reads and writes through consensus while the
131/// existing synchronous local path (used by the WASM host) stays untouched.
132///
133/// Implementors are responsible for their own key validation, quota
134/// enforcement, and TTL semantics — the local fallback continues to apply the
135/// store's own rules, but a clustered backend defines its own.
136#[async_trait::async_trait]
137pub trait KvBackend: Send + Sync + std::fmt::Debug {
138    /// Get a value by key. Returns `None` if the key is missing or expired.
139    ///
140    /// # Errors
141    ///
142    /// Returns a [`KvError`] when the backend rejects the key or fails to read.
143    async fn get(&self, key: &str) -> Result<Option<Vec<u8>>, KvError>;
144
145    /// Set a value.
146    ///
147    /// # Errors
148    ///
149    /// Returns a [`KvError`] when the backend rejects the key/value or fails to
150    /// write.
151    async fn set(&self, key: &str, value: &[u8]) -> Result<(), KvError>;
152
153    /// Set a value with a TTL in nanoseconds.
154    ///
155    /// # Errors
156    ///
157    /// Returns a [`KvError`] when the backend rejects the key/value or fails to
158    /// write.
159    async fn set_with_ttl(&self, key: &str, value: &[u8], ttl_ns: u64) -> Result<(), KvError>;
160
161    /// Delete a key. Returns `true` if the key existed and was deleted.
162    ///
163    /// # Errors
164    ///
165    /// Returns a [`KvError`] when the backend rejects the key or fails to write.
166    async fn delete(&self, key: &str) -> Result<bool, KvError>;
167
168    /// Check if a key exists (and has not expired).
169    ///
170    /// # Errors
171    ///
172    /// Returns a [`KvError`] when the backend rejects the key or fails to read.
173    async fn exists(&self, key: &str) -> Result<bool, KvError>;
174
175    /// List all non-expired keys with a given prefix.
176    ///
177    /// # Errors
178    ///
179    /// Returns a [`KvError`] when the backend fails to read.
180    async fn list_keys(&self, prefix: &str) -> Result<Vec<String>, KvError>;
181
182    /// Increment a numeric value atomically, returning the new value.
183    ///
184    /// # Errors
185    ///
186    /// Returns a [`KvError`] when the backend rejects the key, the existing
187    /// value is not a valid integer, or the write fails.
188    async fn increment(&self, key: &str, delta: i64) -> Result<i64, KvError>;
189
190    /// Compare-and-swap: set `new` only if the current value equals `expected`.
191    /// Returns `true` if the swap succeeded.
192    ///
193    /// # Errors
194    ///
195    /// Returns a [`KvError`] when the backend rejects the key/value or the write
196    /// fails.
197    async fn compare_and_swap(
198        &self,
199        key: &str,
200        expected: Option<&[u8]>,
201        new: &[u8],
202    ) -> Result<bool, KvError>;
203}
204
205/// Thread-safe, cloneable, in-memory key-value store.
206///
207/// Cloning yields a handle to the same underlying state (see module docs).
208#[derive(Clone)]
209pub struct KvStore {
210    inner: Arc<RwLock<HashMap<String, KvEntry>>>,
211    max_value_size: usize,
212    max_keys: usize,
213    events: broadcast::Sender<KvEvent>,
214    /// Optional pluggable backend. When `Some`, the `*_async` methods delegate
215    /// data operations here instead of the local in-memory map.
216    backend: Option<Arc<dyn KvBackend>>,
217}
218
219impl std::fmt::Debug for KvStore {
220    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
221        f.debug_struct("KvStore")
222            .field("len", &self.inner.read().len())
223            .field("max_value_size", &self.max_value_size)
224            .field("max_keys", &self.max_keys)
225            .field("clustered", &self.backend.is_some())
226            .finish_non_exhaustive()
227    }
228}
229
230impl Default for KvStore {
231    fn default() -> Self {
232        Self::new()
233    }
234}
235
236impl KvStore {
237    /// Create a new store with default limits (1 MiB values, 10000 keys).
238    #[must_use]
239    pub fn new() -> Self {
240        let (events, _rx) = broadcast::channel(WATCH_CHANNEL_CAPACITY);
241        Self {
242            inner: Arc::new(RwLock::new(HashMap::new())),
243            max_value_size: DEFAULT_MAX_VALUE_SIZE,
244            max_keys: DEFAULT_MAX_KEYS,
245            events,
246            backend: None,
247        }
248    }
249
250    /// Builder-style setter that installs a pluggable async [`KvBackend`].
251    ///
252    /// With a backend installed, the store runs in "cluster mode": the
253    /// `*_async` methods delegate their data operations to the backend instead
254    /// of the local in-memory map. The synchronous methods are unaffected and
255    /// continue to operate on the local map.
256    #[must_use]
257    pub fn with_backend(mut self, backend: Arc<dyn KvBackend>) -> Self {
258        self.backend = Some(backend);
259        self
260    }
261
262    /// Whether this store is running in cluster mode (a backend is installed).
263    #[must_use]
264    pub fn is_clustered(&self) -> bool {
265        self.backend.is_some()
266    }
267
268    /// Builder-style setter for the maximum value size in bytes.
269    #[must_use]
270    pub fn with_max_value_size(mut self, size: usize) -> Self {
271        self.max_value_size = size;
272        self
273    }
274
275    /// Builder-style setter for the maximum number of keys.
276    #[must_use]
277    pub fn with_max_keys(mut self, count: usize) -> Self {
278        self.max_keys = count;
279        self
280    }
281
282    /// Set the maximum value size in bytes.
283    pub fn set_max_value_size(&mut self, size: usize) {
284        self.max_value_size = size;
285    }
286
287    /// Set the maximum number of keys.
288    pub fn set_max_keys(&mut self, count: usize) {
289        self.max_keys = count;
290    }
291
292    /// The configured maximum value size in bytes.
293    #[must_use]
294    pub fn max_value_size(&self) -> usize {
295        self.max_value_size
296    }
297
298    /// The configured maximum number of keys.
299    #[must_use]
300    pub fn max_keys(&self) -> usize {
301        self.max_keys
302    }
303
304    /// Validate key format.
305    ///
306    /// Keys must be non-empty, at most 1024 bytes, and contain only
307    /// alphanumeric characters or one of `-_./:`.
308    ///
309    /// # Errors
310    ///
311    /// Returns [`KvError::InvalidKey`] when the key fails validation.
312    pub fn validate_key(key: &str) -> Result<(), KvError> {
313        if key.is_empty() {
314            return Err(KvError::InvalidKey);
315        }
316        if key.len() > 1024 {
317            return Err(KvError::InvalidKey);
318        }
319        // Allow alphanumeric, dash, underscore, dot, slash, colon
320        if !key
321            .chars()
322            .all(|c| c.is_alphanumeric() || "-_./:".contains(c))
323        {
324            return Err(KvError::InvalidKey);
325        }
326        Ok(())
327    }
328
329    /// Remove all expired entries from the store.
330    pub fn clean_expired(&self) {
331        let mut kv = self.inner.write();
332        kv.retain(|_, entry| !entry.is_expired());
333    }
334
335    /// Emit a watch event, ignoring the error when there are no subscribers.
336    fn emit(&self, key: &str, kind: KvEventKind, value: Option<Vec<u8>>) {
337        let _ = self.events.send(KvEvent {
338            key: key.to_string(),
339            kind,
340            value,
341        });
342    }
343
344    /// Get a value by key.
345    ///
346    /// Returns `None` if the key is missing or expired.
347    ///
348    /// # Errors
349    ///
350    /// Returns [`KvError::InvalidKey`] when the key is invalid.
351    pub fn get(&self, key: &str) -> Result<Option<Vec<u8>>, KvError> {
352        Self::validate_key(key)?;
353        self.clean_expired();
354
355        let kv = self.inner.read();
356        match kv.get(key) {
357            Some(entry) if !entry.is_expired() => Ok(Some(entry.value.clone())),
358            _ => Ok(None),
359        }
360    }
361
362    /// Get a value as a UTF-8 string.
363    ///
364    /// # Errors
365    ///
366    /// Returns [`KvError::InvalidKey`] when the key is invalid, or
367    /// [`KvError::Storage`] when the stored bytes are not valid UTF-8.
368    pub fn get_string(&self, key: &str) -> Result<Option<String>, KvError> {
369        match self.get(key)? {
370            Some(bytes) => String::from_utf8(bytes)
371                .map(Some)
372                .map_err(|e| KvError::Storage(format!("invalid UTF-8: {e}"))),
373            None => Ok(None),
374        }
375    }
376
377    /// Set a value.
378    ///
379    /// # Errors
380    ///
381    /// Returns [`KvError::InvalidKey`] for an invalid key,
382    /// [`KvError::ValueTooLarge`] when the value exceeds the configured limit,
383    /// or [`KvError::QuotaExceeded`] when adding a new key would exceed the
384    /// configured key count.
385    pub fn set(&self, key: &str, value: &[u8]) -> Result<(), KvError> {
386        Self::validate_key(key)?;
387
388        if value.len() > self.max_value_size {
389            return Err(KvError::ValueTooLarge);
390        }
391
392        {
393            let mut kv = self.inner.write();
394
395            // Check quota (only if adding new key)
396            if !kv.contains_key(key) && kv.len() >= self.max_keys {
397                return Err(KvError::QuotaExceeded);
398            }
399
400            kv.insert(key.to_string(), KvEntry::new(value.to_vec()));
401        }
402
403        self.emit(key, KvEventKind::Set, Some(value.to_vec()));
404        Ok(())
405    }
406
407    /// Set a string value.
408    ///
409    /// # Errors
410    ///
411    /// See [`KvStore::set`].
412    pub fn set_string(&self, key: &str, value: &str) -> Result<(), KvError> {
413        self.set(key, value.as_bytes())
414    }
415
416    /// Set a value with a TTL in nanoseconds.
417    ///
418    /// # Errors
419    ///
420    /// See [`KvStore::set`].
421    pub fn set_with_ttl(&self, key: &str, value: &[u8], ttl_ns: u64) -> Result<(), KvError> {
422        Self::validate_key(key)?;
423
424        if value.len() > self.max_value_size {
425            return Err(KvError::ValueTooLarge);
426        }
427
428        {
429            let mut kv = self.inner.write();
430
431            // Check quota (only if adding new key)
432            if !kv.contains_key(key) && kv.len() >= self.max_keys {
433                return Err(KvError::QuotaExceeded);
434            }
435
436            kv.insert(key.to_string(), KvEntry::with_ttl(value.to_vec(), ttl_ns));
437        }
438
439        self.emit(key, KvEventKind::Set, Some(value.to_vec()));
440        Ok(())
441    }
442
443    /// Delete a key.
444    ///
445    /// Returns `true` if the key existed and was deleted.
446    ///
447    /// # Errors
448    ///
449    /// Returns [`KvError::InvalidKey`] when the key is invalid.
450    pub fn delete(&self, key: &str) -> Result<bool, KvError> {
451        Self::validate_key(key)?;
452
453        let removed = {
454            let mut kv = self.inner.write();
455            kv.remove(key).is_some()
456        };
457
458        if removed {
459            self.emit(key, KvEventKind::Delete, None);
460        }
461        Ok(removed)
462    }
463
464    /// Check if a key exists (and has not expired).
465    #[must_use]
466    pub fn exists(&self, key: &str) -> bool {
467        self.clean_expired();
468        let kv = self.inner.read();
469        kv.get(key).is_some_and(|e| !e.is_expired())
470    }
471
472    /// List all non-expired keys with a given prefix.
473    ///
474    /// # Errors
475    ///
476    /// This implementation never fails, but returns `Result` for parity with
477    /// the WASM host interface and future backends.
478    pub fn list_keys(&self, prefix: &str) -> Result<Vec<String>, KvError> {
479        self.clean_expired();
480        let kv = self.inner.read();
481        Ok(kv
482            .iter()
483            .filter(|(k, entry)| k.starts_with(prefix) && !entry.is_expired())
484            .map(|(k, _)| k.clone())
485            .collect())
486    }
487
488    /// Increment a numeric value atomically, returning the new value.
489    ///
490    /// A missing or expired key is treated as `0`. The arithmetic saturates.
491    ///
492    /// # Errors
493    ///
494    /// Returns [`KvError::InvalidKey`] for an invalid key,
495    /// [`KvError::Storage`] when the existing value is not a valid integer, or
496    /// [`KvError::QuotaExceeded`] when adding a new key would exceed the quota.
497    pub fn increment(&self, key: &str, delta: i64) -> Result<i64, KvError> {
498        Self::validate_key(key)?;
499
500        let (new_value, bytes) = {
501            let mut kv = self.inner.write();
502
503            let current: i64 = match kv.get(key) {
504                Some(entry) if !entry.is_expired() => {
505                    let s = String::from_utf8(entry.value.clone())
506                        .map_err(|e| KvError::Storage(format!("invalid number: {e}")))?;
507                    s.parse()
508                        .map_err(|e| KvError::Storage(format!("invalid number: {e}")))?
509                }
510                _ => 0,
511            };
512
513            let new_value = current.saturating_add(delta);
514            let value_str = new_value.to_string();
515
516            // Check quota (only if adding new key)
517            if !kv.contains_key(key) && kv.len() >= self.max_keys {
518                return Err(KvError::QuotaExceeded);
519            }
520
521            let bytes = value_str.into_bytes();
522            kv.insert(key.to_string(), KvEntry::new(bytes.clone()));
523            (new_value, bytes)
524        };
525
526        self.emit(key, KvEventKind::Set, Some(bytes));
527        Ok(new_value)
528    }
529
530    /// Compare-and-swap: set `new_value` only if the current value equals
531    /// `expected`.
532    ///
533    /// Returns `true` if the swap succeeded, `false` if the current value did
534    /// not match `expected`.
535    ///
536    /// # Errors
537    ///
538    /// Returns [`KvError::InvalidKey`] for an invalid key,
539    /// [`KvError::ValueTooLarge`] when `new_value` exceeds the configured limit,
540    /// or [`KvError::QuotaExceeded`] when adding a new key would exceed the
541    /// quota.
542    pub fn compare_and_swap(
543        &self,
544        key: &str,
545        expected: Option<&[u8]>,
546        new_value: &[u8],
547    ) -> Result<bool, KvError> {
548        Self::validate_key(key)?;
549
550        if new_value.len() > self.max_value_size {
551            return Err(KvError::ValueTooLarge);
552        }
553
554        let swapped = {
555            let mut kv = self.inner.write();
556
557            let current = kv.get(key).and_then(|e| {
558                if e.is_expired() {
559                    None
560                } else {
561                    Some(e.value.as_slice())
562                }
563            });
564
565            if current == expected {
566                // Check quota (only if adding new key)
567                if current.is_none() && kv.len() >= self.max_keys {
568                    return Err(KvError::QuotaExceeded);
569                }
570                kv.insert(key.to_string(), KvEntry::new(new_value.to_vec()));
571                true
572            } else {
573                false
574            }
575        };
576
577        if swapped {
578            self.emit(key, KvEventKind::Set, Some(new_value.to_vec()));
579        }
580        Ok(swapped)
581    }
582
583    /// Async [`KvStore::get`]: delegates to the backend in cluster mode,
584    /// otherwise runs the local sync logic.
585    ///
586    /// # Errors
587    ///
588    /// Propagates errors from the backend, or [`KvError::InvalidKey`] for an
589    /// invalid key on the local path.
590    pub async fn get_async(&self, key: &str) -> Result<Option<Vec<u8>>, KvError> {
591        match &self.backend {
592            Some(b) => b.get(key).await,
593            None => self.get(key),
594        }
595    }
596
597    /// Async [`KvStore::set`]: delegates to the backend in cluster mode,
598    /// otherwise runs the local sync logic.
599    ///
600    /// # Errors
601    ///
602    /// Propagates errors from the backend, or the local [`KvStore::set`] errors.
603    pub async fn set_async(&self, key: &str, value: &[u8]) -> Result<(), KvError> {
604        match &self.backend {
605            Some(b) => b.set(key, value).await,
606            None => self.set(key, value),
607        }
608    }
609
610    /// Async [`KvStore::set_with_ttl`]: delegates to the backend in cluster
611    /// mode, otherwise runs the local sync logic.
612    ///
613    /// # Errors
614    ///
615    /// Propagates errors from the backend, or the local
616    /// [`KvStore::set_with_ttl`] errors.
617    pub async fn set_with_ttl_async(
618        &self,
619        key: &str,
620        value: &[u8],
621        ttl_ns: u64,
622    ) -> Result<(), KvError> {
623        match &self.backend {
624            Some(b) => b.set_with_ttl(key, value, ttl_ns).await,
625            None => self.set_with_ttl(key, value, ttl_ns),
626        }
627    }
628
629    /// Async [`KvStore::delete`]: delegates to the backend in cluster mode,
630    /// otherwise runs the local sync logic.
631    ///
632    /// # Errors
633    ///
634    /// Propagates errors from the backend, or [`KvError::InvalidKey`] for an
635    /// invalid key on the local path.
636    pub async fn delete_async(&self, key: &str) -> Result<bool, KvError> {
637        match &self.backend {
638            Some(b) => b.delete(key).await,
639            None => self.delete(key),
640        }
641    }
642
643    /// Async existence check: delegates to the backend in cluster mode,
644    /// otherwise wraps the infallible local [`KvStore::exists`].
645    ///
646    /// # Errors
647    ///
648    /// Propagates errors from the backend. The local path never fails.
649    pub async fn exists_async(&self, key: &str) -> Result<bool, KvError> {
650        match &self.backend {
651            Some(b) => b.exists(key).await,
652            None => Ok(self.exists(key)),
653        }
654    }
655
656    /// Async [`KvStore::list_keys`]: delegates to the backend in cluster mode,
657    /// otherwise runs the local sync logic.
658    ///
659    /// # Errors
660    ///
661    /// Propagates errors from the backend. The local path never fails.
662    pub async fn list_keys_async(&self, prefix: &str) -> Result<Vec<String>, KvError> {
663        match &self.backend {
664            Some(b) => b.list_keys(prefix).await,
665            None => self.list_keys(prefix),
666        }
667    }
668
669    /// Async [`KvStore::increment`]: delegates to the backend in cluster mode,
670    /// otherwise runs the local sync logic.
671    ///
672    /// # Errors
673    ///
674    /// Propagates errors from the backend, or the local [`KvStore::increment`]
675    /// errors.
676    pub async fn increment_async(&self, key: &str, delta: i64) -> Result<i64, KvError> {
677        match &self.backend {
678            Some(b) => b.increment(key, delta).await,
679            None => self.increment(key, delta),
680        }
681    }
682
683    /// Async [`KvStore::compare_and_swap`]: delegates to the backend in cluster
684    /// mode, otherwise runs the local sync logic.
685    ///
686    /// # Errors
687    ///
688    /// Propagates errors from the backend, or the local
689    /// [`KvStore::compare_and_swap`] errors.
690    pub async fn compare_and_swap_async(
691        &self,
692        key: &str,
693        expected: Option<&[u8]>,
694        new: &[u8],
695    ) -> Result<bool, KvError> {
696        match &self.backend {
697            Some(b) => b.compare_and_swap(key, expected, new).await,
698            None => self.compare_and_swap(key, expected, new),
699        }
700    }
701
702    /// Remove all entries from the store.
703    pub fn clear(&self) {
704        self.inner.write().clear();
705    }
706
707    /// Subscribe to all change events.
708    ///
709    /// Returns a raw broadcast receiver. Slow consumers may observe
710    /// [`broadcast::error::RecvError::Lagged`].
711    #[must_use]
712    pub fn subscribe(&self) -> broadcast::Receiver<KvEvent> {
713        self.events.subscribe()
714    }
715
716    /// Watch for change events whose key starts with `prefix`.
717    ///
718    /// Returns a [`Stream`] that yields matching [`KvEvent`]s. Lagged events
719    /// (dropped because a slow consumer fell behind) are skipped.
720    pub fn watch_prefix(&self, prefix: impl Into<String>) -> impl Stream<Item = KvEvent> {
721        let prefix = prefix.into();
722        BroadcastStream::new(self.events.subscribe()).filter_map(move |res| {
723            let prefix = prefix.clone();
724            async move {
725                match res {
726                    Ok(event) if event.key.starts_with(&prefix) => Some(event),
727                    _ => None,
728                }
729            }
730        })
731    }
732}
733
734/// Process-global slot holding the daemon's shared [`KvStore`] handle.
735///
736/// Populated once by the daemon via [`set_global_kv`] so in-process native
737/// consumers can reach the *same* cluster-backed KV the daemon runs, instead of
738/// constructing their own (inert) store.
739static GLOBAL_KV: std::sync::OnceLock<KvStore> = std::sync::OnceLock::new();
740
741/// Publish the daemon's [`KvStore`] handle to the process-global slot.
742///
743/// Intended to be called exactly once during daemon startup. Because the slot is
744/// write-once, subsequent calls are ignored (a `tracing::warn!` is emitted) so
745/// this is idempotent and never panics. The slot retains the handle for the
746/// lifetime of the process; since `KvStore` is `Arc`-backed, the stored handle
747/// keeps the underlying backend alive.
748pub fn set_global_kv(store: KvStore) {
749    if GLOBAL_KV.set(store).is_err() {
750        tracing::warn!("global KvStore already set; ignoring duplicate set_global_kv call");
751    }
752}
753
754/// Fetch a clone of the daemon's process-global [`KvStore`], if one has been
755/// published via [`set_global_kv`].
756///
757/// This is the hook for in-process native KV consumers (for example a
758/// redis-protocol module, or the `Z3Fungi` brain linked into `zlayer-agent`) to
759/// obtain the same store the daemon uses. `KvStore` is `Clone`/`Arc`-backed, so
760/// the returned clone shares the same backend and data as the daemon's handle.
761///
762/// The returned store reflects the daemon's mode: it is cluster-backed when the
763/// daemon is running clustered, and a plain local in-memory store otherwise.
764/// Returns `None` if the daemon has not yet published its handle.
765#[must_use]
766pub fn global_kv() -> Option<KvStore> {
767    GLOBAL_KV.get().cloned()
768}
769
770#[cfg(test)]
771mod tests {
772    use super::*;
773    use futures_util::StreamExt;
774
775    #[test]
776    fn set_and_get() {
777        let store = KvStore::new();
778        store.set("foo", b"bar").unwrap();
779        assert_eq!(store.get("foo").unwrap(), Some(b"bar".to_vec()));
780        assert_eq!(store.get_string("foo").unwrap(), Some("bar".to_string()));
781    }
782
783    #[test]
784    fn get_missing_returns_none() {
785        let store = KvStore::new();
786        assert_eq!(store.get("missing").unwrap(), None);
787    }
788
789    #[test]
790    fn ttl_expiry() {
791        let store = KvStore::new();
792        // 1ms TTL.
793        store.set_with_ttl("temp", b"v", 1_000_000).unwrap();
794        std::thread::sleep(std::time::Duration::from_millis(5));
795        assert_eq!(store.get("temp").unwrap(), None);
796        assert!(!store.exists("temp"));
797    }
798
799    #[test]
800    fn delete_reports_existence() {
801        let store = KvStore::new();
802        store.set("k", b"v").unwrap();
803        assert!(store.delete("k").unwrap());
804        assert!(!store.delete("k").unwrap());
805        assert_eq!(store.get("k").unwrap(), None);
806    }
807
808    #[test]
809    fn list_keys_prefix() {
810        let store = KvStore::new();
811        store.set("a/1", b"1").unwrap();
812        store.set("a/2", b"2").unwrap();
813        store.set("b/1", b"3").unwrap();
814        let mut keys = store.list_keys("a/").unwrap();
815        keys.sort();
816        assert_eq!(keys, vec!["a/1".to_string(), "a/2".to_string()]);
817    }
818
819    #[test]
820    fn increment() {
821        let store = KvStore::new();
822        assert_eq!(store.increment("counter", 5).unwrap(), 5);
823        assert_eq!(store.increment("counter", 3).unwrap(), 8);
824        assert_eq!(store.increment("counter", -10).unwrap(), -2);
825    }
826
827    #[test]
828    fn increment_saturates() {
829        let store = KvStore::new();
830        store.set("c", i64::MAX.to_string().as_bytes()).unwrap();
831        assert_eq!(store.increment("c", 1).unwrap(), i64::MAX);
832    }
833
834    #[test]
835    fn compare_and_swap_hit_and_miss() {
836        let store = KvStore::new();
837        // Swap from absent (expected None).
838        assert!(store.compare_and_swap("k", None, b"v1").unwrap());
839        // Hit.
840        assert!(store.compare_and_swap("k", Some(b"v1"), b"v2").unwrap());
841        // Miss (wrong expected).
842        assert!(!store.compare_and_swap("k", Some(b"v1"), b"v3").unwrap());
843        assert_eq!(store.get("k").unwrap(), Some(b"v2".to_vec()));
844    }
845
846    #[test]
847    fn quota_exceeded() {
848        let store = KvStore::new().with_max_keys(2);
849        store.set("a", b"1").unwrap();
850        store.set("b", b"2").unwrap();
851        assert_eq!(store.set("c", b"3"), Err(KvError::QuotaExceeded));
852        // Overwriting an existing key is allowed even at quota.
853        assert!(store.set("a", b"x").is_ok());
854    }
855
856    #[test]
857    fn value_too_large() {
858        let store = KvStore::new().with_max_value_size(4);
859        assert_eq!(store.set("k", b"toolong"), Err(KvError::ValueTooLarge));
860    }
861
862    #[test]
863    fn invalid_key() {
864        let store = KvStore::new();
865        assert_eq!(store.set("", b"v"), Err(KvError::InvalidKey));
866        assert_eq!(store.set("bad key", b"v"), Err(KvError::InvalidKey));
867    }
868
869    #[test]
870    fn clone_shares_state() {
871        let a = KvStore::new();
872        let b = a.clone();
873        a.set("k", b"v").unwrap();
874        assert_eq!(b.get("k").unwrap(), Some(b"v".to_vec()));
875    }
876
877    #[tokio::test]
878    async fn watch_receives_set_event() {
879        let store = KvStore::new();
880        let mut rx = store.subscribe();
881        store.set("watched", b"hello").unwrap();
882        let event = rx.recv().await.unwrap();
883        assert_eq!(event.key, "watched");
884        assert_eq!(event.kind, KvEventKind::Set);
885        assert_eq!(event.value, Some(b"hello".to_vec()));
886    }
887
888    #[tokio::test]
889    async fn watch_prefix_filters() {
890        let store = KvStore::new();
891        let mut stream = Box::pin(store.watch_prefix("user/"));
892        store.set("other/1", b"x").unwrap();
893        store.set("user/1", b"y").unwrap();
894        let event = stream.next().await.unwrap();
895        assert_eq!(event.key, "user/1");
896        assert_eq!(event.value, Some(b"y".to_vec()));
897    }
898
899    #[tokio::test]
900    async fn watch_receives_delete_event() {
901        let store = KvStore::new();
902        store.set("k", b"v").unwrap();
903        let mut rx = store.subscribe();
904        store.delete("k").unwrap();
905        let event = rx.recv().await.unwrap();
906        assert_eq!(event.kind, KvEventKind::Delete);
907        assert_eq!(event.key, "k");
908        assert_eq!(event.value, None);
909    }
910
911    /// A tiny mock backend that stores values in its own map and records every
912    /// call. Used to prove that the `*_async` methods route to the backend (and
913    /// never touch the local map) when one is installed.
914    #[derive(Debug, Default)]
915    struct MockBackend {
916        map: std::sync::Mutex<HashMap<String, Vec<u8>>>,
917        calls: std::sync::Mutex<Vec<String>>,
918    }
919
920    impl MockBackend {
921        fn record(&self, op: &str) {
922            self.calls.lock().unwrap().push(op.to_string());
923        }
924
925        fn calls(&self) -> Vec<String> {
926            self.calls.lock().unwrap().clone()
927        }
928    }
929
930    #[async_trait::async_trait]
931    impl KvBackend for MockBackend {
932        async fn get(&self, key: &str) -> Result<Option<Vec<u8>>, KvError> {
933            self.record("get");
934            Ok(self.map.lock().unwrap().get(key).cloned())
935        }
936
937        async fn set(&self, key: &str, value: &[u8]) -> Result<(), KvError> {
938            self.record("set");
939            self.map
940                .lock()
941                .unwrap()
942                .insert(key.to_string(), value.to_vec());
943            Ok(())
944        }
945
946        async fn set_with_ttl(&self, key: &str, value: &[u8], _ttl_ns: u64) -> Result<(), KvError> {
947            self.record("set_with_ttl");
948            self.map
949                .lock()
950                .unwrap()
951                .insert(key.to_string(), value.to_vec());
952            Ok(())
953        }
954
955        async fn delete(&self, key: &str) -> Result<bool, KvError> {
956            self.record("delete");
957            Ok(self.map.lock().unwrap().remove(key).is_some())
958        }
959
960        async fn exists(&self, key: &str) -> Result<bool, KvError> {
961            self.record("exists");
962            Ok(self.map.lock().unwrap().contains_key(key))
963        }
964
965        async fn list_keys(&self, prefix: &str) -> Result<Vec<String>, KvError> {
966            self.record("list_keys");
967            Ok(self
968                .map
969                .lock()
970                .unwrap()
971                .keys()
972                .filter(|k| k.starts_with(prefix))
973                .cloned()
974                .collect())
975        }
976
977        async fn increment(&self, key: &str, delta: i64) -> Result<i64, KvError> {
978            self.record("increment");
979            let mut map = self.map.lock().unwrap();
980            let current: i64 = map
981                .get(key)
982                .map_or(0, |v| String::from_utf8_lossy(v).parse().unwrap_or(0));
983            let new = current + delta;
984            map.insert(key.to_string(), new.to_string().into_bytes());
985            Ok(new)
986        }
987
988        async fn compare_and_swap(
989            &self,
990            key: &str,
991            expected: Option<&[u8]>,
992            new: &[u8],
993        ) -> Result<bool, KvError> {
994            self.record("compare_and_swap");
995            let mut map = self.map.lock().unwrap();
996            let current = map.get(key).map(Vec::as_slice);
997            if current == expected {
998                map.insert(key.to_string(), new.to_vec());
999                Ok(true)
1000            } else {
1001                Ok(false)
1002            }
1003        }
1004    }
1005
1006    #[test]
1007    fn is_clustered_reflects_backend() {
1008        let local = KvStore::new();
1009        assert!(!local.is_clustered());
1010        let clustered = KvStore::new().with_backend(Arc::new(MockBackend::default()));
1011        assert!(clustered.is_clustered());
1012    }
1013
1014    #[tokio::test]
1015    async fn async_routes_to_backend_when_clustered() {
1016        let backend = Arc::new(MockBackend::default());
1017        let store = KvStore::new().with_backend(backend.clone());
1018
1019        // Writes/reads go to the backend, not the local map.
1020        store.set_async("foo", b"bar").await.unwrap();
1021        assert_eq!(store.get_async("foo").await.unwrap(), Some(b"bar".to_vec()));
1022        assert!(store.exists_async("foo").await.unwrap());
1023
1024        store.set_with_ttl_async("ttlk", b"v", 1_000).await.unwrap();
1025        assert_eq!(store.increment_async("counter", 5).await.unwrap(), 5);
1026        assert_eq!(store.increment_async("counter", 3).await.unwrap(), 8);
1027        assert!(store
1028            .compare_and_swap_async("cas", None, b"v1")
1029            .await
1030            .unwrap());
1031
1032        let mut keys = store.list_keys_async("").await.unwrap();
1033        keys.sort();
1034        assert_eq!(
1035            keys,
1036            vec![
1037                "cas".to_string(),
1038                "counter".to_string(),
1039                "foo".to_string(),
1040                "ttlk".to_string(),
1041            ]
1042        );
1043
1044        assert!(store.delete_async("foo").await.unwrap());
1045        assert!(!store.exists_async("foo").await.unwrap());
1046
1047        // The local in-memory map must remain completely untouched.
1048        assert_eq!(store.get("foo").unwrap(), None);
1049        assert!(!store.exists("counter"));
1050        assert_eq!(store.list_keys("").unwrap(), Vec::<String>::new());
1051
1052        // Every async op was recorded by the backend.
1053        let calls = backend.calls();
1054        for op in [
1055            "set",
1056            "get",
1057            "exists",
1058            "set_with_ttl",
1059            "increment",
1060            "compare_and_swap",
1061            "list_keys",
1062            "delete",
1063        ] {
1064            assert!(
1065                calls.contains(&op.to_string()),
1066                "missing backend call: {op}"
1067            );
1068        }
1069    }
1070
1071    #[tokio::test]
1072    async fn async_uses_local_when_not_clustered() {
1073        let store = KvStore::new();
1074
1075        store.set_async("foo", b"bar").await.unwrap();
1076        // Async write landed in the local map (visible via the sync getter).
1077        assert_eq!(store.get("foo").unwrap(), Some(b"bar".to_vec()));
1078        // And matches the sync result exactly.
1079        assert_eq!(
1080            store.get_async("foo").await.unwrap(),
1081            store.get("foo").unwrap()
1082        );
1083
1084        assert_eq!(
1085            store.exists_async("foo").await.unwrap(),
1086            store.exists("foo")
1087        );
1088
1089        store
1090            .set_with_ttl_async("ttlk", b"v", 1_000_000_000)
1091            .await
1092            .unwrap();
1093        assert!(store.exists("ttlk"));
1094
1095        assert_eq!(store.increment_async("c", 4).await.unwrap(), 4);
1096        assert_eq!(store.increment("c", 0).unwrap(), 4);
1097
1098        assert!(store
1099            .compare_and_swap_async("cas", None, b"v1")
1100            .await
1101            .unwrap());
1102        assert_eq!(store.get("cas").unwrap(), Some(b"v1".to_vec()));
1103
1104        let mut a = store.list_keys_async("").await.unwrap();
1105        let mut b = store.list_keys("").unwrap();
1106        a.sort();
1107        b.sort();
1108        assert_eq!(a, b);
1109
1110        assert!(store.delete_async("foo").await.unwrap());
1111        assert_eq!(store.get("foo").unwrap(), None);
1112    }
1113
1114    #[test]
1115    fn global_kv_accessor_shares_state() {
1116        // `GLOBAL_KV` is a process-global `OnceLock`; another test in this
1117        // binary may already have set it. So we do NOT assert it is `None`
1118        // before set (that would be order-dependent and flaky), and we only
1119        // assert the share-semantics on whatever handle is published.
1120        set_global_kv(KvStore::new());
1121
1122        let a = global_kv().expect("global KvStore should be set after set_global_kv");
1123        let b = global_kv().expect("global KvStore should still be set");
1124
1125        // Both clones must share the same backend: a write through one clone is
1126        // visible through an independently-fetched clone.
1127        a.set("global-kv-share-test", b"shared").unwrap();
1128        assert_eq!(
1129            b.get("global-kv-share-test").unwrap(),
1130            Some(b"shared".to_vec()),
1131            "writes through one global_kv() clone must be visible through another"
1132        );
1133    }
1134}