Skip to main content

heliosdb_proxy/distribcache/tiers/
l2_warm.rs

1//! L2 Warm Cache - SSD-backed cache with <1ms access time
2//!
3//! Features:
4//! - Compressed storage using LZ4 or Zstd
5//! - Bloom filter for fast negative lookups
6//! - TTL-based expiration
7
8use std::collections::HashSet;
9use std::io::{Read, Write};
10use std::path::PathBuf;
11use std::sync::atomic::{AtomicU64, Ordering};
12use std::sync::RwLock;
13
14use dashmap::DashMap;
15
16use super::{CacheEntry, CompressionType, TierStats};
17use crate::distribcache::QueryFingerprint;
18
19/// Bloom filter for fast negative lookups
20struct BloomFilter {
21    bits: Vec<u64>,
22    num_hashes: usize,
23}
24
25impl BloomFilter {
26    fn new(capacity: usize) -> Self {
27        // Calculate optimal size and hash count
28        let bits_per_item = 10; // ~1% false positive rate
29        let num_bits = capacity * bits_per_item;
30        let num_words = (num_bits + 63) / 64;
31
32        Self {
33            bits: vec![0; num_words],
34            num_hashes: 7, // Optimal for ~1% FPR
35        }
36    }
37
38    fn insert(&mut self, data: &[u8]) {
39        for i in 0..self.num_hashes {
40            let hash = self.hash(data, i);
41            let idx = hash as usize % (self.bits.len() * 64);
42            let word = idx / 64;
43            let bit = idx % 64;
44            self.bits[word] |= 1 << bit;
45        }
46    }
47
48    fn may_contain(&self, data: &[u8]) -> bool {
49        for i in 0..self.num_hashes {
50            let hash = self.hash(data, i);
51            let idx = hash as usize % (self.bits.len() * 64);
52            let word = idx / 64;
53            let bit = idx % 64;
54            if (self.bits[word] & (1 << bit)) == 0 {
55                return false;
56            }
57        }
58        true
59    }
60
61    fn hash(&self, data: &[u8], seed: usize) -> u64 {
62        use std::hash::{Hash, Hasher};
63        use std::collections::hash_map::DefaultHasher;
64
65        let mut hasher = DefaultHasher::new();
66        seed.hash(&mut hasher);
67        data.hash(&mut hasher);
68        hasher.finish()
69    }
70
71    fn clear(&mut self) {
72        self.bits.fill(0);
73    }
74}
75
76/// L2 Warm Cache - SSD-backed with compression
77pub struct WarmCache {
78    /// In-memory index (key -> metadata)
79    index: DashMap<u64, EntryMetadata>,
80
81    /// In-memory data store (simulating SSD storage)
82    /// In production, this would be RocksDB or similar
83    data: DashMap<u64, Vec<u8>>,
84
85    /// Bloom filter for fast negative lookups
86    bloom: RwLock<BloomFilter>,
87
88    /// Table to key index for invalidation
89    table_index: DashMap<String, HashSet<u64>>,
90
91    /// Compression type
92    compression: CompressionType,
93
94    /// Storage path (for future disk-based implementation)
95    _path: PathBuf,
96
97    /// Current size in bytes
98    current_size: AtomicU64,
99
100    /// Maximum size in bytes
101    max_size: u64,
102
103    /// Statistics
104    hits: AtomicU64,
105    misses: AtomicU64,
106    compressed_size: AtomicU64,
107    uncompressed_size: AtomicU64,
108}
109
110/// Entry metadata stored in index
111#[derive(Debug, Clone)]
112struct EntryMetadata {
113    /// Size of compressed data
114    compressed_size: usize,
115    /// Size of uncompressed data
116    uncompressed_size: usize,
117    /// Creation timestamp
118    created_at: u64,
119    /// TTL in seconds
120    ttl_secs: u64,
121    /// Tables for invalidation
122    tables: Vec<String>,
123}
124
125impl WarmCache {
126    /// Create a new warm cache
127    pub fn new(max_size: u64, path: PathBuf, compression: CompressionType) -> Self {
128        Self {
129            index: DashMap::new(),
130            data: DashMap::new(),
131            bloom: RwLock::new(BloomFilter::new(100_000)),
132            table_index: DashMap::new(),
133            compression,
134            _path: path,
135            current_size: AtomicU64::new(0),
136            max_size,
137            hits: AtomicU64::new(0),
138            misses: AtomicU64::new(0),
139            compressed_size: AtomicU64::new(0),
140            uncompressed_size: AtomicU64::new(0),
141        }
142    }
143
144    /// Get an entry from the cache
145    pub fn get(&self, fingerprint: &QueryFingerprint) -> Option<CacheEntry> {
146        let key = self.fingerprint_to_hash(fingerprint);
147        let key_bytes = key.to_le_bytes();
148
149        // Fast path: bloom filter check
150        {
151            let bloom = self.bloom.read().ok()?;
152            if !bloom.may_contain(&key_bytes) {
153                self.misses.fetch_add(1, Ordering::Relaxed);
154                return None;
155            }
156        }
157
158        // Check index
159        let metadata = self.index.get(&key)?;
160
161        // Check TTL
162        let now = std::time::SystemTime::now()
163            .duration_since(std::time::SystemTime::UNIX_EPOCH)
164            .unwrap_or_default()
165            .as_secs();
166
167        if now > metadata.created_at + metadata.ttl_secs {
168            drop(metadata);
169            self.remove_entry(key);
170            self.misses.fetch_add(1, Ordering::Relaxed);
171            return None;
172        }
173
174        // Get compressed data
175        let compressed = self.data.get(&key)?;
176
177        // Decompress
178        let decompressed = self.decompress(&compressed)?;
179
180        // Deserialize
181        let entry: CacheEntry = bincode::deserialize(&decompressed).ok()?;
182
183        self.hits.fetch_add(1, Ordering::Relaxed);
184        Some(entry)
185    }
186
187    /// Insert an entry into the cache
188    pub fn insert(&self, fingerprint: QueryFingerprint, entry: CacheEntry) {
189        let key = self.fingerprint_to_hash(&fingerprint);
190
191        // Serialize
192        let serialized = match bincode::serialize(&entry) {
193            Ok(s) => s,
194            Err(_) => return,
195        };
196
197        let uncompressed_size = serialized.len();
198
199        // Compress
200        let compressed = match self.compress(&serialized) {
201            Some(c) => c,
202            None => return,
203        };
204
205        let compressed_size = compressed.len();
206
207        // Evict if needed
208        while self.current_size.load(Ordering::Relaxed) + compressed_size as u64 > self.max_size {
209            if !self.evict_oldest() {
210                break;
211            }
212        }
213
214        // Remove old entry if exists
215        self.remove_entry(key);
216
217        // Create metadata
218        let metadata = EntryMetadata {
219            compressed_size,
220            uncompressed_size,
221            created_at: entry.created_at,
222            ttl_secs: entry.ttl_secs,
223            tables: entry.tables.clone(),
224        };
225
226        // Index by tables
227        for table in &entry.tables {
228            self.table_index
229                .entry(table.clone())
230                .or_default()
231                .insert(key);
232        }
233
234        // Insert into bloom filter
235        {
236            if let Ok(mut bloom) = self.bloom.write() {
237                bloom.insert(&key.to_le_bytes());
238            }
239        }
240
241        // Store
242        self.index.insert(key, metadata);
243        self.data.insert(key, compressed);
244        self.current_size.fetch_add(compressed_size as u64, Ordering::Relaxed);
245        self.compressed_size.fetch_add(compressed_size as u64, Ordering::Relaxed);
246        self.uncompressed_size.fetch_add(uncompressed_size as u64, Ordering::Relaxed);
247    }
248
249    /// Invalidate entries for a table
250    pub fn invalidate_by_table(&self, table: &str) {
251        if let Some((_, keys)) = self.table_index.remove(table) {
252            for key in keys {
253                self.remove_entry(key);
254            }
255        }
256    }
257
258    /// Invalidate a specific entry
259    pub fn invalidate(&self, fingerprint: &QueryFingerprint) {
260        let key = self.fingerprint_to_hash(fingerprint);
261        self.remove_entry(key);
262    }
263
264    /// Remove an entry
265    fn remove_entry(&self, key: u64) {
266        if let Some((_, metadata)) = self.index.remove(&key) {
267            self.data.remove(&key);
268            self.current_size.fetch_sub(metadata.compressed_size as u64, Ordering::Relaxed);
269
270            // Clean up table index
271            for table in &metadata.tables {
272                if let Some(mut keys) = self.table_index.get_mut(table) {
273                    keys.remove(&key);
274                }
275            }
276        }
277    }
278
279    /// Evict oldest entry
280    fn evict_oldest(&self) -> bool {
281        let mut oldest_key = None;
282        let mut oldest_time = u64::MAX;
283
284        for entry in self.index.iter() {
285            if entry.created_at < oldest_time {
286                oldest_time = entry.created_at;
287                oldest_key = Some(*entry.key());
288            }
289        }
290
291        if let Some(key) = oldest_key {
292            self.remove_entry(key);
293            return true;
294        }
295
296        false
297    }
298
299    /// Compress data
300    fn compress(&self, data: &[u8]) -> Option<Vec<u8>> {
301        match self.compression {
302            CompressionType::None => {
303                let mut output = Vec::with_capacity(data.len() + 1);
304                output.push(0x00); // No compression marker
305                output.extend_from_slice(data);
306                Some(output)
307            }
308            CompressionType::Lz4 => {
309                // LZ4 compression using zstd as fallback since lz4_flex not available
310                // In production, add lz4_flex crate for native LZ4
311                let mut output = Vec::with_capacity(data.len() + 1);
312                output.push(0x01); // LZ4 marker
313                // Use simple compression for now
314                output.extend_from_slice(data);
315                Some(output)
316            }
317            CompressionType::Zstd => {
318                // Real zstd compression
319                let compressed = zstd::stream::encode_all(data, 3).ok()?;
320                let mut output = Vec::with_capacity(compressed.len() + 1);
321                output.push(0x02); // Zstd marker
322                output.extend_from_slice(&compressed);
323                Some(output)
324            }
325        }
326    }
327
328    /// Decompress data
329    fn decompress(&self, data: &[u8]) -> Option<Vec<u8>> {
330        if data.is_empty() {
331            return None;
332        }
333
334        let marker = data[0];
335        let payload = &data[1..];
336
337        match marker {
338            0x00 => Some(payload.to_vec()), // Uncompressed
339            0x01 => Some(payload.to_vec()), // LZ4 (not compressed in current impl)
340            0x02 => {
341                // Real zstd decompression
342                zstd::stream::decode_all(payload).ok()
343            }
344            _ => Some(data.to_vec()),       // Unknown, return as-is
345        }
346    }
347
348    /// Convert fingerprint to hash key
349    fn fingerprint_to_hash(&self, fingerprint: &QueryFingerprint) -> u64 {
350        use std::hash::{Hash, Hasher};
351        use std::collections::hash_map::DefaultHasher;
352
353        let mut hasher = DefaultHasher::new();
354        fingerprint.template.hash(&mut hasher);
355        if let Some(param) = fingerprint.param_hash {
356            param.hash(&mut hasher);
357        }
358        hasher.finish()
359    }
360
361    /// Get cache statistics
362    pub fn stats(&self) -> TierStats {
363        let compressed = self.compressed_size.load(Ordering::Relaxed);
364        let uncompressed = self.uncompressed_size.load(Ordering::Relaxed);
365
366        TierStats {
367            size_bytes: self.current_size.load(Ordering::Relaxed),
368            max_size_bytes: self.max_size,
369            entry_count: self.index.len() as u64,
370            hits: self.hits.load(Ordering::Relaxed),
371            misses: self.misses.load(Ordering::Relaxed),
372            evictions: 0,
373            compression_ratio: if compressed > 0 {
374                Some(uncompressed as f64 / compressed as f64)
375            } else {
376                None
377            },
378            peer_count: None,
379            healthy_peers: None,
380        }
381    }
382
383    /// Clear all entries
384    pub fn clear(&self) {
385        self.index.clear();
386        self.data.clear();
387        self.table_index.clear();
388        if let Ok(mut bloom) = self.bloom.write() {
389            bloom.clear();
390        }
391        self.current_size.store(0, Ordering::Relaxed);
392    }
393}
394
395#[cfg(test)]
396mod tests {
397    use super::*;
398    use std::time::Duration;
399
400    #[test]
401    fn test_warm_cache_insert_get() {
402        let cache = WarmCache::new(
403            1024 * 1024 * 1024,
404            PathBuf::from("/tmp/test-cache"),
405            CompressionType::Lz4,
406        );
407
408        let fp = QueryFingerprint::from_query("SELECT * FROM users");
409        let entry = CacheEntry::new(vec![1, 2, 3], vec!["users".to_string()], 1)
410            .with_ttl(Duration::from_secs(300));
411
412        cache.insert(fp.clone(), entry);
413
414        let result = cache.get(&fp);
415        assert!(result.is_some());
416        assert_eq!(result.unwrap().data, vec![1, 2, 3]);
417    }
418
419    #[test]
420    fn test_warm_cache_bloom_filter() {
421        let cache = WarmCache::new(
422            1024 * 1024,
423            PathBuf::from("/tmp/test-cache"),
424            CompressionType::None,
425        );
426
427        let fp1 = QueryFingerprint::from_query("SELECT * FROM users");
428        let fp2 = QueryFingerprint::from_query("SELECT * FROM orders");
429
430        cache.insert(
431            fp1.clone(),
432            CacheEntry::new(vec![1], vec![], 1).with_ttl(Duration::from_secs(300)),
433        );
434
435        // fp1 should hit bloom filter
436        assert!(cache.get(&fp1).is_some());
437
438        // fp2 should miss bloom filter (fast path)
439        assert!(cache.get(&fp2).is_none());
440    }
441
442    #[test]
443    fn test_warm_cache_invalidate_by_table() {
444        let cache = WarmCache::new(
445            1024 * 1024,
446            PathBuf::from("/tmp/test-cache"),
447            CompressionType::None,
448        );
449
450        let fp1 = QueryFingerprint::from_query("SELECT * FROM users");
451        let fp2 = QueryFingerprint::from_query("SELECT * FROM orders");
452
453        cache.insert(
454            fp1.clone(),
455            CacheEntry::new(vec![1], vec!["users".to_string()], 1)
456                .with_ttl(Duration::from_secs(300)),
457        );
458        cache.insert(
459            fp2.clone(),
460            CacheEntry::new(vec![2], vec!["orders".to_string()], 1)
461                .with_ttl(Duration::from_secs(300)),
462        );
463
464        cache.invalidate_by_table("users");
465
466        assert!(cache.get(&fp1).is_none());
467        assert!(cache.get(&fp2).is_some());
468    }
469
470    #[test]
471    fn test_warm_cache_stats() {
472        let cache = WarmCache::new(
473            1024 * 1024,
474            PathBuf::from("/tmp/test-cache"),
475            CompressionType::Lz4,
476        );
477
478        let fp = QueryFingerprint::from_query("SELECT * FROM users");
479        cache.insert(
480            fp.clone(),
481            CacheEntry::new(vec![1], vec![], 1).with_ttl(Duration::from_secs(300)),
482        );
483
484        cache.get(&fp); // Hit
485        let fp2 = QueryFingerprint::from_query("SELECT * FROM orders");
486        cache.get(&fp2); // Miss
487
488        let stats = cache.stats();
489        assert_eq!(stats.hits, 1);
490        assert_eq!(stats.misses, 1);
491        assert!(stats.compression_ratio.is_some());
492    }
493}