Skip to main content

heliosdb_proxy/cache/
l1_hot.rs

1//! L1 Hot Cache
2//!
3//! Per-connection, exact-match cache with LRU eviction.
4//! Provides sub-microsecond latency for repeated queries.
5//!
6//! Hits take only a read lock on the entries map; `L1Entry::access_count`
7//! is an `AtomicU64`, so its increment doesn't require `&mut` access to
8//! the entry. `parking_lot::RwLock` is used throughout — no poisoning,
9//! no `.unwrap()` on every lock acquire.
10
11use std::collections::HashMap;
12use std::time::Instant;
13
14use parking_lot::RwLock;
15
16use super::config::L1Config;
17use super::result::{CachedResult, L1Entry};
18
19/// L1 hot cache (per-connection)
20///
21/// This cache stores exact query matches for a single connection.
22/// It uses LRU eviction when the cache is full.
23#[derive(Debug)]
24pub struct L1HotCache {
25    /// Cache configuration
26    config: L1Config,
27
28    /// Cache entries indexed by exact query string
29    entries: RwLock<HashMap<String, L1Entry>>,
30
31    /// LRU order tracking (query string -> last access time)
32    lru_order: RwLock<Vec<(String, Instant)>>,
33}
34
35impl L1HotCache {
36    /// Create a new L1 hot cache with the given configuration
37    pub fn new(config: L1Config) -> Self {
38        let size = config.size;
39        Self {
40            config,
41            entries: RwLock::new(HashMap::with_capacity(size)),
42            lru_order: RwLock::new(Vec::with_capacity(size)),
43        }
44    }
45
46    /// Look up a query in the cache.
47    ///
48    /// Hits take only a read lock on the entries map; the `touch()` call
49    /// uses atomic `fetch_add` on `access_count` rather than exclusive
50    /// access. Expired entries still need a write-lock to evict, but that
51    /// is only the slow path.
52    pub fn get(&self, query: &str) -> Option<CachedResult> {
53        if !self.config.enabled {
54            return None;
55        }
56
57        // Fast path: read lock.
58        let (result, expired) = {
59            let entries = self.entries.read();
60            match entries.get(query) {
61                None => return None,
62                Some(entry) if entry.is_expired() => (None, true),
63                Some(entry) => {
64                    entry.touch();
65                    (Some(entry.result.clone()), false)
66                }
67            }
68        };
69
70        if expired {
71            // Slow path: escalate to a write lock to evict the dead entry.
72            let mut entries = self.entries.write();
73            entries.remove(query);
74            drop(entries);
75            self.remove_from_lru(query);
76            return None;
77        }
78
79        // Hit: update LRU ordering after releasing the entries read lock,
80        // so it contends only with other LRU updates, not with reads.
81        self.update_lru(query);
82        result
83    }
84
85    /// Store a query result in the cache
86    pub fn put(&self, query: String, result: CachedResult) {
87        if !self.config.enabled {
88            return;
89        }
90
91        let mut entries = self.entries.write();
92
93        // Check if we need to evict
94        if entries.len() >= self.config.size && !entries.contains_key(&query) {
95            self.evict_lru(&mut entries);
96        }
97
98        // Create TTL-adjusted result
99        let mut adjusted_result = result;
100        if adjusted_result.ttl > self.config.ttl {
101            adjusted_result.ttl = self.config.ttl;
102        }
103
104        // Insert or update entry
105        let entry = L1Entry::new(query.clone(), adjusted_result);
106        entries.insert(query.clone(), entry);
107        drop(entries);
108        self.update_lru(&query);
109    }
110
111    /// Remove an entry from the cache
112    pub fn remove(&self, query: &str) {
113        self.entries.write().remove(query);
114        self.remove_from_lru(query);
115    }
116
117    /// Clear all entries
118    pub fn clear(&self) {
119        self.entries.write().clear();
120        self.lru_order.write().clear();
121    }
122
123    /// Get current entry count
124    pub fn len(&self) -> usize {
125        self.entries.read().len()
126    }
127
128    /// Check if cache is empty
129    pub fn is_empty(&self) -> bool {
130        self.len() == 0
131    }
132
133    /// Get cache capacity
134    pub fn capacity(&self) -> usize {
135        self.config.size
136    }
137
138    /// Get hit statistics
139    pub fn stats(&self) -> L1CacheStats {
140        let entries = self.entries.read();
141        let total_size: usize = entries.values().map(|e| e.result.size()).sum();
142        let total_access: u64 = entries.values().map(|e| e.access_count()).sum();
143
144        L1CacheStats {
145            entry_count: entries.len(),
146            capacity: self.config.size,
147            total_size_bytes: total_size,
148            total_accesses: total_access,
149        }
150    }
151
152    /// Evict expired entries
153    pub fn evict_expired(&self) {
154        let mut entries = self.entries.write();
155        let expired: Vec<String> = entries
156            .iter()
157            .filter(|(_, entry)| entry.is_expired())
158            .map(|(key, _)| key.clone())
159            .collect();
160
161        for key in &expired {
162            entries.remove(key);
163        }
164        drop(entries);
165
166        for key in &expired {
167            self.remove_from_lru(key);
168        }
169    }
170
171    /// Update LRU tracking for a query
172    fn update_lru(&self, query: &str) {
173        let mut lru = self.lru_order.write();
174        lru.retain(|(q, _)| q != query);
175        lru.push((query.to_string(), Instant::now()));
176    }
177
178    /// Remove from LRU tracking
179    fn remove_from_lru(&self, query: &str) {
180        self.lru_order.write().retain(|(q, _)| q != query);
181    }
182
183    /// Evict least recently used entry
184    fn evict_lru(&self, entries: &mut HashMap<String, L1Entry>) {
185        let mut lru = self.lru_order.write();
186
187        // First, try to evict expired entries
188        let expired: Vec<String> = lru
189            .iter()
190            .filter(|(q, _)| {
191                entries
192                    .get(q)
193                    .map(|e| e.is_expired())
194                    .unwrap_or(true)
195            })
196            .map(|(q, _)| q.clone())
197            .collect();
198
199        for key in expired {
200            entries.remove(&key);
201            lru.retain(|(q, _)| q != &key);
202        }
203
204        // If still full, evict LRU entry
205        if entries.len() >= self.config.size {
206            if let Some((key, _)) = lru.first().cloned() {
207                entries.remove(&key);
208                lru.remove(0);
209            }
210        }
211    }
212}
213
214/// L1 cache statistics
215#[derive(Debug, Clone)]
216pub struct L1CacheStats {
217    /// Number of entries in cache
218    pub entry_count: usize,
219
220    /// Maximum capacity
221    pub capacity: usize,
222
223    /// Total size of cached data in bytes
224    pub total_size_bytes: usize,
225
226    /// Total number of accesses
227    pub total_accesses: u64,
228}
229
230#[cfg(test)]
231mod tests {
232    use super::*;
233    use bytes::Bytes;
234    use std::time::Duration;
235
236    fn create_result(data: &str) -> CachedResult {
237        CachedResult::new(
238            Bytes::from(data.to_string()),
239            1,
240            Duration::from_secs(60),
241            vec!["test".to_string()],
242            Duration::from_millis(5),
243        )
244    }
245
246    #[test]
247    fn test_basic_get_put() {
248        let config = L1Config {
249            enabled: true,
250            size: 100,
251            ttl: Duration::from_secs(60),
252        };
253        let cache = L1HotCache::new(config);
254
255        let query = "SELECT * FROM users WHERE id = 1";
256        let result = create_result("user data");
257
258        // Initially empty
259        assert!(cache.get(query).is_none());
260
261        // Put and get
262        cache.put(query.to_string(), result.clone());
263        let cached = cache.get(query);
264        assert!(cached.is_some());
265        assert_eq!(cached.unwrap().data, result.data);
266    }
267
268    #[test]
269    fn test_exact_match() {
270        let config = L1Config {
271            enabled: true,
272            size: 100,
273            ttl: Duration::from_secs(60),
274        };
275        let cache = L1HotCache::new(config);
276
277        let query1 = "SELECT * FROM users WHERE id = 1";
278        let query2 = "SELECT * FROM users WHERE id = 2";
279        let result = create_result("user data");
280
281        cache.put(query1.to_string(), result);
282
283        // Exact match should hit
284        assert!(cache.get(query1).is_some());
285
286        // Different query should miss
287        assert!(cache.get(query2).is_none());
288    }
289
290    #[test]
291    fn test_expiration() {
292        let config = L1Config {
293            enabled: true,
294            size: 100,
295            ttl: Duration::from_millis(10),
296        };
297        let cache = L1HotCache::new(config);
298
299        let query = "SELECT 1";
300        let result = create_result("1");
301
302        cache.put(query.to_string(), result);
303        assert!(cache.get(query).is_some());
304
305        // Wait for expiration
306        std::thread::sleep(Duration::from_millis(15));
307        assert!(cache.get(query).is_none());
308    }
309
310    #[test]
311    fn test_lru_eviction() {
312        let config = L1Config {
313            enabled: true,
314            size: 3,
315            ttl: Duration::from_secs(60),
316        };
317        let cache = L1HotCache::new(config);
318
319        // Fill cache
320        cache.put("query1".to_string(), create_result("1"));
321        cache.put("query2".to_string(), create_result("2"));
322        cache.put("query3".to_string(), create_result("3"));
323
324        // Access query1 to make it recent
325        cache.get("query1");
326
327        // Add new entry - should evict query2 (LRU)
328        cache.put("query4".to_string(), create_result("4"));
329
330        assert!(cache.get("query1").is_some()); // Recently accessed
331        assert!(cache.get("query2").is_none()); // Evicted
332        assert!(cache.get("query3").is_some()); // Still present
333        assert!(cache.get("query4").is_some()); // Newly added
334    }
335
336    #[test]
337    fn test_clear() {
338        let config = L1Config {
339            enabled: true,
340            size: 100,
341            ttl: Duration::from_secs(60),
342        };
343        let cache = L1HotCache::new(config);
344
345        cache.put("query1".to_string(), create_result("1"));
346        cache.put("query2".to_string(), create_result("2"));
347
348        assert_eq!(cache.len(), 2);
349
350        cache.clear();
351
352        assert_eq!(cache.len(), 0);
353        assert!(cache.is_empty());
354    }
355
356    #[test]
357    fn test_remove() {
358        let config = L1Config {
359            enabled: true,
360            size: 100,
361            ttl: Duration::from_secs(60),
362        };
363        let cache = L1HotCache::new(config);
364
365        cache.put("query1".to_string(), create_result("1"));
366        cache.put("query2".to_string(), create_result("2"));
367
368        cache.remove("query1");
369
370        assert!(cache.get("query1").is_none());
371        assert!(cache.get("query2").is_some());
372    }
373
374    #[test]
375    fn test_disabled_cache() {
376        let config = L1Config {
377            enabled: false,
378            size: 100,
379            ttl: Duration::from_secs(60),
380        };
381        let cache = L1HotCache::new(config);
382
383        cache.put("query".to_string(), create_result("data"));
384        assert!(cache.get("query").is_none());
385    }
386
387    #[test]
388    fn test_stats() {
389        let config = L1Config {
390            enabled: true,
391            size: 100,
392            ttl: Duration::from_secs(60),
393        };
394        let cache = L1HotCache::new(config);
395
396        cache.put("query1".to_string(), create_result("1"));
397        cache.put("query2".to_string(), create_result("2"));
398
399        // Access entries
400        cache.get("query1");
401        cache.get("query1");
402        cache.get("query2");
403
404        let stats = cache.stats();
405        assert_eq!(stats.entry_count, 2);
406        assert_eq!(stats.capacity, 100);
407        assert!(stats.total_size_bytes > 0);
408        assert_eq!(stats.total_accesses, 5); // 2 puts + 3 gets
409    }
410
411    #[test]
412    fn test_evict_expired() {
413        let config = L1Config {
414            enabled: true,
415            size: 100,
416            ttl: Duration::from_millis(10),
417        };
418        let cache = L1HotCache::new(config);
419
420        cache.put("query1".to_string(), create_result("1"));
421        cache.put("query2".to_string(), create_result("2"));
422
423        std::thread::sleep(Duration::from_millis(15));
424
425        cache.evict_expired();
426
427        assert!(cache.is_empty());
428    }
429
430    #[test]
431    fn test_update_existing() {
432        let config = L1Config {
433            enabled: true,
434            size: 100,
435            ttl: Duration::from_secs(60),
436        };
437        let cache = L1HotCache::new(config);
438
439        cache.put("query".to_string(), create_result("old"));
440        cache.put("query".to_string(), create_result("new"));
441
442        let cached = cache.get("query").unwrap();
443        assert_eq!(cached.data, Bytes::from("new"));
444    }
445
446    /// Concurrent hits on the same key must not block each other and must
447    /// all observe the cached result. Before the read-path refactor this
448    /// test could not be written sensibly — every `get()` took a write
449    /// lock, so reads serialised. With atomic access_count + read-locked
450    /// hits, many threads can hit the same entry in parallel.
451    #[test]
452    fn test_concurrent_hits_read_lock_only() {
453        use std::sync::Arc;
454        use std::thread;
455
456        let cache = Arc::new(L1HotCache::new(L1Config {
457            enabled: true,
458            size: 100,
459            ttl: Duration::from_secs(60),
460        }));
461        cache.put("hot-query".to_string(), create_result("hot data"));
462
463        const THREADS: usize = 16;
464        const ITERS_PER_THREAD: usize = 500;
465
466        let mut handles = Vec::with_capacity(THREADS);
467        for _ in 0..THREADS {
468            let cache = Arc::clone(&cache);
469            handles.push(thread::spawn(move || {
470                for _ in 0..ITERS_PER_THREAD {
471                    let r = cache.get("hot-query").expect("hit expected");
472                    assert_eq!(r.data, Bytes::from("hot data"));
473                }
474            }));
475        }
476        for h in handles {
477            h.join().unwrap();
478        }
479
480        let stats = cache.stats();
481        // access_count starts at 1 (from put) and is bumped once per get.
482        // Total: 1 (put) + THREADS * ITERS_PER_THREAD (gets).
483        assert_eq!(
484            stats.total_accesses,
485            1 + (THREADS * ITERS_PER_THREAD) as u64
486        );
487    }
488}