Skip to main content

heliosdb_proxy/distribcache/tiers/
mod.rs

1//! Multi-tier cache implementations
2//!
3//! L1: Hot cache (in-memory, <100μs)
4//! L2: Warm cache (SSD, <1ms)
5//! L3: Distributed cache (mesh, <10ms)
6
7mod l1_hot;
8mod l2_warm;
9mod l3_distributed;
10
11pub use l1_hot::HotCache;
12pub use l2_warm::WarmCache;
13pub use l3_distributed::DistributedCache;
14
15use serde::{Deserialize, Serialize};
16use std::sync::atomic::AtomicU64;
17use std::time::{Duration, SystemTime};
18
19/// Cache tier identifier
20#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
21pub enum CacheTier {
22    /// L1: In-memory hot cache
23    L1,
24    /// L2: SSD warm cache
25    L2,
26    /// L3: Distributed mesh cache
27    L3,
28}
29
30/// Eviction policy for cache tiers
31#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
32pub enum EvictionPolicy {
33    /// Least Recently Used
34    LRU,
35    /// Least Frequently Used
36    #[default]
37    LFU,
38    /// Adaptive (switches between LRU and LFU based on access patterns)
39    Adaptive,
40    /// Time-based (oldest entries first)
41    FIFO,
42}
43
44/// Compression type for L2 cache
45#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
46pub enum CompressionType {
47    /// No compression
48    None,
49    /// LZ4 (fast, moderate compression)
50    #[default]
51    Lz4,
52    /// Zstd (slower, better compression)
53    Zstd,
54}
55
56/// Cache key for entry lookup
57#[derive(Debug, Clone, Hash, PartialEq, Eq)]
58pub struct CacheKey {
59    /// Query fingerprint hash
60    pub fingerprint_hash: u64,
61    /// Optional parameter hash
62    pub param_hash: Option<u64>,
63    /// Branch name (for branch-aware caching)
64    pub branch: Option<String>,
65    /// Time travel timestamp (for historical queries)
66    pub as_of: Option<u64>,
67}
68
69impl CacheKey {
70    /// Create a new cache key from a query fingerprint
71    pub fn from_fingerprint(fingerprint: &super::QueryFingerprint) -> Self {
72        use std::collections::hash_map::DefaultHasher;
73        use std::hash::{Hash, Hasher};
74
75        let mut hasher = DefaultHasher::new();
76        fingerprint.template.hash(&mut hasher);
77        let fingerprint_hash = hasher.finish();
78
79        Self {
80            fingerprint_hash,
81            param_hash: fingerprint.param_hash,
82            branch: None,
83            as_of: None,
84        }
85    }
86
87    /// Create a cache key for a chunk
88    pub fn chunk(id: u64) -> Self {
89        Self {
90            fingerprint_hash: id,
91            param_hash: None,
92            branch: None,
93            as_of: None,
94        }
95    }
96
97    /// Set branch for branch-aware caching
98    pub fn with_branch(mut self, branch: impl Into<String>) -> Self {
99        self.branch = Some(branch.into());
100        self
101    }
102
103    /// Set timestamp for time-travel caching
104    pub fn with_as_of(mut self, timestamp: u64) -> Self {
105        self.as_of = Some(timestamp);
106        self
107    }
108
109    /// Get table name from the key (if available)
110    pub fn table(&self) -> &str {
111        "unknown"
112    }
113
114    /// Get the fingerprint for metrics
115    pub fn fingerprint(&self) -> super::QueryFingerprint {
116        super::QueryFingerprint {
117            template: format!("{:x}", self.fingerprint_hash),
118            tables: Vec::new(),
119            param_hash: self.param_hash,
120        }
121    }
122
123    /// Check if key matches a pattern
124    pub fn matches_pattern(&self, pattern: &str) -> bool {
125        // Simple pattern matching for invalidation
126        pattern.contains(&format!("{:x}", self.fingerprint_hash))
127    }
128
129    /// Convert to bytes for storage
130    pub fn to_bytes(&self) -> Vec<u8> {
131        let mut bytes = Vec::with_capacity(24);
132        bytes.extend_from_slice(&self.fingerprint_hash.to_le_bytes());
133        if let Some(param) = self.param_hash {
134            bytes.extend_from_slice(&param.to_le_bytes());
135        }
136        if let Some(ref branch) = self.branch {
137            bytes.extend_from_slice(branch.as_bytes());
138        }
139        if let Some(ts) = self.as_of {
140            bytes.extend_from_slice(&ts.to_le_bytes());
141        }
142        bytes
143    }
144}
145
146/// Cache entry containing query result
147#[derive(Debug, Clone, Serialize, Deserialize)]
148pub struct CacheEntry {
149    /// Serialized result data
150    pub data: Vec<u8>,
151    /// Entry creation time (Unix timestamp)
152    pub created_at: u64,
153    /// Time-to-live duration in seconds
154    pub ttl_secs: u64,
155    /// Number of rows in result
156    pub row_count: usize,
157    /// Tables involved (for invalidation)
158    pub tables: Vec<String>,
159    /// Access count (for LFU)
160    #[serde(skip)]
161    pub access_count: u64,
162}
163
164impl CacheEntry {
165    /// Create a new cache entry
166    pub fn new(data: Vec<u8>, tables: Vec<String>, row_count: usize) -> Self {
167        let now = SystemTime::now()
168            .duration_since(SystemTime::UNIX_EPOCH)
169            .unwrap_or_default()
170            .as_secs();
171
172        Self {
173            data,
174            created_at: now,
175            ttl_secs: 300, // Default 5 minutes
176            row_count,
177            tables,
178            access_count: 0,
179        }
180    }
181
182    /// Create entry with specific TTL
183    pub fn with_ttl(mut self, ttl: Duration) -> Self {
184        self.ttl_secs = ttl.as_secs();
185        self
186    }
187
188    /// Check if entry is expired
189    pub fn is_expired(&self) -> bool {
190        let now = SystemTime::now()
191            .duration_since(SystemTime::UNIX_EPOCH)
192            .unwrap_or_default()
193            .as_secs();
194
195        now > self.created_at + self.ttl_secs
196    }
197
198    /// Get entry size in bytes
199    pub fn size(&self) -> usize {
200        self.data.len() + self.tables.iter().map(|t| t.len()).sum::<usize>() + 32
201    }
202
203    /// Create entry from a RAG chunk
204    pub fn from_chunk(chunk: &super::ai::Chunk) -> Self {
205        Self::new(
206            chunk.content.as_bytes().to_vec(),
207            vec!["chunks".to_string()],
208            1,
209        )
210    }
211
212    /// Get remaining TTL
213    pub fn remaining_ttl(&self) -> Duration {
214        let now = SystemTime::now()
215            .duration_since(SystemTime::UNIX_EPOCH)
216            .unwrap_or_default()
217            .as_secs();
218
219        let elapsed = now.saturating_sub(self.created_at);
220        let remaining = self.ttl_secs.saturating_sub(elapsed);
221        Duration::from_secs(remaining)
222    }
223}
224
225/// Statistics for a cache tier
226#[derive(Debug, Clone, Default)]
227pub struct TierStats {
228    /// Current size in bytes
229    pub size_bytes: u64,
230    /// Maximum size in bytes
231    pub max_size_bytes: u64,
232    /// Number of entries
233    pub entry_count: u64,
234    /// Hit count
235    pub hits: u64,
236    /// Miss count
237    pub misses: u64,
238    /// Eviction count
239    pub evictions: u64,
240    /// Compression ratio (for L2)
241    pub compression_ratio: Option<f64>,
242    /// Peer count (for L3)
243    pub peer_count: Option<u32>,
244    /// Healthy peer count (for L3)
245    pub healthy_peers: Option<u32>,
246}
247
248impl TierStats {
249    /// Calculate hit ratio
250    pub fn hit_ratio(&self) -> f64 {
251        let total = self.hits + self.misses;
252        if total > 0 {
253            self.hits as f64 / total as f64
254        } else {
255            0.0
256        }
257    }
258
259    /// Calculate utilization percentage
260    pub fn utilization(&self) -> f64 {
261        if self.max_size_bytes > 0 {
262            self.size_bytes as f64 / self.max_size_bytes as f64 * 100.0
263        } else {
264            0.0
265        }
266    }
267}
268
269/// LFU eviction tracker
270pub struct LFUEviction {
271    /// Access counts per key
272    counts: dashmap::DashMap<u64, u64>,
273    /// Minimum frequency for fast eviction
274    #[allow(dead_code)]
275    min_freq: AtomicU64,
276}
277
278impl LFUEviction {
279    pub fn new() -> Self {
280        Self {
281            counts: dashmap::DashMap::new(),
282            min_freq: AtomicU64::new(1),
283        }
284    }
285
286    pub fn touch(&self, key_hash: u64) {
287        self.counts
288            .entry(key_hash)
289            .and_modify(|c| *c += 1)
290            .or_insert(1);
291    }
292
293    pub fn insert(&self, key_hash: u64) {
294        self.counts.insert(key_hash, 1);
295    }
296
297    pub fn remove(&self, key_hash: u64) {
298        self.counts.remove(&key_hash);
299    }
300
301    pub fn evict_one(&self) -> Option<u64> {
302        // Find entry with minimum frequency
303        let mut min_key = None;
304        let mut min_count = u64::MAX;
305
306        for entry in self.counts.iter() {
307            if *entry.value() < min_count {
308                min_count = *entry.value();
309                min_key = Some(*entry.key());
310            }
311        }
312
313        if let Some(key) = min_key {
314            self.counts.remove(&key);
315        }
316
317        min_key
318    }
319}
320
321impl Default for LFUEviction {
322    fn default() -> Self {
323        Self::new()
324    }
325}
326
327#[cfg(test)]
328mod tests {
329    use super::*;
330
331    #[test]
332    fn test_cache_key_from_fingerprint() {
333        let fp = super::super::QueryFingerprint::from_query("SELECT * FROM users");
334        let key = CacheKey::from_fingerprint(&fp);
335        assert!(key.fingerprint_hash > 0);
336        assert!(key.branch.is_none());
337    }
338
339    #[test]
340    fn test_cache_entry_expiration() {
341        let entry = CacheEntry::new(vec![1, 2, 3], vec!["users".to_string()], 1)
342            .with_ttl(Duration::from_secs(1));
343
344        assert!(!entry.is_expired());
345
346        // For actual expiration test we'd need to wait or mock time
347    }
348
349    #[test]
350    fn test_cache_entry_size() {
351        let entry = CacheEntry::new(
352            vec![0; 1000],
353            vec!["users".to_string(), "orders".to_string()],
354            10,
355        );
356        assert!(entry.size() > 1000);
357    }
358
359    #[test]
360    fn test_tier_stats_hit_ratio() {
361        let mut stats = TierStats::default();
362        stats.hits = 80;
363        stats.misses = 20;
364        assert!((stats.hit_ratio() - 0.8).abs() < 0.001);
365    }
366
367    #[test]
368    fn test_lfu_eviction() {
369        let lfu = LFUEviction::new();
370
371        lfu.insert(1);
372        lfu.insert(2);
373        lfu.insert(3);
374
375        // Touch some keys
376        lfu.touch(1);
377        lfu.touch(1);
378        lfu.touch(2);
379
380        // Key 3 has lowest frequency, should be evicted
381        let evicted = lfu.evict_one();
382        assert_eq!(evicted, Some(3));
383    }
384}