Skip to main content

heliosdb_proxy/distribcache/tiers/
l1_hot.rs

1//! L1 Hot Cache - In-memory cache with <100μs access time
2//!
3//! Features:
4//! - LRU/LFU eviction with frequency aging
5//! - Per-session affinity for connection locality
6//! - Automatic size management
7
8use dashmap::DashMap;
9use std::collections::HashSet;
10use std::sync::atomic::{AtomicUsize, AtomicU64, Ordering};
11use std::sync::Arc;
12
13use super::{CacheEntry, EvictionPolicy, LFUEviction, TierStats};
14use crate::distribcache::{QueryFingerprint, SessionId};
15
16/// L1 Hot Cache - In-memory with LRU/LFU eviction
17pub struct HotCache {
18    /// Main cache storage
19    cache: DashMap<u64, CacheEntry>,
20
21    /// LFU eviction tracker
22    eviction: Arc<LFUEviction>,
23
24    /// Per-session affinity tracking
25    session_affinity: DashMap<SessionId, HashSet<u64>>,
26
27    /// Table to key index for invalidation
28    table_index: DashMap<String, HashSet<u64>>,
29
30    /// Current size in bytes
31    current_size: AtomicUsize,
32
33    /// Maximum size in bytes
34    max_size: usize,
35
36    /// Maximum entry size
37    max_entry_size: usize,
38
39    /// Eviction policy
40    policy: EvictionPolicy,
41
42    /// Statistics
43    hits: AtomicU64,
44    misses: AtomicU64,
45    evictions: AtomicU64,
46}
47
48impl HotCache {
49    /// Create a new hot cache
50    pub fn new(max_size: usize, max_entry_size: usize, policy: EvictionPolicy) -> Self {
51        Self {
52            cache: DashMap::new(),
53            eviction: Arc::new(LFUEviction::new()),
54            session_affinity: DashMap::new(),
55            table_index: DashMap::new(),
56            current_size: AtomicUsize::new(0),
57            max_size,
58            max_entry_size,
59            policy,
60            hits: AtomicU64::new(0),
61            misses: AtomicU64::new(0),
62            evictions: AtomicU64::new(0),
63        }
64    }
65
66    /// Get an entry from the cache
67    pub fn get(&self, fingerprint: &QueryFingerprint, _session: SessionId) -> Option<CacheEntry> {
68        let key = self.fingerprint_to_hash(fingerprint);
69
70        if let Some(mut entry) = self.cache.get_mut(&key) {
71            // Check TTL
72            if entry.is_expired() {
73                drop(entry);
74                self.remove_entry(key);
75                self.misses.fetch_add(1, Ordering::Relaxed);
76                return None;
77            }
78
79            // Update access count for LFU
80            entry.access_count += 1;
81            self.eviction.touch(key);
82            self.hits.fetch_add(1, Ordering::Relaxed);
83
84            Some(entry.clone())
85        } else {
86            self.misses.fetch_add(1, Ordering::Relaxed);
87            None
88        }
89    }
90
91    /// Insert an entry into the cache
92    pub fn insert(
93        &self,
94        fingerprint: QueryFingerprint,
95        entry: CacheEntry,
96        session: Option<SessionId>,
97    ) {
98        let entry_size = entry.size();
99
100        // Skip if entry is too large
101        if entry_size > self.max_entry_size {
102            return;
103        }
104
105        let key = self.fingerprint_to_hash(&fingerprint);
106
107        // Evict entries if needed
108        while self.current_size.load(Ordering::Relaxed) + entry_size > self.max_size {
109            if !self.evict_one() {
110                break; // No more to evict
111            }
112        }
113
114        // Remove old entry if exists
115        if let Some((_, old_entry)) = self.cache.remove(&key) {
116            self.current_size.fetch_sub(old_entry.size(), Ordering::Relaxed);
117            self.eviction.remove(key);
118        }
119
120        // Index by tables for invalidation
121        for table in &entry.tables {
122            self.table_index
123                .entry(table.clone())
124                .or_default()
125                .insert(key);
126        }
127
128        // Track session affinity
129        if let Some(sid) = session {
130            self.session_affinity
131                .entry(sid)
132                .or_default()
133                .insert(key);
134        }
135
136        // Insert entry
137        self.cache.insert(key, entry);
138        self.current_size.fetch_add(entry_size, Ordering::Relaxed);
139        self.eviction.insert(key);
140    }
141
142    /// Invalidate entries for a table
143    pub fn invalidate_by_table(&self, table: &str) {
144        if let Some((_, keys)) = self.table_index.remove(table) {
145            for key in keys {
146                self.remove_entry(key);
147            }
148        }
149    }
150
151    /// Invalidate a specific entry
152    pub fn invalidate(&self, fingerprint: &QueryFingerprint) {
153        let key = self.fingerprint_to_hash(fingerprint);
154        self.remove_entry(key);
155    }
156
157    /// Remove an entry by key
158    fn remove_entry(&self, key: u64) {
159        if let Some((_, entry)) = self.cache.remove(&key) {
160            self.current_size.fetch_sub(entry.size(), Ordering::Relaxed);
161            self.eviction.remove(key);
162
163            // Clean up table index
164            for table in &entry.tables {
165                if let Some(mut keys) = self.table_index.get_mut(table) {
166                    keys.remove(&key);
167                }
168            }
169        }
170    }
171
172    /// Evict one entry based on policy
173    fn evict_one(&self) -> bool {
174        match self.policy {
175            EvictionPolicy::LFU | EvictionPolicy::Adaptive => {
176                if let Some(key) = self.eviction.evict_one() {
177                    self.remove_entry(key);
178                    self.evictions.fetch_add(1, Ordering::Relaxed);
179                    return true;
180                }
181            }
182            EvictionPolicy::LRU => {
183                // Find oldest entry (simplified - would need timestamp tracking)
184                if let Some(entry) = self.cache.iter().next() {
185                    let key = *entry.key();
186                    drop(entry);
187                    self.remove_entry(key);
188                    self.evictions.fetch_add(1, Ordering::Relaxed);
189                    return true;
190                }
191            }
192            EvictionPolicy::FIFO => {
193                // Same as LRU for simplicity
194                if let Some(entry) = self.cache.iter().next() {
195                    let key = *entry.key();
196                    drop(entry);
197                    self.remove_entry(key);
198                    self.evictions.fetch_add(1, Ordering::Relaxed);
199                    return true;
200                }
201            }
202        }
203        false
204    }
205
206    /// Convert fingerprint to hash key
207    fn fingerprint_to_hash(&self, fingerprint: &QueryFingerprint) -> u64 {
208        use std::hash::{Hash, Hasher};
209        use std::collections::hash_map::DefaultHasher;
210
211        let mut hasher = DefaultHasher::new();
212        fingerprint.template.hash(&mut hasher);
213        if let Some(param) = fingerprint.param_hash {
214            param.hash(&mut hasher);
215        }
216        hasher.finish()
217    }
218
219    /// Get cache statistics
220    pub fn stats(&self) -> TierStats {
221        TierStats {
222            size_bytes: self.current_size.load(Ordering::Relaxed) as u64,
223            max_size_bytes: self.max_size as u64,
224            entry_count: self.cache.len() as u64,
225            hits: self.hits.load(Ordering::Relaxed),
226            misses: self.misses.load(Ordering::Relaxed),
227            evictions: self.evictions.load(Ordering::Relaxed),
228            compression_ratio: None,
229            peer_count: None,
230            healthy_peers: None,
231        }
232    }
233
234    /// Clear all entries
235    pub fn clear(&self) {
236        self.cache.clear();
237        self.table_index.clear();
238        self.session_affinity.clear();
239        self.current_size.store(0, Ordering::Relaxed);
240    }
241
242    /// Get number of entries
243    pub fn len(&self) -> usize {
244        self.cache.len()
245    }
246
247    /// Check if cache is empty
248    pub fn is_empty(&self) -> bool {
249        self.cache.is_empty()
250    }
251
252    /// Iterate over all entries (for L3 invalidation broadcast)
253    pub fn iter(&self) -> impl Iterator<Item = dashmap::mapref::multiple::RefMulti<'_, u64, CacheEntry>> {
254        self.cache.iter()
255    }
256
257    /// Check if cache contains a fingerprint
258    pub fn contains(&self, fingerprint: &QueryFingerprint) -> bool {
259        let key = self.fingerprint_to_hash(fingerprint);
260        self.cache.contains_key(&key)
261    }
262}
263
264#[cfg(test)]
265mod tests {
266    use super::*;
267    use std::time::Duration;
268
269    #[test]
270    fn test_hot_cache_insert_get() {
271        let cache = HotCache::new(1024 * 1024, 1024, EvictionPolicy::LFU);
272        let fp = QueryFingerprint::from_query("SELECT * FROM users");
273        let session = SessionId::new("sess-1");
274
275        let entry = CacheEntry::new(vec![1, 2, 3], vec!["users".to_string()], 1);
276        cache.insert(fp.clone(), entry, Some(session.clone()));
277
278        let result = cache.get(&fp, session);
279        assert!(result.is_some());
280        assert_eq!(result.unwrap().data, vec![1, 2, 3]);
281    }
282
283    #[test]
284    fn test_hot_cache_eviction() {
285        // Small cache to force eviction
286        let cache = HotCache::new(200, 100, EvictionPolicy::LFU);
287
288        let table_names = ["alpha", "bravo", "charlie", "delta", "echo",
289                           "foxtrot", "golf", "hotel", "india", "juliet"];
290        for name in &table_names {
291            let fp = QueryFingerprint::from_query(&format!("SELECT * FROM {}", name));
292            let entry = CacheEntry::new(vec![0; 50], vec![], 1);
293            cache.insert(fp, entry, None);
294        }
295
296        // Should have evicted some entries
297        assert!(cache.len() < 10);
298        assert!(cache.stats().evictions > 0);
299    }
300
301    #[test]
302    fn test_hot_cache_invalidate_by_table() {
303        let cache = HotCache::new(1024 * 1024, 1024, EvictionPolicy::LFU);
304
305        let fp1 = QueryFingerprint::from_query("SELECT * FROM users WHERE id = 1");
306        let fp2 = QueryFingerprint::from_query("SELECT * FROM orders WHERE id = 1");
307
308        cache.insert(
309            fp1.clone(),
310            CacheEntry::new(vec![1], vec!["users".to_string()], 1),
311            None,
312        );
313        cache.insert(
314            fp2.clone(),
315            CacheEntry::new(vec![2], vec!["orders".to_string()], 1),
316            None,
317        );
318
319        assert_eq!(cache.len(), 2);
320
321        // Invalidate users table
322        cache.invalidate_by_table("users");
323
324        assert_eq!(cache.len(), 1);
325        assert!(cache.get(&fp2, SessionId::new("")).is_some());
326    }
327
328    #[test]
329    fn test_hot_cache_stats() {
330        let cache = HotCache::new(1024 * 1024, 1024, EvictionPolicy::LFU);
331        let fp = QueryFingerprint::from_query("SELECT * FROM users");
332        let session = SessionId::new("test");
333
334        cache.insert(fp.clone(), CacheEntry::new(vec![1], vec![], 1), None);
335
336        // Hit
337        cache.get(&fp, session.clone());
338        // Miss
339        let fp2 = QueryFingerprint::from_query("SELECT * FROM orders");
340        cache.get(&fp2, session);
341
342        let stats = cache.stats();
343        assert_eq!(stats.hits, 1);
344        assert_eq!(stats.misses, 1);
345        assert_eq!(stats.entry_count, 1);
346    }
347
348    #[test]
349    fn test_max_entry_size() {
350        let cache = HotCache::new(1024 * 1024, 100, EvictionPolicy::LFU);
351
352        // Entry larger than max should not be inserted
353        let fp = QueryFingerprint::from_query("SELECT *");
354        let large_entry = CacheEntry::new(vec![0; 200], vec![], 1);
355        cache.insert(fp.clone(), large_entry, None);
356
357        assert!(cache.is_empty());
358    }
359}