Skip to main content

reddb_server/storage/cache/
result.rs

1//! Query Result Cache
2//!
3//! Caches query results for expensive cross-modal queries.
4//! Supports TTL-based expiration and dependency-based invalidation.
5//!
6//! # Features
7//!
8//! - **Result Caching**: Cache expensive query results
9//! - **TTL Expiration**: Automatic expiry based on time
10//! - **Dependency Tracking**: Invalidate when underlying data changes
11//! - **Size Management**: LRU eviction based on memory usage
12//! - **Statistics**: Cache hit/miss tracking
13//!
14//! # Example
15//!
16//! ```ignore
17//! use storage::cache::result::{ResultCache, CacheKey, CachePolicy};
18//!
19//! let mut cache = ResultCache::new(100_000_000); // 100MB max
20//!
21//! let key = CacheKey::new("attack_paths")
22//!     .param("from", "external")
23//!     .param("to", "database");
24//!
25//! if let Some(result) = cache.get(&key) {
26//!     return result;
27//! }
28//!
29//! let result = expensive_query();
30//! cache.insert(key, result.clone(), CachePolicy::default()
31//!     .ttl(Duration::from_secs(300))
32//!     .depends_on(&["hosts", "vulnerabilities"]));
33//! ```
34
35use std::collections::{HashMap, HashSet};
36use std::hash::{Hash, Hasher};
37use std::sync::atomic::{AtomicU64, Ordering};
38use std::time::{Duration, Instant};
39
40// ============================================================================
41// Cache Key
42// ============================================================================
43
44/// Cache key for query results
45#[derive(Debug, Clone, PartialEq, Eq)]
46pub struct CacheKey {
47    /// Query type identifier
48    pub query_type: String,
49    /// Query parameters (sorted for consistent hashing)
50    pub params: Vec<(String, String)>,
51    /// Hash for fast comparison
52    hash: u64,
53}
54
55impl CacheKey {
56    /// Create a new cache key
57    pub fn new(query_type: impl Into<String>) -> Self {
58        let query_type = query_type.into();
59        let mut key = Self {
60            query_type,
61            params: Vec::new(),
62            hash: 0,
63        };
64        key.rehash();
65        key
66    }
67
68    /// Add a parameter to the key
69    pub fn param(mut self, name: impl Into<String>, value: impl Into<String>) -> Self {
70        self.params.push((name.into(), value.into()));
71        self.params.sort_by(|a, b| a.0.cmp(&b.0));
72        self.rehash();
73        self
74    }
75
76    /// Add multiple parameters
77    pub fn params(mut self, params: impl IntoIterator<Item = (String, String)>) -> Self {
78        self.params.extend(params);
79        self.params.sort_by(|a, b| a.0.cmp(&b.0));
80        self.rehash();
81        self
82    }
83
84    fn rehash(&mut self) {
85        use std::collections::hash_map::DefaultHasher;
86        let mut hasher = DefaultHasher::new();
87        self.query_type.hash(&mut hasher);
88        for (k, v) in &self.params {
89            k.hash(&mut hasher);
90            v.hash(&mut hasher);
91        }
92        self.hash = hasher.finish();
93    }
94}
95
96impl Hash for CacheKey {
97    fn hash<H: Hasher>(&self, state: &mut H) {
98        state.write_u64(self.hash);
99    }
100}
101
102// ============================================================================
103// Cache Policy
104// ============================================================================
105
106/// Policy for cache entry behavior
107#[derive(Debug, Clone)]
108pub struct CachePolicy {
109    /// Time to live
110    pub ttl: Duration,
111    /// Tables/collections this result depends on
112    pub dependencies: HashSet<String>,
113    /// Priority for eviction (higher = keep longer)
114    pub priority: u8,
115    /// Whether to refresh on access (sliding expiration)
116    pub sliding: bool,
117}
118
119impl Default for CachePolicy {
120    fn default() -> Self {
121        Self {
122            ttl: Duration::from_secs(300), // 5 minutes
123            dependencies: HashSet::new(),
124            priority: 50,
125            sliding: false,
126        }
127    }
128}
129
130impl CachePolicy {
131    /// Set TTL
132    pub fn ttl(mut self, ttl: Duration) -> Self {
133        self.ttl = ttl;
134        self
135    }
136
137    /// Add dependencies
138    pub fn depends_on(mut self, deps: &[&str]) -> Self {
139        for dep in deps {
140            self.dependencies.insert(dep.to_string());
141        }
142        self
143    }
144
145    /// Set priority
146    pub fn priority(mut self, priority: u8) -> Self {
147        self.priority = priority;
148        self
149    }
150
151    /// Enable sliding expiration
152    pub fn sliding(mut self) -> Self {
153        self.sliding = true;
154        self
155    }
156}
157
158// ============================================================================
159// Cache Entry
160// ============================================================================
161
162/// Cached query result
163struct CacheEntry {
164    /// Serialized result data
165    data: Vec<u8>,
166    /// Estimated size in bytes
167    size: usize,
168    /// When this entry was created
169    created_at: Instant,
170    /// Last access time
171    last_accessed: Instant,
172    /// Access count
173    access_count: AtomicU64,
174    /// Cache policy
175    policy: CachePolicy,
176}
177
178impl CacheEntry {
179    fn new(data: Vec<u8>, policy: CachePolicy) -> Self {
180        let size = data.len();
181        let now = Instant::now();
182        Self {
183            data,
184            size,
185            created_at: now,
186            last_accessed: now,
187            access_count: AtomicU64::new(0),
188            policy,
189        }
190    }
191
192    fn is_expired(&self) -> bool {
193        let elapsed = if self.policy.sliding {
194            self.last_accessed.elapsed()
195        } else {
196            self.created_at.elapsed()
197        };
198        elapsed > self.policy.ttl
199    }
200
201    fn touch(&mut self) {
202        self.access_count.fetch_add(1, Ordering::Relaxed);
203        self.last_accessed = Instant::now();
204    }
205
206    /// Calculate eviction score (lower = more likely to evict)
207    fn eviction_score(&self) -> u64 {
208        let frequency = self.access_count.load(Ordering::Relaxed);
209        let recency = self.last_accessed.elapsed().as_secs();
210        let priority = self.policy.priority as u64;
211
212        // Score: frequency * priority / recency
213        // Higher = keep longer
214        frequency
215            .saturating_mul(priority)
216            .checked_div(recency)
217            .unwrap_or_else(|| frequency.saturating_mul(priority).saturating_mul(1000))
218    }
219}
220
221// ============================================================================
222// Cache Statistics
223// ============================================================================
224
225/// Cache statistics
226#[derive(Debug, Clone, Default)]
227pub struct ResultCacheStats {
228    /// Cache hits
229    pub hits: u64,
230    /// Cache misses
231    pub misses: u64,
232    /// Entries evicted
233    pub evictions: u64,
234    /// Current entry count
235    pub entry_count: usize,
236    /// Current memory usage in bytes
237    pub memory_bytes: usize,
238    /// Maximum memory limit
239    pub max_memory_bytes: usize,
240    /// Entries expired
241    pub expirations: u64,
242    /// Invalidations by dependency
243    pub invalidations: u64,
244}
245
246impl ResultCacheStats {
247    /// Calculate hit rate
248    pub fn hit_rate(&self) -> f64 {
249        let total = self.hits + self.misses;
250        if total == 0 {
251            0.0
252        } else {
253            self.hits as f64 / total as f64
254        }
255    }
256
257    /// Calculate memory utilization
258    pub fn memory_utilization(&self) -> f64 {
259        if self.max_memory_bytes == 0 {
260            0.0
261        } else {
262            self.memory_bytes as f64 / self.max_memory_bytes as f64
263        }
264    }
265}
266
267// ============================================================================
268// Result Cache
269// ============================================================================
270
271/// LRU cache for query results with memory management
272pub struct ResultCache {
273    /// Cached entries
274    entries: HashMap<CacheKey, CacheEntry>,
275    /// Dependency index: table -> keys depending on it
276    dependency_index: HashMap<String, HashSet<CacheKey>>,
277    /// Maximum memory in bytes
278    max_memory: usize,
279    /// Current memory usage
280    current_memory: usize,
281    /// Statistics
282    stats: ResultCacheStats,
283}
284
285impl ResultCache {
286    /// Create a new result cache with max memory limit
287    pub fn new(max_memory_bytes: usize) -> Self {
288        Self {
289            entries: HashMap::new(),
290            dependency_index: HashMap::new(),
291            max_memory: max_memory_bytes,
292            current_memory: 0,
293            stats: ResultCacheStats {
294                max_memory_bytes,
295                ..Default::default()
296            },
297        }
298    }
299
300    /// Get a cached result
301    pub fn get(&mut self, key: &CacheKey) -> Option<Vec<u8>> {
302        // Remove expired entry if present
303        if let Some(entry) = self.entries.get(key) {
304            if entry.is_expired() {
305                self.remove(key);
306                self.stats.expirations += 1;
307                self.stats.misses += 1;
308                return None;
309            }
310        }
311
312        if let Some(entry) = self.entries.get_mut(key) {
313            entry.touch();
314            self.stats.hits += 1;
315            Some(entry.data.clone())
316        } else {
317            self.stats.misses += 1;
318            None
319        }
320    }
321
322    /// Check if a key exists (without touching)
323    pub fn contains(&self, key: &CacheKey) -> bool {
324        if let Some(entry) = self.entries.get(key) {
325            !entry.is_expired()
326        } else {
327            false
328        }
329    }
330
331    /// Insert a result into the cache
332    pub fn insert(&mut self, key: CacheKey, data: Vec<u8>, policy: CachePolicy) {
333        let entry_size = data.len() + std::mem::size_of::<CacheEntry>();
334
335        // Remove existing entry if present
336        if self.entries.contains_key(&key) {
337            self.remove(&key);
338        }
339
340        // Evict until we have space
341        while self.current_memory + entry_size > self.max_memory && !self.entries.is_empty() {
342            self.evict_one();
343        }
344
345        // Index dependencies
346        for dep in &policy.dependencies {
347            self.dependency_index
348                .entry(dep.clone())
349                .or_default()
350                .insert(key.clone());
351        }
352
353        let entry = CacheEntry::new(data, policy);
354        self.current_memory += entry.size;
355        self.entries.insert(key, entry);
356        self.stats.entry_count = self.entries.len();
357        self.stats.memory_bytes = self.current_memory;
358    }
359
360    /// Remove an entry
361    pub fn remove(&mut self, key: &CacheKey) -> Option<Vec<u8>> {
362        if let Some(entry) = self.entries.remove(key) {
363            self.current_memory = self.current_memory.saturating_sub(entry.size);
364
365            // Remove from dependency index
366            for dep in &entry.policy.dependencies {
367                if let Some(keys) = self.dependency_index.get_mut(dep) {
368                    keys.remove(key);
369                }
370            }
371
372            self.stats.entry_count = self.entries.len();
373            self.stats.memory_bytes = self.current_memory;
374            Some(entry.data)
375        } else {
376            None
377        }
378    }
379
380    /// Invalidate all entries depending on a table/collection
381    pub fn invalidate_by_dependency(&mut self, dependency: &str) {
382        if let Some(keys) = self.dependency_index.remove(dependency) {
383            for key in keys {
384                if self.entries.remove(&key).is_some() {
385                    self.stats.invalidations += 1;
386                }
387            }
388            self.stats.entry_count = self.entries.len();
389            // Recalculate memory
390            self.current_memory = self.entries.values().map(|e| e.size).sum();
391            self.stats.memory_bytes = self.current_memory;
392        }
393    }
394
395    /// Invalidate entries matching a predicate
396    pub fn invalidate_where<F>(&mut self, predicate: F)
397    where
398        F: Fn(&CacheKey) -> bool,
399    {
400        let keys_to_remove: Vec<CacheKey> = self
401            .entries
402            .keys()
403            .filter(|k| predicate(k))
404            .cloned()
405            .collect();
406
407        for key in keys_to_remove {
408            self.remove(&key);
409            self.stats.invalidations += 1;
410        }
411    }
412
413    /// Prune all expired entries
414    pub fn prune_expired(&mut self) {
415        let expired: Vec<CacheKey> = self
416            .entries
417            .iter()
418            .filter(|(_, v)| v.is_expired())
419            .map(|(k, _)| k.clone())
420            .collect();
421
422        for key in expired {
423            self.remove(&key);
424            self.stats.expirations += 1;
425        }
426    }
427
428    /// Clear all entries
429    pub fn clear(&mut self) {
430        self.entries.clear();
431        self.dependency_index.clear();
432        self.current_memory = 0;
433        self.stats.entry_count = 0;
434        self.stats.memory_bytes = 0;
435    }
436
437    /// Get cache statistics
438    pub fn stats(&self) -> &ResultCacheStats {
439        &self.stats
440    }
441
442    /// Evict one entry (lowest eviction score)
443    fn evict_one(&mut self) {
444        let victim = self
445            .entries
446            .iter()
447            .min_by_key(|(_, v)| v.eviction_score())
448            .map(|(k, _)| k.clone());
449
450        if let Some(key) = victim {
451            self.remove(&key);
452            self.stats.evictions += 1;
453        }
454    }
455}
456
457// ============================================================================
458// Materialized View Cache
459// ============================================================================
460
461/// Definition of a materialized view
462#[derive(Debug, Clone)]
463pub struct MaterializedViewDef {
464    /// View name
465    pub name: String,
466    /// Query that populates the view
467    pub query: String,
468    /// Tables this view depends on
469    pub dependencies: Vec<String>,
470    /// Refresh policy
471    pub refresh: RefreshPolicy,
472}
473
474/// How to refresh a materialized view
475#[derive(Debug, Clone)]
476pub enum RefreshPolicy {
477    /// Refresh on demand only
478    Manual,
479    /// Refresh when dependencies change
480    OnChange,
481    /// Refresh periodically
482    Periodic(Duration),
483    /// Refresh after N invalidations
484    AfterWrites(usize),
485}
486
487/// Materialized view cache entry
488struct MaterializedView {
489    /// The cached result
490    data: Vec<u8>,
491    /// View definition
492    def: MaterializedViewDef,
493    /// When last refreshed
494    last_refresh: Instant,
495    /// Write count since last refresh
496    writes_since_refresh: usize,
497    /// Whether the view is stale
498    stale: bool,
499}
500
501/// Cache for materialized views
502pub struct MaterializedViewCache {
503    /// Views by name
504    views: HashMap<String, MaterializedView>,
505    /// Dependency index: table -> view names
506    dependency_index: HashMap<String, HashSet<String>>,
507}
508
509impl MaterializedViewCache {
510    /// Create a new materialized view cache
511    pub fn new() -> Self {
512        Self {
513            views: HashMap::new(),
514            dependency_index: HashMap::new(),
515        }
516    }
517
518    /// Register a view definition
519    pub fn register(&mut self, def: MaterializedViewDef) {
520        // Index dependencies
521        for dep in &def.dependencies {
522            self.dependency_index
523                .entry(dep.clone())
524                .or_default()
525                .insert(def.name.clone());
526        }
527
528        let view = MaterializedView {
529            data: Vec::new(),
530            def,
531            last_refresh: Instant::now(),
532            writes_since_refresh: 0,
533            stale: true,
534        };
535
536        self.views.insert(view.def.name.clone(), view);
537    }
538
539    /// Get view data (if not stale)
540    pub fn get(&self, name: &str) -> Option<&[u8]> {
541        self.views
542            .get(name)
543            .filter(|v| !v.stale && !v.data.is_empty())
544            .map(|v| v.data.as_slice())
545    }
546
547    /// Check if view needs refresh
548    pub fn needs_refresh(&self, name: &str) -> bool {
549        self.views.get(name).map(|v| v.stale).unwrap_or(false)
550    }
551
552    /// Refresh a view with new data
553    pub fn refresh(&mut self, name: &str, data: Vec<u8>) {
554        if let Some(view) = self.views.get_mut(name) {
555            view.data = data;
556            view.last_refresh = Instant::now();
557            view.writes_since_refresh = 0;
558            view.stale = false;
559        }
560    }
561
562    /// Mark views depending on a table as stale
563    pub fn mark_stale(&mut self, table: &str) {
564        if let Some(view_names) = self.dependency_index.get(table) {
565            for name in view_names.clone() {
566                if let Some(view) = self.views.get_mut(&name) {
567                    view.writes_since_refresh += 1;
568
569                    match &view.def.refresh {
570                        RefreshPolicy::OnChange => {
571                            view.stale = true;
572                        }
573                        RefreshPolicy::AfterWrites(threshold)
574                            if view.writes_since_refresh >= *threshold =>
575                        {
576                            view.stale = true;
577                        }
578                        _ => {}
579                    }
580                }
581            }
582        }
583    }
584
585    /// Get views needing periodic refresh
586    pub fn due_for_refresh(&self) -> Vec<String> {
587        self.views
588            .values()
589            .filter(|v| {
590                if let RefreshPolicy::Periodic(interval) = &v.def.refresh {
591                    v.last_refresh.elapsed() >= *interval
592                } else {
593                    false
594                }
595            })
596            .map(|v| v.def.name.clone())
597            .collect()
598    }
599
600    /// Remove a view
601    pub fn remove(&mut self, name: &str) {
602        if let Some(view) = self.views.remove(name) {
603            for dep in &view.def.dependencies {
604                if let Some(names) = self.dependency_index.get_mut(dep) {
605                    names.remove(name);
606                }
607            }
608        }
609    }
610
611    /// List all view names
612    pub fn list(&self) -> Vec<&str> {
613        self.views.keys().map(|s| s.as_str()).collect()
614    }
615}
616
617impl Default for MaterializedViewCache {
618    fn default() -> Self {
619        Self::new()
620    }
621}
622
623// ============================================================================
624// Tests
625// ============================================================================
626
627#[cfg(test)]
628mod tests {
629    use super::*;
630
631    #[test]
632    fn test_cache_key_hashing() {
633        let key1 = CacheKey::new("attack_paths")
634            .param("from", "host1")
635            .param("to", "host2");
636
637        let key2 = CacheKey::new("attack_paths")
638            .param("to", "host2")
639            .param("from", "host1"); // Different order
640
641        assert_eq!(key1, key2);
642        assert_eq!(key1.hash, key2.hash);
643    }
644
645    #[test]
646    fn test_result_cache_basic() {
647        let mut cache = ResultCache::new(1024 * 1024); // 1MB
648
649        let key = CacheKey::new("test_query").param("id", "123");
650        let data = vec![1, 2, 3, 4, 5];
651
652        cache.insert(key.clone(), data.clone(), CachePolicy::default());
653
654        let result = cache.get(&key);
655        assert_eq!(result, Some(data));
656        assert_eq!(cache.stats().hits, 1);
657    }
658
659    #[test]
660    fn test_cache_expiration() {
661        let mut cache = ResultCache::new(1024 * 1024);
662
663        let key = CacheKey::new("test");
664        let data = vec![1, 2, 3];
665
666        // Very short TTL
667        cache.insert(
668            key.clone(),
669            data,
670            CachePolicy::default().ttl(Duration::from_millis(1)),
671        );
672
673        // Wait for expiration
674        std::thread::sleep(Duration::from_millis(10));
675
676        assert!(cache.get(&key).is_none());
677        assert_eq!(cache.stats().expirations, 1);
678    }
679
680    #[test]
681    fn test_dependency_invalidation() {
682        let mut cache = ResultCache::new(1024 * 1024);
683
684        let key = CacheKey::new("host_query");
685        cache.insert(
686            key.clone(),
687            vec![1, 2, 3],
688            CachePolicy::default().depends_on(&["hosts"]),
689        );
690
691        assert!(cache.contains(&key));
692
693        // Invalidate hosts table
694        cache.invalidate_by_dependency("hosts");
695
696        assert!(!cache.contains(&key));
697        assert_eq!(cache.stats().invalidations, 1);
698    }
699
700    #[test]
701    fn test_memory_eviction() {
702        let mut cache = ResultCache::new(100); // Very small
703
704        // Insert enough to trigger eviction
705        for i in 0..10 {
706            let key = CacheKey::new("query").param("i", i.to_string());
707            cache.insert(key, vec![0u8; 20], CachePolicy::default());
708        }
709
710        // Should have evicted some entries
711        assert!(cache.stats().evictions > 0);
712        assert!(cache.stats().memory_bytes <= 100);
713    }
714
715    #[test]
716    fn test_materialized_view() {
717        let mut cache = MaterializedViewCache::new();
718
719        cache.register(MaterializedViewDef {
720            name: "active_hosts".to_string(),
721            query: "SELECT * FROM hosts WHERE status = 'active'".to_string(),
722            dependencies: vec!["hosts".to_string()],
723            refresh: RefreshPolicy::OnChange,
724        });
725
726        // Initially stale
727        assert!(cache.needs_refresh("active_hosts"));
728
729        // Refresh it
730        cache.refresh("active_hosts", vec![1, 2, 3]);
731        assert!(!cache.needs_refresh("active_hosts"));
732        assert_eq!(cache.get("active_hosts"), Some(&[1, 2, 3][..]));
733
734        // Mark stale due to dependency
735        cache.mark_stale("hosts");
736        assert!(cache.needs_refresh("active_hosts"));
737    }
738}