stygian_graph/adapters/
cache.rs1use crate::domain::error::Result;
12use crate::ports::CachePort;
13use async_trait::async_trait;
14use dashmap::DashMap;
15use parking_lot::RwLock;
16use std::collections::HashMap;
17use std::sync::{Arc, LazyLock};
18use std::time::{Duration, Instant};
19
20pub struct MemoryCache {
39 store: Arc<RwLock<HashMap<String, String>>>,
40}
41
42impl MemoryCache {
43 pub fn new() -> Self {
45 Self {
46 store: Arc::new(RwLock::new(HashMap::new())),
47 }
48 }
49}
50
51impl Default for MemoryCache {
52 fn default() -> Self {
53 Self::new()
54 }
55}
56
57#[async_trait]
58impl CachePort for MemoryCache {
59 async fn get(&self, key: &str) -> Result<Option<String>> {
60 let value = {
61 let store = self.store.read();
62 store.get(key).cloned()
63 };
64 Ok(value)
65 }
66
67 async fn set(&self, key: &str, value: String, _ttl: Option<Duration>) -> Result<()> {
68 {
69 let mut store = self.store.write();
70 store.insert(key.to_string(), value);
71 }
72 Ok(())
73 }
74
75 async fn invalidate(&self, key: &str) -> Result<()> {
76 {
77 let mut store = self.store.write();
78 store.remove(key);
79 }
80 Ok(())
81 }
82
83 async fn exists(&self, key: &str) -> Result<bool> {
84 let exists = {
85 let store = self.store.read();
86 store.contains_key(key)
87 };
88 Ok(exists)
89 }
90}
91
92#[derive(Clone)]
95struct TtlEntry {
96 value: String,
97 expires_at: Option<Instant>,
98}
99
100impl TtlEntry {
101 fn new(value: String, ttl: Option<Duration>) -> Self {
102 Self {
103 value,
104 expires_at: ttl.map(|d| Instant::now() + d),
105 }
106 }
107
108 fn is_expired(&self) -> bool {
109 self.expires_at.is_some_and(|exp| Instant::now() > exp)
110 }
111}
112
113pub struct DashMapCache {
134 store: Arc<DashMap<String, TtlEntry>>,
135}
136
137impl DashMapCache {
138 pub fn new(cleanup_interval: Duration) -> Self {
152 let store: Arc<DashMap<String, TtlEntry>> = Arc::new(DashMap::new());
153 let weak = Arc::downgrade(&store);
154 tokio::spawn(async move {
155 let mut ticker = tokio::time::interval(cleanup_interval);
156 ticker.tick().await; loop {
158 ticker.tick().await;
159 let Some(map) = weak.upgrade() else { break };
160 map.retain(|_, v| !v.is_expired());
161 }
162 });
163 Self { store }
164 }
165
166 pub fn len(&self) -> usize {
168 self.store.iter().filter(|e| !e.is_expired()).count()
169 }
170
171 pub fn is_empty(&self) -> bool {
173 self.len() == 0
174 }
175}
176
177#[async_trait]
178impl CachePort for DashMapCache {
179 async fn get(&self, key: &str) -> Result<Option<String>> {
180 match self.store.get(key) {
181 None => Ok(None),
182 Some(entry) if entry.is_expired() => {
183 drop(entry);
184 self.store.remove(key);
185 Ok(None)
186 }
187 Some(entry) => Ok(Some(entry.value.clone())),
188 }
189 }
190
191 async fn set(&self, key: &str, value: String, ttl: Option<Duration>) -> Result<()> {
192 self.store
193 .insert(key.to_string(), TtlEntry::new(value, ttl));
194 Ok(())
195 }
196
197 async fn invalidate(&self, key: &str) -> Result<()> {
198 self.store.remove(key);
199 Ok(())
200 }
201
202 async fn exists(&self, key: &str) -> Result<bool> {
203 match self.store.get(key) {
204 None => Ok(false),
205 Some(entry) if entry.is_expired() => {
206 drop(entry);
207 self.store.remove(key);
208 Ok(false)
209 }
210 Some(_) => Ok(true),
211 }
212 }
213}
214
215pub struct BoundedLruCache {
238 inner: tokio::sync::Mutex<lru::LruCache<String, TtlEntry>>,
239}
240
241impl BoundedLruCache {
242 pub fn new(capacity: std::num::NonZeroUsize) -> Self {
253 Self {
254 inner: tokio::sync::Mutex::new(lru::LruCache::new(capacity)),
255 }
256 }
257}
258
259#[async_trait]
260impl CachePort for BoundedLruCache {
261 async fn get(&self, key: &str) -> Result<Option<String>> {
262 let result = {
263 let mut cache = self.inner.lock().await;
264 match cache.get(key) {
265 None => None,
266 Some(entry) if entry.is_expired() => {
267 cache.pop(key);
268 None
269 }
270 Some(entry) => Some(entry.value.clone()),
271 }
272 };
273 Ok(result)
274 }
275
276 async fn set(&self, key: &str, value: String, ttl: Option<Duration>) -> Result<()> {
277 {
278 let mut cache = self.inner.lock().await;
279 cache.put(key.to_string(), TtlEntry::new(value, ttl));
280 }
281 Ok(())
282 }
283
284 async fn invalidate(&self, key: &str) -> Result<()> {
285 {
286 let mut cache = self.inner.lock().await;
287 cache.pop(key);
288 }
289 Ok(())
290 }
291
292 async fn exists(&self, key: &str) -> Result<bool> {
293 let result = {
294 let mut cache = self.inner.lock().await;
295 match cache.get(key) {
296 None => false,
297 Some(entry) if entry.is_expired() => {
298 cache.pop(key);
299 false
300 }
301 Some(_) => true,
302 }
303 };
304 Ok(result)
305 }
306}
307
308pub fn global_cache() -> &'static DashMapCache {
328 static INSTANCE: LazyLock<DashMapCache> =
329 LazyLock::new(|| DashMapCache::new(Duration::from_secs(300)));
330 &INSTANCE
331}
332
333#[cfg(test)]
336mod tests {
337 use super::*;
338
339 #[tokio::test]
342 async fn dashmap_set_get() -> Result<()> {
343 let c = DashMapCache::new(Duration::from_secs(60));
344 c.set("a", "1".to_string(), None).await?;
345 assert_eq!(c.get("a").await?, Some("1".to_string()));
346 Ok(())
347 }
348
349 #[tokio::test]
350 async fn dashmap_miss_returns_none() -> Result<()> {
351 let c = DashMapCache::new(Duration::from_secs(60));
352 assert_eq!(c.get("missing").await?, None);
353 Ok(())
354 }
355
356 #[tokio::test]
357 async fn dashmap_invalidate() -> Result<()> {
358 let c = DashMapCache::new(Duration::from_secs(60));
359 c.set("b", "2".to_string(), None).await?;
360 c.invalidate("b").await?;
361 assert_eq!(c.get("b").await?, None);
362 Ok(())
363 }
364
365 #[tokio::test]
366 async fn dashmap_ttl_expires() -> Result<()> {
367 let c = DashMapCache::new(Duration::from_secs(60));
368 c.set("x", "y".to_string(), Some(Duration::from_nanos(1)))
370 .await?;
371 tokio::time::sleep(Duration::from_millis(10)).await;
372 assert_eq!(c.get("x").await?, None);
373 Ok(())
374 }
375
376 #[tokio::test]
377 async fn dashmap_exists() -> Result<()> {
378 let c = DashMapCache::new(Duration::from_secs(60));
379 c.set("e", "z".to_string(), None).await?;
380 assert!(c.exists("e").await?);
381 assert!(!c.exists("nope").await?);
382 Ok(())
383 }
384
385 #[tokio::test]
388 async fn lru_set_get() -> Result<()> {
389 let c = BoundedLruCache::new(std::num::NonZeroUsize::MIN.saturating_add(3));
390 c.set("a", "1".to_string(), None).await?;
391 assert_eq!(c.get("a").await?, Some("1".to_string()));
392 Ok(())
393 }
394
395 #[tokio::test]
396 async fn lru_evicts_on_capacity() -> Result<()> {
397 let c = BoundedLruCache::new(std::num::NonZeroUsize::MIN.saturating_add(1));
398 c.set("k1", "v1".to_string(), None).await?;
399 c.set("k2", "v2".to_string(), None).await?;
400 c.get("k1").await?;
402 c.set("k3", "v3".to_string(), None).await?;
404 assert_eq!(c.get("k2").await?, None);
405 assert_eq!(c.get("k1").await?, Some("v1".to_string()));
406 assert_eq!(c.get("k3").await?, Some("v3".to_string()));
407 Ok(())
408 }
409
410 #[tokio::test]
411 async fn lru_ttl_expires() -> Result<()> {
412 let c = BoundedLruCache::new(std::num::NonZeroUsize::MIN.saturating_add(7));
413 c.set("t", "val".to_string(), Some(Duration::from_nanos(1)))
414 .await?;
415 tokio::time::sleep(Duration::from_millis(10)).await;
416 assert_eq!(c.get("t").await?, None);
417 Ok(())
418 }
419
420 #[tokio::test]
421 async fn lru_invalidate() -> Result<()> {
422 let c = BoundedLruCache::new(std::num::NonZeroUsize::MIN.saturating_add(3));
423 c.set("x", "y".to_string(), None).await?;
424 c.invalidate("x").await?;
425 assert!(!c.exists("x").await?);
426 Ok(())
427 }
428
429 #[tokio::test]
432 async fn global_cache_roundtrip() -> Result<()> {
433 global_cache()
434 .set("gc_test", "hello".to_string(), None)
435 .await?;
436 let v = global_cache().get("gc_test").await?;
437 assert_eq!(v, Some("hello".to_string()));
438 global_cache().invalidate("gc_test").await?;
439 Ok(())
440 }
441}