heliosdb_proxy/distribcache/tiers/
mod.rs1mod l1_hot;
8mod l2_warm;
9mod l3_distributed;
10
11pub use l1_hot::HotCache;
12pub use l2_warm::WarmCache;
13pub use l3_distributed::DistributedCache;
14
15use serde::{Deserialize, Serialize};
16use std::sync::atomic::AtomicU64;
17use std::time::{Duration, SystemTime};
18
19#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
21pub enum CacheTier {
22 L1,
24 L2,
26 L3,
28}
29
30#[derive(Debug, Clone, Copy, PartialEq, Eq)]
32pub enum EvictionPolicy {
33 LRU,
35 LFU,
37 Adaptive,
39 FIFO,
41}
42
43impl Default for EvictionPolicy {
44 fn default() -> Self {
45 Self::LFU
46 }
47}
48
49#[derive(Debug, Clone, Copy, PartialEq, Eq)]
51pub enum CompressionType {
52 None,
54 Lz4,
56 Zstd,
58}
59
60impl Default for CompressionType {
61 fn default() -> Self {
62 Self::Lz4
63 }
64}
65
66#[derive(Debug, Clone, Hash, PartialEq, Eq)]
68pub struct CacheKey {
69 pub fingerprint_hash: u64,
71 pub param_hash: Option<u64>,
73 pub branch: Option<String>,
75 pub as_of: Option<u64>,
77}
78
79impl CacheKey {
80 pub fn from_fingerprint(fingerprint: &super::QueryFingerprint) -> Self {
82 use std::hash::{Hash, Hasher};
83 use std::collections::hash_map::DefaultHasher;
84
85 let mut hasher = DefaultHasher::new();
86 fingerprint.template.hash(&mut hasher);
87 let fingerprint_hash = hasher.finish();
88
89 Self {
90 fingerprint_hash,
91 param_hash: fingerprint.param_hash,
92 branch: None,
93 as_of: None,
94 }
95 }
96
97 pub fn chunk(id: u64) -> Self {
99 Self {
100 fingerprint_hash: id,
101 param_hash: None,
102 branch: None,
103 as_of: None,
104 }
105 }
106
107 pub fn with_branch(mut self, branch: impl Into<String>) -> Self {
109 self.branch = Some(branch.into());
110 self
111 }
112
113 pub fn with_as_of(mut self, timestamp: u64) -> Self {
115 self.as_of = Some(timestamp);
116 self
117 }
118
119 pub fn table(&self) -> &str {
121 "unknown"
122 }
123
124 pub fn fingerprint(&self) -> super::QueryFingerprint {
126 super::QueryFingerprint {
127 template: format!("{:x}", self.fingerprint_hash),
128 tables: Vec::new(),
129 param_hash: self.param_hash,
130 }
131 }
132
133 pub fn matches_pattern(&self, pattern: &str) -> bool {
135 pattern.contains(&format!("{:x}", self.fingerprint_hash))
137 }
138
139 pub fn to_bytes(&self) -> Vec<u8> {
141 let mut bytes = Vec::with_capacity(24);
142 bytes.extend_from_slice(&self.fingerprint_hash.to_le_bytes());
143 if let Some(param) = self.param_hash {
144 bytes.extend_from_slice(¶m.to_le_bytes());
145 }
146 if let Some(ref branch) = self.branch {
147 bytes.extend_from_slice(branch.as_bytes());
148 }
149 if let Some(ts) = self.as_of {
150 bytes.extend_from_slice(&ts.to_le_bytes());
151 }
152 bytes
153 }
154}
155
156#[derive(Debug, Clone, Serialize, Deserialize)]
158pub struct CacheEntry {
159 pub data: Vec<u8>,
161 pub created_at: u64,
163 pub ttl_secs: u64,
165 pub row_count: usize,
167 pub tables: Vec<String>,
169 #[serde(skip)]
171 pub access_count: u64,
172}
173
174impl CacheEntry {
175 pub fn new(data: Vec<u8>, tables: Vec<String>, row_count: usize) -> Self {
177 let now = SystemTime::now()
178 .duration_since(SystemTime::UNIX_EPOCH)
179 .unwrap_or_default()
180 .as_secs();
181
182 Self {
183 data,
184 created_at: now,
185 ttl_secs: 300, row_count,
187 tables,
188 access_count: 0,
189 }
190 }
191
192 pub fn with_ttl(mut self, ttl: Duration) -> Self {
194 self.ttl_secs = ttl.as_secs();
195 self
196 }
197
198 pub fn is_expired(&self) -> bool {
200 let now = SystemTime::now()
201 .duration_since(SystemTime::UNIX_EPOCH)
202 .unwrap_or_default()
203 .as_secs();
204
205 now > self.created_at + self.ttl_secs
206 }
207
208 pub fn size(&self) -> usize {
210 self.data.len() + self.tables.iter().map(|t| t.len()).sum::<usize>() + 32
211 }
212
213 pub fn from_chunk(chunk: &super::ai::Chunk) -> Self {
215 Self::new(
216 chunk.content.as_bytes().to_vec(),
217 vec!["chunks".to_string()],
218 1,
219 )
220 }
221
222 pub fn remaining_ttl(&self) -> Duration {
224 let now = SystemTime::now()
225 .duration_since(SystemTime::UNIX_EPOCH)
226 .unwrap_or_default()
227 .as_secs();
228
229 let elapsed = now.saturating_sub(self.created_at);
230 let remaining = self.ttl_secs.saturating_sub(elapsed);
231 Duration::from_secs(remaining)
232 }
233}
234
235#[derive(Debug, Clone, Default)]
237pub struct TierStats {
238 pub size_bytes: u64,
240 pub max_size_bytes: u64,
242 pub entry_count: u64,
244 pub hits: u64,
246 pub misses: u64,
248 pub evictions: u64,
250 pub compression_ratio: Option<f64>,
252 pub peer_count: Option<u32>,
254 pub healthy_peers: Option<u32>,
256}
257
258impl TierStats {
259 pub fn hit_ratio(&self) -> f64 {
261 let total = self.hits + self.misses;
262 if total > 0 {
263 self.hits as f64 / total as f64
264 } else {
265 0.0
266 }
267 }
268
269 pub fn utilization(&self) -> f64 {
271 if self.max_size_bytes > 0 {
272 self.size_bytes as f64 / self.max_size_bytes as f64 * 100.0
273 } else {
274 0.0
275 }
276 }
277}
278
279pub struct LFUEviction {
281 counts: dashmap::DashMap<u64, u64>,
283 min_freq: AtomicU64,
285}
286
287impl LFUEviction {
288 pub fn new() -> Self {
289 Self {
290 counts: dashmap::DashMap::new(),
291 min_freq: AtomicU64::new(1),
292 }
293 }
294
295 pub fn touch(&self, key_hash: u64) {
296 self.counts
297 .entry(key_hash)
298 .and_modify(|c| *c += 1)
299 .or_insert(1);
300 }
301
302 pub fn insert(&self, key_hash: u64) {
303 self.counts.insert(key_hash, 1);
304 }
305
306 pub fn remove(&self, key_hash: u64) {
307 self.counts.remove(&key_hash);
308 }
309
310 pub fn evict_one(&self) -> Option<u64> {
311 let mut min_key = None;
313 let mut min_count = u64::MAX;
314
315 for entry in self.counts.iter() {
316 if *entry.value() < min_count {
317 min_count = *entry.value();
318 min_key = Some(*entry.key());
319 }
320 }
321
322 if let Some(key) = min_key {
323 self.counts.remove(&key);
324 }
325
326 min_key
327 }
328}
329
330impl Default for LFUEviction {
331 fn default() -> Self {
332 Self::new()
333 }
334}
335
336#[cfg(test)]
337mod tests {
338 use super::*;
339
340 #[test]
341 fn test_cache_key_from_fingerprint() {
342 let fp = super::super::QueryFingerprint::from_query("SELECT * FROM users");
343 let key = CacheKey::from_fingerprint(&fp);
344 assert!(key.fingerprint_hash > 0);
345 assert!(key.branch.is_none());
346 }
347
348 #[test]
349 fn test_cache_entry_expiration() {
350 let entry = CacheEntry::new(vec![1, 2, 3], vec!["users".to_string()], 1)
351 .with_ttl(Duration::from_secs(1));
352
353 assert!(!entry.is_expired());
354
355 }
357
358 #[test]
359 fn test_cache_entry_size() {
360 let entry = CacheEntry::new(
361 vec![0; 1000],
362 vec!["users".to_string(), "orders".to_string()],
363 10,
364 );
365 assert!(entry.size() > 1000);
366 }
367
368 #[test]
369 fn test_tier_stats_hit_ratio() {
370 let mut stats = TierStats::default();
371 stats.hits = 80;
372 stats.misses = 20;
373 assert!((stats.hit_ratio() - 0.8).abs() < 0.001);
374 }
375
376 #[test]
377 fn test_lfu_eviction() {
378 let lfu = LFUEviction::new();
379
380 lfu.insert(1);
381 lfu.insert(2);
382 lfu.insert(3);
383
384 lfu.touch(1);
386 lfu.touch(1);
387 lfu.touch(2);
388
389 let evicted = lfu.evict_one();
391 assert_eq!(evicted, Some(3));
392 }
393}