heliosdb_proxy/distribcache/tiers/
l1_hot.rs1use dashmap::DashMap;
9use std::collections::HashSet;
10use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
11use std::sync::Arc;
12
13use super::{CacheEntry, EvictionPolicy, LFUEviction, TierStats};
14use crate::distribcache::{QueryFingerprint, SessionId};
15
16pub struct HotCache {
18 cache: DashMap<u64, CacheEntry>,
20
21 eviction: Arc<LFUEviction>,
23
24 session_affinity: DashMap<SessionId, HashSet<u64>>,
26
27 table_index: DashMap<String, HashSet<u64>>,
29
30 current_size: AtomicUsize,
32
33 max_size: usize,
35
36 max_entry_size: usize,
38
39 policy: EvictionPolicy,
41
42 hits: AtomicU64,
44 misses: AtomicU64,
45 evictions: AtomicU64,
46}
47
48impl HotCache {
49 pub fn new(max_size: usize, max_entry_size: usize, policy: EvictionPolicy) -> Self {
51 Self {
52 cache: DashMap::new(),
53 eviction: Arc::new(LFUEviction::new()),
54 session_affinity: DashMap::new(),
55 table_index: DashMap::new(),
56 current_size: AtomicUsize::new(0),
57 max_size,
58 max_entry_size,
59 policy,
60 hits: AtomicU64::new(0),
61 misses: AtomicU64::new(0),
62 evictions: AtomicU64::new(0),
63 }
64 }
65
66 pub fn get(&self, fingerprint: &QueryFingerprint, _session: SessionId) -> Option<CacheEntry> {
68 let key = self.fingerprint_to_hash(fingerprint);
69
70 if let Some(mut entry) = self.cache.get_mut(&key) {
71 if entry.is_expired() {
73 drop(entry);
74 self.remove_entry(key);
75 self.misses.fetch_add(1, Ordering::Relaxed);
76 return None;
77 }
78
79 entry.access_count += 1;
81 self.eviction.touch(key);
82 self.hits.fetch_add(1, Ordering::Relaxed);
83
84 Some(entry.clone())
85 } else {
86 self.misses.fetch_add(1, Ordering::Relaxed);
87 None
88 }
89 }
90
91 pub fn insert(
93 &self,
94 fingerprint: QueryFingerprint,
95 entry: CacheEntry,
96 session: Option<SessionId>,
97 ) {
98 let entry_size = entry.size();
99
100 if entry_size > self.max_entry_size {
102 return;
103 }
104
105 let key = self.fingerprint_to_hash(&fingerprint);
106
107 while self.current_size.load(Ordering::Relaxed) + entry_size > self.max_size {
109 if !self.evict_one() {
110 break; }
112 }
113
114 if let Some((_, old_entry)) = self.cache.remove(&key) {
116 self.current_size
117 .fetch_sub(old_entry.size(), Ordering::Relaxed);
118 self.eviction.remove(key);
119 }
120
121 for table in &entry.tables {
123 self.table_index
124 .entry(table.clone())
125 .or_default()
126 .insert(key);
127 }
128
129 if let Some(sid) = session {
131 self.session_affinity.entry(sid).or_default().insert(key);
132 }
133
134 self.cache.insert(key, entry);
136 self.current_size.fetch_add(entry_size, Ordering::Relaxed);
137 self.eviction.insert(key);
138 }
139
140 pub fn invalidate_by_table(&self, table: &str) {
142 if let Some((_, keys)) = self.table_index.remove(table) {
143 for key in keys {
144 self.remove_entry(key);
145 }
146 }
147 }
148
149 pub fn invalidate(&self, fingerprint: &QueryFingerprint) {
151 let key = self.fingerprint_to_hash(fingerprint);
152 self.remove_entry(key);
153 }
154
155 fn remove_entry(&self, key: u64) {
157 if let Some((_, entry)) = self.cache.remove(&key) {
158 self.current_size.fetch_sub(entry.size(), Ordering::Relaxed);
159 self.eviction.remove(key);
160
161 for table in &entry.tables {
163 if let Some(mut keys) = self.table_index.get_mut(table) {
164 keys.remove(&key);
165 }
166 }
167 }
168 }
169
170 fn evict_one(&self) -> bool {
172 match self.policy {
173 EvictionPolicy::LFU | EvictionPolicy::Adaptive => {
174 if let Some(key) = self.eviction.evict_one() {
175 self.remove_entry(key);
176 self.evictions.fetch_add(1, Ordering::Relaxed);
177 return true;
178 }
179 }
180 EvictionPolicy::LRU => {
181 if let Some(entry) = self.cache.iter().next() {
183 let key = *entry.key();
184 drop(entry);
185 self.remove_entry(key);
186 self.evictions.fetch_add(1, Ordering::Relaxed);
187 return true;
188 }
189 }
190 EvictionPolicy::FIFO => {
191 if let Some(entry) = self.cache.iter().next() {
193 let key = *entry.key();
194 drop(entry);
195 self.remove_entry(key);
196 self.evictions.fetch_add(1, Ordering::Relaxed);
197 return true;
198 }
199 }
200 }
201 false
202 }
203
204 fn fingerprint_to_hash(&self, fingerprint: &QueryFingerprint) -> u64 {
206 use std::collections::hash_map::DefaultHasher;
207 use std::hash::{Hash, Hasher};
208
209 let mut hasher = DefaultHasher::new();
210 fingerprint.template.hash(&mut hasher);
211 if let Some(param) = fingerprint.param_hash {
212 param.hash(&mut hasher);
213 }
214 hasher.finish()
215 }
216
217 pub fn stats(&self) -> TierStats {
219 TierStats {
220 size_bytes: self.current_size.load(Ordering::Relaxed) as u64,
221 max_size_bytes: self.max_size as u64,
222 entry_count: self.cache.len() as u64,
223 hits: self.hits.load(Ordering::Relaxed),
224 misses: self.misses.load(Ordering::Relaxed),
225 evictions: self.evictions.load(Ordering::Relaxed),
226 compression_ratio: None,
227 peer_count: None,
228 healthy_peers: None,
229 }
230 }
231
232 pub fn clear(&self) {
234 self.cache.clear();
235 self.table_index.clear();
236 self.session_affinity.clear();
237 self.current_size.store(0, Ordering::Relaxed);
238 }
239
240 pub fn len(&self) -> usize {
242 self.cache.len()
243 }
244
245 pub fn is_empty(&self) -> bool {
247 self.cache.is_empty()
248 }
249
250 pub fn iter(
252 &self,
253 ) -> impl Iterator<Item = dashmap::mapref::multiple::RefMulti<'_, u64, CacheEntry>> {
254 self.cache.iter()
255 }
256
257 pub fn contains(&self, fingerprint: &QueryFingerprint) -> bool {
259 let key = self.fingerprint_to_hash(fingerprint);
260 self.cache.contains_key(&key)
261 }
262}
263
264#[cfg(test)]
265mod tests {
266 use super::*;
267 use std::time::Duration;
268
269 #[test]
270 fn test_hot_cache_insert_get() {
271 let cache = HotCache::new(1024 * 1024, 1024, EvictionPolicy::LFU);
272 let fp = QueryFingerprint::from_query("SELECT * FROM users");
273 let session = SessionId::new("sess-1");
274
275 let entry = CacheEntry::new(vec![1, 2, 3], vec!["users".to_string()], 1);
276 cache.insert(fp.clone(), entry, Some(session.clone()));
277
278 let result = cache.get(&fp, session);
279 assert!(result.is_some());
280 assert_eq!(result.unwrap().data, vec![1, 2, 3]);
281 }
282
283 #[test]
284 fn test_hot_cache_eviction() {
285 let cache = HotCache::new(200, 100, EvictionPolicy::LFU);
287
288 let table_names = [
289 "alpha", "bravo", "charlie", "delta", "echo", "foxtrot", "golf", "hotel", "india",
290 "juliet",
291 ];
292 for name in &table_names {
293 let fp = QueryFingerprint::from_query(&format!("SELECT * FROM {}", name));
294 let entry = CacheEntry::new(vec![0; 50], vec![], 1);
295 cache.insert(fp, entry, None);
296 }
297
298 assert!(cache.len() < 10);
300 assert!(cache.stats().evictions > 0);
301 }
302
303 #[test]
304 fn test_hot_cache_invalidate_by_table() {
305 let cache = HotCache::new(1024 * 1024, 1024, EvictionPolicy::LFU);
306
307 let fp1 = QueryFingerprint::from_query("SELECT * FROM users WHERE id = 1");
308 let fp2 = QueryFingerprint::from_query("SELECT * FROM orders WHERE id = 1");
309
310 cache.insert(
311 fp1.clone(),
312 CacheEntry::new(vec![1], vec!["users".to_string()], 1),
313 None,
314 );
315 cache.insert(
316 fp2.clone(),
317 CacheEntry::new(vec![2], vec!["orders".to_string()], 1),
318 None,
319 );
320
321 assert_eq!(cache.len(), 2);
322
323 cache.invalidate_by_table("users");
325
326 assert_eq!(cache.len(), 1);
327 assert!(cache.get(&fp2, SessionId::new("")).is_some());
328 }
329
330 #[test]
331 fn test_hot_cache_stats() {
332 let cache = HotCache::new(1024 * 1024, 1024, EvictionPolicy::LFU);
333 let fp = QueryFingerprint::from_query("SELECT * FROM users");
334 let session = SessionId::new("test");
335
336 cache.insert(fp.clone(), CacheEntry::new(vec![1], vec![], 1), None);
337
338 cache.get(&fp, session.clone());
340 let fp2 = QueryFingerprint::from_query("SELECT * FROM orders");
342 cache.get(&fp2, session);
343
344 let stats = cache.stats();
345 assert_eq!(stats.hits, 1);
346 assert_eq!(stats.misses, 1);
347 assert_eq!(stats.entry_count, 1);
348 }
349
350 #[test]
351 fn test_max_entry_size() {
352 let cache = HotCache::new(1024 * 1024, 100, EvictionPolicy::LFU);
353
354 let fp = QueryFingerprint::from_query("SELECT *");
356 let large_entry = CacheEntry::new(vec![0; 200], vec![], 1);
357 cache.insert(fp.clone(), large_entry, None);
358
359 assert!(cache.is_empty());
360 }
361}