1use std::collections::HashMap;
23use std::sync::Arc;
24use std::time::{Duration, Instant};
25
26use async_trait::async_trait;
27use entelix_core::{ExecutionContext, Result};
28use parking_lot::Mutex;
29
30use crate::namespace::{Namespace, NamespacePrefix};
31
32#[derive(Clone, Debug, Default)]
36#[non_exhaustive]
37pub struct PutOptions {
38 pub ttl: Option<Duration>,
42}
43
44impl PutOptions {
45 #[must_use]
47 pub const fn with_ttl(mut self, ttl: Duration) -> Self {
48 self.ttl = Some(ttl);
49 self
50 }
51}
52
53#[async_trait]
60pub trait Store<V>: Send + Sync + 'static
61where
62 V: Clone + Send + Sync + 'static,
63{
64 async fn put_with_options(
69 &self,
70 ctx: &ExecutionContext,
71 ns: &Namespace,
72 key: &str,
73 value: V,
74 options: PutOptions,
75 ) -> Result<()>;
76
77 async fn put(&self, ctx: &ExecutionContext, ns: &Namespace, key: &str, value: V) -> Result<()> {
81 self.put_with_options(ctx, ns, key, value, PutOptions::default())
82 .await
83 }
84
85 async fn get(&self, ctx: &ExecutionContext, ns: &Namespace, key: &str) -> Result<Option<V>>;
87
88 async fn delete(&self, ctx: &ExecutionContext, ns: &Namespace, key: &str) -> Result<()>;
91
92 async fn list(
95 &self,
96 ctx: &ExecutionContext,
97 ns: &Namespace,
98 prefix: Option<&str>,
99 ) -> Result<Vec<String>>;
100
101 async fn list_namespaces(
109 &self,
110 _ctx: &ExecutionContext,
111 _prefix: &NamespacePrefix,
112 ) -> Result<Vec<Namespace>> {
113 Ok(Vec::new())
114 }
115
116 async fn evict_expired(&self, _ctx: &ExecutionContext) -> Result<usize> {
123 Ok(0)
124 }
125}
126
127pub struct InMemoryStore<V>
138where
139 V: Clone + Send + Sync + 'static,
140{
141 inner: Arc<Mutex<EntryMap<V>>>,
142}
143
144type EntryMap<V> = HashMap<(String, String), Entry<V>>;
145
146struct Entry<V> {
147 value: V,
148 expires_at: Option<Instant>,
149}
150
151impl<V> InMemoryStore<V>
152where
153 V: Clone + Send + Sync + 'static,
154{
155 #[must_use]
157 pub fn new() -> Self {
158 Self {
159 inner: Arc::new(Mutex::new(HashMap::new())),
160 }
161 }
162
163 #[must_use]
168 pub fn total_entries(&self) -> usize {
169 self.inner.lock().len()
170 }
171}
172
173impl<V> Default for InMemoryStore<V>
174where
175 V: Clone + Send + Sync + 'static,
176{
177 fn default() -> Self {
178 Self::new()
179 }
180}
181
182impl<V> Clone for InMemoryStore<V>
183where
184 V: Clone + Send + Sync + 'static,
185{
186 fn clone(&self) -> Self {
187 Self {
188 inner: Arc::clone(&self.inner),
189 }
190 }
191}
192
193#[async_trait]
194impl<V> Store<V> for InMemoryStore<V>
195where
196 V: Clone + Send + Sync + 'static,
197{
198 async fn put_with_options(
199 &self,
200 _ctx: &ExecutionContext,
201 ns: &Namespace,
202 key: &str,
203 value: V,
204 options: PutOptions,
205 ) -> Result<()> {
206 let composite = (ns.render(), key.to_owned());
207 let expires_at = options.ttl.map(|d| Instant::now() + d);
208 {
209 let mut guard = self.inner.lock();
210 guard.insert(composite, Entry { value, expires_at });
211 }
212 Ok(())
213 }
214
215 async fn get(&self, _ctx: &ExecutionContext, ns: &Namespace, key: &str) -> Result<Option<V>> {
216 let composite = (ns.render(), key.to_owned());
217 let now = Instant::now();
218 let result = {
219 let guard = self.inner.lock();
220 guard
221 .get(&composite)
222 .filter(|entry| entry.expires_at.is_none_or(|exp| exp > now))
223 .map(|entry| entry.value.clone())
224 };
225 Ok(result)
226 }
227
228 async fn delete(&self, _ctx: &ExecutionContext, ns: &Namespace, key: &str) -> Result<()> {
229 let composite = (ns.render(), key.to_owned());
230 {
231 let mut guard = self.inner.lock();
232 guard.remove(&composite);
233 }
234 Ok(())
235 }
236
237 async fn list(
238 &self,
239 _ctx: &ExecutionContext,
240 ns: &Namespace,
241 prefix: Option<&str>,
242 ) -> Result<Vec<String>> {
243 let ns_key = ns.render();
244 let now = Instant::now();
245 let out = {
246 let guard = self.inner.lock();
247 guard
248 .iter()
249 .filter(|((n, _), entry)| {
250 n == &ns_key && entry.expires_at.is_none_or(|exp| exp > now)
251 })
252 .filter(|((_, k), _)| prefix.is_none_or(|p| k.starts_with(p)))
253 .map(|((_, k), _)| k.clone())
254 .collect::<Vec<_>>()
255 };
256 Ok(out)
257 }
258
259 async fn list_namespaces(
260 &self,
261 _ctx: &ExecutionContext,
262 prefix: &NamespacePrefix,
263 ) -> Result<Vec<Namespace>> {
264 let prefix_render = render_prefix(prefix);
265 let now = Instant::now();
266 let mut seen: std::collections::HashSet<String> = std::collections::HashSet::new();
267 {
268 let guard = self.inner.lock();
269 for ((rendered_ns, _), entry) in guard.iter() {
270 if entry.expires_at.is_some_and(|exp| exp <= now) {
271 continue;
272 }
273 if rendered_ns == &prefix_render
274 || rendered_ns.starts_with(&format!("{prefix_render}:"))
275 {
276 seen.insert(rendered_ns.clone());
277 }
278 }
279 }
280 seen.into_iter().map(|key| Namespace::parse(&key)).collect()
287 }
288
289 async fn evict_expired(&self, _ctx: &ExecutionContext) -> Result<usize> {
290 let now = Instant::now();
291 let removed = {
292 let mut guard = self.inner.lock();
293 let before = guard.len();
294 guard.retain(|_, entry| entry.expires_at.is_none_or(|exp| exp > now));
295 before - guard.len()
296 };
297 Ok(removed)
298 }
299}
300
301fn render_prefix(prefix: &NamespacePrefix) -> String {
302 let mut tmp = Namespace::new(prefix.tenant_id().clone());
305 for s in prefix.scope() {
306 tmp = tmp.with_scope(s.clone());
307 }
308 tmp.render()
309}
310
311#[cfg(test)]
312#[allow(clippy::unwrap_used, clippy::indexing_slicing)]
313mod tests {
314 use super::*;
315 use entelix_core::TenantId;
316
317 fn ctx() -> ExecutionContext {
318 ExecutionContext::new()
319 }
320
321 fn ns() -> Namespace {
322 Namespace::new(TenantId::new("acme")).with_scope("agent-a")
323 }
324
325 #[tokio::test]
326 async fn put_then_get_round_trips() {
327 let store: InMemoryStore<String> = InMemoryStore::new();
328 store.put(&ctx(), &ns(), "k", "v".into()).await.unwrap();
329 let got = store.get(&ctx(), &ns(), "k").await.unwrap();
330 assert_eq!(got.as_deref(), Some("v"));
331 }
332
333 #[tokio::test]
334 async fn ttl_expires_on_get() {
335 let store: InMemoryStore<String> = InMemoryStore::new();
336 store
337 .put_with_options(
338 &ctx(),
339 &ns(),
340 "k",
341 "v".into(),
342 PutOptions::default().with_ttl(Duration::from_millis(20)),
343 )
344 .await
345 .unwrap();
346 assert!(store.get(&ctx(), &ns(), "k").await.unwrap().is_some());
348 tokio::time::sleep(Duration::from_millis(40)).await;
349 assert!(store.get(&ctx(), &ns(), "k").await.unwrap().is_none());
351 }
352
353 #[tokio::test]
354 async fn evict_expired_returns_count_and_drops_rows() {
355 let store: InMemoryStore<String> = InMemoryStore::new();
356 store
358 .put_with_options(
359 &ctx(),
360 &ns(),
361 "doomed",
362 "v".into(),
363 PutOptions::default().with_ttl(Duration::from_millis(10)),
364 )
365 .await
366 .unwrap();
367 store.put(&ctx(), &ns(), "alive", "v".into()).await.unwrap();
368 tokio::time::sleep(Duration::from_millis(30)).await;
369 let removed = store.evict_expired(&ctx()).await.unwrap();
370 assert_eq!(removed, 1);
371 assert_eq!(store.total_entries(), 1);
372 }
373
374 #[tokio::test]
375 async fn list_namespaces_finds_subscopes_under_prefix() {
376 let store: InMemoryStore<String> = InMemoryStore::new();
377 let ns_a = Namespace::new(TenantId::new("acme")).with_scope("agent-a");
378 let ns_b = Namespace::new(TenantId::new("acme"))
379 .with_scope("agent-a")
380 .with_scope("conv-1");
381 let ns_other = Namespace::new(TenantId::new("acme")).with_scope("agent-b");
382 store.put(&ctx(), &ns_a, "k", "v".into()).await.unwrap();
383 store.put(&ctx(), &ns_b, "k", "v".into()).await.unwrap();
384 store.put(&ctx(), &ns_other, "k", "v".into()).await.unwrap();
385 let prefix = NamespacePrefix::new(TenantId::new("acme")).with_scope("agent-a");
386 let found = store.list_namespaces(&ctx(), &prefix).await.unwrap();
387 assert_eq!(found.len(), 2);
389 let mut got: Vec<Namespace> = found;
393 got.sort_by_key(|x| x.scope().len());
394 assert_eq!(got[0], ns_a);
395 assert_eq!(got[1], ns_b);
396 }
397
398 #[tokio::test]
399 async fn list_namespaces_recovers_escaped_segments() {
400 let store: InMemoryStore<String> = InMemoryStore::new();
401 let ns_colon = Namespace::new(TenantId::new("acme"))
402 .with_scope("agent-a")
403 .with_scope("k8s:pod:foo");
404 store.put(&ctx(), &ns_colon, "k", "v".into()).await.unwrap();
405 let prefix = NamespacePrefix::new(TenantId::new("acme")).with_scope("agent-a");
406 let found = store.list_namespaces(&ctx(), &prefix).await.unwrap();
407 assert_eq!(found.len(), 1);
408 assert_eq!(found[0], ns_colon);
412 }
413
414 #[tokio::test]
415 async fn delete_then_get_returns_none() {
416 let store: InMemoryStore<String> = InMemoryStore::new();
417 store.put(&ctx(), &ns(), "k", "v".into()).await.unwrap();
418 store.delete(&ctx(), &ns(), "k").await.unwrap();
419 assert!(store.get(&ctx(), &ns(), "k").await.unwrap().is_none());
420 }
421}