Skip to main content

dag_executor/storage/
cache.rs

1//! A write-through LRU cache wrapping any [`Storage`].
2
3use crate::storage::file_storage::{Storage, StorageResult};
4use async_trait::async_trait;
5use lru::LruCache;
6use parking_lot::Mutex;
7use std::num::NonZeroUsize;
8use std::sync::Arc;
9
10/// A write-through cache in front of a backing [`Storage`].
11///
12/// Reads consult the in-memory LRU first and fall back to the backend on a
13/// miss (populating the cache). Writes update both the cache and the backend so
14/// the cache never serves stale data for keys it owns.
15pub struct Cache {
16    inner: Arc<dyn Storage>,
17    cache: Mutex<LruCache<String, serde_json::Value>>,
18}
19
20impl Cache {
21    /// Wrap `inner` with an LRU cache of the given capacity.
22    ///
23    /// A `capacity` of 0 is treated as 1 (LRU requires non-zero capacity).
24    pub fn new(inner: Arc<dyn Storage>, capacity: usize) -> Self {
25        let cap = NonZeroUsize::new(capacity.max(1)).unwrap();
26        Cache {
27            inner,
28            cache: Mutex::new(LruCache::new(cap)),
29        }
30    }
31
32    /// Number of entries currently cached.
33    pub fn len(&self) -> usize {
34        self.cache.lock().len()
35    }
36
37    /// Whether the cache holds no entries.
38    pub fn is_empty(&self) -> bool {
39        self.cache.lock().is_empty()
40    }
41}
42
43#[async_trait]
44impl Storage for Cache {
45    async fn save(&self, key: &str, value: &serde_json::Value) -> StorageResult<()> {
46        self.inner.save(key, value).await?;
47        self.cache.lock().put(key.to_string(), value.clone());
48        Ok(())
49    }
50
51    async fn load(&self, key: &str) -> StorageResult<Option<serde_json::Value>> {
52        if let Some(v) = self.cache.lock().get(key).cloned() {
53            return Ok(Some(v));
54        }
55        let loaded = self.inner.load(key).await?;
56        if let Some(ref v) = loaded {
57            self.cache.lock().put(key.to_string(), v.clone());
58        }
59        Ok(loaded)
60    }
61
62    async fn delete(&self, key: &str) -> StorageResult<()> {
63        self.inner.delete(key).await?;
64        self.cache.lock().pop(key);
65        Ok(())
66    }
67
68    async fn list(&self) -> StorageResult<Vec<String>> {
69        // The backend is authoritative for the full key set.
70        self.inner.list().await
71    }
72}