heliosdb_proxy/distribcache/tiers/
l1_hot.rs1use dashmap::DashMap;
9use std::collections::HashSet;
10use std::sync::atomic::{AtomicUsize, AtomicU64, Ordering};
11use std::sync::Arc;
12
13use super::{CacheEntry, EvictionPolicy, LFUEviction, TierStats};
14use crate::distribcache::{QueryFingerprint, SessionId};
15
16pub struct HotCache {
18 cache: DashMap<u64, CacheEntry>,
20
21 eviction: Arc<LFUEviction>,
23
24 session_affinity: DashMap<SessionId, HashSet<u64>>,
26
27 table_index: DashMap<String, HashSet<u64>>,
29
30 current_size: AtomicUsize,
32
33 max_size: usize,
35
36 max_entry_size: usize,
38
39 policy: EvictionPolicy,
41
42 hits: AtomicU64,
44 misses: AtomicU64,
45 evictions: AtomicU64,
46}
47
48impl HotCache {
49 pub fn new(max_size: usize, max_entry_size: usize, policy: EvictionPolicy) -> Self {
51 Self {
52 cache: DashMap::new(),
53 eviction: Arc::new(LFUEviction::new()),
54 session_affinity: DashMap::new(),
55 table_index: DashMap::new(),
56 current_size: AtomicUsize::new(0),
57 max_size,
58 max_entry_size,
59 policy,
60 hits: AtomicU64::new(0),
61 misses: AtomicU64::new(0),
62 evictions: AtomicU64::new(0),
63 }
64 }
65
66 pub fn get(&self, fingerprint: &QueryFingerprint, _session: SessionId) -> Option<CacheEntry> {
68 let key = self.fingerprint_to_hash(fingerprint);
69
70 if let Some(mut entry) = self.cache.get_mut(&key) {
71 if entry.is_expired() {
73 drop(entry);
74 self.remove_entry(key);
75 self.misses.fetch_add(1, Ordering::Relaxed);
76 return None;
77 }
78
79 entry.access_count += 1;
81 self.eviction.touch(key);
82 self.hits.fetch_add(1, Ordering::Relaxed);
83
84 Some(entry.clone())
85 } else {
86 self.misses.fetch_add(1, Ordering::Relaxed);
87 None
88 }
89 }
90
91 pub fn insert(
93 &self,
94 fingerprint: QueryFingerprint,
95 entry: CacheEntry,
96 session: Option<SessionId>,
97 ) {
98 let entry_size = entry.size();
99
100 if entry_size > self.max_entry_size {
102 return;
103 }
104
105 let key = self.fingerprint_to_hash(&fingerprint);
106
107 while self.current_size.load(Ordering::Relaxed) + entry_size > self.max_size {
109 if !self.evict_one() {
110 break; }
112 }
113
114 if let Some((_, old_entry)) = self.cache.remove(&key) {
116 self.current_size.fetch_sub(old_entry.size(), Ordering::Relaxed);
117 self.eviction.remove(key);
118 }
119
120 for table in &entry.tables {
122 self.table_index
123 .entry(table.clone())
124 .or_default()
125 .insert(key);
126 }
127
128 if let Some(sid) = session {
130 self.session_affinity
131 .entry(sid)
132 .or_default()
133 .insert(key);
134 }
135
136 self.cache.insert(key, entry);
138 self.current_size.fetch_add(entry_size, Ordering::Relaxed);
139 self.eviction.insert(key);
140 }
141
142 pub fn invalidate_by_table(&self, table: &str) {
144 if let Some((_, keys)) = self.table_index.remove(table) {
145 for key in keys {
146 self.remove_entry(key);
147 }
148 }
149 }
150
151 pub fn invalidate(&self, fingerprint: &QueryFingerprint) {
153 let key = self.fingerprint_to_hash(fingerprint);
154 self.remove_entry(key);
155 }
156
157 fn remove_entry(&self, key: u64) {
159 if let Some((_, entry)) = self.cache.remove(&key) {
160 self.current_size.fetch_sub(entry.size(), Ordering::Relaxed);
161 self.eviction.remove(key);
162
163 for table in &entry.tables {
165 if let Some(mut keys) = self.table_index.get_mut(table) {
166 keys.remove(&key);
167 }
168 }
169 }
170 }
171
172 fn evict_one(&self) -> bool {
174 match self.policy {
175 EvictionPolicy::LFU | EvictionPolicy::Adaptive => {
176 if let Some(key) = self.eviction.evict_one() {
177 self.remove_entry(key);
178 self.evictions.fetch_add(1, Ordering::Relaxed);
179 return true;
180 }
181 }
182 EvictionPolicy::LRU => {
183 if let Some(entry) = self.cache.iter().next() {
185 let key = *entry.key();
186 drop(entry);
187 self.remove_entry(key);
188 self.evictions.fetch_add(1, Ordering::Relaxed);
189 return true;
190 }
191 }
192 EvictionPolicy::FIFO => {
193 if let Some(entry) = self.cache.iter().next() {
195 let key = *entry.key();
196 drop(entry);
197 self.remove_entry(key);
198 self.evictions.fetch_add(1, Ordering::Relaxed);
199 return true;
200 }
201 }
202 }
203 false
204 }
205
206 fn fingerprint_to_hash(&self, fingerprint: &QueryFingerprint) -> u64 {
208 use std::hash::{Hash, Hasher};
209 use std::collections::hash_map::DefaultHasher;
210
211 let mut hasher = DefaultHasher::new();
212 fingerprint.template.hash(&mut hasher);
213 if let Some(param) = fingerprint.param_hash {
214 param.hash(&mut hasher);
215 }
216 hasher.finish()
217 }
218
219 pub fn stats(&self) -> TierStats {
221 TierStats {
222 size_bytes: self.current_size.load(Ordering::Relaxed) as u64,
223 max_size_bytes: self.max_size as u64,
224 entry_count: self.cache.len() as u64,
225 hits: self.hits.load(Ordering::Relaxed),
226 misses: self.misses.load(Ordering::Relaxed),
227 evictions: self.evictions.load(Ordering::Relaxed),
228 compression_ratio: None,
229 peer_count: None,
230 healthy_peers: None,
231 }
232 }
233
234 pub fn clear(&self) {
236 self.cache.clear();
237 self.table_index.clear();
238 self.session_affinity.clear();
239 self.current_size.store(0, Ordering::Relaxed);
240 }
241
242 pub fn len(&self) -> usize {
244 self.cache.len()
245 }
246
247 pub fn is_empty(&self) -> bool {
249 self.cache.is_empty()
250 }
251
252 pub fn iter(&self) -> impl Iterator<Item = dashmap::mapref::multiple::RefMulti<'_, u64, CacheEntry>> {
254 self.cache.iter()
255 }
256
257 pub fn contains(&self, fingerprint: &QueryFingerprint) -> bool {
259 let key = self.fingerprint_to_hash(fingerprint);
260 self.cache.contains_key(&key)
261 }
262}
263
264#[cfg(test)]
265mod tests {
266 use super::*;
267 use std::time::Duration;
268
269 #[test]
270 fn test_hot_cache_insert_get() {
271 let cache = HotCache::new(1024 * 1024, 1024, EvictionPolicy::LFU);
272 let fp = QueryFingerprint::from_query("SELECT * FROM users");
273 let session = SessionId::new("sess-1");
274
275 let entry = CacheEntry::new(vec![1, 2, 3], vec!["users".to_string()], 1);
276 cache.insert(fp.clone(), entry, Some(session.clone()));
277
278 let result = cache.get(&fp, session);
279 assert!(result.is_some());
280 assert_eq!(result.unwrap().data, vec![1, 2, 3]);
281 }
282
283 #[test]
284 fn test_hot_cache_eviction() {
285 let cache = HotCache::new(200, 100, EvictionPolicy::LFU);
287
288 let table_names = ["alpha", "bravo", "charlie", "delta", "echo",
289 "foxtrot", "golf", "hotel", "india", "juliet"];
290 for name in &table_names {
291 let fp = QueryFingerprint::from_query(&format!("SELECT * FROM {}", name));
292 let entry = CacheEntry::new(vec![0; 50], vec![], 1);
293 cache.insert(fp, entry, None);
294 }
295
296 assert!(cache.len() < 10);
298 assert!(cache.stats().evictions > 0);
299 }
300
301 #[test]
302 fn test_hot_cache_invalidate_by_table() {
303 let cache = HotCache::new(1024 * 1024, 1024, EvictionPolicy::LFU);
304
305 let fp1 = QueryFingerprint::from_query("SELECT * FROM users WHERE id = 1");
306 let fp2 = QueryFingerprint::from_query("SELECT * FROM orders WHERE id = 1");
307
308 cache.insert(
309 fp1.clone(),
310 CacheEntry::new(vec![1], vec!["users".to_string()], 1),
311 None,
312 );
313 cache.insert(
314 fp2.clone(),
315 CacheEntry::new(vec![2], vec!["orders".to_string()], 1),
316 None,
317 );
318
319 assert_eq!(cache.len(), 2);
320
321 cache.invalidate_by_table("users");
323
324 assert_eq!(cache.len(), 1);
325 assert!(cache.get(&fp2, SessionId::new("")).is_some());
326 }
327
328 #[test]
329 fn test_hot_cache_stats() {
330 let cache = HotCache::new(1024 * 1024, 1024, EvictionPolicy::LFU);
331 let fp = QueryFingerprint::from_query("SELECT * FROM users");
332 let session = SessionId::new("test");
333
334 cache.insert(fp.clone(), CacheEntry::new(vec![1], vec![], 1), None);
335
336 cache.get(&fp, session.clone());
338 let fp2 = QueryFingerprint::from_query("SELECT * FROM orders");
340 cache.get(&fp2, session);
341
342 let stats = cache.stats();
343 assert_eq!(stats.hits, 1);
344 assert_eq!(stats.misses, 1);
345 assert_eq!(stats.entry_count, 1);
346 }
347
348 #[test]
349 fn test_max_entry_size() {
350 let cache = HotCache::new(1024 * 1024, 100, EvictionPolicy::LFU);
351
352 let fp = QueryFingerprint::from_query("SELECT *");
354 let large_entry = CacheEntry::new(vec![0; 200], vec![], 1);
355 cache.insert(fp.clone(), large_entry, None);
356
357 assert!(cache.is_empty());
358 }
359}