heliosdb_proxy/distribcache/tiers/
mod.rs1mod l1_hot;
8mod l2_warm;
9mod l3_distributed;
10
11pub use l1_hot::HotCache;
12pub use l2_warm::WarmCache;
13pub use l3_distributed::DistributedCache;
14
15use serde::{Deserialize, Serialize};
16use std::sync::atomic::AtomicU64;
17use std::time::{Duration, SystemTime};
18
19#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
21pub enum CacheTier {
22 L1,
24 L2,
26 L3,
28}
29
30#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
32pub enum EvictionPolicy {
33 LRU,
35 #[default]
37 LFU,
38 Adaptive,
40 FIFO,
42}
43
44#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
46pub enum CompressionType {
47 None,
49 #[default]
51 Lz4,
52 Zstd,
54}
55
56#[derive(Debug, Clone, Hash, PartialEq, Eq)]
58pub struct CacheKey {
59 pub fingerprint_hash: u64,
61 pub param_hash: Option<u64>,
63 pub branch: Option<String>,
65 pub as_of: Option<u64>,
67}
68
69impl CacheKey {
70 pub fn from_fingerprint(fingerprint: &super::QueryFingerprint) -> Self {
72 use std::collections::hash_map::DefaultHasher;
73 use std::hash::{Hash, Hasher};
74
75 let mut hasher = DefaultHasher::new();
76 fingerprint.template.hash(&mut hasher);
77 let fingerprint_hash = hasher.finish();
78
79 Self {
80 fingerprint_hash,
81 param_hash: fingerprint.param_hash,
82 branch: None,
83 as_of: None,
84 }
85 }
86
87 pub fn chunk(id: u64) -> Self {
89 Self {
90 fingerprint_hash: id,
91 param_hash: None,
92 branch: None,
93 as_of: None,
94 }
95 }
96
97 pub fn with_branch(mut self, branch: impl Into<String>) -> Self {
99 self.branch = Some(branch.into());
100 self
101 }
102
103 pub fn with_as_of(mut self, timestamp: u64) -> Self {
105 self.as_of = Some(timestamp);
106 self
107 }
108
109 pub fn table(&self) -> &str {
111 "unknown"
112 }
113
114 pub fn fingerprint(&self) -> super::QueryFingerprint {
116 super::QueryFingerprint {
117 template: format!("{:x}", self.fingerprint_hash),
118 tables: Vec::new(),
119 param_hash: self.param_hash,
120 }
121 }
122
123 pub fn matches_pattern(&self, pattern: &str) -> bool {
125 pattern.contains(&format!("{:x}", self.fingerprint_hash))
127 }
128
129 pub fn to_bytes(&self) -> Vec<u8> {
131 let mut bytes = Vec::with_capacity(24);
132 bytes.extend_from_slice(&self.fingerprint_hash.to_le_bytes());
133 if let Some(param) = self.param_hash {
134 bytes.extend_from_slice(¶m.to_le_bytes());
135 }
136 if let Some(ref branch) = self.branch {
137 bytes.extend_from_slice(branch.as_bytes());
138 }
139 if let Some(ts) = self.as_of {
140 bytes.extend_from_slice(&ts.to_le_bytes());
141 }
142 bytes
143 }
144}
145
146#[derive(Debug, Clone, Serialize, Deserialize)]
148pub struct CacheEntry {
149 pub data: Vec<u8>,
151 pub created_at: u64,
153 pub ttl_secs: u64,
155 pub row_count: usize,
157 pub tables: Vec<String>,
159 #[serde(skip)]
161 pub access_count: u64,
162}
163
164impl CacheEntry {
165 pub fn new(data: Vec<u8>, tables: Vec<String>, row_count: usize) -> Self {
167 let now = SystemTime::now()
168 .duration_since(SystemTime::UNIX_EPOCH)
169 .unwrap_or_default()
170 .as_secs();
171
172 Self {
173 data,
174 created_at: now,
175 ttl_secs: 300, row_count,
177 tables,
178 access_count: 0,
179 }
180 }
181
182 pub fn with_ttl(mut self, ttl: Duration) -> Self {
184 self.ttl_secs = ttl.as_secs();
185 self
186 }
187
188 pub fn is_expired(&self) -> bool {
190 let now = SystemTime::now()
191 .duration_since(SystemTime::UNIX_EPOCH)
192 .unwrap_or_default()
193 .as_secs();
194
195 now > self.created_at + self.ttl_secs
196 }
197
198 pub fn size(&self) -> usize {
200 self.data.len() + self.tables.iter().map(|t| t.len()).sum::<usize>() + 32
201 }
202
203 pub fn from_chunk(chunk: &super::ai::Chunk) -> Self {
205 Self::new(
206 chunk.content.as_bytes().to_vec(),
207 vec!["chunks".to_string()],
208 1,
209 )
210 }
211
212 pub fn remaining_ttl(&self) -> Duration {
214 let now = SystemTime::now()
215 .duration_since(SystemTime::UNIX_EPOCH)
216 .unwrap_or_default()
217 .as_secs();
218
219 let elapsed = now.saturating_sub(self.created_at);
220 let remaining = self.ttl_secs.saturating_sub(elapsed);
221 Duration::from_secs(remaining)
222 }
223}
224
225#[derive(Debug, Clone, Default)]
227pub struct TierStats {
228 pub size_bytes: u64,
230 pub max_size_bytes: u64,
232 pub entry_count: u64,
234 pub hits: u64,
236 pub misses: u64,
238 pub evictions: u64,
240 pub compression_ratio: Option<f64>,
242 pub peer_count: Option<u32>,
244 pub healthy_peers: Option<u32>,
246}
247
248impl TierStats {
249 pub fn hit_ratio(&self) -> f64 {
251 let total = self.hits + self.misses;
252 if total > 0 {
253 self.hits as f64 / total as f64
254 } else {
255 0.0
256 }
257 }
258
259 pub fn utilization(&self) -> f64 {
261 if self.max_size_bytes > 0 {
262 self.size_bytes as f64 / self.max_size_bytes as f64 * 100.0
263 } else {
264 0.0
265 }
266 }
267}
268
269pub struct LFUEviction {
271 counts: dashmap::DashMap<u64, u64>,
273 #[allow(dead_code)]
275 min_freq: AtomicU64,
276}
277
278impl LFUEviction {
279 pub fn new() -> Self {
280 Self {
281 counts: dashmap::DashMap::new(),
282 min_freq: AtomicU64::new(1),
283 }
284 }
285
286 pub fn touch(&self, key_hash: u64) {
287 self.counts
288 .entry(key_hash)
289 .and_modify(|c| *c += 1)
290 .or_insert(1);
291 }
292
293 pub fn insert(&self, key_hash: u64) {
294 self.counts.insert(key_hash, 1);
295 }
296
297 pub fn remove(&self, key_hash: u64) {
298 self.counts.remove(&key_hash);
299 }
300
301 pub fn evict_one(&self) -> Option<u64> {
302 let mut min_key = None;
304 let mut min_count = u64::MAX;
305
306 for entry in self.counts.iter() {
307 if *entry.value() < min_count {
308 min_count = *entry.value();
309 min_key = Some(*entry.key());
310 }
311 }
312
313 if let Some(key) = min_key {
314 self.counts.remove(&key);
315 }
316
317 min_key
318 }
319}
320
321impl Default for LFUEviction {
322 fn default() -> Self {
323 Self::new()
324 }
325}
326
327#[cfg(test)]
328mod tests {
329 use super::*;
330
331 #[test]
332 fn test_cache_key_from_fingerprint() {
333 let fp = super::super::QueryFingerprint::from_query("SELECT * FROM users");
334 let key = CacheKey::from_fingerprint(&fp);
335 assert!(key.fingerprint_hash > 0);
336 assert!(key.branch.is_none());
337 }
338
339 #[test]
340 fn test_cache_entry_expiration() {
341 let entry = CacheEntry::new(vec![1, 2, 3], vec!["users".to_string()], 1)
342 .with_ttl(Duration::from_secs(1));
343
344 assert!(!entry.is_expired());
345
346 }
348
349 #[test]
350 fn test_cache_entry_size() {
351 let entry = CacheEntry::new(
352 vec![0; 1000],
353 vec!["users".to_string(), "orders".to_string()],
354 10,
355 );
356 assert!(entry.size() > 1000);
357 }
358
359 #[test]
360 fn test_tier_stats_hit_ratio() {
361 let mut stats = TierStats::default();
362 stats.hits = 80;
363 stats.misses = 20;
364 assert!((stats.hit_ratio() - 0.8).abs() < 0.001);
365 }
366
367 #[test]
368 fn test_lfu_eviction() {
369 let lfu = LFUEviction::new();
370
371 lfu.insert(1);
372 lfu.insert(2);
373 lfu.insert(3);
374
375 lfu.touch(1);
377 lfu.touch(1);
378 lfu.touch(2);
379
380 let evicted = lfu.evict_one();
382 assert_eq!(evicted, Some(3));
383 }
384}