Skip to main content

heliosdb_proxy/distribcache/tiers/
mod.rs

1//! Multi-tier cache implementations
2//!
3//! L1: Hot cache (in-memory, <100μs)
4//! L2: Warm cache (SSD, <1ms)
5//! L3: Distributed cache (mesh, <10ms)
6
7mod l1_hot;
8mod l2_warm;
9mod l3_distributed;
10
11pub use l1_hot::HotCache;
12pub use l2_warm::WarmCache;
13pub use l3_distributed::DistributedCache;
14
15use serde::{Deserialize, Serialize};
16use std::sync::atomic::AtomicU64;
17use std::time::{Duration, SystemTime};
18
19/// Cache tier identifier
20#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
21pub enum CacheTier {
22    /// L1: In-memory hot cache
23    L1,
24    /// L2: SSD warm cache
25    L2,
26    /// L3: Distributed mesh cache
27    L3,
28}
29
30/// Eviction policy for cache tiers
31#[derive(Debug, Clone, Copy, PartialEq, Eq)]
32pub enum EvictionPolicy {
33    /// Least Recently Used
34    LRU,
35    /// Least Frequently Used
36    LFU,
37    /// Adaptive (switches between LRU and LFU based on access patterns)
38    Adaptive,
39    /// Time-based (oldest entries first)
40    FIFO,
41}
42
43impl Default for EvictionPolicy {
44    fn default() -> Self {
45        Self::LFU
46    }
47}
48
49/// Compression type for L2 cache
50#[derive(Debug, Clone, Copy, PartialEq, Eq)]
51pub enum CompressionType {
52    /// No compression
53    None,
54    /// LZ4 (fast, moderate compression)
55    Lz4,
56    /// Zstd (slower, better compression)
57    Zstd,
58}
59
60impl Default for CompressionType {
61    fn default() -> Self {
62        Self::Lz4
63    }
64}
65
66/// Cache key for entry lookup
67#[derive(Debug, Clone, Hash, PartialEq, Eq)]
68pub struct CacheKey {
69    /// Query fingerprint hash
70    pub fingerprint_hash: u64,
71    /// Optional parameter hash
72    pub param_hash: Option<u64>,
73    /// Branch name (for branch-aware caching)
74    pub branch: Option<String>,
75    /// Time travel timestamp (for historical queries)
76    pub as_of: Option<u64>,
77}
78
79impl CacheKey {
80    /// Create a new cache key from a query fingerprint
81    pub fn from_fingerprint(fingerprint: &super::QueryFingerprint) -> Self {
82        use std::hash::{Hash, Hasher};
83        use std::collections::hash_map::DefaultHasher;
84
85        let mut hasher = DefaultHasher::new();
86        fingerprint.template.hash(&mut hasher);
87        let fingerprint_hash = hasher.finish();
88
89        Self {
90            fingerprint_hash,
91            param_hash: fingerprint.param_hash,
92            branch: None,
93            as_of: None,
94        }
95    }
96
97    /// Create a cache key for a chunk
98    pub fn chunk(id: u64) -> Self {
99        Self {
100            fingerprint_hash: id,
101            param_hash: None,
102            branch: None,
103            as_of: None,
104        }
105    }
106
107    /// Set branch for branch-aware caching
108    pub fn with_branch(mut self, branch: impl Into<String>) -> Self {
109        self.branch = Some(branch.into());
110        self
111    }
112
113    /// Set timestamp for time-travel caching
114    pub fn with_as_of(mut self, timestamp: u64) -> Self {
115        self.as_of = Some(timestamp);
116        self
117    }
118
119    /// Get table name from the key (if available)
120    pub fn table(&self) -> &str {
121        "unknown"
122    }
123
124    /// Get the fingerprint for metrics
125    pub fn fingerprint(&self) -> super::QueryFingerprint {
126        super::QueryFingerprint {
127            template: format!("{:x}", self.fingerprint_hash),
128            tables: Vec::new(),
129            param_hash: self.param_hash,
130        }
131    }
132
133    /// Check if key matches a pattern
134    pub fn matches_pattern(&self, pattern: &str) -> bool {
135        // Simple pattern matching for invalidation
136        pattern.contains(&format!("{:x}", self.fingerprint_hash))
137    }
138
139    /// Convert to bytes for storage
140    pub fn to_bytes(&self) -> Vec<u8> {
141        let mut bytes = Vec::with_capacity(24);
142        bytes.extend_from_slice(&self.fingerprint_hash.to_le_bytes());
143        if let Some(param) = self.param_hash {
144            bytes.extend_from_slice(&param.to_le_bytes());
145        }
146        if let Some(ref branch) = self.branch {
147            bytes.extend_from_slice(branch.as_bytes());
148        }
149        if let Some(ts) = self.as_of {
150            bytes.extend_from_slice(&ts.to_le_bytes());
151        }
152        bytes
153    }
154}
155
156/// Cache entry containing query result
157#[derive(Debug, Clone, Serialize, Deserialize)]
158pub struct CacheEntry {
159    /// Serialized result data
160    pub data: Vec<u8>,
161    /// Entry creation time (Unix timestamp)
162    pub created_at: u64,
163    /// Time-to-live duration in seconds
164    pub ttl_secs: u64,
165    /// Number of rows in result
166    pub row_count: usize,
167    /// Tables involved (for invalidation)
168    pub tables: Vec<String>,
169    /// Access count (for LFU)
170    #[serde(skip)]
171    pub access_count: u64,
172}
173
174impl CacheEntry {
175    /// Create a new cache entry
176    pub fn new(data: Vec<u8>, tables: Vec<String>, row_count: usize) -> Self {
177        let now = SystemTime::now()
178            .duration_since(SystemTime::UNIX_EPOCH)
179            .unwrap_or_default()
180            .as_secs();
181
182        Self {
183            data,
184            created_at: now,
185            ttl_secs: 300, // Default 5 minutes
186            row_count,
187            tables,
188            access_count: 0,
189        }
190    }
191
192    /// Create entry with specific TTL
193    pub fn with_ttl(mut self, ttl: Duration) -> Self {
194        self.ttl_secs = ttl.as_secs();
195        self
196    }
197
198    /// Check if entry is expired
199    pub fn is_expired(&self) -> bool {
200        let now = SystemTime::now()
201            .duration_since(SystemTime::UNIX_EPOCH)
202            .unwrap_or_default()
203            .as_secs();
204
205        now > self.created_at + self.ttl_secs
206    }
207
208    /// Get entry size in bytes
209    pub fn size(&self) -> usize {
210        self.data.len() + self.tables.iter().map(|t| t.len()).sum::<usize>() + 32
211    }
212
213    /// Create entry from a RAG chunk
214    pub fn from_chunk(chunk: &super::ai::Chunk) -> Self {
215        Self::new(
216            chunk.content.as_bytes().to_vec(),
217            vec!["chunks".to_string()],
218            1,
219        )
220    }
221
222    /// Get remaining TTL
223    pub fn remaining_ttl(&self) -> Duration {
224        let now = SystemTime::now()
225            .duration_since(SystemTime::UNIX_EPOCH)
226            .unwrap_or_default()
227            .as_secs();
228
229        let elapsed = now.saturating_sub(self.created_at);
230        let remaining = self.ttl_secs.saturating_sub(elapsed);
231        Duration::from_secs(remaining)
232    }
233}
234
235/// Statistics for a cache tier
236#[derive(Debug, Clone, Default)]
237pub struct TierStats {
238    /// Current size in bytes
239    pub size_bytes: u64,
240    /// Maximum size in bytes
241    pub max_size_bytes: u64,
242    /// Number of entries
243    pub entry_count: u64,
244    /// Hit count
245    pub hits: u64,
246    /// Miss count
247    pub misses: u64,
248    /// Eviction count
249    pub evictions: u64,
250    /// Compression ratio (for L2)
251    pub compression_ratio: Option<f64>,
252    /// Peer count (for L3)
253    pub peer_count: Option<u32>,
254    /// Healthy peer count (for L3)
255    pub healthy_peers: Option<u32>,
256}
257
258impl TierStats {
259    /// Calculate hit ratio
260    pub fn hit_ratio(&self) -> f64 {
261        let total = self.hits + self.misses;
262        if total > 0 {
263            self.hits as f64 / total as f64
264        } else {
265            0.0
266        }
267    }
268
269    /// Calculate utilization percentage
270    pub fn utilization(&self) -> f64 {
271        if self.max_size_bytes > 0 {
272            self.size_bytes as f64 / self.max_size_bytes as f64 * 100.0
273        } else {
274            0.0
275        }
276    }
277}
278
279/// LFU eviction tracker
280pub struct LFUEviction {
281    /// Access counts per key
282    counts: dashmap::DashMap<u64, u64>,
283    /// Minimum frequency for fast eviction
284    min_freq: AtomicU64,
285}
286
287impl LFUEviction {
288    pub fn new() -> Self {
289        Self {
290            counts: dashmap::DashMap::new(),
291            min_freq: AtomicU64::new(1),
292        }
293    }
294
295    pub fn touch(&self, key_hash: u64) {
296        self.counts
297            .entry(key_hash)
298            .and_modify(|c| *c += 1)
299            .or_insert(1);
300    }
301
302    pub fn insert(&self, key_hash: u64) {
303        self.counts.insert(key_hash, 1);
304    }
305
306    pub fn remove(&self, key_hash: u64) {
307        self.counts.remove(&key_hash);
308    }
309
310    pub fn evict_one(&self) -> Option<u64> {
311        // Find entry with minimum frequency
312        let mut min_key = None;
313        let mut min_count = u64::MAX;
314
315        for entry in self.counts.iter() {
316            if *entry.value() < min_count {
317                min_count = *entry.value();
318                min_key = Some(*entry.key());
319            }
320        }
321
322        if let Some(key) = min_key {
323            self.counts.remove(&key);
324        }
325
326        min_key
327    }
328}
329
330impl Default for LFUEviction {
331    fn default() -> Self {
332        Self::new()
333    }
334}
335
336#[cfg(test)]
337mod tests {
338    use super::*;
339
340    #[test]
341    fn test_cache_key_from_fingerprint() {
342        let fp = super::super::QueryFingerprint::from_query("SELECT * FROM users");
343        let key = CacheKey::from_fingerprint(&fp);
344        assert!(key.fingerprint_hash > 0);
345        assert!(key.branch.is_none());
346    }
347
348    #[test]
349    fn test_cache_entry_expiration() {
350        let entry = CacheEntry::new(vec![1, 2, 3], vec!["users".to_string()], 1)
351            .with_ttl(Duration::from_secs(1));
352
353        assert!(!entry.is_expired());
354
355        // For actual expiration test we'd need to wait or mock time
356    }
357
358    #[test]
359    fn test_cache_entry_size() {
360        let entry = CacheEntry::new(
361            vec![0; 1000],
362            vec!["users".to_string(), "orders".to_string()],
363            10,
364        );
365        assert!(entry.size() > 1000);
366    }
367
368    #[test]
369    fn test_tier_stats_hit_ratio() {
370        let mut stats = TierStats::default();
371        stats.hits = 80;
372        stats.misses = 20;
373        assert!((stats.hit_ratio() - 0.8).abs() < 0.001);
374    }
375
376    #[test]
377    fn test_lfu_eviction() {
378        let lfu = LFUEviction::new();
379
380        lfu.insert(1);
381        lfu.insert(2);
382        lfu.insert(3);
383
384        // Touch some keys
385        lfu.touch(1);
386        lfu.touch(1);
387        lfu.touch(2);
388
389        // Key 3 has lowest frequency, should be evicted
390        let evicted = lfu.evict_one();
391        assert_eq!(evicted, Some(3));
392    }
393}