1use std::{sync::Arc, time::Duration};
2
3use async_trait::async_trait;
4use tokio::sync::Mutex;
5
6use crate::cache::{CacheError, CacheKey, CacheResult, CacheStore};
7
8#[derive(Debug, Default)]
9struct StatsInner {
10 l1_hits: u64,
11 l1_misses: u64,
12 l2_hits: u64,
13 l2_misses: u64,
14 backfills: u64,
15}
16
17#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
19pub struct TwoLevelCacheSnapshot {
20 pub l1_hits: u64,
22 pub l1_misses: u64,
24 pub l2_hits: u64,
26 pub l2_misses: u64,
28 pub backfills: u64,
30}
31
32#[derive(Debug, Clone, Default)]
34pub struct TwoLevelCacheStats {
35 inner: Arc<Mutex<StatsInner>>,
36}
37
38impl TwoLevelCacheStats {
39 async fn record_l1_hit(&self) {
40 self.inner.lock().await.l1_hits += 1;
41 }
42
43 async fn record_l1_miss(&self) {
44 self.inner.lock().await.l1_misses += 1;
45 }
46
47 async fn record_l2_hit(&self) {
48 self.inner.lock().await.l2_hits += 1;
49 }
50
51 async fn record_l2_miss(&self) {
52 self.inner.lock().await.l2_misses += 1;
53 }
54
55 async fn record_backfill(&self) {
56 self.inner.lock().await.backfills += 1;
57 }
58
59 pub async fn snapshot(&self) -> TwoLevelCacheSnapshot {
61 let stats = self.inner.lock().await;
62 TwoLevelCacheSnapshot {
63 l1_hits: stats.l1_hits,
64 l1_misses: stats.l1_misses,
65 l2_hits: stats.l2_hits,
66 l2_misses: stats.l2_misses,
67 backfills: stats.backfills,
68 }
69 }
70}
71
72#[derive(Debug, Clone)]
74pub struct TwoLevelCacheStore<L1, L2> {
75 l1: L1,
76 l2: L2,
77 l1_backfill_ttl: Option<Duration>,
78 stats: TwoLevelCacheStats,
79 #[cfg(feature = "observability")]
80 metrics: Option<crate::observability::MetricsRegistry>,
81}
82
83impl<L1, L2> TwoLevelCacheStore<L1, L2> {
84 pub fn new(l1: L1, l2: L2) -> Self {
86 Self {
87 l1,
88 l2,
89 l1_backfill_ttl: None,
90 stats: TwoLevelCacheStats::default(),
91 #[cfg(feature = "observability")]
92 metrics: None,
93 }
94 }
95
96 pub fn with_l1_backfill_ttl(mut self, ttl: Option<Duration>) -> Self {
98 self.l1_backfill_ttl = ttl;
99 self
100 }
101
102 #[cfg(feature = "observability")]
104 pub fn with_metrics(mut self, metrics: crate::observability::MetricsRegistry) -> Self {
105 self.metrics = Some(metrics);
106 self
107 }
108
109 pub fn stats(&self) -> TwoLevelCacheStats {
111 self.stats.clone()
112 }
113
114 pub fn l1(&self) -> &L1 {
116 &self.l1
117 }
118
119 pub fn l2(&self) -> &L2 {
121 &self.l2
122 }
123
124 fn record_event(&self, operation: &str, result: &str) {
125 #[cfg(feature = "observability")]
126 crate::observability::cache::record_cache_event(
127 self.metrics.as_ref(),
128 "two_level",
129 operation,
130 result,
131 );
132
133 #[cfg(not(feature = "observability"))]
134 {
135 let _ = (operation, result);
136 }
137 }
138}
139
140#[async_trait]
141impl<L1, L2> CacheStore for TwoLevelCacheStore<L1, L2>
142where
143 L1: CacheStore,
144 L2: CacheStore,
145{
146 async fn get_raw(&self, key: &CacheKey) -> CacheResult<Option<Vec<u8>>> {
147 if let Some(value) = self.l1.get_raw(key).await? {
148 self.stats.record_l1_hit().await;
149 self.record_event("get", "l1_hit");
150 return Ok(Some(value));
151 }
152 self.stats.record_l1_miss().await;
153 self.record_event("get", "l1_miss");
154
155 let Some(value) = self.l2.get_raw(key).await? else {
156 self.stats.record_l2_miss().await;
157 self.record_event("get", "l2_miss");
158 return Ok(None);
159 };
160 self.stats.record_l2_hit().await;
161 self.record_event("get", "l2_hit");
162 self.l1
163 .set_raw(key, value.clone(), self.l1_backfill_ttl)
164 .await?;
165 self.stats.record_backfill().await;
166 self.record_event("set", "backfill");
167 Ok(Some(value))
168 }
169
170 async fn set_raw(
171 &self,
172 key: &CacheKey,
173 value: Vec<u8>,
174 ttl: Option<Duration>,
175 ) -> CacheResult<()> {
176 self.l2.set_raw(key, value.clone(), ttl).await?;
177 self.l1.set_raw(key, value, ttl).await?;
178 self.record_event("set", "success");
179 Ok(())
180 }
181
182 async fn delete(&self, key: &CacheKey) -> CacheResult<()> {
183 let l1 = self.l1.delete(key).await;
184 let l2 = self.l2.delete(key).await;
185 match (l1, l2) {
186 (Ok(()), Ok(())) => {
187 self.record_event("delete", "success");
188 Ok(())
189 }
190 (Err(error), Ok(())) | (Ok(()), Err(error)) => {
191 self.record_event("delete", "error");
192 Err(error)
193 }
194 (Err(left), Err(right)) => {
195 self.record_event("delete", "error");
196 Err(CacheError::Backend(format!(
197 "two-level cache delete failed: l1={left}; l2={right}"
198 )))
199 }
200 }
201 }
202}
203
204#[cfg(test)]
205mod tests {
206 use crate::cache::{CacheKey, CacheStore, MemoryCacheStore, TwoLevelCacheStore};
207
208 #[tokio::test]
209 async fn two_level_cache_backfills_l1_from_l2() {
210 let l1 = MemoryCacheStore::new();
211 let l2 = MemoryCacheStore::new();
212 let key = CacheKey::new("app", ["user", "1"]);
213 l2.set_raw(&key, b"ada".to_vec(), None)
214 .await
215 .expect("set l2");
216
217 let store = TwoLevelCacheStore::new(l1.clone(), l2);
218 assert_eq!(
219 store.get_raw(&key).await.expect("get"),
220 Some(b"ada".to_vec())
221 );
222 assert_eq!(l1.get_raw(&key).await.expect("l1"), Some(b"ada".to_vec()));
223
224 let snapshot = store.stats().snapshot().await;
225 assert_eq!(snapshot.l1_misses, 1);
226 assert_eq!(snapshot.l2_hits, 1);
227 assert_eq!(snapshot.backfills, 1);
228 }
229
230 #[tokio::test]
231 async fn two_level_cache_sets_and_deletes_both_levels() {
232 let l1 = MemoryCacheStore::new();
233 let l2 = MemoryCacheStore::new();
234 let store = TwoLevelCacheStore::new(l1.clone(), l2.clone());
235 let key = CacheKey::new("app", ["user", "2"]);
236
237 store
238 .set_raw(&key, b"grace".to_vec(), None)
239 .await
240 .expect("set");
241 assert_eq!(l1.get_raw(&key).await.expect("l1"), Some(b"grace".to_vec()));
242 assert_eq!(l2.get_raw(&key).await.expect("l2"), Some(b"grace".to_vec()));
243
244 store.delete(&key).await.expect("delete");
245 assert!(l1.get_raw(&key).await.expect("l1").is_none());
246 assert!(l2.get_raw(&key).await.expect("l2").is_none());
247 }
248}