Skip to main content

heliosdb_proxy/edge/
cache.rs

1//! Per-edge query result cache.
2//!
3//! Hash-keyed by `(query_fingerprint, params_hash)`. Each entry
4//! carries a monotonic `version` so an invalidation can sweep
5//! everything older than a known commit point in one pass.
6//!
7//! LRU eviction kicks in at `max_entries`. Concurrent reads share a
8//! parking_lot RwLock; concurrent writes are serialised by the same
9//! lock — fine because writes are rare on the edge (only on
10//! pull-on-miss + invalidation sweeps).
11
12use parking_lot::RwLock;
13use std::collections::{HashMap, VecDeque};
14use std::sync::atomic::{AtomicU64, Ordering};
15use std::sync::Arc;
16use std::time::{Duration, Instant};
17
18use serde::Serialize;
19
20/// One cached query result.
21#[derive(Debug, Clone)]
22pub struct CacheEntry {
23    /// Monotonic logical version. An invalidation drops every entry
24    /// whose version <= invalidation.version.
25    pub version: u64,
26    /// Pre-encoded PostgreSQL wire-protocol response bytes ready
27    /// to write back to the client.
28    pub response_bytes: Vec<u8>,
29    /// Tables the query touched, used by the home for invalidation
30    /// fan-out — empty when the home didn't supply them.
31    pub tables: Vec<String>,
32    /// Wall-clock entry expiry.
33    pub expires_at: Instant,
34}
35
36#[derive(Debug, Clone, Copy, Default, Serialize)]
37pub struct EdgeCacheStats {
38    pub hits: u64,
39    pub misses: u64,
40    pub inserts: u64,
41    pub invalidations_received: u64,
42    pub entries_evicted: u64,
43    pub current_entries: usize,
44}
45
46/// LRU + version + TTL cache. Cheap to clone via Arc.
47#[derive(Clone)]
48pub struct EdgeCache {
49    inner: Arc<EdgeCacheInner>,
50}
51
52struct EdgeCacheInner {
53    max_entries: usize,
54    map: RwLock<HashMap<CacheKey, CacheEntry>>,
55    /// LRU ordering — newest at the back, oldest at the front.
56    lru: RwLock<VecDeque<CacheKey>>,
57    next_version: AtomicU64,
58    hits: AtomicU64,
59    misses: AtomicU64,
60    inserts: AtomicU64,
61    invalidations: AtomicU64,
62    evictions: AtomicU64,
63}
64
65/// Cache key — the (fingerprint, params_hash) pair. Both sides are
66/// strings so the same key works whether params are plain text or
67/// hashed by an upstream caller.
68#[derive(Debug, Clone, PartialEq, Eq, Hash)]
69pub struct CacheKey {
70    pub fingerprint: String,
71    pub params_hash: String,
72}
73
74impl CacheKey {
75    pub fn new(fingerprint: impl Into<String>, params_hash: impl Into<String>) -> Self {
76        Self {
77            fingerprint: fingerprint.into(),
78            params_hash: params_hash.into(),
79        }
80    }
81}
82
83impl EdgeCache {
84    pub fn new(max_entries: usize) -> Self {
85        assert!(max_entries > 0, "max_entries must be > 0");
86        Self {
87            inner: Arc::new(EdgeCacheInner {
88                max_entries,
89                map: RwLock::new(HashMap::new()),
90                lru: RwLock::new(VecDeque::new()),
91                next_version: AtomicU64::new(1),
92                hits: AtomicU64::new(0),
93                misses: AtomicU64::new(0),
94                inserts: AtomicU64::new(0),
95                invalidations: AtomicU64::new(0),
96                evictions: AtomicU64::new(0),
97            }),
98        }
99    }
100
101    /// Mint a fresh logical version. Used by the home when assigning
102    /// version stamps to writes; also used by the cache itself when
103    /// inserting locally-cached entries.
104    pub fn next_version(&self) -> u64 {
105        self.inner.next_version.fetch_add(1, Ordering::Relaxed)
106    }
107
108    /// Look up a cache entry. Returns None on miss or expired TTL.
109    /// Bumps the LRU on hit; increments hit/miss counters either way.
110    pub fn get(&self, key: &CacheKey) -> Option<CacheEntry> {
111        let now = Instant::now();
112        let map = self.inner.map.read();
113        let entry = match map.get(key) {
114            Some(e) => e.clone(),
115            None => {
116                self.inner.misses.fetch_add(1, Ordering::Relaxed);
117                return None;
118            }
119        };
120        drop(map);
121
122        if entry.expires_at <= now {
123            // Lazy expiry: evict on read so we don't bloat memory
124            // with stale entries even when nothing else touches them.
125            self.inner.map.write().remove(key);
126            self.inner.lru.write().retain(|k| k != key);
127            self.inner.misses.fetch_add(1, Ordering::Relaxed);
128            return None;
129        }
130
131        // Bump LRU.
132        let mut lru = self.inner.lru.write();
133        lru.retain(|k| k != key);
134        lru.push_back(key.clone());
135
136        self.inner.hits.fetch_add(1, Ordering::Relaxed);
137        Some(entry)
138    }
139
140    /// Insert / overwrite an entry. Triggers LRU eviction if
141    /// over capacity.
142    pub fn insert(&self, key: CacheKey, entry: CacheEntry) {
143        {
144            let mut map = self.inner.map.write();
145            let mut lru = self.inner.lru.write();
146            if map.insert(key.clone(), entry).is_none() {
147                lru.push_back(key.clone());
148            } else {
149                lru.retain(|k| k != &key);
150                lru.push_back(key);
151            }
152            self.inner.inserts.fetch_add(1, Ordering::Relaxed);
153
154            // LRU eviction.
155            while map.len() > self.inner.max_entries {
156                if let Some(victim) = lru.pop_front() {
157                    map.remove(&victim);
158                    self.inner.evictions.fetch_add(1, Ordering::Relaxed);
159                } else {
160                    break;
161                }
162            }
163        }
164    }
165
166    /// Drop every entry whose version <= `up_to_version` AND whose
167    /// `tables` overlaps with `tables` (empty `tables` invalidates
168    /// every entry meeting the version bound). Returns the count
169    /// dropped.
170    pub fn invalidate(&self, up_to_version: u64, tables: &[String]) -> u64 {
171        self.inner.invalidations.fetch_add(1, Ordering::Relaxed);
172        let mut map = self.inner.map.write();
173        let mut lru = self.inner.lru.write();
174        let mut drop_keys = Vec::new();
175        for (k, e) in map.iter() {
176            if e.version > up_to_version {
177                continue;
178            }
179            if tables.is_empty() {
180                drop_keys.push(k.clone());
181                continue;
182            }
183            if e.tables.iter().any(|t| tables.contains(t)) {
184                drop_keys.push(k.clone());
185            }
186        }
187        for k in &drop_keys {
188            map.remove(k);
189            lru.retain(|x| x != k);
190        }
191        drop_keys.len() as u64
192    }
193
194    pub fn stats(&self) -> EdgeCacheStats {
195        EdgeCacheStats {
196            hits: self.inner.hits.load(Ordering::Relaxed),
197            misses: self.inner.misses.load(Ordering::Relaxed),
198            inserts: self.inner.inserts.load(Ordering::Relaxed),
199            invalidations_received: self
200                .inner
201                .invalidations
202                .load(Ordering::Relaxed),
203            entries_evicted: self.inner.evictions.load(Ordering::Relaxed),
204            current_entries: self.inner.map.read().len(),
205        }
206    }
207
208    /// Test-only: deterministic insert with explicit version + TTL.
209    pub fn insert_with(&self, key: CacheKey, entry: CacheEntry) {
210        self.insert(key, entry);
211    }
212}
213
214#[cfg(test)]
215mod tests {
216    use super::*;
217
218    fn entry(version: u64, body: &[u8], tables: &[&str], ttl: Duration) -> CacheEntry {
219        CacheEntry {
220            version,
221            response_bytes: body.to_vec(),
222            tables: tables.iter().map(|s| s.to_string()).collect(),
223            expires_at: Instant::now() + ttl,
224        }
225    }
226
227    #[test]
228    fn insert_then_get_returns_value() {
229        let c = EdgeCache::new(10);
230        let k = CacheKey::new("fp1", "p1");
231        c.insert(k.clone(), entry(1, b"row", &["users"], Duration::from_secs(60)));
232        let got = c.get(&k).expect("hit");
233        assert_eq!(got.response_bytes, b"row");
234    }
235
236    #[test]
237    fn miss_returns_none() {
238        let c = EdgeCache::new(10);
239        assert!(c.get(&CacheKey::new("fp1", "p1")).is_none());
240        assert_eq!(c.stats().misses, 1);
241    }
242
243    #[test]
244    fn expired_entry_is_dropped_on_read() {
245        let c = EdgeCache::new(10);
246        let k = CacheKey::new("fp1", "p1");
247        // Insert with a 0-duration TTL — already expired.
248        let mut e = entry(1, b"x", &[], Duration::from_secs(0));
249        e.expires_at = Instant::now() - Duration::from_millis(1);
250        c.insert(k.clone(), e);
251        assert!(c.get(&k).is_none());
252        assert_eq!(c.stats().current_entries, 0);
253    }
254
255    #[test]
256    fn lru_evicts_oldest_when_over_capacity() {
257        let c = EdgeCache::new(3);
258        for i in 0..5 {
259            let k = CacheKey::new(format!("fp{}", i), "p");
260            c.insert(k, entry(i, b"x", &[], Duration::from_secs(60)));
261        }
262        // Capacity 3, inserted 5 → 2 evictions.
263        assert_eq!(c.stats().entries_evicted, 2);
264        assert_eq!(c.stats().current_entries, 3);
265        // The two oldest (fp0, fp1) should be gone.
266        assert!(c.get(&CacheKey::new("fp0", "p")).is_none());
267        assert!(c.get(&CacheKey::new("fp1", "p")).is_none());
268        assert!(c.get(&CacheKey::new("fp4", "p")).is_some());
269    }
270
271    #[test]
272    fn lru_promotes_recently_read_entries() {
273        let c = EdgeCache::new(3);
274        for i in 0..3 {
275            let k = CacheKey::new(format!("fp{}", i), "p");
276            c.insert(k, entry(i, b"x", &[], Duration::from_secs(60)));
277        }
278        // Read fp0 — promotes it to the back of the LRU.
279        let _ = c.get(&CacheKey::new("fp0", "p"));
280        // Insert one more, should evict fp1 (now the oldest).
281        c.insert(
282            CacheKey::new("fp3", "p"),
283            entry(3, b"x", &[], Duration::from_secs(60)),
284        );
285        assert!(c.get(&CacheKey::new("fp0", "p")).is_some());
286        assert!(c.get(&CacheKey::new("fp1", "p")).is_none());
287        assert!(c.get(&CacheKey::new("fp2", "p")).is_some());
288        assert!(c.get(&CacheKey::new("fp3", "p")).is_some());
289    }
290
291    #[test]
292    fn invalidate_drops_old_versions_only() {
293        let c = EdgeCache::new(10);
294        c.insert(
295            CacheKey::new("fp1", "p"),
296            entry(5, b"v5", &["users"], Duration::from_secs(60)),
297        );
298        c.insert(
299            CacheKey::new("fp2", "p"),
300            entry(10, b"v10", &["users"], Duration::from_secs(60)),
301        );
302        let dropped = c.invalidate(7, &["users".to_string()]);
303        assert_eq!(dropped, 1);
304        assert!(c.get(&CacheKey::new("fp1", "p")).is_none());
305        assert!(c.get(&CacheKey::new("fp2", "p")).is_some());
306    }
307
308    #[test]
309    fn invalidate_filters_by_tables() {
310        let c = EdgeCache::new(10);
311        c.insert(
312            CacheKey::new("fp1", "p"),
313            entry(5, b"x", &["users"], Duration::from_secs(60)),
314        );
315        c.insert(
316            CacheKey::new("fp2", "p"),
317            entry(5, b"y", &["orders"], Duration::from_secs(60)),
318        );
319        let dropped = c.invalidate(100, &["users".to_string()]);
320        assert_eq!(dropped, 1);
321        assert!(c.get(&CacheKey::new("fp1", "p")).is_none());
322        assert!(c.get(&CacheKey::new("fp2", "p")).is_some());
323    }
324
325    #[test]
326    fn invalidate_with_no_tables_drops_everything_within_version() {
327        let c = EdgeCache::new(10);
328        c.insert(
329            CacheKey::new("fp1", "p"),
330            entry(5, b"x", &["users"], Duration::from_secs(60)),
331        );
332        c.insert(
333            CacheKey::new("fp2", "p"),
334            entry(10, b"y", &["orders"], Duration::from_secs(60)),
335        );
336        let dropped = c.invalidate(7, &[]);
337        assert_eq!(dropped, 1, "fp1 (v5) should be dropped, fp2 (v10) kept");
338    }
339
340    #[test]
341    fn next_version_is_monotonic() {
342        let c = EdgeCache::new(10);
343        let v1 = c.next_version();
344        let v2 = c.next_version();
345        let v3 = c.next_version();
346        assert!(v1 < v2 && v2 < v3);
347    }
348
349    #[test]
350    fn stats_track_hits_and_misses() {
351        let c = EdgeCache::new(10);
352        let k = CacheKey::new("fp1", "p");
353        c.insert(k.clone(), entry(1, b"x", &[], Duration::from_secs(60)));
354        let _ = c.get(&k);
355        let _ = c.get(&k);
356        let _ = c.get(&CacheKey::new("missing", "p"));
357        let s = c.stats();
358        assert_eq!(s.hits, 2);
359        assert_eq!(s.misses, 1);
360        assert_eq!(s.inserts, 1);
361    }
362
363    #[test]
364    fn panics_on_zero_capacity() {
365        let res = std::panic::catch_unwind(|| EdgeCache::new(0));
366        assert!(res.is_err());
367    }
368}