heliosdb_proxy/distribcache/tiers/
l2_warm.rs1use std::collections::HashSet;
9use std::io::{Read, Write};
10use std::path::PathBuf;
11use std::sync::atomic::{AtomicU64, Ordering};
12use std::sync::RwLock;
13
14use dashmap::DashMap;
15
16use super::{CacheEntry, CompressionType, TierStats};
17use crate::distribcache::QueryFingerprint;
18
19struct BloomFilter {
21 bits: Vec<u64>,
22 num_hashes: usize,
23}
24
25impl BloomFilter {
26 fn new(capacity: usize) -> Self {
27 let bits_per_item = 10; let num_bits = capacity * bits_per_item;
30 let num_words = (num_bits + 63) / 64;
31
32 Self {
33 bits: vec![0; num_words],
34 num_hashes: 7, }
36 }
37
38 fn insert(&mut self, data: &[u8]) {
39 for i in 0..self.num_hashes {
40 let hash = self.hash(data, i);
41 let idx = hash as usize % (self.bits.len() * 64);
42 let word = idx / 64;
43 let bit = idx % 64;
44 self.bits[word] |= 1 << bit;
45 }
46 }
47
48 fn may_contain(&self, data: &[u8]) -> bool {
49 for i in 0..self.num_hashes {
50 let hash = self.hash(data, i);
51 let idx = hash as usize % (self.bits.len() * 64);
52 let word = idx / 64;
53 let bit = idx % 64;
54 if (self.bits[word] & (1 << bit)) == 0 {
55 return false;
56 }
57 }
58 true
59 }
60
61 fn hash(&self, data: &[u8], seed: usize) -> u64 {
62 use std::hash::{Hash, Hasher};
63 use std::collections::hash_map::DefaultHasher;
64
65 let mut hasher = DefaultHasher::new();
66 seed.hash(&mut hasher);
67 data.hash(&mut hasher);
68 hasher.finish()
69 }
70
71 fn clear(&mut self) {
72 self.bits.fill(0);
73 }
74}
75
76pub struct WarmCache {
78 index: DashMap<u64, EntryMetadata>,
80
81 data: DashMap<u64, Vec<u8>>,
84
85 bloom: RwLock<BloomFilter>,
87
88 table_index: DashMap<String, HashSet<u64>>,
90
91 compression: CompressionType,
93
94 _path: PathBuf,
96
97 current_size: AtomicU64,
99
100 max_size: u64,
102
103 hits: AtomicU64,
105 misses: AtomicU64,
106 compressed_size: AtomicU64,
107 uncompressed_size: AtomicU64,
108}
109
110#[derive(Debug, Clone)]
112struct EntryMetadata {
113 compressed_size: usize,
115 uncompressed_size: usize,
117 created_at: u64,
119 ttl_secs: u64,
121 tables: Vec<String>,
123}
124
125impl WarmCache {
126 pub fn new(max_size: u64, path: PathBuf, compression: CompressionType) -> Self {
128 Self {
129 index: DashMap::new(),
130 data: DashMap::new(),
131 bloom: RwLock::new(BloomFilter::new(100_000)),
132 table_index: DashMap::new(),
133 compression,
134 _path: path,
135 current_size: AtomicU64::new(0),
136 max_size,
137 hits: AtomicU64::new(0),
138 misses: AtomicU64::new(0),
139 compressed_size: AtomicU64::new(0),
140 uncompressed_size: AtomicU64::new(0),
141 }
142 }
143
144 pub fn get(&self, fingerprint: &QueryFingerprint) -> Option<CacheEntry> {
146 let key = self.fingerprint_to_hash(fingerprint);
147 let key_bytes = key.to_le_bytes();
148
149 {
151 let bloom = self.bloom.read().ok()?;
152 if !bloom.may_contain(&key_bytes) {
153 self.misses.fetch_add(1, Ordering::Relaxed);
154 return None;
155 }
156 }
157
158 let metadata = self.index.get(&key)?;
160
161 let now = std::time::SystemTime::now()
163 .duration_since(std::time::SystemTime::UNIX_EPOCH)
164 .unwrap_or_default()
165 .as_secs();
166
167 if now > metadata.created_at + metadata.ttl_secs {
168 drop(metadata);
169 self.remove_entry(key);
170 self.misses.fetch_add(1, Ordering::Relaxed);
171 return None;
172 }
173
174 let compressed = self.data.get(&key)?;
176
177 let decompressed = self.decompress(&compressed)?;
179
180 let entry: CacheEntry = bincode::deserialize(&decompressed).ok()?;
182
183 self.hits.fetch_add(1, Ordering::Relaxed);
184 Some(entry)
185 }
186
187 pub fn insert(&self, fingerprint: QueryFingerprint, entry: CacheEntry) {
189 let key = self.fingerprint_to_hash(&fingerprint);
190
191 let serialized = match bincode::serialize(&entry) {
193 Ok(s) => s,
194 Err(_) => return,
195 };
196
197 let uncompressed_size = serialized.len();
198
199 let compressed = match self.compress(&serialized) {
201 Some(c) => c,
202 None => return,
203 };
204
205 let compressed_size = compressed.len();
206
207 while self.current_size.load(Ordering::Relaxed) + compressed_size as u64 > self.max_size {
209 if !self.evict_oldest() {
210 break;
211 }
212 }
213
214 self.remove_entry(key);
216
217 let metadata = EntryMetadata {
219 compressed_size,
220 uncompressed_size,
221 created_at: entry.created_at,
222 ttl_secs: entry.ttl_secs,
223 tables: entry.tables.clone(),
224 };
225
226 for table in &entry.tables {
228 self.table_index
229 .entry(table.clone())
230 .or_default()
231 .insert(key);
232 }
233
234 {
236 if let Ok(mut bloom) = self.bloom.write() {
237 bloom.insert(&key.to_le_bytes());
238 }
239 }
240
241 self.index.insert(key, metadata);
243 self.data.insert(key, compressed);
244 self.current_size.fetch_add(compressed_size as u64, Ordering::Relaxed);
245 self.compressed_size.fetch_add(compressed_size as u64, Ordering::Relaxed);
246 self.uncompressed_size.fetch_add(uncompressed_size as u64, Ordering::Relaxed);
247 }
248
249 pub fn invalidate_by_table(&self, table: &str) {
251 if let Some((_, keys)) = self.table_index.remove(table) {
252 for key in keys {
253 self.remove_entry(key);
254 }
255 }
256 }
257
258 pub fn invalidate(&self, fingerprint: &QueryFingerprint) {
260 let key = self.fingerprint_to_hash(fingerprint);
261 self.remove_entry(key);
262 }
263
264 fn remove_entry(&self, key: u64) {
266 if let Some((_, metadata)) = self.index.remove(&key) {
267 self.data.remove(&key);
268 self.current_size.fetch_sub(metadata.compressed_size as u64, Ordering::Relaxed);
269
270 for table in &metadata.tables {
272 if let Some(mut keys) = self.table_index.get_mut(table) {
273 keys.remove(&key);
274 }
275 }
276 }
277 }
278
279 fn evict_oldest(&self) -> bool {
281 let mut oldest_key = None;
282 let mut oldest_time = u64::MAX;
283
284 for entry in self.index.iter() {
285 if entry.created_at < oldest_time {
286 oldest_time = entry.created_at;
287 oldest_key = Some(*entry.key());
288 }
289 }
290
291 if let Some(key) = oldest_key {
292 self.remove_entry(key);
293 return true;
294 }
295
296 false
297 }
298
299 fn compress(&self, data: &[u8]) -> Option<Vec<u8>> {
301 match self.compression {
302 CompressionType::None => {
303 let mut output = Vec::with_capacity(data.len() + 1);
304 output.push(0x00); output.extend_from_slice(data);
306 Some(output)
307 }
308 CompressionType::Lz4 => {
309 let mut output = Vec::with_capacity(data.len() + 1);
312 output.push(0x01); output.extend_from_slice(data);
315 Some(output)
316 }
317 CompressionType::Zstd => {
318 let compressed = zstd::stream::encode_all(data, 3).ok()?;
320 let mut output = Vec::with_capacity(compressed.len() + 1);
321 output.push(0x02); output.extend_from_slice(&compressed);
323 Some(output)
324 }
325 }
326 }
327
328 fn decompress(&self, data: &[u8]) -> Option<Vec<u8>> {
330 if data.is_empty() {
331 return None;
332 }
333
334 let marker = data[0];
335 let payload = &data[1..];
336
337 match marker {
338 0x00 => Some(payload.to_vec()), 0x01 => Some(payload.to_vec()), 0x02 => {
341 zstd::stream::decode_all(payload).ok()
343 }
344 _ => Some(data.to_vec()), }
346 }
347
348 fn fingerprint_to_hash(&self, fingerprint: &QueryFingerprint) -> u64 {
350 use std::hash::{Hash, Hasher};
351 use std::collections::hash_map::DefaultHasher;
352
353 let mut hasher = DefaultHasher::new();
354 fingerprint.template.hash(&mut hasher);
355 if let Some(param) = fingerprint.param_hash {
356 param.hash(&mut hasher);
357 }
358 hasher.finish()
359 }
360
361 pub fn stats(&self) -> TierStats {
363 let compressed = self.compressed_size.load(Ordering::Relaxed);
364 let uncompressed = self.uncompressed_size.load(Ordering::Relaxed);
365
366 TierStats {
367 size_bytes: self.current_size.load(Ordering::Relaxed),
368 max_size_bytes: self.max_size,
369 entry_count: self.index.len() as u64,
370 hits: self.hits.load(Ordering::Relaxed),
371 misses: self.misses.load(Ordering::Relaxed),
372 evictions: 0,
373 compression_ratio: if compressed > 0 {
374 Some(uncompressed as f64 / compressed as f64)
375 } else {
376 None
377 },
378 peer_count: None,
379 healthy_peers: None,
380 }
381 }
382
383 pub fn clear(&self) {
385 self.index.clear();
386 self.data.clear();
387 self.table_index.clear();
388 if let Ok(mut bloom) = self.bloom.write() {
389 bloom.clear();
390 }
391 self.current_size.store(0, Ordering::Relaxed);
392 }
393}
394
395#[cfg(test)]
396mod tests {
397 use super::*;
398 use std::time::Duration;
399
400 #[test]
401 fn test_warm_cache_insert_get() {
402 let cache = WarmCache::new(
403 1024 * 1024 * 1024,
404 PathBuf::from("/tmp/test-cache"),
405 CompressionType::Lz4,
406 );
407
408 let fp = QueryFingerprint::from_query("SELECT * FROM users");
409 let entry = CacheEntry::new(vec![1, 2, 3], vec!["users".to_string()], 1)
410 .with_ttl(Duration::from_secs(300));
411
412 cache.insert(fp.clone(), entry);
413
414 let result = cache.get(&fp);
415 assert!(result.is_some());
416 assert_eq!(result.unwrap().data, vec![1, 2, 3]);
417 }
418
419 #[test]
420 fn test_warm_cache_bloom_filter() {
421 let cache = WarmCache::new(
422 1024 * 1024,
423 PathBuf::from("/tmp/test-cache"),
424 CompressionType::None,
425 );
426
427 let fp1 = QueryFingerprint::from_query("SELECT * FROM users");
428 let fp2 = QueryFingerprint::from_query("SELECT * FROM orders");
429
430 cache.insert(
431 fp1.clone(),
432 CacheEntry::new(vec![1], vec![], 1).with_ttl(Duration::from_secs(300)),
433 );
434
435 assert!(cache.get(&fp1).is_some());
437
438 assert!(cache.get(&fp2).is_none());
440 }
441
442 #[test]
443 fn test_warm_cache_invalidate_by_table() {
444 let cache = WarmCache::new(
445 1024 * 1024,
446 PathBuf::from("/tmp/test-cache"),
447 CompressionType::None,
448 );
449
450 let fp1 = QueryFingerprint::from_query("SELECT * FROM users");
451 let fp2 = QueryFingerprint::from_query("SELECT * FROM orders");
452
453 cache.insert(
454 fp1.clone(),
455 CacheEntry::new(vec![1], vec!["users".to_string()], 1)
456 .with_ttl(Duration::from_secs(300)),
457 );
458 cache.insert(
459 fp2.clone(),
460 CacheEntry::new(vec![2], vec!["orders".to_string()], 1)
461 .with_ttl(Duration::from_secs(300)),
462 );
463
464 cache.invalidate_by_table("users");
465
466 assert!(cache.get(&fp1).is_none());
467 assert!(cache.get(&fp2).is_some());
468 }
469
470 #[test]
471 fn test_warm_cache_stats() {
472 let cache = WarmCache::new(
473 1024 * 1024,
474 PathBuf::from("/tmp/test-cache"),
475 CompressionType::Lz4,
476 );
477
478 let fp = QueryFingerprint::from_query("SELECT * FROM users");
479 cache.insert(
480 fp.clone(),
481 CacheEntry::new(vec![1], vec![], 1).with_ttl(Duration::from_secs(300)),
482 );
483
484 cache.get(&fp); let fp2 = QueryFingerprint::from_query("SELECT * FROM orders");
486 cache.get(&fp2); let stats = cache.stats();
489 assert_eq!(stats.hits, 1);
490 assert_eq!(stats.misses, 1);
491 assert!(stats.compression_ratio.is_some());
492 }
493}