Skip to main content

rs_zero/cache/
two_level.rs

1use std::{sync::Arc, time::Duration};
2
3use async_trait::async_trait;
4use tokio::sync::Mutex;
5
6use crate::cache::{CacheError, CacheKey, CacheResult, CacheStore};
7
8#[derive(Debug, Default)]
9struct StatsInner {
10    l1_hits: u64,
11    l1_misses: u64,
12    l2_hits: u64,
13    l2_misses: u64,
14    backfills: u64,
15}
16
17/// Snapshot of two-level cache routing counters.
18#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
19pub struct TwoLevelCacheSnapshot {
20    /// Reads satisfied by L1.
21    pub l1_hits: u64,
22    /// Reads that missed L1.
23    pub l1_misses: u64,
24    /// Reads satisfied by L2 after an L1 miss.
25    pub l2_hits: u64,
26    /// Reads that missed both levels.
27    pub l2_misses: u64,
28    /// Number of L2 values written back into L1.
29    pub backfills: u64,
30}
31
32/// Shared counters for [`TwoLevelCacheStore`].
33#[derive(Debug, Clone, Default)]
34pub struct TwoLevelCacheStats {
35    inner: Arc<Mutex<StatsInner>>,
36}
37
38impl TwoLevelCacheStats {
39    async fn record_l1_hit(&self) {
40        self.inner.lock().await.l1_hits += 1;
41    }
42
43    async fn record_l1_miss(&self) {
44        self.inner.lock().await.l1_misses += 1;
45    }
46
47    async fn record_l2_hit(&self) {
48        self.inner.lock().await.l2_hits += 1;
49    }
50
51    async fn record_l2_miss(&self) {
52        self.inner.lock().await.l2_misses += 1;
53    }
54
55    async fn record_backfill(&self) {
56        self.inner.lock().await.backfills += 1;
57    }
58
59    /// Returns the current two-level cache counters.
60    pub async fn snapshot(&self) -> TwoLevelCacheSnapshot {
61        let stats = self.inner.lock().await;
62        TwoLevelCacheSnapshot {
63            l1_hits: stats.l1_hits,
64            l1_misses: stats.l1_misses,
65            l2_hits: stats.l2_hits,
66            l2_misses: stats.l2_misses,
67            backfills: stats.backfills,
68        }
69    }
70}
71
72/// Cache store that composes a fast L1 store with an authoritative L2 store.
73#[derive(Debug, Clone)]
74pub struct TwoLevelCacheStore<L1, L2> {
75    l1: L1,
76    l2: L2,
77    l1_backfill_ttl: Option<Duration>,
78    stats: TwoLevelCacheStats,
79    #[cfg(feature = "observability")]
80    metrics: Option<crate::observability::MetricsRegistry>,
81}
82
83impl<L1, L2> TwoLevelCacheStore<L1, L2> {
84    /// Creates a two-level cache without a custom L1 backfill TTL.
85    pub fn new(l1: L1, l2: L2) -> Self {
86        Self {
87            l1,
88            l2,
89            l1_backfill_ttl: None,
90            stats: TwoLevelCacheStats::default(),
91            #[cfg(feature = "observability")]
92            metrics: None,
93        }
94    }
95
96    /// Sets the TTL used when an L2 hit is backfilled into L1.
97    pub fn with_l1_backfill_ttl(mut self, ttl: Option<Duration>) -> Self {
98        self.l1_backfill_ttl = ttl;
99        self
100    }
101
102    /// Attaches a metrics registry to this two-level cache store.
103    #[cfg(feature = "observability")]
104    pub fn with_metrics(mut self, metrics: crate::observability::MetricsRegistry) -> Self {
105        self.metrics = Some(metrics);
106        self
107    }
108
109    /// Returns shared two-level cache counters.
110    pub fn stats(&self) -> TwoLevelCacheStats {
111        self.stats.clone()
112    }
113
114    /// Returns a reference to the L1 store.
115    pub fn l1(&self) -> &L1 {
116        &self.l1
117    }
118
119    /// Returns a reference to the L2 store.
120    pub fn l2(&self) -> &L2 {
121        &self.l2
122    }
123
124    fn record_event(&self, operation: &str, result: &str) {
125        #[cfg(feature = "observability")]
126        crate::observability::cache::record_cache_event(
127            self.metrics.as_ref(),
128            "two_level",
129            operation,
130            result,
131        );
132
133        #[cfg(not(feature = "observability"))]
134        {
135            let _ = (operation, result);
136        }
137    }
138}
139
140#[async_trait]
141impl<L1, L2> CacheStore for TwoLevelCacheStore<L1, L2>
142where
143    L1: CacheStore,
144    L2: CacheStore,
145{
146    async fn get_raw(&self, key: &CacheKey) -> CacheResult<Option<Vec<u8>>> {
147        if let Some(value) = self.l1.get_raw(key).await? {
148            self.stats.record_l1_hit().await;
149            self.record_event("get", "l1_hit");
150            return Ok(Some(value));
151        }
152        self.stats.record_l1_miss().await;
153        self.record_event("get", "l1_miss");
154
155        let Some(value) = self.l2.get_raw(key).await? else {
156            self.stats.record_l2_miss().await;
157            self.record_event("get", "l2_miss");
158            return Ok(None);
159        };
160        self.stats.record_l2_hit().await;
161        self.record_event("get", "l2_hit");
162        self.l1
163            .set_raw(key, value.clone(), self.l1_backfill_ttl)
164            .await?;
165        self.stats.record_backfill().await;
166        self.record_event("set", "backfill");
167        Ok(Some(value))
168    }
169
170    async fn set_raw(
171        &self,
172        key: &CacheKey,
173        value: Vec<u8>,
174        ttl: Option<Duration>,
175    ) -> CacheResult<()> {
176        self.l2.set_raw(key, value.clone(), ttl).await?;
177        self.l1.set_raw(key, value, ttl).await?;
178        self.record_event("set", "success");
179        Ok(())
180    }
181
182    async fn delete(&self, key: &CacheKey) -> CacheResult<()> {
183        let l1 = self.l1.delete(key).await;
184        let l2 = self.l2.delete(key).await;
185        match (l1, l2) {
186            (Ok(()), Ok(())) => {
187                self.record_event("delete", "success");
188                Ok(())
189            }
190            (Err(error), Ok(())) | (Ok(()), Err(error)) => {
191                self.record_event("delete", "error");
192                Err(error)
193            }
194            (Err(left), Err(right)) => {
195                self.record_event("delete", "error");
196                Err(CacheError::Backend(format!(
197                    "two-level cache delete failed: l1={left}; l2={right}"
198                )))
199            }
200        }
201    }
202}
203
204#[cfg(test)]
205mod tests {
206    use crate::cache::{CacheKey, CacheStore, MemoryCacheStore, TwoLevelCacheStore};
207
208    #[tokio::test]
209    async fn two_level_cache_backfills_l1_from_l2() {
210        let l1 = MemoryCacheStore::new();
211        let l2 = MemoryCacheStore::new();
212        let key = CacheKey::new("app", ["user", "1"]);
213        l2.set_raw(&key, b"ada".to_vec(), None)
214            .await
215            .expect("set l2");
216
217        let store = TwoLevelCacheStore::new(l1.clone(), l2);
218        assert_eq!(
219            store.get_raw(&key).await.expect("get"),
220            Some(b"ada".to_vec())
221        );
222        assert_eq!(l1.get_raw(&key).await.expect("l1"), Some(b"ada".to_vec()));
223
224        let snapshot = store.stats().snapshot().await;
225        assert_eq!(snapshot.l1_misses, 1);
226        assert_eq!(snapshot.l2_hits, 1);
227        assert_eq!(snapshot.backfills, 1);
228    }
229
230    #[tokio::test]
231    async fn two_level_cache_sets_and_deletes_both_levels() {
232        let l1 = MemoryCacheStore::new();
233        let l2 = MemoryCacheStore::new();
234        let store = TwoLevelCacheStore::new(l1.clone(), l2.clone());
235        let key = CacheKey::new("app", ["user", "2"]);
236
237        store
238            .set_raw(&key, b"grace".to_vec(), None)
239            .await
240            .expect("set");
241        assert_eq!(l1.get_raw(&key).await.expect("l1"), Some(b"grace".to_vec()));
242        assert_eq!(l2.get_raw(&key).await.expect("l2"), Some(b"grace".to_vec()));
243
244        store.delete(&key).await.expect("delete");
245        assert!(l1.get_raw(&key).await.expect("l1").is_none());
246        assert!(l2.get_raw(&key).await.expect("l2").is_none());
247    }
248}