reddb_server/storage/cache/
result.rs1use std::collections::{HashMap, HashSet};
36use std::hash::{Hash, Hasher};
37use std::sync::atomic::{AtomicU64, Ordering};
38use std::time::{Duration, Instant};
39
40#[derive(Debug, Clone, PartialEq, Eq)]
46pub struct CacheKey {
47 pub query_type: String,
49 pub params: Vec<(String, String)>,
51 hash: u64,
53}
54
55impl CacheKey {
56 pub fn new(query_type: impl Into<String>) -> Self {
58 let query_type = query_type.into();
59 let mut key = Self {
60 query_type,
61 params: Vec::new(),
62 hash: 0,
63 };
64 key.rehash();
65 key
66 }
67
68 pub fn param(mut self, name: impl Into<String>, value: impl Into<String>) -> Self {
70 self.params.push((name.into(), value.into()));
71 self.params.sort_by(|a, b| a.0.cmp(&b.0));
72 self.rehash();
73 self
74 }
75
76 pub fn params(mut self, params: impl IntoIterator<Item = (String, String)>) -> Self {
78 self.params.extend(params);
79 self.params.sort_by(|a, b| a.0.cmp(&b.0));
80 self.rehash();
81 self
82 }
83
84 fn rehash(&mut self) {
85 use std::collections::hash_map::DefaultHasher;
86 let mut hasher = DefaultHasher::new();
87 self.query_type.hash(&mut hasher);
88 for (k, v) in &self.params {
89 k.hash(&mut hasher);
90 v.hash(&mut hasher);
91 }
92 self.hash = hasher.finish();
93 }
94}
95
96impl Hash for CacheKey {
97 fn hash<H: Hasher>(&self, state: &mut H) {
98 state.write_u64(self.hash);
99 }
100}
101
102#[derive(Debug, Clone)]
108pub struct CachePolicy {
109 pub ttl: Duration,
111 pub dependencies: HashSet<String>,
113 pub priority: u8,
115 pub sliding: bool,
117}
118
119impl Default for CachePolicy {
120 fn default() -> Self {
121 Self {
122 ttl: Duration::from_secs(300), dependencies: HashSet::new(),
124 priority: 50,
125 sliding: false,
126 }
127 }
128}
129
130impl CachePolicy {
131 pub fn ttl(mut self, ttl: Duration) -> Self {
133 self.ttl = ttl;
134 self
135 }
136
137 pub fn depends_on(mut self, deps: &[&str]) -> Self {
139 for dep in deps {
140 self.dependencies.insert(dep.to_string());
141 }
142 self
143 }
144
145 pub fn priority(mut self, priority: u8) -> Self {
147 self.priority = priority;
148 self
149 }
150
151 pub fn sliding(mut self) -> Self {
153 self.sliding = true;
154 self
155 }
156}
157
158struct CacheEntry {
164 data: Vec<u8>,
166 size: usize,
168 created_at: Instant,
170 last_accessed: Instant,
172 access_count: AtomicU64,
174 policy: CachePolicy,
176}
177
178impl CacheEntry {
179 fn new(data: Vec<u8>, policy: CachePolicy) -> Self {
180 let size = data.len();
181 let now = Instant::now();
182 Self {
183 data,
184 size,
185 created_at: now,
186 last_accessed: now,
187 access_count: AtomicU64::new(0),
188 policy,
189 }
190 }
191
192 fn is_expired(&self) -> bool {
193 let elapsed = if self.policy.sliding {
194 self.last_accessed.elapsed()
195 } else {
196 self.created_at.elapsed()
197 };
198 elapsed > self.policy.ttl
199 }
200
201 fn touch(&mut self) {
202 self.access_count.fetch_add(1, Ordering::Relaxed);
203 self.last_accessed = Instant::now();
204 }
205
206 fn eviction_score(&self) -> u64 {
208 let frequency = self.access_count.load(Ordering::Relaxed);
209 let recency = self.last_accessed.elapsed().as_secs();
210 let priority = self.policy.priority as u64;
211
212 frequency
215 .saturating_mul(priority)
216 .checked_div(recency)
217 .unwrap_or_else(|| frequency.saturating_mul(priority).saturating_mul(1000))
218 }
219}
220
221#[derive(Debug, Clone, Default)]
227pub struct ResultCacheStats {
228 pub hits: u64,
230 pub misses: u64,
232 pub evictions: u64,
234 pub entry_count: usize,
236 pub memory_bytes: usize,
238 pub max_memory_bytes: usize,
240 pub expirations: u64,
242 pub invalidations: u64,
244}
245
246impl ResultCacheStats {
247 pub fn hit_rate(&self) -> f64 {
249 let total = self.hits + self.misses;
250 if total == 0 {
251 0.0
252 } else {
253 self.hits as f64 / total as f64
254 }
255 }
256
257 pub fn memory_utilization(&self) -> f64 {
259 if self.max_memory_bytes == 0 {
260 0.0
261 } else {
262 self.memory_bytes as f64 / self.max_memory_bytes as f64
263 }
264 }
265}
266
267pub struct ResultCache {
273 entries: HashMap<CacheKey, CacheEntry>,
275 dependency_index: HashMap<String, HashSet<CacheKey>>,
277 max_memory: usize,
279 current_memory: usize,
281 stats: ResultCacheStats,
283}
284
285impl ResultCache {
286 pub fn new(max_memory_bytes: usize) -> Self {
288 Self {
289 entries: HashMap::new(),
290 dependency_index: HashMap::new(),
291 max_memory: max_memory_bytes,
292 current_memory: 0,
293 stats: ResultCacheStats {
294 max_memory_bytes,
295 ..Default::default()
296 },
297 }
298 }
299
300 pub fn get(&mut self, key: &CacheKey) -> Option<Vec<u8>> {
302 if let Some(entry) = self.entries.get(key) {
304 if entry.is_expired() {
305 self.remove(key);
306 self.stats.expirations += 1;
307 self.stats.misses += 1;
308 return None;
309 }
310 }
311
312 if let Some(entry) = self.entries.get_mut(key) {
313 entry.touch();
314 self.stats.hits += 1;
315 Some(entry.data.clone())
316 } else {
317 self.stats.misses += 1;
318 None
319 }
320 }
321
322 pub fn contains(&self, key: &CacheKey) -> bool {
324 if let Some(entry) = self.entries.get(key) {
325 !entry.is_expired()
326 } else {
327 false
328 }
329 }
330
331 pub fn insert(&mut self, key: CacheKey, data: Vec<u8>, policy: CachePolicy) {
333 let entry_size = data.len() + std::mem::size_of::<CacheEntry>();
334
335 if self.entries.contains_key(&key) {
337 self.remove(&key);
338 }
339
340 while self.current_memory + entry_size > self.max_memory && !self.entries.is_empty() {
342 self.evict_one();
343 }
344
345 for dep in &policy.dependencies {
347 self.dependency_index
348 .entry(dep.clone())
349 .or_default()
350 .insert(key.clone());
351 }
352
353 let entry = CacheEntry::new(data, policy);
354 self.current_memory += entry.size;
355 self.entries.insert(key, entry);
356 self.stats.entry_count = self.entries.len();
357 self.stats.memory_bytes = self.current_memory;
358 }
359
360 pub fn remove(&mut self, key: &CacheKey) -> Option<Vec<u8>> {
362 if let Some(entry) = self.entries.remove(key) {
363 self.current_memory = self.current_memory.saturating_sub(entry.size);
364
365 for dep in &entry.policy.dependencies {
367 if let Some(keys) = self.dependency_index.get_mut(dep) {
368 keys.remove(key);
369 }
370 }
371
372 self.stats.entry_count = self.entries.len();
373 self.stats.memory_bytes = self.current_memory;
374 Some(entry.data)
375 } else {
376 None
377 }
378 }
379
380 pub fn invalidate_by_dependency(&mut self, dependency: &str) {
382 if let Some(keys) = self.dependency_index.remove(dependency) {
383 for key in keys {
384 if self.entries.remove(&key).is_some() {
385 self.stats.invalidations += 1;
386 }
387 }
388 self.stats.entry_count = self.entries.len();
389 self.current_memory = self.entries.values().map(|e| e.size).sum();
391 self.stats.memory_bytes = self.current_memory;
392 }
393 }
394
395 pub fn invalidate_where<F>(&mut self, predicate: F)
397 where
398 F: Fn(&CacheKey) -> bool,
399 {
400 let keys_to_remove: Vec<CacheKey> = self
401 .entries
402 .keys()
403 .filter(|k| predicate(k))
404 .cloned()
405 .collect();
406
407 for key in keys_to_remove {
408 self.remove(&key);
409 self.stats.invalidations += 1;
410 }
411 }
412
413 pub fn prune_expired(&mut self) {
415 let expired: Vec<CacheKey> = self
416 .entries
417 .iter()
418 .filter(|(_, v)| v.is_expired())
419 .map(|(k, _)| k.clone())
420 .collect();
421
422 for key in expired {
423 self.remove(&key);
424 self.stats.expirations += 1;
425 }
426 }
427
428 pub fn clear(&mut self) {
430 self.entries.clear();
431 self.dependency_index.clear();
432 self.current_memory = 0;
433 self.stats.entry_count = 0;
434 self.stats.memory_bytes = 0;
435 }
436
437 pub fn stats(&self) -> &ResultCacheStats {
439 &self.stats
440 }
441
442 fn evict_one(&mut self) {
444 let victim = self
445 .entries
446 .iter()
447 .min_by_key(|(_, v)| v.eviction_score())
448 .map(|(k, _)| k.clone());
449
450 if let Some(key) = victim {
451 self.remove(&key);
452 self.stats.evictions += 1;
453 }
454 }
455}
456
457#[derive(Debug, Clone)]
463pub struct MaterializedViewDef {
464 pub name: String,
466 pub query: String,
468 pub dependencies: Vec<String>,
470 pub refresh: RefreshPolicy,
472}
473
474#[derive(Debug, Clone)]
476pub enum RefreshPolicy {
477 Manual,
479 OnChange,
481 Periodic(Duration),
483 AfterWrites(usize),
485}
486
487struct MaterializedView {
489 data: Vec<u8>,
491 def: MaterializedViewDef,
493 last_refresh: Instant,
495 writes_since_refresh: usize,
497 stale: bool,
499}
500
501pub struct MaterializedViewCache {
503 views: HashMap<String, MaterializedView>,
505 dependency_index: HashMap<String, HashSet<String>>,
507}
508
509impl MaterializedViewCache {
510 pub fn new() -> Self {
512 Self {
513 views: HashMap::new(),
514 dependency_index: HashMap::new(),
515 }
516 }
517
518 pub fn register(&mut self, def: MaterializedViewDef) {
520 for dep in &def.dependencies {
522 self.dependency_index
523 .entry(dep.clone())
524 .or_default()
525 .insert(def.name.clone());
526 }
527
528 let view = MaterializedView {
529 data: Vec::new(),
530 def,
531 last_refresh: Instant::now(),
532 writes_since_refresh: 0,
533 stale: true,
534 };
535
536 self.views.insert(view.def.name.clone(), view);
537 }
538
539 pub fn get(&self, name: &str) -> Option<&[u8]> {
541 self.views
542 .get(name)
543 .filter(|v| !v.stale && !v.data.is_empty())
544 .map(|v| v.data.as_slice())
545 }
546
547 pub fn needs_refresh(&self, name: &str) -> bool {
549 self.views.get(name).map(|v| v.stale).unwrap_or(false)
550 }
551
552 pub fn refresh(&mut self, name: &str, data: Vec<u8>) {
554 if let Some(view) = self.views.get_mut(name) {
555 view.data = data;
556 view.last_refresh = Instant::now();
557 view.writes_since_refresh = 0;
558 view.stale = false;
559 }
560 }
561
562 pub fn mark_stale(&mut self, table: &str) {
564 if let Some(view_names) = self.dependency_index.get(table) {
565 for name in view_names.clone() {
566 if let Some(view) = self.views.get_mut(&name) {
567 view.writes_since_refresh += 1;
568
569 match &view.def.refresh {
570 RefreshPolicy::OnChange => {
571 view.stale = true;
572 }
573 RefreshPolicy::AfterWrites(threshold)
574 if view.writes_since_refresh >= *threshold =>
575 {
576 view.stale = true;
577 }
578 _ => {}
579 }
580 }
581 }
582 }
583 }
584
585 pub fn due_for_refresh(&self) -> Vec<String> {
587 self.views
588 .values()
589 .filter(|v| {
590 if let RefreshPolicy::Periodic(interval) = &v.def.refresh {
591 v.last_refresh.elapsed() >= *interval
592 } else {
593 false
594 }
595 })
596 .map(|v| v.def.name.clone())
597 .collect()
598 }
599
600 pub fn remove(&mut self, name: &str) {
602 if let Some(view) = self.views.remove(name) {
603 for dep in &view.def.dependencies {
604 if let Some(names) = self.dependency_index.get_mut(dep) {
605 names.remove(name);
606 }
607 }
608 }
609 }
610
611 pub fn list(&self) -> Vec<&str> {
613 self.views.keys().map(|s| s.as_str()).collect()
614 }
615}
616
617impl Default for MaterializedViewCache {
618 fn default() -> Self {
619 Self::new()
620 }
621}
622
623#[cfg(test)]
628mod tests {
629 use super::*;
630
631 #[test]
632 fn test_cache_key_hashing() {
633 let key1 = CacheKey::new("attack_paths")
634 .param("from", "host1")
635 .param("to", "host2");
636
637 let key2 = CacheKey::new("attack_paths")
638 .param("to", "host2")
639 .param("from", "host1"); assert_eq!(key1, key2);
642 assert_eq!(key1.hash, key2.hash);
643 }
644
645 #[test]
646 fn test_result_cache_basic() {
647 let mut cache = ResultCache::new(1024 * 1024); let key = CacheKey::new("test_query").param("id", "123");
650 let data = vec![1, 2, 3, 4, 5];
651
652 cache.insert(key.clone(), data.clone(), CachePolicy::default());
653
654 let result = cache.get(&key);
655 assert_eq!(result, Some(data));
656 assert_eq!(cache.stats().hits, 1);
657 }
658
659 #[test]
660 fn test_cache_expiration() {
661 let mut cache = ResultCache::new(1024 * 1024);
662
663 let key = CacheKey::new("test");
664 let data = vec![1, 2, 3];
665
666 cache.insert(
668 key.clone(),
669 data,
670 CachePolicy::default().ttl(Duration::from_millis(1)),
671 );
672
673 std::thread::sleep(Duration::from_millis(10));
675
676 assert!(cache.get(&key).is_none());
677 assert_eq!(cache.stats().expirations, 1);
678 }
679
680 #[test]
681 fn test_dependency_invalidation() {
682 let mut cache = ResultCache::new(1024 * 1024);
683
684 let key = CacheKey::new("host_query");
685 cache.insert(
686 key.clone(),
687 vec![1, 2, 3],
688 CachePolicy::default().depends_on(&["hosts"]),
689 );
690
691 assert!(cache.contains(&key));
692
693 cache.invalidate_by_dependency("hosts");
695
696 assert!(!cache.contains(&key));
697 assert_eq!(cache.stats().invalidations, 1);
698 }
699
700 #[test]
701 fn test_memory_eviction() {
702 let mut cache = ResultCache::new(100); for i in 0..10 {
706 let key = CacheKey::new("query").param("i", i.to_string());
707 cache.insert(key, vec![0u8; 20], CachePolicy::default());
708 }
709
710 assert!(cache.stats().evictions > 0);
712 assert!(cache.stats().memory_bytes <= 100);
713 }
714
715 #[test]
716 fn test_materialized_view() {
717 let mut cache = MaterializedViewCache::new();
718
719 cache.register(MaterializedViewDef {
720 name: "active_hosts".to_string(),
721 query: "SELECT * FROM hosts WHERE status = 'active'".to_string(),
722 dependencies: vec!["hosts".to_string()],
723 refresh: RefreshPolicy::OnChange,
724 });
725
726 assert!(cache.needs_refresh("active_hosts"));
728
729 cache.refresh("active_hosts", vec![1, 2, 3]);
731 assert!(!cache.needs_refresh("active_hosts"));
732 assert_eq!(cache.get("active_hosts"), Some(&[1, 2, 3][..]));
733
734 cache.mark_stale("hosts");
736 assert!(cache.needs_refresh("active_hosts"));
737 }
738}