Skip to main content

entelix_memory/
store.rs

1//! `Store<V>` trait — namespace-scoped key/value storage that
2//! survives across threads (Tier 3 in the 3-tier state model).
3//!
4//! In-process default: [`InMemoryStore<V>`]. Postgres / Redis backed
5//! `Store` impls live in `entelix-persistence`.
6//!
7//! ## Production primitives
8//!
9//! - [`PutOptions`] — declarative per-write knobs. The only field
10//!   today is `ttl`; future additions ride on `#[non_exhaustive]`
11//!   without touching call sites.
12//! - [`Store::put`] is the simple hot path; [`Store::put_with_options`]
13//!   is the configurable form. `put` has a default impl that
14//!   delegates to `put_with_options(PutOptions::default())`.
15//! - [`Store::list_namespaces`] returns every [`Namespace`] under a
16//!   [`NamespacePrefix`] — the F2 / Invariant-11 boundary stays
17//!   structural for hierarchical traversal as well as point lookups.
18//! - [`Store::evict_expired`] is a default-`Ok(0)` hook that backends
19//!   override when they own a TTL sweeper. Operators run it on a
20//!   timer to bound store growth.
21
22use std::collections::HashMap;
23use std::sync::Arc;
24use std::time::{Duration, Instant};
25
26use async_trait::async_trait;
27use entelix_core::{ExecutionContext, Result};
28use parking_lot::Mutex;
29
30use crate::namespace::{Namespace, NamespacePrefix};
31
32/// Per-write knobs the operator may attach when calling
33/// [`Store::put_with_options`]. `Default::default()` corresponds to
34/// the simple [`Store::put`] path: no TTL, no extra metadata.
35#[derive(Clone, Debug, Default)]
36#[non_exhaustive]
37pub struct PutOptions {
38    /// Time-to-live for the entry. `None` = no expiry. Backends
39    /// without native TTL support emit the value with no expiry and
40    /// surface the request through [`Store::evict_expired`] sweeps.
41    pub ttl: Option<Duration>,
42}
43
44impl PutOptions {
45    /// Attach a TTL to this put. Builder-style.
46    #[must_use]
47    pub const fn with_ttl(mut self, ttl: Duration) -> Self {
48        self.ttl = Some(ttl);
49        self
50    }
51}
52
53/// Persistent (or in-memory) key/value store, scoped by [`Namespace`].
54///
55/// Every method takes [`ExecutionContext`] so remote backends can
56/// honour caller-side cancellation and deadlines (invariant
57/// "cancellation propagation"). In-memory impls accept the parameter
58/// for trait uniformity and otherwise ignore it.
59#[async_trait]
60pub trait Store<V>: Send + Sync + 'static
61where
62    V: Clone + Send + Sync + 'static,
63{
64    /// Insert or replace `value` at `(ns, key)` with the supplied
65    /// per-write options (TTL, future fields). This is the only
66    /// required write — [`Self::put`] is a thin convenience that
67    /// delegates here.
68    async fn put_with_options(
69        &self,
70        ctx: &ExecutionContext,
71        ns: &Namespace,
72        key: &str,
73        value: V,
74        options: PutOptions,
75    ) -> Result<()>;
76
77    /// Insert or replace `value` at `(ns, key)` with default options
78    /// (no TTL). The default impl delegates to
79    /// [`Self::put_with_options`] — backends only need to provide one.
80    async fn put(&self, ctx: &ExecutionContext, ns: &Namespace, key: &str, value: V) -> Result<()> {
81        self.put_with_options(ctx, ns, key, value, PutOptions::default())
82            .await
83    }
84
85    /// Look up `(ns, key)`. Returns `None` if absent or expired.
86    async fn get(&self, ctx: &ExecutionContext, ns: &Namespace, key: &str) -> Result<Option<V>>;
87
88    /// Delete `(ns, key)`. Idempotent — deleting an absent key
89    /// succeeds.
90    async fn delete(&self, ctx: &ExecutionContext, ns: &Namespace, key: &str) -> Result<()>;
91
92    /// List keys under `ns` whose names start with `prefix` (or all
93    /// keys if `prefix` is `None`). Order is unspecified.
94    async fn list(
95        &self,
96        ctx: &ExecutionContext,
97        ns: &Namespace,
98        prefix: Option<&str>,
99    ) -> Result<Vec<String>>;
100
101    /// List every [`Namespace`] under `prefix` that holds at least
102    /// one entry. The default impl returns an empty list — backends
103    /// override when they can enumerate cheaply (Postgres index
104    /// scan, Redis `SCAN`). Order is unspecified.
105    ///
106    /// Useful for "list all conversations under agent-X" or
107    /// admin tooling that audits per-tenant storage.
108    async fn list_namespaces(
109        &self,
110        _ctx: &ExecutionContext,
111        _prefix: &NamespacePrefix,
112    ) -> Result<Vec<Namespace>> {
113        Ok(Vec::new())
114    }
115
116    /// Sweep expired entries. Returns the number of rows removed.
117    /// Default impl returns `Ok(0)` — only backends that natively
118    /// track TTL implement this. Operators schedule it on a timer
119    /// (or trigger from cron / periodic graph) to bound store
120    /// growth in deployments where the store does not auto-expire
121    /// (e.g. plain `put` into Postgres without a TTL trigger).
122    async fn evict_expired(&self, _ctx: &ExecutionContext) -> Result<usize> {
123        Ok(0)
124    }
125}
126
127/// In-process `Store<V>` backed by a `HashMap` keyed by
128/// `(rendered_namespace, key)`. Cheap to clone — internal state is
129/// `Arc<Mutex<...>>`-shared.
130///
131/// TTL is honoured: entries written via
132/// [`Store::put_with_options`] with a non-`None` `ttl` are dropped
133/// from `get` / `list` results once their absolute expiry passes.
134/// The sweep ([`Store::evict_expired`]) cleans the map structure;
135/// callers may run it from a periodic graph if memory pressure
136/// matters.
137pub struct InMemoryStore<V>
138where
139    V: Clone + Send + Sync + 'static,
140{
141    inner: Arc<Mutex<EntryMap<V>>>,
142}
143
144type EntryMap<V> = HashMap<(String, String), Entry<V>>;
145
146struct Entry<V> {
147    value: V,
148    expires_at: Option<Instant>,
149}
150
151impl<V> InMemoryStore<V>
152where
153    V: Clone + Send + Sync + 'static,
154{
155    /// Empty store.
156    #[must_use]
157    pub fn new() -> Self {
158        Self {
159            inner: Arc::new(Mutex::new(HashMap::new())),
160        }
161    }
162
163    /// Total entry count across all namespaces, including not-yet-
164    /// swept-but-expired ones. Useful for tests; production callers
165    /// should run [`Store::evict_expired`] first if they care about
166    /// the live count.
167    #[must_use]
168    pub fn total_entries(&self) -> usize {
169        self.inner.lock().len()
170    }
171}
172
173impl<V> Default for InMemoryStore<V>
174where
175    V: Clone + Send + Sync + 'static,
176{
177    fn default() -> Self {
178        Self::new()
179    }
180}
181
182impl<V> Clone for InMemoryStore<V>
183where
184    V: Clone + Send + Sync + 'static,
185{
186    fn clone(&self) -> Self {
187        Self {
188            inner: Arc::clone(&self.inner),
189        }
190    }
191}
192
193#[async_trait]
194impl<V> Store<V> for InMemoryStore<V>
195where
196    V: Clone + Send + Sync + 'static,
197{
198    async fn put_with_options(
199        &self,
200        _ctx: &ExecutionContext,
201        ns: &Namespace,
202        key: &str,
203        value: V,
204        options: PutOptions,
205    ) -> Result<()> {
206        let composite = (ns.render(), key.to_owned());
207        let expires_at = options.ttl.map(|d| Instant::now() + d);
208        {
209            let mut guard = self.inner.lock();
210            guard.insert(composite, Entry { value, expires_at });
211        }
212        Ok(())
213    }
214
215    async fn get(&self, _ctx: &ExecutionContext, ns: &Namespace, key: &str) -> Result<Option<V>> {
216        let composite = (ns.render(), key.to_owned());
217        let now = Instant::now();
218        let result = {
219            let guard = self.inner.lock();
220            guard
221                .get(&composite)
222                .filter(|entry| entry.expires_at.is_none_or(|exp| exp > now))
223                .map(|entry| entry.value.clone())
224        };
225        Ok(result)
226    }
227
228    async fn delete(&self, _ctx: &ExecutionContext, ns: &Namespace, key: &str) -> Result<()> {
229        let composite = (ns.render(), key.to_owned());
230        {
231            let mut guard = self.inner.lock();
232            guard.remove(&composite);
233        }
234        Ok(())
235    }
236
237    async fn list(
238        &self,
239        _ctx: &ExecutionContext,
240        ns: &Namespace,
241        prefix: Option<&str>,
242    ) -> Result<Vec<String>> {
243        let ns_key = ns.render();
244        let now = Instant::now();
245        let out = {
246            let guard = self.inner.lock();
247            guard
248                .iter()
249                .filter(|((n, _), entry)| {
250                    n == &ns_key && entry.expires_at.is_none_or(|exp| exp > now)
251                })
252                .filter(|((_, k), _)| prefix.is_none_or(|p| k.starts_with(p)))
253                .map(|((_, k), _)| k.clone())
254                .collect::<Vec<_>>()
255        };
256        Ok(out)
257    }
258
259    async fn list_namespaces(
260        &self,
261        _ctx: &ExecutionContext,
262        prefix: &NamespacePrefix,
263    ) -> Result<Vec<Namespace>> {
264        let prefix_render = render_prefix(prefix);
265        let now = Instant::now();
266        let mut seen: std::collections::HashSet<String> = std::collections::HashSet::new();
267        {
268            let guard = self.inner.lock();
269            for ((rendered_ns, _), entry) in guard.iter() {
270                if entry.expires_at.is_some_and(|exp| exp <= now) {
271                    continue;
272                }
273                if rendered_ns == &prefix_render
274                    || rendered_ns.starts_with(&format!("{prefix_render}:"))
275                {
276                    seen.insert(rendered_ns.clone());
277                }
278            }
279        }
280        // `Namespace::parse` recovers the typed `(tenant_id, scope)`
281        // tuple from the rendered key — the structural identity is
282        // preserved through the round-trip render → store → list →
283        // parse. The trait contract ("every distinct Namespace
284        // under prefix") is honoured as written rather than
285        // approximated with a synthetic clone of the prefix.
286        seen.into_iter().map(|key| Namespace::parse(&key)).collect()
287    }
288
289    async fn evict_expired(&self, _ctx: &ExecutionContext) -> Result<usize> {
290        let now = Instant::now();
291        let removed = {
292            let mut guard = self.inner.lock();
293            let before = guard.len();
294            guard.retain(|_, entry| entry.expires_at.is_none_or(|exp| exp > now));
295            before - guard.len()
296        };
297        Ok(removed)
298    }
299}
300
301fn render_prefix(prefix: &NamespacePrefix) -> String {
302    // Mirror Namespace::render layout so InMemoryStore prefix matches
303    // are textually consistent with stored namespace keys.
304    let mut tmp = Namespace::new(prefix.tenant_id().clone());
305    for s in prefix.scope() {
306        tmp = tmp.with_scope(s.clone());
307    }
308    tmp.render()
309}
310
311#[cfg(test)]
312#[allow(clippy::unwrap_used, clippy::indexing_slicing)]
313mod tests {
314    use super::*;
315    use entelix_core::TenantId;
316
317    fn ctx() -> ExecutionContext {
318        ExecutionContext::new()
319    }
320
321    fn ns() -> Namespace {
322        Namespace::new(TenantId::new("acme")).with_scope("agent-a")
323    }
324
325    #[tokio::test]
326    async fn put_then_get_round_trips() {
327        let store: InMemoryStore<String> = InMemoryStore::new();
328        store.put(&ctx(), &ns(), "k", "v".into()).await.unwrap();
329        let got = store.get(&ctx(), &ns(), "k").await.unwrap();
330        assert_eq!(got.as_deref(), Some("v"));
331    }
332
333    #[tokio::test]
334    async fn ttl_expires_on_get() {
335        let store: InMemoryStore<String> = InMemoryStore::new();
336        store
337            .put_with_options(
338                &ctx(),
339                &ns(),
340                "k",
341                "v".into(),
342                PutOptions::default().with_ttl(Duration::from_millis(20)),
343            )
344            .await
345            .unwrap();
346        // Live before expiry.
347        assert!(store.get(&ctx(), &ns(), "k").await.unwrap().is_some());
348        tokio::time::sleep(Duration::from_millis(40)).await;
349        // Expired — get returns None even though sweep has not run.
350        assert!(store.get(&ctx(), &ns(), "k").await.unwrap().is_none());
351    }
352
353    #[tokio::test]
354    async fn evict_expired_returns_count_and_drops_rows() {
355        let store: InMemoryStore<String> = InMemoryStore::new();
356        // One TTL row (will expire) + one no-TTL row (survives).
357        store
358            .put_with_options(
359                &ctx(),
360                &ns(),
361                "doomed",
362                "v".into(),
363                PutOptions::default().with_ttl(Duration::from_millis(10)),
364            )
365            .await
366            .unwrap();
367        store.put(&ctx(), &ns(), "alive", "v".into()).await.unwrap();
368        tokio::time::sleep(Duration::from_millis(30)).await;
369        let removed = store.evict_expired(&ctx()).await.unwrap();
370        assert_eq!(removed, 1);
371        assert_eq!(store.total_entries(), 1);
372    }
373
374    #[tokio::test]
375    async fn list_namespaces_finds_subscopes_under_prefix() {
376        let store: InMemoryStore<String> = InMemoryStore::new();
377        let ns_a = Namespace::new(TenantId::new("acme")).with_scope("agent-a");
378        let ns_b = Namespace::new(TenantId::new("acme"))
379            .with_scope("agent-a")
380            .with_scope("conv-1");
381        let ns_other = Namespace::new(TenantId::new("acme")).with_scope("agent-b");
382        store.put(&ctx(), &ns_a, "k", "v".into()).await.unwrap();
383        store.put(&ctx(), &ns_b, "k", "v".into()).await.unwrap();
384        store.put(&ctx(), &ns_other, "k", "v".into()).await.unwrap();
385        let prefix = NamespacePrefix::new(TenantId::new("acme")).with_scope("agent-a");
386        let found = store.list_namespaces(&ctx(), &prefix).await.unwrap();
387        // ns_a + ns_b match; ns_other does not.
388        assert_eq!(found.len(), 2);
389        // Returned namespaces structurally match the originals, not
390        // a prefix-shape clone — the round-trip render → parse
391        // recovers the typed scope.
392        let mut got: Vec<Namespace> = found;
393        got.sort_by_key(|x| x.scope().len());
394        assert_eq!(got[0], ns_a);
395        assert_eq!(got[1], ns_b);
396    }
397
398    #[tokio::test]
399    async fn list_namespaces_recovers_escaped_segments() {
400        let store: InMemoryStore<String> = InMemoryStore::new();
401        let ns_colon = Namespace::new(TenantId::new("acme"))
402            .with_scope("agent-a")
403            .with_scope("k8s:pod:foo");
404        store.put(&ctx(), &ns_colon, "k", "v".into()).await.unwrap();
405        let prefix = NamespacePrefix::new(TenantId::new("acme")).with_scope("agent-a");
406        let found = store.list_namespaces(&ctx(), &prefix).await.unwrap();
407        assert_eq!(found.len(), 1);
408        // The `:`-bearing scope segment survives the render → store
409        // → list → parse round-trip — escapes are not silently
410        // chopped at substring boundaries.
411        assert_eq!(found[0], ns_colon);
412    }
413
414    #[tokio::test]
415    async fn delete_then_get_returns_none() {
416        let store: InMemoryStore<String> = InMemoryStore::new();
417        store.put(&ctx(), &ns(), "k", "v".into()).await.unwrap();
418        store.delete(&ctx(), &ns(), "k").await.unwrap();
419        assert!(store.get(&ctx(), &ns(), "k").await.unwrap().is_none());
420    }
421}