1use parking_lot::RwLock;
13use std::collections::{HashMap, VecDeque};
14use std::sync::atomic::{AtomicU64, Ordering};
15use std::sync::Arc;
16use std::time::{Duration, Instant};
17
18use serde::Serialize;
19
20#[derive(Debug, Clone)]
22pub struct CacheEntry {
23 pub version: u64,
26 pub response_bytes: Vec<u8>,
29 pub tables: Vec<String>,
32 pub expires_at: Instant,
34}
35
36#[derive(Debug, Clone, Copy, Default, Serialize)]
37pub struct EdgeCacheStats {
38 pub hits: u64,
39 pub misses: u64,
40 pub inserts: u64,
41 pub invalidations_received: u64,
42 pub entries_evicted: u64,
43 pub current_entries: usize,
44}
45
46#[derive(Clone)]
48pub struct EdgeCache {
49 inner: Arc<EdgeCacheInner>,
50}
51
52struct EdgeCacheInner {
53 max_entries: usize,
54 map: RwLock<HashMap<CacheKey, CacheEntry>>,
55 lru: RwLock<VecDeque<CacheKey>>,
57 next_version: AtomicU64,
58 hits: AtomicU64,
59 misses: AtomicU64,
60 inserts: AtomicU64,
61 invalidations: AtomicU64,
62 evictions: AtomicU64,
63}
64
65#[derive(Debug, Clone, PartialEq, Eq, Hash)]
69pub struct CacheKey {
70 pub fingerprint: String,
71 pub params_hash: String,
72}
73
74impl CacheKey {
75 pub fn new(fingerprint: impl Into<String>, params_hash: impl Into<String>) -> Self {
76 Self {
77 fingerprint: fingerprint.into(),
78 params_hash: params_hash.into(),
79 }
80 }
81}
82
83impl EdgeCache {
84 pub fn new(max_entries: usize) -> Self {
85 assert!(max_entries > 0, "max_entries must be > 0");
86 Self {
87 inner: Arc::new(EdgeCacheInner {
88 max_entries,
89 map: RwLock::new(HashMap::new()),
90 lru: RwLock::new(VecDeque::new()),
91 next_version: AtomicU64::new(1),
92 hits: AtomicU64::new(0),
93 misses: AtomicU64::new(0),
94 inserts: AtomicU64::new(0),
95 invalidations: AtomicU64::new(0),
96 evictions: AtomicU64::new(0),
97 }),
98 }
99 }
100
101 pub fn next_version(&self) -> u64 {
105 self.inner.next_version.fetch_add(1, Ordering::Relaxed)
106 }
107
108 pub fn get(&self, key: &CacheKey) -> Option<CacheEntry> {
111 let now = Instant::now();
112 let map = self.inner.map.read();
113 let entry = match map.get(key) {
114 Some(e) => e.clone(),
115 None => {
116 self.inner.misses.fetch_add(1, Ordering::Relaxed);
117 return None;
118 }
119 };
120 drop(map);
121
122 if entry.expires_at <= now {
123 self.inner.map.write().remove(key);
126 self.inner.lru.write().retain(|k| k != key);
127 self.inner.misses.fetch_add(1, Ordering::Relaxed);
128 return None;
129 }
130
131 let mut lru = self.inner.lru.write();
133 lru.retain(|k| k != key);
134 lru.push_back(key.clone());
135
136 self.inner.hits.fetch_add(1, Ordering::Relaxed);
137 Some(entry)
138 }
139
140 pub fn insert(&self, key: CacheKey, entry: CacheEntry) {
143 {
144 let mut map = self.inner.map.write();
145 let mut lru = self.inner.lru.write();
146 if map.insert(key.clone(), entry).is_none() {
147 lru.push_back(key.clone());
148 } else {
149 lru.retain(|k| k != &key);
150 lru.push_back(key);
151 }
152 self.inner.inserts.fetch_add(1, Ordering::Relaxed);
153
154 while map.len() > self.inner.max_entries {
156 if let Some(victim) = lru.pop_front() {
157 map.remove(&victim);
158 self.inner.evictions.fetch_add(1, Ordering::Relaxed);
159 } else {
160 break;
161 }
162 }
163 }
164 }
165
166 pub fn invalidate(&self, up_to_version: u64, tables: &[String]) -> u64 {
171 self.inner.invalidations.fetch_add(1, Ordering::Relaxed);
172 let mut map = self.inner.map.write();
173 let mut lru = self.inner.lru.write();
174 let mut drop_keys = Vec::new();
175 for (k, e) in map.iter() {
176 if e.version > up_to_version {
177 continue;
178 }
179 if tables.is_empty() {
180 drop_keys.push(k.clone());
181 continue;
182 }
183 if e.tables.iter().any(|t| tables.contains(t)) {
184 drop_keys.push(k.clone());
185 }
186 }
187 for k in &drop_keys {
188 map.remove(k);
189 lru.retain(|x| x != k);
190 }
191 drop_keys.len() as u64
192 }
193
194 pub fn stats(&self) -> EdgeCacheStats {
195 EdgeCacheStats {
196 hits: self.inner.hits.load(Ordering::Relaxed),
197 misses: self.inner.misses.load(Ordering::Relaxed),
198 inserts: self.inner.inserts.load(Ordering::Relaxed),
199 invalidations_received: self
200 .inner
201 .invalidations
202 .load(Ordering::Relaxed),
203 entries_evicted: self.inner.evictions.load(Ordering::Relaxed),
204 current_entries: self.inner.map.read().len(),
205 }
206 }
207
208 pub fn insert_with(&self, key: CacheKey, entry: CacheEntry) {
210 self.insert(key, entry);
211 }
212}
213
214#[cfg(test)]
215mod tests {
216 use super::*;
217
218 fn entry(version: u64, body: &[u8], tables: &[&str], ttl: Duration) -> CacheEntry {
219 CacheEntry {
220 version,
221 response_bytes: body.to_vec(),
222 tables: tables.iter().map(|s| s.to_string()).collect(),
223 expires_at: Instant::now() + ttl,
224 }
225 }
226
227 #[test]
228 fn insert_then_get_returns_value() {
229 let c = EdgeCache::new(10);
230 let k = CacheKey::new("fp1", "p1");
231 c.insert(k.clone(), entry(1, b"row", &["users"], Duration::from_secs(60)));
232 let got = c.get(&k).expect("hit");
233 assert_eq!(got.response_bytes, b"row");
234 }
235
236 #[test]
237 fn miss_returns_none() {
238 let c = EdgeCache::new(10);
239 assert!(c.get(&CacheKey::new("fp1", "p1")).is_none());
240 assert_eq!(c.stats().misses, 1);
241 }
242
243 #[test]
244 fn expired_entry_is_dropped_on_read() {
245 let c = EdgeCache::new(10);
246 let k = CacheKey::new("fp1", "p1");
247 let mut e = entry(1, b"x", &[], Duration::from_secs(0));
249 e.expires_at = Instant::now() - Duration::from_millis(1);
250 c.insert(k.clone(), e);
251 assert!(c.get(&k).is_none());
252 assert_eq!(c.stats().current_entries, 0);
253 }
254
255 #[test]
256 fn lru_evicts_oldest_when_over_capacity() {
257 let c = EdgeCache::new(3);
258 for i in 0..5 {
259 let k = CacheKey::new(format!("fp{}", i), "p");
260 c.insert(k, entry(i, b"x", &[], Duration::from_secs(60)));
261 }
262 assert_eq!(c.stats().entries_evicted, 2);
264 assert_eq!(c.stats().current_entries, 3);
265 assert!(c.get(&CacheKey::new("fp0", "p")).is_none());
267 assert!(c.get(&CacheKey::new("fp1", "p")).is_none());
268 assert!(c.get(&CacheKey::new("fp4", "p")).is_some());
269 }
270
271 #[test]
272 fn lru_promotes_recently_read_entries() {
273 let c = EdgeCache::new(3);
274 for i in 0..3 {
275 let k = CacheKey::new(format!("fp{}", i), "p");
276 c.insert(k, entry(i, b"x", &[], Duration::from_secs(60)));
277 }
278 let _ = c.get(&CacheKey::new("fp0", "p"));
280 c.insert(
282 CacheKey::new("fp3", "p"),
283 entry(3, b"x", &[], Duration::from_secs(60)),
284 );
285 assert!(c.get(&CacheKey::new("fp0", "p")).is_some());
286 assert!(c.get(&CacheKey::new("fp1", "p")).is_none());
287 assert!(c.get(&CacheKey::new("fp2", "p")).is_some());
288 assert!(c.get(&CacheKey::new("fp3", "p")).is_some());
289 }
290
291 #[test]
292 fn invalidate_drops_old_versions_only() {
293 let c = EdgeCache::new(10);
294 c.insert(
295 CacheKey::new("fp1", "p"),
296 entry(5, b"v5", &["users"], Duration::from_secs(60)),
297 );
298 c.insert(
299 CacheKey::new("fp2", "p"),
300 entry(10, b"v10", &["users"], Duration::from_secs(60)),
301 );
302 let dropped = c.invalidate(7, &["users".to_string()]);
303 assert_eq!(dropped, 1);
304 assert!(c.get(&CacheKey::new("fp1", "p")).is_none());
305 assert!(c.get(&CacheKey::new("fp2", "p")).is_some());
306 }
307
308 #[test]
309 fn invalidate_filters_by_tables() {
310 let c = EdgeCache::new(10);
311 c.insert(
312 CacheKey::new("fp1", "p"),
313 entry(5, b"x", &["users"], Duration::from_secs(60)),
314 );
315 c.insert(
316 CacheKey::new("fp2", "p"),
317 entry(5, b"y", &["orders"], Duration::from_secs(60)),
318 );
319 let dropped = c.invalidate(100, &["users".to_string()]);
320 assert_eq!(dropped, 1);
321 assert!(c.get(&CacheKey::new("fp1", "p")).is_none());
322 assert!(c.get(&CacheKey::new("fp2", "p")).is_some());
323 }
324
325 #[test]
326 fn invalidate_with_no_tables_drops_everything_within_version() {
327 let c = EdgeCache::new(10);
328 c.insert(
329 CacheKey::new("fp1", "p"),
330 entry(5, b"x", &["users"], Duration::from_secs(60)),
331 );
332 c.insert(
333 CacheKey::new("fp2", "p"),
334 entry(10, b"y", &["orders"], Duration::from_secs(60)),
335 );
336 let dropped = c.invalidate(7, &[]);
337 assert_eq!(dropped, 1, "fp1 (v5) should be dropped, fp2 (v10) kept");
338 }
339
340 #[test]
341 fn next_version_is_monotonic() {
342 let c = EdgeCache::new(10);
343 let v1 = c.next_version();
344 let v2 = c.next_version();
345 let v3 = c.next_version();
346 assert!(v1 < v2 && v2 < v3);
347 }
348
349 #[test]
350 fn stats_track_hits_and_misses() {
351 let c = EdgeCache::new(10);
352 let k = CacheKey::new("fp1", "p");
353 c.insert(k.clone(), entry(1, b"x", &[], Duration::from_secs(60)));
354 let _ = c.get(&k);
355 let _ = c.get(&k);
356 let _ = c.get(&CacheKey::new("missing", "p"));
357 let s = c.stats();
358 assert_eq!(s.hits, 2);
359 assert_eq!(s.misses, 1);
360 assert_eq!(s.inserts, 1);
361 }
362
363 #[test]
364 fn panics_on_zero_capacity() {
365 let res = std::panic::catch_unwind(|| EdgeCache::new(0));
366 assert!(res.is_err());
367 }
368}