Skip to main content

heliosdb_proxy/edge/
cache.rs

1//! Per-edge query result cache.
2//!
3//! Hash-keyed by `(query_fingerprint, params_hash)`. Each entry
4//! carries a monotonic `version` so an invalidation can sweep
5//! everything older than a known commit point in one pass.
6//!
7//! LRU eviction kicks in at `max_entries`. Concurrent reads share a
8//! parking_lot RwLock; concurrent writes are serialised by the same
9//! lock — fine because writes are rare on the edge (only on
10//! pull-on-miss + invalidation sweeps).
11
12use parking_lot::RwLock;
13use std::collections::{HashMap, VecDeque};
14use std::sync::atomic::{AtomicU64, Ordering};
15use std::sync::Arc;
16use std::time::Instant;
17
18use serde::Serialize;
19
20/// One cached query result.
21#[derive(Debug, Clone)]
22pub struct CacheEntry {
23    /// Monotonic logical version. An invalidation drops every entry
24    /// whose version <= invalidation.version.
25    pub version: u64,
26    /// Pre-encoded PostgreSQL wire-protocol response bytes ready
27    /// to write back to the client.
28    pub response_bytes: Vec<u8>,
29    /// Tables the query touched, used by the home for invalidation
30    /// fan-out — empty when the home didn't supply them.
31    pub tables: Vec<String>,
32    /// Wall-clock entry expiry.
33    pub expires_at: Instant,
34}
35
36#[derive(Debug, Clone, Copy, Default, Serialize)]
37pub struct EdgeCacheStats {
38    pub hits: u64,
39    pub misses: u64,
40    pub inserts: u64,
41    pub invalidations_received: u64,
42    pub entries_evicted: u64,
43    pub current_entries: usize,
44}
45
46/// LRU + version + TTL cache. Cheap to clone via Arc.
47#[derive(Clone)]
48pub struct EdgeCache {
49    inner: Arc<EdgeCacheInner>,
50}
51
52struct EdgeCacheInner {
53    max_entries: usize,
54    map: RwLock<HashMap<CacheKey, CacheEntry>>,
55    /// LRU ordering — newest at the back, oldest at the front.
56    lru: RwLock<VecDeque<CacheKey>>,
57    next_version: AtomicU64,
58    hits: AtomicU64,
59    misses: AtomicU64,
60    inserts: AtomicU64,
61    invalidations: AtomicU64,
62    evictions: AtomicU64,
63}
64
65/// Cache key — the (fingerprint, params_hash) pair. Both sides are
66/// strings so the same key works whether params are plain text or
67/// hashed by an upstream caller.
68#[derive(Debug, Clone, PartialEq, Eq, Hash)]
69pub struct CacheKey {
70    pub fingerprint: String,
71    pub params_hash: String,
72}
73
74impl CacheKey {
75    pub fn new(fingerprint: impl Into<String>, params_hash: impl Into<String>) -> Self {
76        Self {
77            fingerprint: fingerprint.into(),
78            params_hash: params_hash.into(),
79        }
80    }
81}
82
83impl EdgeCache {
84    pub fn new(max_entries: usize) -> Self {
85        assert!(max_entries > 0, "max_entries must be > 0");
86        Self {
87            inner: Arc::new(EdgeCacheInner {
88                max_entries,
89                map: RwLock::new(HashMap::new()),
90                lru: RwLock::new(VecDeque::new()),
91                next_version: AtomicU64::new(1),
92                hits: AtomicU64::new(0),
93                misses: AtomicU64::new(0),
94                inserts: AtomicU64::new(0),
95                invalidations: AtomicU64::new(0),
96                evictions: AtomicU64::new(0),
97            }),
98        }
99    }
100
101    /// Mint a fresh logical version. Used by the home when assigning
102    /// version stamps to writes; also used by the cache itself when
103    /// inserting locally-cached entries.
104    pub fn next_version(&self) -> u64 {
105        self.inner.next_version.fetch_add(1, Ordering::Relaxed)
106    }
107
108    /// Look up a cache entry. Returns None on miss or expired TTL.
109    /// Bumps the LRU on hit; increments hit/miss counters either way.
110    pub fn get(&self, key: &CacheKey) -> Option<CacheEntry> {
111        let now = Instant::now();
112        let map = self.inner.map.read();
113        let entry = match map.get(key) {
114            Some(e) => e.clone(),
115            None => {
116                self.inner.misses.fetch_add(1, Ordering::Relaxed);
117                return None;
118            }
119        };
120        drop(map);
121
122        if entry.expires_at <= now {
123            // Lazy expiry: evict on read so we don't bloat memory
124            // with stale entries even when nothing else touches them.
125            self.inner.map.write().remove(key);
126            self.inner.lru.write().retain(|k| k != key);
127            self.inner.misses.fetch_add(1, Ordering::Relaxed);
128            return None;
129        }
130
131        // Bump LRU.
132        let mut lru = self.inner.lru.write();
133        lru.retain(|k| k != key);
134        lru.push_back(key.clone());
135
136        self.inner.hits.fetch_add(1, Ordering::Relaxed);
137        Some(entry)
138    }
139
140    /// Insert / overwrite an entry. Triggers LRU eviction if
141    /// over capacity.
142    pub fn insert(&self, key: CacheKey, entry: CacheEntry) {
143        {
144            let mut map = self.inner.map.write();
145            let mut lru = self.inner.lru.write();
146            if map.insert(key.clone(), entry).is_none() {
147                lru.push_back(key.clone());
148            } else {
149                lru.retain(|k| k != &key);
150                lru.push_back(key);
151            }
152            self.inner.inserts.fetch_add(1, Ordering::Relaxed);
153
154            // LRU eviction.
155            while map.len() > self.inner.max_entries {
156                if let Some(victim) = lru.pop_front() {
157                    map.remove(&victim);
158                    self.inner.evictions.fetch_add(1, Ordering::Relaxed);
159                } else {
160                    break;
161                }
162            }
163        }
164    }
165
166    /// Drop every entry whose version <= `up_to_version` AND whose
167    /// `tables` overlaps with `tables` (empty `tables` invalidates
168    /// every entry meeting the version bound). Returns the count
169    /// dropped.
170    pub fn invalidate(&self, up_to_version: u64, tables: &[String]) -> u64 {
171        self.inner.invalidations.fetch_add(1, Ordering::Relaxed);
172        let mut map = self.inner.map.write();
173        let mut lru = self.inner.lru.write();
174        let mut drop_keys = Vec::new();
175        for (k, e) in map.iter() {
176            if e.version > up_to_version {
177                continue;
178            }
179            if tables.is_empty() {
180                drop_keys.push(k.clone());
181                continue;
182            }
183            if e.tables.iter().any(|t| tables.contains(t)) {
184                drop_keys.push(k.clone());
185            }
186        }
187        for k in &drop_keys {
188            map.remove(k);
189            lru.retain(|x| x != k);
190        }
191        drop_keys.len() as u64
192    }
193
194    pub fn stats(&self) -> EdgeCacheStats {
195        EdgeCacheStats {
196            hits: self.inner.hits.load(Ordering::Relaxed),
197            misses: self.inner.misses.load(Ordering::Relaxed),
198            inserts: self.inner.inserts.load(Ordering::Relaxed),
199            invalidations_received: self.inner.invalidations.load(Ordering::Relaxed),
200            entries_evicted: self.inner.evictions.load(Ordering::Relaxed),
201            current_entries: self.inner.map.read().len(),
202        }
203    }
204
205    /// Test-only: deterministic insert with explicit version + TTL.
206    pub fn insert_with(&self, key: CacheKey, entry: CacheEntry) {
207        self.insert(key, entry);
208    }
209}
210
211#[cfg(test)]
212mod tests {
213    use super::*;
214    use std::time::Duration;
215
216    fn entry(version: u64, body: &[u8], tables: &[&str], ttl: Duration) -> CacheEntry {
217        CacheEntry {
218            version,
219            response_bytes: body.to_vec(),
220            tables: tables.iter().map(|s| s.to_string()).collect(),
221            expires_at: Instant::now() + ttl,
222        }
223    }
224
225    #[test]
226    fn insert_then_get_returns_value() {
227        let c = EdgeCache::new(10);
228        let k = CacheKey::new("fp1", "p1");
229        c.insert(
230            k.clone(),
231            entry(1, b"row", &["users"], Duration::from_secs(60)),
232        );
233        let got = c.get(&k).expect("hit");
234        assert_eq!(got.response_bytes, b"row");
235    }
236
237    #[test]
238    fn miss_returns_none() {
239        let c = EdgeCache::new(10);
240        assert!(c.get(&CacheKey::new("fp1", "p1")).is_none());
241        assert_eq!(c.stats().misses, 1);
242    }
243
244    #[test]
245    fn expired_entry_is_dropped_on_read() {
246        let c = EdgeCache::new(10);
247        let k = CacheKey::new("fp1", "p1");
248        // Insert with a 0-duration TTL — already expired.
249        let mut e = entry(1, b"x", &[], Duration::from_secs(0));
250        e.expires_at = Instant::now() - Duration::from_millis(1);
251        c.insert(k.clone(), e);
252        assert!(c.get(&k).is_none());
253        assert_eq!(c.stats().current_entries, 0);
254    }
255
256    #[test]
257    fn lru_evicts_oldest_when_over_capacity() {
258        let c = EdgeCache::new(3);
259        for i in 0..5 {
260            let k = CacheKey::new(format!("fp{}", i), "p");
261            c.insert(k, entry(i, b"x", &[], Duration::from_secs(60)));
262        }
263        // Capacity 3, inserted 5 → 2 evictions.
264        assert_eq!(c.stats().entries_evicted, 2);
265        assert_eq!(c.stats().current_entries, 3);
266        // The two oldest (fp0, fp1) should be gone.
267        assert!(c.get(&CacheKey::new("fp0", "p")).is_none());
268        assert!(c.get(&CacheKey::new("fp1", "p")).is_none());
269        assert!(c.get(&CacheKey::new("fp4", "p")).is_some());
270    }
271
272    #[test]
273    fn lru_promotes_recently_read_entries() {
274        let c = EdgeCache::new(3);
275        for i in 0..3 {
276            let k = CacheKey::new(format!("fp{}", i), "p");
277            c.insert(k, entry(i, b"x", &[], Duration::from_secs(60)));
278        }
279        // Read fp0 — promotes it to the back of the LRU.
280        let _ = c.get(&CacheKey::new("fp0", "p"));
281        // Insert one more, should evict fp1 (now the oldest).
282        c.insert(
283            CacheKey::new("fp3", "p"),
284            entry(3, b"x", &[], Duration::from_secs(60)),
285        );
286        assert!(c.get(&CacheKey::new("fp0", "p")).is_some());
287        assert!(c.get(&CacheKey::new("fp1", "p")).is_none());
288        assert!(c.get(&CacheKey::new("fp2", "p")).is_some());
289        assert!(c.get(&CacheKey::new("fp3", "p")).is_some());
290    }
291
292    #[test]
293    fn invalidate_drops_old_versions_only() {
294        let c = EdgeCache::new(10);
295        c.insert(
296            CacheKey::new("fp1", "p"),
297            entry(5, b"v5", &["users"], Duration::from_secs(60)),
298        );
299        c.insert(
300            CacheKey::new("fp2", "p"),
301            entry(10, b"v10", &["users"], Duration::from_secs(60)),
302        );
303        let dropped = c.invalidate(7, &["users".to_string()]);
304        assert_eq!(dropped, 1);
305        assert!(c.get(&CacheKey::new("fp1", "p")).is_none());
306        assert!(c.get(&CacheKey::new("fp2", "p")).is_some());
307    }
308
309    #[test]
310    fn invalidate_filters_by_tables() {
311        let c = EdgeCache::new(10);
312        c.insert(
313            CacheKey::new("fp1", "p"),
314            entry(5, b"x", &["users"], Duration::from_secs(60)),
315        );
316        c.insert(
317            CacheKey::new("fp2", "p"),
318            entry(5, b"y", &["orders"], Duration::from_secs(60)),
319        );
320        let dropped = c.invalidate(100, &["users".to_string()]);
321        assert_eq!(dropped, 1);
322        assert!(c.get(&CacheKey::new("fp1", "p")).is_none());
323        assert!(c.get(&CacheKey::new("fp2", "p")).is_some());
324    }
325
326    #[test]
327    fn invalidate_with_no_tables_drops_everything_within_version() {
328        let c = EdgeCache::new(10);
329        c.insert(
330            CacheKey::new("fp1", "p"),
331            entry(5, b"x", &["users"], Duration::from_secs(60)),
332        );
333        c.insert(
334            CacheKey::new("fp2", "p"),
335            entry(10, b"y", &["orders"], Duration::from_secs(60)),
336        );
337        let dropped = c.invalidate(7, &[]);
338        assert_eq!(dropped, 1, "fp1 (v5) should be dropped, fp2 (v10) kept");
339    }
340
341    #[test]
342    fn next_version_is_monotonic() {
343        let c = EdgeCache::new(10);
344        let v1 = c.next_version();
345        let v2 = c.next_version();
346        let v3 = c.next_version();
347        assert!(v1 < v2 && v2 < v3);
348    }
349
350    #[test]
351    fn stats_track_hits_and_misses() {
352        let c = EdgeCache::new(10);
353        let k = CacheKey::new("fp1", "p");
354        c.insert(k.clone(), entry(1, b"x", &[], Duration::from_secs(60)));
355        let _ = c.get(&k);
356        let _ = c.get(&k);
357        let _ = c.get(&CacheKey::new("missing", "p"));
358        let s = c.stats();
359        assert_eq!(s.hits, 2);
360        assert_eq!(s.misses, 1);
361        assert_eq!(s.inserts, 1);
362    }
363
364    #[test]
365    fn panics_on_zero_capacity() {
366        let res = std::panic::catch_unwind(|| EdgeCache::new(0));
367        assert!(res.is_err());
368    }
369}