heliosdb_proxy/distribcache/tiers/
l2_warm.rs1use std::collections::HashSet;
9use std::path::PathBuf;
10use std::sync::atomic::{AtomicU64, Ordering};
11use std::sync::RwLock;
12
13use dashmap::DashMap;
14
15use super::{CacheEntry, CompressionType, TierStats};
16use crate::distribcache::QueryFingerprint;
17
18struct BloomFilter {
20 bits: Vec<u64>,
21 num_hashes: usize,
22}
23
24impl BloomFilter {
25 fn new(capacity: usize) -> Self {
26 let bits_per_item = 10; let num_bits = capacity * bits_per_item;
29 let num_words = num_bits.div_ceil(64);
30
31 Self {
32 bits: vec![0; num_words],
33 num_hashes: 7, }
35 }
36
37 fn insert(&mut self, data: &[u8]) {
38 for i in 0..self.num_hashes {
39 let hash = self.hash(data, i);
40 let idx = hash as usize % (self.bits.len() * 64);
41 let word = idx / 64;
42 let bit = idx % 64;
43 self.bits[word] |= 1 << bit;
44 }
45 }
46
47 fn may_contain(&self, data: &[u8]) -> bool {
48 for i in 0..self.num_hashes {
49 let hash = self.hash(data, i);
50 let idx = hash as usize % (self.bits.len() * 64);
51 let word = idx / 64;
52 let bit = idx % 64;
53 if (self.bits[word] & (1 << bit)) == 0 {
54 return false;
55 }
56 }
57 true
58 }
59
60 fn hash(&self, data: &[u8], seed: usize) -> u64 {
61 use std::collections::hash_map::DefaultHasher;
62 use std::hash::{Hash, Hasher};
63
64 let mut hasher = DefaultHasher::new();
65 seed.hash(&mut hasher);
66 data.hash(&mut hasher);
67 hasher.finish()
68 }
69
70 fn clear(&mut self) {
71 self.bits.fill(0);
72 }
73}
74
75pub struct WarmCache {
77 index: DashMap<u64, EntryMetadata>,
79
80 data: DashMap<u64, Vec<u8>>,
83
84 bloom: RwLock<BloomFilter>,
86
87 table_index: DashMap<String, HashSet<u64>>,
89
90 compression: CompressionType,
92
93 _path: PathBuf,
95
96 current_size: AtomicU64,
98
99 max_size: u64,
101
102 hits: AtomicU64,
104 misses: AtomicU64,
105 compressed_size: AtomicU64,
106 #[allow(dead_code)]
107 uncompressed_size: AtomicU64,
108}
109
110#[derive(Debug, Clone)]
112struct EntryMetadata {
113 compressed_size: usize,
115 #[allow(dead_code)]
117 uncompressed_size: usize,
118 created_at: u64,
120 ttl_secs: u64,
122 tables: Vec<String>,
124}
125
126impl WarmCache {
127 pub fn new(max_size: u64, path: PathBuf, compression: CompressionType) -> Self {
129 Self {
130 index: DashMap::new(),
131 data: DashMap::new(),
132 bloom: RwLock::new(BloomFilter::new(100_000)),
133 table_index: DashMap::new(),
134 compression,
135 _path: path,
136 current_size: AtomicU64::new(0),
137 max_size,
138 hits: AtomicU64::new(0),
139 misses: AtomicU64::new(0),
140 compressed_size: AtomicU64::new(0),
141 uncompressed_size: AtomicU64::new(0),
142 }
143 }
144
145 pub fn get(&self, fingerprint: &QueryFingerprint) -> Option<CacheEntry> {
147 let key = self.fingerprint_to_hash(fingerprint);
148 let key_bytes = key.to_le_bytes();
149
150 {
152 let bloom = self.bloom.read().ok()?;
153 if !bloom.may_contain(&key_bytes) {
154 self.misses.fetch_add(1, Ordering::Relaxed);
155 return None;
156 }
157 }
158
159 let metadata = self.index.get(&key)?;
161
162 let now = std::time::SystemTime::now()
164 .duration_since(std::time::SystemTime::UNIX_EPOCH)
165 .unwrap_or_default()
166 .as_secs();
167
168 if now > metadata.created_at + metadata.ttl_secs {
169 drop(metadata);
170 self.remove_entry(key);
171 self.misses.fetch_add(1, Ordering::Relaxed);
172 return None;
173 }
174
175 let compressed = self.data.get(&key)?;
177
178 let decompressed = self.decompress(&compressed)?;
180
181 let entry: CacheEntry = bincode::deserialize(&decompressed).ok()?;
183
184 self.hits.fetch_add(1, Ordering::Relaxed);
185 Some(entry)
186 }
187
188 pub fn insert(&self, fingerprint: QueryFingerprint, entry: CacheEntry) {
190 let key = self.fingerprint_to_hash(&fingerprint);
191
192 let serialized = match bincode::serialize(&entry) {
194 Ok(s) => s,
195 Err(_) => return,
196 };
197
198 let uncompressed_size = serialized.len();
199
200 let compressed = match self.compress(&serialized) {
202 Some(c) => c,
203 None => return,
204 };
205
206 let compressed_size = compressed.len();
207
208 while self.current_size.load(Ordering::Relaxed) + compressed_size as u64 > self.max_size {
210 if !self.evict_oldest() {
211 break;
212 }
213 }
214
215 self.remove_entry(key);
217
218 let metadata = EntryMetadata {
220 compressed_size,
221 uncompressed_size,
222 created_at: entry.created_at,
223 ttl_secs: entry.ttl_secs,
224 tables: entry.tables.clone(),
225 };
226
227 for table in &entry.tables {
229 self.table_index
230 .entry(table.clone())
231 .or_default()
232 .insert(key);
233 }
234
235 {
237 if let Ok(mut bloom) = self.bloom.write() {
238 bloom.insert(&key.to_le_bytes());
239 }
240 }
241
242 self.index.insert(key, metadata);
244 self.data.insert(key, compressed);
245 self.current_size
246 .fetch_add(compressed_size as u64, Ordering::Relaxed);
247 self.compressed_size
248 .fetch_add(compressed_size as u64, Ordering::Relaxed);
249 self.uncompressed_size
250 .fetch_add(uncompressed_size as u64, Ordering::Relaxed);
251 }
252
253 pub fn invalidate_by_table(&self, table: &str) {
255 if let Some((_, keys)) = self.table_index.remove(table) {
256 for key in keys {
257 self.remove_entry(key);
258 }
259 }
260 }
261
262 pub fn invalidate(&self, fingerprint: &QueryFingerprint) {
264 let key = self.fingerprint_to_hash(fingerprint);
265 self.remove_entry(key);
266 }
267
268 fn remove_entry(&self, key: u64) {
270 if let Some((_, metadata)) = self.index.remove(&key) {
271 self.data.remove(&key);
272 self.current_size
273 .fetch_sub(metadata.compressed_size as u64, Ordering::Relaxed);
274
275 for table in &metadata.tables {
277 if let Some(mut keys) = self.table_index.get_mut(table) {
278 keys.remove(&key);
279 }
280 }
281 }
282 }
283
284 fn evict_oldest(&self) -> bool {
286 let mut oldest_key = None;
287 let mut oldest_time = u64::MAX;
288
289 for entry in self.index.iter() {
290 if entry.created_at < oldest_time {
291 oldest_time = entry.created_at;
292 oldest_key = Some(*entry.key());
293 }
294 }
295
296 if let Some(key) = oldest_key {
297 self.remove_entry(key);
298 return true;
299 }
300
301 false
302 }
303
304 fn compress(&self, data: &[u8]) -> Option<Vec<u8>> {
306 match self.compression {
307 CompressionType::None => {
308 let mut output = Vec::with_capacity(data.len() + 1);
309 output.push(0x00); output.extend_from_slice(data);
311 Some(output)
312 }
313 CompressionType::Lz4 => {
314 let compressed = lz4_flex::block::compress_prepend_size(data);
318 let mut output = Vec::with_capacity(compressed.len() + 1);
319 output.push(0x01); output.extend_from_slice(&compressed);
321 Some(output)
322 }
323 CompressionType::Zstd => {
324 let compressed = zstd::stream::encode_all(data, 3).ok()?;
326 let mut output = Vec::with_capacity(compressed.len() + 1);
327 output.push(0x02); output.extend_from_slice(&compressed);
329 Some(output)
330 }
331 }
332 }
333
334 fn decompress(&self, data: &[u8]) -> Option<Vec<u8>> {
336 if data.is_empty() {
337 return None;
338 }
339
340 let marker = data[0];
341 let payload = &data[1..];
342
343 match marker {
344 0x00 => Some(payload.to_vec()), 0x01 => {
346 lz4_flex::block::decompress_size_prepended(payload).ok()
349 }
350 0x02 => {
351 zstd::stream::decode_all(payload).ok()
353 }
354 _ => Some(data.to_vec()), }
356 }
357
358 fn fingerprint_to_hash(&self, fingerprint: &QueryFingerprint) -> u64 {
360 use std::collections::hash_map::DefaultHasher;
361 use std::hash::{Hash, Hasher};
362
363 let mut hasher = DefaultHasher::new();
364 fingerprint.template.hash(&mut hasher);
365 if let Some(param) = fingerprint.param_hash {
366 param.hash(&mut hasher);
367 }
368 hasher.finish()
369 }
370
371 pub fn stats(&self) -> TierStats {
373 let compressed = self.compressed_size.load(Ordering::Relaxed);
374 let uncompressed = self.uncompressed_size.load(Ordering::Relaxed);
375
376 TierStats {
377 size_bytes: self.current_size.load(Ordering::Relaxed),
378 max_size_bytes: self.max_size,
379 entry_count: self.index.len() as u64,
380 hits: self.hits.load(Ordering::Relaxed),
381 misses: self.misses.load(Ordering::Relaxed),
382 evictions: 0,
383 compression_ratio: if compressed > 0 {
384 Some(uncompressed as f64 / compressed as f64)
385 } else {
386 None
387 },
388 peer_count: None,
389 healthy_peers: None,
390 }
391 }
392
393 pub fn clear(&self) {
395 self.index.clear();
396 self.data.clear();
397 self.table_index.clear();
398 if let Ok(mut bloom) = self.bloom.write() {
399 bloom.clear();
400 }
401 self.current_size.store(0, Ordering::Relaxed);
402 }
403}
404
405#[cfg(test)]
406mod tests {
407 use super::*;
408 use std::time::Duration;
409
410 #[test]
411 fn test_warm_cache_insert_get() {
412 let cache = WarmCache::new(
413 1024 * 1024 * 1024,
414 PathBuf::from("/tmp/test-cache"),
415 CompressionType::Lz4,
416 );
417
418 let fp = QueryFingerprint::from_query("SELECT * FROM users");
419 let entry = CacheEntry::new(vec![1, 2, 3], vec!["users".to_string()], 1)
420 .with_ttl(Duration::from_secs(300));
421
422 cache.insert(fp.clone(), entry);
423
424 let result = cache.get(&fp);
425 assert!(result.is_some());
426 assert_eq!(result.unwrap().data, vec![1, 2, 3]);
427 }
428
429 #[test]
433 fn test_lz4_compression_is_real() {
434 let cache = WarmCache::new(
435 1024 * 1024,
436 PathBuf::from("/tmp/test-cache-lz4"),
437 CompressionType::Lz4,
438 );
439
440 let original = b"helios-distribcache-".repeat(200);
442 let compressed = cache.compress(&original).expect("compress");
443 assert!(
445 compressed.len() < original.len(),
446 "LZ4 did not shrink data: {} -> {}",
447 original.len(),
448 compressed.len()
449 );
450
451 let restored = cache.decompress(&compressed).expect("decompress");
452 assert_eq!(restored, original, "LZ4 round-trip mismatch");
453
454 let zcache = WarmCache::new(
456 1024 * 1024,
457 PathBuf::from("/tmp/test-cache-zstd"),
458 CompressionType::Zstd,
459 );
460 let zc = zcache.compress(&original).expect("zstd compress");
461 assert!(zc.len() < original.len());
462 assert_eq!(zcache.decompress(&zc).expect("zstd decompress"), original);
463 }
464
465 #[test]
466 fn test_warm_cache_bloom_filter() {
467 let cache = WarmCache::new(
468 1024 * 1024,
469 PathBuf::from("/tmp/test-cache"),
470 CompressionType::None,
471 );
472
473 let fp1 = QueryFingerprint::from_query("SELECT * FROM users");
474 let fp2 = QueryFingerprint::from_query("SELECT * FROM orders");
475
476 cache.insert(
477 fp1.clone(),
478 CacheEntry::new(vec![1], vec![], 1).with_ttl(Duration::from_secs(300)),
479 );
480
481 assert!(cache.get(&fp1).is_some());
483
484 assert!(cache.get(&fp2).is_none());
486 }
487
488 #[test]
489 fn test_warm_cache_invalidate_by_table() {
490 let cache = WarmCache::new(
491 1024 * 1024,
492 PathBuf::from("/tmp/test-cache"),
493 CompressionType::None,
494 );
495
496 let fp1 = QueryFingerprint::from_query("SELECT * FROM users");
497 let fp2 = QueryFingerprint::from_query("SELECT * FROM orders");
498
499 cache.insert(
500 fp1.clone(),
501 CacheEntry::new(vec![1], vec!["users".to_string()], 1)
502 .with_ttl(Duration::from_secs(300)),
503 );
504 cache.insert(
505 fp2.clone(),
506 CacheEntry::new(vec![2], vec!["orders".to_string()], 1)
507 .with_ttl(Duration::from_secs(300)),
508 );
509
510 cache.invalidate_by_table("users");
511
512 assert!(cache.get(&fp1).is_none());
513 assert!(cache.get(&fp2).is_some());
514 }
515
516 #[test]
517 fn test_warm_cache_stats() {
518 let cache = WarmCache::new(
519 1024 * 1024,
520 PathBuf::from("/tmp/test-cache"),
521 CompressionType::Lz4,
522 );
523
524 let fp = QueryFingerprint::from_query("SELECT * FROM users");
525 cache.insert(
526 fp.clone(),
527 CacheEntry::new(vec![1], vec![], 1).with_ttl(Duration::from_secs(300)),
528 );
529
530 cache.get(&fp); let fp2 = QueryFingerprint::from_query("SELECT * FROM orders");
532 cache.get(&fp2); let stats = cache.stats();
535 assert_eq!(stats.hits, 1);
536 assert_eq!(stats.misses, 1);
537 assert!(stats.compression_ratio.is_some());
538 }
539}