1use parking_lot::RwLock;
13use std::collections::{HashMap, VecDeque};
14use std::sync::atomic::{AtomicU64, Ordering};
15use std::sync::Arc;
16use std::time::Instant;
17
18use serde::Serialize;
19
20#[derive(Debug, Clone)]
22pub struct CacheEntry {
23 pub version: u64,
26 pub response_bytes: Vec<u8>,
29 pub tables: Vec<String>,
32 pub expires_at: Instant,
34}
35
36#[derive(Debug, Clone, Copy, Default, Serialize)]
37pub struct EdgeCacheStats {
38 pub hits: u64,
39 pub misses: u64,
40 pub inserts: u64,
41 pub invalidations_received: u64,
42 pub entries_evicted: u64,
43 pub current_entries: usize,
44}
45
46#[derive(Clone)]
48pub struct EdgeCache {
49 inner: Arc<EdgeCacheInner>,
50}
51
52struct EdgeCacheInner {
53 max_entries: usize,
54 map: RwLock<HashMap<CacheKey, CacheEntry>>,
55 lru: RwLock<VecDeque<CacheKey>>,
57 next_version: AtomicU64,
58 hits: AtomicU64,
59 misses: AtomicU64,
60 inserts: AtomicU64,
61 invalidations: AtomicU64,
62 evictions: AtomicU64,
63}
64
65#[derive(Debug, Clone, PartialEq, Eq, Hash)]
69pub struct CacheKey {
70 pub fingerprint: String,
71 pub params_hash: String,
72}
73
74impl CacheKey {
75 pub fn new(fingerprint: impl Into<String>, params_hash: impl Into<String>) -> Self {
76 Self {
77 fingerprint: fingerprint.into(),
78 params_hash: params_hash.into(),
79 }
80 }
81}
82
83impl EdgeCache {
84 pub fn new(max_entries: usize) -> Self {
85 assert!(max_entries > 0, "max_entries must be > 0");
86 Self {
87 inner: Arc::new(EdgeCacheInner {
88 max_entries,
89 map: RwLock::new(HashMap::new()),
90 lru: RwLock::new(VecDeque::new()),
91 next_version: AtomicU64::new(1),
92 hits: AtomicU64::new(0),
93 misses: AtomicU64::new(0),
94 inserts: AtomicU64::new(0),
95 invalidations: AtomicU64::new(0),
96 evictions: AtomicU64::new(0),
97 }),
98 }
99 }
100
101 pub fn next_version(&self) -> u64 {
105 self.inner.next_version.fetch_add(1, Ordering::Relaxed)
106 }
107
108 pub fn get(&self, key: &CacheKey) -> Option<CacheEntry> {
111 let now = Instant::now();
112 let map = self.inner.map.read();
113 let entry = match map.get(key) {
114 Some(e) => e.clone(),
115 None => {
116 self.inner.misses.fetch_add(1, Ordering::Relaxed);
117 return None;
118 }
119 };
120 drop(map);
121
122 if entry.expires_at <= now {
123 self.inner.map.write().remove(key);
126 self.inner.lru.write().retain(|k| k != key);
127 self.inner.misses.fetch_add(1, Ordering::Relaxed);
128 return None;
129 }
130
131 let mut lru = self.inner.lru.write();
133 lru.retain(|k| k != key);
134 lru.push_back(key.clone());
135
136 self.inner.hits.fetch_add(1, Ordering::Relaxed);
137 Some(entry)
138 }
139
140 pub fn insert(&self, key: CacheKey, entry: CacheEntry) {
143 {
144 let mut map = self.inner.map.write();
145 let mut lru = self.inner.lru.write();
146 if map.insert(key.clone(), entry).is_none() {
147 lru.push_back(key.clone());
148 } else {
149 lru.retain(|k| k != &key);
150 lru.push_back(key);
151 }
152 self.inner.inserts.fetch_add(1, Ordering::Relaxed);
153
154 while map.len() > self.inner.max_entries {
156 if let Some(victim) = lru.pop_front() {
157 map.remove(&victim);
158 self.inner.evictions.fetch_add(1, Ordering::Relaxed);
159 } else {
160 break;
161 }
162 }
163 }
164 }
165
166 pub fn invalidate(&self, up_to_version: u64, tables: &[String]) -> u64 {
171 self.inner.invalidations.fetch_add(1, Ordering::Relaxed);
172 let mut map = self.inner.map.write();
173 let mut lru = self.inner.lru.write();
174 let mut drop_keys = Vec::new();
175 for (k, e) in map.iter() {
176 if e.version > up_to_version {
177 continue;
178 }
179 if tables.is_empty() {
180 drop_keys.push(k.clone());
181 continue;
182 }
183 if e.tables.iter().any(|t| tables.contains(t)) {
184 drop_keys.push(k.clone());
185 }
186 }
187 for k in &drop_keys {
188 map.remove(k);
189 lru.retain(|x| x != k);
190 }
191 drop_keys.len() as u64
192 }
193
194 pub fn stats(&self) -> EdgeCacheStats {
195 EdgeCacheStats {
196 hits: self.inner.hits.load(Ordering::Relaxed),
197 misses: self.inner.misses.load(Ordering::Relaxed),
198 inserts: self.inner.inserts.load(Ordering::Relaxed),
199 invalidations_received: self.inner.invalidations.load(Ordering::Relaxed),
200 entries_evicted: self.inner.evictions.load(Ordering::Relaxed),
201 current_entries: self.inner.map.read().len(),
202 }
203 }
204
205 pub fn insert_with(&self, key: CacheKey, entry: CacheEntry) {
207 self.insert(key, entry);
208 }
209}
210
211#[cfg(test)]
212mod tests {
213 use super::*;
214 use std::time::Duration;
215
216 fn entry(version: u64, body: &[u8], tables: &[&str], ttl: Duration) -> CacheEntry {
217 CacheEntry {
218 version,
219 response_bytes: body.to_vec(),
220 tables: tables.iter().map(|s| s.to_string()).collect(),
221 expires_at: Instant::now() + ttl,
222 }
223 }
224
225 #[test]
226 fn insert_then_get_returns_value() {
227 let c = EdgeCache::new(10);
228 let k = CacheKey::new("fp1", "p1");
229 c.insert(
230 k.clone(),
231 entry(1, b"row", &["users"], Duration::from_secs(60)),
232 );
233 let got = c.get(&k).expect("hit");
234 assert_eq!(got.response_bytes, b"row");
235 }
236
237 #[test]
238 fn miss_returns_none() {
239 let c = EdgeCache::new(10);
240 assert!(c.get(&CacheKey::new("fp1", "p1")).is_none());
241 assert_eq!(c.stats().misses, 1);
242 }
243
244 #[test]
245 fn expired_entry_is_dropped_on_read() {
246 let c = EdgeCache::new(10);
247 let k = CacheKey::new("fp1", "p1");
248 let mut e = entry(1, b"x", &[], Duration::from_secs(0));
250 e.expires_at = Instant::now() - Duration::from_millis(1);
251 c.insert(k.clone(), e);
252 assert!(c.get(&k).is_none());
253 assert_eq!(c.stats().current_entries, 0);
254 }
255
256 #[test]
257 fn lru_evicts_oldest_when_over_capacity() {
258 let c = EdgeCache::new(3);
259 for i in 0..5 {
260 let k = CacheKey::new(format!("fp{}", i), "p");
261 c.insert(k, entry(i, b"x", &[], Duration::from_secs(60)));
262 }
263 assert_eq!(c.stats().entries_evicted, 2);
265 assert_eq!(c.stats().current_entries, 3);
266 assert!(c.get(&CacheKey::new("fp0", "p")).is_none());
268 assert!(c.get(&CacheKey::new("fp1", "p")).is_none());
269 assert!(c.get(&CacheKey::new("fp4", "p")).is_some());
270 }
271
272 #[test]
273 fn lru_promotes_recently_read_entries() {
274 let c = EdgeCache::new(3);
275 for i in 0..3 {
276 let k = CacheKey::new(format!("fp{}", i), "p");
277 c.insert(k, entry(i, b"x", &[], Duration::from_secs(60)));
278 }
279 let _ = c.get(&CacheKey::new("fp0", "p"));
281 c.insert(
283 CacheKey::new("fp3", "p"),
284 entry(3, b"x", &[], Duration::from_secs(60)),
285 );
286 assert!(c.get(&CacheKey::new("fp0", "p")).is_some());
287 assert!(c.get(&CacheKey::new("fp1", "p")).is_none());
288 assert!(c.get(&CacheKey::new("fp2", "p")).is_some());
289 assert!(c.get(&CacheKey::new("fp3", "p")).is_some());
290 }
291
292 #[test]
293 fn invalidate_drops_old_versions_only() {
294 let c = EdgeCache::new(10);
295 c.insert(
296 CacheKey::new("fp1", "p"),
297 entry(5, b"v5", &["users"], Duration::from_secs(60)),
298 );
299 c.insert(
300 CacheKey::new("fp2", "p"),
301 entry(10, b"v10", &["users"], Duration::from_secs(60)),
302 );
303 let dropped = c.invalidate(7, &["users".to_string()]);
304 assert_eq!(dropped, 1);
305 assert!(c.get(&CacheKey::new("fp1", "p")).is_none());
306 assert!(c.get(&CacheKey::new("fp2", "p")).is_some());
307 }
308
309 #[test]
310 fn invalidate_filters_by_tables() {
311 let c = EdgeCache::new(10);
312 c.insert(
313 CacheKey::new("fp1", "p"),
314 entry(5, b"x", &["users"], Duration::from_secs(60)),
315 );
316 c.insert(
317 CacheKey::new("fp2", "p"),
318 entry(5, b"y", &["orders"], Duration::from_secs(60)),
319 );
320 let dropped = c.invalidate(100, &["users".to_string()]);
321 assert_eq!(dropped, 1);
322 assert!(c.get(&CacheKey::new("fp1", "p")).is_none());
323 assert!(c.get(&CacheKey::new("fp2", "p")).is_some());
324 }
325
326 #[test]
327 fn invalidate_with_no_tables_drops_everything_within_version() {
328 let c = EdgeCache::new(10);
329 c.insert(
330 CacheKey::new("fp1", "p"),
331 entry(5, b"x", &["users"], Duration::from_secs(60)),
332 );
333 c.insert(
334 CacheKey::new("fp2", "p"),
335 entry(10, b"y", &["orders"], Duration::from_secs(60)),
336 );
337 let dropped = c.invalidate(7, &[]);
338 assert_eq!(dropped, 1, "fp1 (v5) should be dropped, fp2 (v10) kept");
339 }
340
341 #[test]
342 fn next_version_is_monotonic() {
343 let c = EdgeCache::new(10);
344 let v1 = c.next_version();
345 let v2 = c.next_version();
346 let v3 = c.next_version();
347 assert!(v1 < v2 && v2 < v3);
348 }
349
350 #[test]
351 fn stats_track_hits_and_misses() {
352 let c = EdgeCache::new(10);
353 let k = CacheKey::new("fp1", "p");
354 c.insert(k.clone(), entry(1, b"x", &[], Duration::from_secs(60)));
355 let _ = c.get(&k);
356 let _ = c.get(&k);
357 let _ = c.get(&CacheKey::new("missing", "p"));
358 let s = c.stats();
359 assert_eq!(s.hits, 2);
360 assert_eq!(s.misses, 1);
361 assert_eq!(s.inserts, 1);
362 }
363
364 #[test]
365 fn panics_on_zero_capacity() {
366 let res = std::panic::catch_unwind(|| EdgeCache::new(0));
367 assert!(res.is_err());
368 }
369}