dag_executor/storage/
cache.rs1use crate::storage::file_storage::{Storage, StorageResult};
4use async_trait::async_trait;
5use lru::LruCache;
6use parking_lot::Mutex;
7use std::num::NonZeroUsize;
8use std::sync::Arc;
9
10pub struct Cache {
16 inner: Arc<dyn Storage>,
17 cache: Mutex<LruCache<String, serde_json::Value>>,
18}
19
20impl Cache {
21 pub fn new(inner: Arc<dyn Storage>, capacity: usize) -> Self {
25 let cap = NonZeroUsize::new(capacity.max(1)).unwrap();
26 Cache {
27 inner,
28 cache: Mutex::new(LruCache::new(cap)),
29 }
30 }
31
32 pub fn len(&self) -> usize {
34 self.cache.lock().len()
35 }
36
37 pub fn is_empty(&self) -> bool {
39 self.cache.lock().is_empty()
40 }
41}
42
43#[async_trait]
44impl Storage for Cache {
45 async fn save(&self, key: &str, value: &serde_json::Value) -> StorageResult<()> {
46 self.inner.save(key, value).await?;
47 self.cache.lock().put(key.to_string(), value.clone());
48 Ok(())
49 }
50
51 async fn load(&self, key: &str) -> StorageResult<Option<serde_json::Value>> {
52 if let Some(v) = self.cache.lock().get(key).cloned() {
53 return Ok(Some(v));
54 }
55 let loaded = self.inner.load(key).await?;
56 if let Some(ref v) = loaded {
57 self.cache.lock().put(key.to_string(), v.clone());
58 }
59 Ok(loaded)
60 }
61
62 async fn delete(&self, key: &str) -> StorageResult<()> {
63 self.inner.delete(key).await?;
64 self.cache.lock().pop(key);
65 Ok(())
66 }
67
68 async fn list(&self) -> StorageResult<Vec<String>> {
69 self.inner.list().await
71 }
72}