stygian_graph/adapters/
cache.rs1use crate::domain::error::Result;
12use crate::ports::CachePort;
13use async_trait::async_trait;
14use dashmap::DashMap;
15use parking_lot::RwLock;
16use std::collections::HashMap;
17use std::sync::{Arc, LazyLock};
18use std::time::{Duration, Instant};
19
20pub struct MemoryCache {
39 store: Arc<RwLock<HashMap<String, String>>>,
40}
41
42impl MemoryCache {
43 #[must_use]
45 pub fn new() -> Self {
46 Self {
47 store: Arc::new(RwLock::new(HashMap::new())),
48 }
49 }
50}
51
52impl Default for MemoryCache {
53 fn default() -> Self {
54 Self::new()
55 }
56}
57
58#[async_trait]
59impl CachePort for MemoryCache {
60 async fn get(&self, key: &str) -> Result<Option<String>> {
61 let value = {
62 let store = self.store.read();
63 store.get(key).cloned()
64 };
65 Ok(value)
66 }
67
68 async fn set(&self, key: &str, value: String, _ttl: Option<Duration>) -> Result<()> {
69 {
70 let mut store = self.store.write();
71 store.insert(key.to_string(), value);
72 }
73 Ok(())
74 }
75
76 async fn invalidate(&self, key: &str) -> Result<()> {
77 {
78 let mut store = self.store.write();
79 store.remove(key);
80 }
81 Ok(())
82 }
83
84 async fn exists(&self, key: &str) -> Result<bool> {
85 let exists = {
86 let store = self.store.read();
87 store.contains_key(key)
88 };
89 Ok(exists)
90 }
91}
92
93#[derive(Clone)]
96struct TtlEntry {
97 value: String,
98 expires_at: Option<Instant>,
99}
100
101impl TtlEntry {
102 fn new(value: String, ttl: Option<Duration>) -> Self {
103 Self {
104 value,
105 expires_at: ttl.map(|d| Instant::now() + d),
106 }
107 }
108
109 fn is_expired(&self) -> bool {
110 self.expires_at.is_some_and(|exp| Instant::now() > exp)
111 }
112}
113
114pub struct DashMapCache {
135 store: Arc<DashMap<String, TtlEntry>>,
136}
137
138impl DashMapCache {
139 #[must_use]
153 pub fn new(cleanup_interval: Duration) -> Self {
154 let store: Arc<DashMap<String, TtlEntry>> = Arc::new(DashMap::new());
155 let weak = Arc::downgrade(&store);
156 tokio::spawn(async move {
157 let mut ticker = tokio::time::interval(cleanup_interval);
158 ticker.tick().await; loop {
160 ticker.tick().await;
161 let Some(map) = weak.upgrade() else { break };
162 map.retain(|_, v| !v.is_expired());
163 }
164 });
165 Self { store }
166 }
167
168 #[must_use]
170 pub fn len(&self) -> usize {
171 self.store.iter().filter(|e| !e.is_expired()).count()
172 }
173
174 #[must_use]
176 pub fn is_empty(&self) -> bool {
177 self.len() == 0
178 }
179}
180
181#[async_trait]
182impl CachePort for DashMapCache {
183 async fn get(&self, key: &str) -> Result<Option<String>> {
184 match self.store.get(key) {
185 None => Ok(None),
186 Some(entry) if entry.is_expired() => {
187 drop(entry);
188 self.store.remove(key);
189 Ok(None)
190 }
191 Some(entry) => Ok(Some(entry.value.clone())),
192 }
193 }
194
195 async fn set(&self, key: &str, value: String, ttl: Option<Duration>) -> Result<()> {
196 self.store
197 .insert(key.to_string(), TtlEntry::new(value, ttl));
198 Ok(())
199 }
200
201 async fn invalidate(&self, key: &str) -> Result<()> {
202 self.store.remove(key);
203 Ok(())
204 }
205
206 async fn exists(&self, key: &str) -> Result<bool> {
207 match self.store.get(key) {
208 None => Ok(false),
209 Some(entry) if entry.is_expired() => {
210 drop(entry);
211 self.store.remove(key);
212 Ok(false)
213 }
214 Some(_) => Ok(true),
215 }
216 }
217}
218
219pub struct BoundedLruCache {
242 inner: tokio::sync::Mutex<lru::LruCache<String, TtlEntry>>,
243}
244
245impl BoundedLruCache {
246 #[must_use]
257 pub fn new(capacity: std::num::NonZeroUsize) -> Self {
258 Self {
259 inner: tokio::sync::Mutex::new(lru::LruCache::new(capacity)),
260 }
261 }
262}
263
264#[async_trait]
265impl CachePort for BoundedLruCache {
266 async fn get(&self, key: &str) -> Result<Option<String>> {
267 let result = {
268 let mut cache = self.inner.lock().await;
269 match cache.get(key) {
270 None => None,
271 Some(entry) if entry.is_expired() => {
272 cache.pop(key);
273 None
274 }
275 Some(entry) => Some(entry.value.clone()),
276 }
277 };
278 Ok(result)
279 }
280
281 async fn set(&self, key: &str, value: String, ttl: Option<Duration>) -> Result<()> {
282 {
283 let mut cache = self.inner.lock().await;
284 cache.put(key.to_string(), TtlEntry::new(value, ttl));
285 }
286 Ok(())
287 }
288
289 async fn invalidate(&self, key: &str) -> Result<()> {
290 {
291 let mut cache = self.inner.lock().await;
292 cache.pop(key);
293 }
294 Ok(())
295 }
296
297 async fn exists(&self, key: &str) -> Result<bool> {
298 let result = {
299 let mut cache = self.inner.lock().await;
300 match cache.get(key) {
301 None => false,
302 Some(entry) if entry.is_expired() => {
303 cache.pop(key);
304 false
305 }
306 Some(_) => true,
307 }
308 };
309 Ok(result)
310 }
311}
312
313#[must_use]
333pub fn global_cache() -> &'static DashMapCache {
334 static INSTANCE: LazyLock<DashMapCache> =
335 LazyLock::new(|| DashMapCache::new(Duration::from_mins(5)));
336 &INSTANCE
337}
338
339#[cfg(test)]
342mod tests {
343 use super::*;
344
345 #[tokio::test]
348 async fn dashmap_set_get() -> Result<()> {
349 let c = DashMapCache::new(Duration::from_mins(1));
350 c.set("a", "1".to_string(), None).await?;
351 assert_eq!(c.get("a").await?, Some("1".to_string()));
352 Ok(())
353 }
354
355 #[tokio::test]
356 async fn dashmap_miss_returns_none() -> Result<()> {
357 let c = DashMapCache::new(Duration::from_mins(1));
358 assert_eq!(c.get("missing").await?, None);
359 Ok(())
360 }
361
362 #[tokio::test]
363 async fn dashmap_invalidate() -> Result<()> {
364 let c = DashMapCache::new(Duration::from_mins(1));
365 c.set("b", "2".to_string(), None).await?;
366 c.invalidate("b").await?;
367 assert_eq!(c.get("b").await?, None);
368 Ok(())
369 }
370
371 #[tokio::test]
372 async fn dashmap_ttl_expires() -> Result<()> {
373 let c = DashMapCache::new(Duration::from_mins(1));
374 c.set("x", "y".to_string(), Some(Duration::from_nanos(1)))
376 .await?;
377 tokio::time::sleep(Duration::from_millis(10)).await;
378 assert_eq!(c.get("x").await?, None);
379 Ok(())
380 }
381
382 #[tokio::test]
383 async fn dashmap_exists() -> Result<()> {
384 let c = DashMapCache::new(Duration::from_mins(1));
385 c.set("e", "z".to_string(), None).await?;
386 assert!(c.exists("e").await?);
387 assert!(!c.exists("nope").await?);
388 Ok(())
389 }
390
391 #[tokio::test]
394 async fn lru_set_get() -> Result<()> {
395 let c = BoundedLruCache::new(std::num::NonZeroUsize::MIN.saturating_add(3));
396 c.set("a", "1".to_string(), None).await?;
397 assert_eq!(c.get("a").await?, Some("1".to_string()));
398 Ok(())
399 }
400
401 #[tokio::test]
402 async fn lru_evicts_on_capacity() -> Result<()> {
403 let c = BoundedLruCache::new(std::num::NonZeroUsize::MIN.saturating_add(1));
404 c.set("k1", "v1".to_string(), None).await?;
405 c.set("k2", "v2".to_string(), None).await?;
406 c.get("k1").await?;
408 c.set("k3", "v3".to_string(), None).await?;
410 assert_eq!(c.get("k2").await?, None);
411 assert_eq!(c.get("k1").await?, Some("v1".to_string()));
412 assert_eq!(c.get("k3").await?, Some("v3".to_string()));
413 Ok(())
414 }
415
416 #[tokio::test]
417 async fn lru_ttl_expires() -> Result<()> {
418 let c = BoundedLruCache::new(std::num::NonZeroUsize::MIN.saturating_add(7));
419 c.set("t", "val".to_string(), Some(Duration::from_nanos(1)))
420 .await?;
421 tokio::time::sleep(Duration::from_millis(10)).await;
422 assert_eq!(c.get("t").await?, None);
423 Ok(())
424 }
425
426 #[tokio::test]
427 async fn lru_invalidate() -> Result<()> {
428 let c = BoundedLruCache::new(std::num::NonZeroUsize::MIN.saturating_add(3));
429 c.set("x", "y".to_string(), None).await?;
430 c.invalidate("x").await?;
431 assert!(!c.exists("x").await?);
432 Ok(())
433 }
434
435 #[tokio::test]
438 async fn global_cache_roundtrip() -> Result<()> {
439 global_cache()
440 .set("gc_test", "hello".to_string(), None)
441 .await?;
442 let v = global_cache().get("gc_test").await?;
443 assert_eq!(v, Some("hello".to_string()));
444 global_cache().invalidate("gc_test").await?;
445 Ok(())
446 }
447}