Skip to main content

heliosdb_proxy/distribcache/tiers/
l2_warm.rs

1//! L2 Warm Cache - SSD-backed cache with <1ms access time
2//!
3//! Features:
4//! - Compressed storage using LZ4 or Zstd
5//! - Bloom filter for fast negative lookups
6//! - TTL-based expiration
7
8use std::collections::HashSet;
9use std::path::PathBuf;
10use std::sync::atomic::{AtomicU64, Ordering};
11use std::sync::RwLock;
12
13use dashmap::DashMap;
14
15use super::{CacheEntry, CompressionType, TierStats};
16use crate::distribcache::QueryFingerprint;
17
18/// Bloom filter for fast negative lookups
19struct BloomFilter {
20    bits: Vec<u64>,
21    num_hashes: usize,
22}
23
24impl BloomFilter {
25    fn new(capacity: usize) -> Self {
26        // Calculate optimal size and hash count
27        let bits_per_item = 10; // ~1% false positive rate
28        let num_bits = capacity * bits_per_item;
29        let num_words = num_bits.div_ceil(64);
30
31        Self {
32            bits: vec![0; num_words],
33            num_hashes: 7, // Optimal for ~1% FPR
34        }
35    }
36
37    fn insert(&mut self, data: &[u8]) {
38        for i in 0..self.num_hashes {
39            let hash = self.hash(data, i);
40            let idx = hash as usize % (self.bits.len() * 64);
41            let word = idx / 64;
42            let bit = idx % 64;
43            self.bits[word] |= 1 << bit;
44        }
45    }
46
47    fn may_contain(&self, data: &[u8]) -> bool {
48        for i in 0..self.num_hashes {
49            let hash = self.hash(data, i);
50            let idx = hash as usize % (self.bits.len() * 64);
51            let word = idx / 64;
52            let bit = idx % 64;
53            if (self.bits[word] & (1 << bit)) == 0 {
54                return false;
55            }
56        }
57        true
58    }
59
60    fn hash(&self, data: &[u8], seed: usize) -> u64 {
61        use std::collections::hash_map::DefaultHasher;
62        use std::hash::{Hash, Hasher};
63
64        let mut hasher = DefaultHasher::new();
65        seed.hash(&mut hasher);
66        data.hash(&mut hasher);
67        hasher.finish()
68    }
69
70    fn clear(&mut self) {
71        self.bits.fill(0);
72    }
73}
74
75/// L2 Warm Cache - SSD-backed with compression
76pub struct WarmCache {
77    /// In-memory index (key -> metadata)
78    index: DashMap<u64, EntryMetadata>,
79
80    /// In-memory data store (simulating SSD storage)
81    /// In production, this would be RocksDB or similar
82    data: DashMap<u64, Vec<u8>>,
83
84    /// Bloom filter for fast negative lookups
85    bloom: RwLock<BloomFilter>,
86
87    /// Table to key index for invalidation
88    table_index: DashMap<String, HashSet<u64>>,
89
90    /// Compression type
91    compression: CompressionType,
92
93    /// Storage path (for future disk-based implementation)
94    _path: PathBuf,
95
96    /// Current size in bytes
97    current_size: AtomicU64,
98
99    /// Maximum size in bytes
100    max_size: u64,
101
102    /// Statistics
103    hits: AtomicU64,
104    misses: AtomicU64,
105    compressed_size: AtomicU64,
106    #[allow(dead_code)]
107    uncompressed_size: AtomicU64,
108}
109
110/// Entry metadata stored in index
111#[derive(Debug, Clone)]
112struct EntryMetadata {
113    /// Size of compressed data
114    compressed_size: usize,
115    /// Size of uncompressed data
116    #[allow(dead_code)]
117    uncompressed_size: usize,
118    /// Creation timestamp
119    created_at: u64,
120    /// TTL in seconds
121    ttl_secs: u64,
122    /// Tables for invalidation
123    tables: Vec<String>,
124}
125
126impl WarmCache {
127    /// Create a new warm cache
128    pub fn new(max_size: u64, path: PathBuf, compression: CompressionType) -> Self {
129        Self {
130            index: DashMap::new(),
131            data: DashMap::new(),
132            bloom: RwLock::new(BloomFilter::new(100_000)),
133            table_index: DashMap::new(),
134            compression,
135            _path: path,
136            current_size: AtomicU64::new(0),
137            max_size,
138            hits: AtomicU64::new(0),
139            misses: AtomicU64::new(0),
140            compressed_size: AtomicU64::new(0),
141            uncompressed_size: AtomicU64::new(0),
142        }
143    }
144
145    /// Get an entry from the cache
146    pub fn get(&self, fingerprint: &QueryFingerprint) -> Option<CacheEntry> {
147        let key = self.fingerprint_to_hash(fingerprint);
148        let key_bytes = key.to_le_bytes();
149
150        // Fast path: bloom filter check
151        {
152            let bloom = self.bloom.read().ok()?;
153            if !bloom.may_contain(&key_bytes) {
154                self.misses.fetch_add(1, Ordering::Relaxed);
155                return None;
156            }
157        }
158
159        // Check index
160        let metadata = self.index.get(&key)?;
161
162        // Check TTL
163        let now = std::time::SystemTime::now()
164            .duration_since(std::time::SystemTime::UNIX_EPOCH)
165            .unwrap_or_default()
166            .as_secs();
167
168        if now > metadata.created_at + metadata.ttl_secs {
169            drop(metadata);
170            self.remove_entry(key);
171            self.misses.fetch_add(1, Ordering::Relaxed);
172            return None;
173        }
174
175        // Get compressed data
176        let compressed = self.data.get(&key)?;
177
178        // Decompress
179        let decompressed = self.decompress(&compressed)?;
180
181        // Deserialize
182        let entry: CacheEntry = bincode::deserialize(&decompressed).ok()?;
183
184        self.hits.fetch_add(1, Ordering::Relaxed);
185        Some(entry)
186    }
187
188    /// Insert an entry into the cache
189    pub fn insert(&self, fingerprint: QueryFingerprint, entry: CacheEntry) {
190        let key = self.fingerprint_to_hash(&fingerprint);
191
192        // Serialize
193        let serialized = match bincode::serialize(&entry) {
194            Ok(s) => s,
195            Err(_) => return,
196        };
197
198        let uncompressed_size = serialized.len();
199
200        // Compress
201        let compressed = match self.compress(&serialized) {
202            Some(c) => c,
203            None => return,
204        };
205
206        let compressed_size = compressed.len();
207
208        // Evict if needed
209        while self.current_size.load(Ordering::Relaxed) + compressed_size as u64 > self.max_size {
210            if !self.evict_oldest() {
211                break;
212            }
213        }
214
215        // Remove old entry if exists
216        self.remove_entry(key);
217
218        // Create metadata
219        let metadata = EntryMetadata {
220            compressed_size,
221            uncompressed_size,
222            created_at: entry.created_at,
223            ttl_secs: entry.ttl_secs,
224            tables: entry.tables.clone(),
225        };
226
227        // Index by tables
228        for table in &entry.tables {
229            self.table_index
230                .entry(table.clone())
231                .or_default()
232                .insert(key);
233        }
234
235        // Insert into bloom filter
236        {
237            if let Ok(mut bloom) = self.bloom.write() {
238                bloom.insert(&key.to_le_bytes());
239            }
240        }
241
242        // Store
243        self.index.insert(key, metadata);
244        self.data.insert(key, compressed);
245        self.current_size
246            .fetch_add(compressed_size as u64, Ordering::Relaxed);
247        self.compressed_size
248            .fetch_add(compressed_size as u64, Ordering::Relaxed);
249        self.uncompressed_size
250            .fetch_add(uncompressed_size as u64, Ordering::Relaxed);
251    }
252
253    /// Invalidate entries for a table
254    pub fn invalidate_by_table(&self, table: &str) {
255        if let Some((_, keys)) = self.table_index.remove(table) {
256            for key in keys {
257                self.remove_entry(key);
258            }
259        }
260    }
261
262    /// Invalidate a specific entry
263    pub fn invalidate(&self, fingerprint: &QueryFingerprint) {
264        let key = self.fingerprint_to_hash(fingerprint);
265        self.remove_entry(key);
266    }
267
268    /// Remove an entry
269    fn remove_entry(&self, key: u64) {
270        if let Some((_, metadata)) = self.index.remove(&key) {
271            self.data.remove(&key);
272            self.current_size
273                .fetch_sub(metadata.compressed_size as u64, Ordering::Relaxed);
274
275            // Clean up table index
276            for table in &metadata.tables {
277                if let Some(mut keys) = self.table_index.get_mut(table) {
278                    keys.remove(&key);
279                }
280            }
281        }
282    }
283
284    /// Evict oldest entry
285    fn evict_oldest(&self) -> bool {
286        let mut oldest_key = None;
287        let mut oldest_time = u64::MAX;
288
289        for entry in self.index.iter() {
290            if entry.created_at < oldest_time {
291                oldest_time = entry.created_at;
292                oldest_key = Some(*entry.key());
293            }
294        }
295
296        if let Some(key) = oldest_key {
297            self.remove_entry(key);
298            return true;
299        }
300
301        false
302    }
303
304    /// Compress data
305    fn compress(&self, data: &[u8]) -> Option<Vec<u8>> {
306        match self.compression {
307            CompressionType::None => {
308                let mut output = Vec::with_capacity(data.len() + 1);
309                output.push(0x00); // No compression marker
310                output.extend_from_slice(data);
311                Some(output)
312            }
313            CompressionType::Lz4 => {
314                // Real LZ4 block compression. compress_prepend_size writes the
315                // uncompressed length as a little-endian u32 prefix so the
316                // decoder can size its output buffer exactly.
317                let compressed = lz4_flex::block::compress_prepend_size(data);
318                let mut output = Vec::with_capacity(compressed.len() + 1);
319                output.push(0x01); // LZ4 marker
320                output.extend_from_slice(&compressed);
321                Some(output)
322            }
323            CompressionType::Zstd => {
324                // Real zstd compression
325                let compressed = zstd::stream::encode_all(data, 3).ok()?;
326                let mut output = Vec::with_capacity(compressed.len() + 1);
327                output.push(0x02); // Zstd marker
328                output.extend_from_slice(&compressed);
329                Some(output)
330            }
331        }
332    }
333
334    /// Decompress data
335    fn decompress(&self, data: &[u8]) -> Option<Vec<u8>> {
336        if data.is_empty() {
337            return None;
338        }
339
340        let marker = data[0];
341        let payload = &data[1..];
342
343        match marker {
344            0x00 => Some(payload.to_vec()), // Uncompressed
345            0x01 => {
346                // Real LZ4 block decompression — reads the u32 size prefix
347                // written by compress_prepend_size.
348                lz4_flex::block::decompress_size_prepended(payload).ok()
349            }
350            0x02 => {
351                // Real zstd decompression
352                zstd::stream::decode_all(payload).ok()
353            }
354            _ => Some(data.to_vec()), // Unknown, return as-is
355        }
356    }
357
358    /// Convert fingerprint to hash key
359    fn fingerprint_to_hash(&self, fingerprint: &QueryFingerprint) -> u64 {
360        use std::collections::hash_map::DefaultHasher;
361        use std::hash::{Hash, Hasher};
362
363        let mut hasher = DefaultHasher::new();
364        fingerprint.template.hash(&mut hasher);
365        if let Some(param) = fingerprint.param_hash {
366            param.hash(&mut hasher);
367        }
368        hasher.finish()
369    }
370
371    /// Get cache statistics
372    pub fn stats(&self) -> TierStats {
373        let compressed = self.compressed_size.load(Ordering::Relaxed);
374        let uncompressed = self.uncompressed_size.load(Ordering::Relaxed);
375
376        TierStats {
377            size_bytes: self.current_size.load(Ordering::Relaxed),
378            max_size_bytes: self.max_size,
379            entry_count: self.index.len() as u64,
380            hits: self.hits.load(Ordering::Relaxed),
381            misses: self.misses.load(Ordering::Relaxed),
382            evictions: 0,
383            compression_ratio: if compressed > 0 {
384                Some(uncompressed as f64 / compressed as f64)
385            } else {
386                None
387            },
388            peer_count: None,
389            healthy_peers: None,
390        }
391    }
392
393    /// Clear all entries
394    pub fn clear(&self) {
395        self.index.clear();
396        self.data.clear();
397        self.table_index.clear();
398        if let Ok(mut bloom) = self.bloom.write() {
399            bloom.clear();
400        }
401        self.current_size.store(0, Ordering::Relaxed);
402    }
403}
404
405#[cfg(test)]
406mod tests {
407    use super::*;
408    use std::time::Duration;
409
410    #[test]
411    fn test_warm_cache_insert_get() {
412        let cache = WarmCache::new(
413            1024 * 1024 * 1024,
414            PathBuf::from("/tmp/test-cache"),
415            CompressionType::Lz4,
416        );
417
418        let fp = QueryFingerprint::from_query("SELECT * FROM users");
419        let entry = CacheEntry::new(vec![1, 2, 3], vec!["users".to_string()], 1)
420            .with_ttl(Duration::from_secs(300));
421
422        cache.insert(fp.clone(), entry);
423
424        let result = cache.get(&fp);
425        assert!(result.is_some());
426        assert_eq!(result.unwrap().data, vec![1, 2, 3]);
427    }
428
429    // Proves LZ4 is real now: it round-trips AND a compressible payload comes
430    // out strictly smaller than it went in. The previous impl just copied the
431    // bytes behind a marker, so this shrink assertion would have failed.
432    #[test]
433    fn test_lz4_compression_is_real() {
434        let cache = WarmCache::new(
435            1024 * 1024,
436            PathBuf::from("/tmp/test-cache-lz4"),
437            CompressionType::Lz4,
438        );
439
440        // Highly compressible: 4 KiB of a repeating pattern.
441        let original = b"helios-distribcache-".repeat(200);
442        let compressed = cache.compress(&original).expect("compress");
443        // marker byte + LZ4 block; must be meaningfully smaller than the input.
444        assert!(
445            compressed.len() < original.len(),
446            "LZ4 did not shrink data: {} -> {}",
447            original.len(),
448            compressed.len()
449        );
450
451        let restored = cache.decompress(&compressed).expect("decompress");
452        assert_eq!(restored, original, "LZ4 round-trip mismatch");
453
454        // Sanity: Zstd path round-trips too.
455        let zcache = WarmCache::new(
456            1024 * 1024,
457            PathBuf::from("/tmp/test-cache-zstd"),
458            CompressionType::Zstd,
459        );
460        let zc = zcache.compress(&original).expect("zstd compress");
461        assert!(zc.len() < original.len());
462        assert_eq!(zcache.decompress(&zc).expect("zstd decompress"), original);
463    }
464
465    #[test]
466    fn test_warm_cache_bloom_filter() {
467        let cache = WarmCache::new(
468            1024 * 1024,
469            PathBuf::from("/tmp/test-cache"),
470            CompressionType::None,
471        );
472
473        let fp1 = QueryFingerprint::from_query("SELECT * FROM users");
474        let fp2 = QueryFingerprint::from_query("SELECT * FROM orders");
475
476        cache.insert(
477            fp1.clone(),
478            CacheEntry::new(vec![1], vec![], 1).with_ttl(Duration::from_secs(300)),
479        );
480
481        // fp1 should hit bloom filter
482        assert!(cache.get(&fp1).is_some());
483
484        // fp2 should miss bloom filter (fast path)
485        assert!(cache.get(&fp2).is_none());
486    }
487
488    #[test]
489    fn test_warm_cache_invalidate_by_table() {
490        let cache = WarmCache::new(
491            1024 * 1024,
492            PathBuf::from("/tmp/test-cache"),
493            CompressionType::None,
494        );
495
496        let fp1 = QueryFingerprint::from_query("SELECT * FROM users");
497        let fp2 = QueryFingerprint::from_query("SELECT * FROM orders");
498
499        cache.insert(
500            fp1.clone(),
501            CacheEntry::new(vec![1], vec!["users".to_string()], 1)
502                .with_ttl(Duration::from_secs(300)),
503        );
504        cache.insert(
505            fp2.clone(),
506            CacheEntry::new(vec![2], vec!["orders".to_string()], 1)
507                .with_ttl(Duration::from_secs(300)),
508        );
509
510        cache.invalidate_by_table("users");
511
512        assert!(cache.get(&fp1).is_none());
513        assert!(cache.get(&fp2).is_some());
514    }
515
516    #[test]
517    fn test_warm_cache_stats() {
518        let cache = WarmCache::new(
519            1024 * 1024,
520            PathBuf::from("/tmp/test-cache"),
521            CompressionType::Lz4,
522        );
523
524        let fp = QueryFingerprint::from_query("SELECT * FROM users");
525        cache.insert(
526            fp.clone(),
527            CacheEntry::new(vec![1], vec![], 1).with_ttl(Duration::from_secs(300)),
528        );
529
530        cache.get(&fp); // Hit
531        let fp2 = QueryFingerprint::from_query("SELECT * FROM orders");
532        cache.get(&fp2); // Miss
533
534        let stats = cache.stats();
535        assert_eq!(stats.hits, 1);
536        assert_eq!(stats.misses, 1);
537        assert!(stats.compression_ratio.is_some());
538    }
539}