Skip to main content

reddb_server/storage/cache/
result.rs

1//! Query Result Cache
2//!
3//! Caches query results for expensive cross-modal queries.
4//! Supports TTL-based expiration and dependency-based invalidation.
5//!
6//! # Features
7//!
8//! - **Result Caching**: Cache expensive query results
9//! - **TTL Expiration**: Automatic expiry based on time
10//! - **Dependency Tracking**: Invalidate when underlying data changes
11//! - **Size Management**: LRU eviction based on memory usage
12//! - **Statistics**: Cache hit/miss tracking
13//!
14//! # Example
15//!
16//! ```ignore
17//! use storage::cache::result::{ResultCache, CacheKey, CachePolicy};
18//!
19//! let mut cache = ResultCache::new(100_000_000); // 100MB max
20//!
21//! let key = CacheKey::new("attack_paths")
22//!     .param("from", "external")
23//!     .param("to", "database");
24//!
25//! if let Some(result) = cache.get(&key) {
26//!     return result;
27//! }
28//!
29//! let result = expensive_query();
30//! cache.insert(key, result.clone(), CachePolicy::default()
31//!     .ttl(Duration::from_secs(300))
32//!     .depends_on(&["hosts", "vulnerabilities"]));
33//! ```
34
35use std::collections::{HashMap, HashSet};
36use std::hash::{Hash, Hasher};
37use std::sync::atomic::{AtomicU64, Ordering};
38use std::time::{Duration, Instant};
39
40// ============================================================================
41// Cache Key
42// ============================================================================
43
44/// Cache key for query results
45#[derive(Debug, Clone, PartialEq, Eq)]
46pub struct CacheKey {
47    /// Query type identifier
48    pub query_type: String,
49    /// Query parameters (sorted for consistent hashing)
50    pub params: Vec<(String, String)>,
51    /// Hash for fast comparison
52    hash: u64,
53}
54
55impl CacheKey {
56    /// Create a new cache key
57    pub fn new(query_type: impl Into<String>) -> Self {
58        let query_type = query_type.into();
59        let mut key = Self {
60            query_type,
61            params: Vec::new(),
62            hash: 0,
63        };
64        key.rehash();
65        key
66    }
67
68    /// Add a parameter to the key
69    pub fn param(mut self, name: impl Into<String>, value: impl Into<String>) -> Self {
70        self.params.push((name.into(), value.into()));
71        self.params.sort_by(|a, b| a.0.cmp(&b.0));
72        self.rehash();
73        self
74    }
75
76    /// Add multiple parameters
77    pub fn params(mut self, params: impl IntoIterator<Item = (String, String)>) -> Self {
78        self.params.extend(params);
79        self.params.sort_by(|a, b| a.0.cmp(&b.0));
80        self.rehash();
81        self
82    }
83
84    fn rehash(&mut self) {
85        use std::collections::hash_map::DefaultHasher;
86        let mut hasher = DefaultHasher::new();
87        self.query_type.hash(&mut hasher);
88        for (k, v) in &self.params {
89            k.hash(&mut hasher);
90            v.hash(&mut hasher);
91        }
92        self.hash = hasher.finish();
93    }
94}
95
96impl Hash for CacheKey {
97    fn hash<H: Hasher>(&self, state: &mut H) {
98        state.write_u64(self.hash);
99    }
100}
101
102// ============================================================================
103// Cache Policy
104// ============================================================================
105
106/// Policy for cache entry behavior
107#[derive(Debug, Clone)]
108pub struct CachePolicy {
109    /// Time to live
110    pub ttl: Duration,
111    /// Tables/collections this result depends on
112    pub dependencies: HashSet<String>,
113    /// Priority for eviction (higher = keep longer)
114    pub priority: u8,
115    /// Whether to refresh on access (sliding expiration)
116    pub sliding: bool,
117}
118
119impl Default for CachePolicy {
120    fn default() -> Self {
121        Self {
122            ttl: Duration::from_secs(300), // 5 minutes
123            dependencies: HashSet::new(),
124            priority: 50,
125            sliding: false,
126        }
127    }
128}
129
130impl CachePolicy {
131    /// Set TTL
132    pub fn ttl(mut self, ttl: Duration) -> Self {
133        self.ttl = ttl;
134        self
135    }
136
137    /// Add dependencies
138    pub fn depends_on(mut self, deps: &[&str]) -> Self {
139        for dep in deps {
140            self.dependencies.insert(dep.to_string());
141        }
142        self
143    }
144
145    /// Set priority
146    pub fn priority(mut self, priority: u8) -> Self {
147        self.priority = priority;
148        self
149    }
150
151    /// Enable sliding expiration
152    pub fn sliding(mut self) -> Self {
153        self.sliding = true;
154        self
155    }
156}
157
158// ============================================================================
159// Cache Entry
160// ============================================================================
161
162/// Cached query result
163struct CacheEntry {
164    /// Serialized result data
165    data: Vec<u8>,
166    /// Estimated size in bytes
167    size: usize,
168    /// When this entry was created
169    created_at: Instant,
170    /// Last access time
171    last_accessed: Instant,
172    /// Access count
173    access_count: AtomicU64,
174    /// Cache policy
175    policy: CachePolicy,
176}
177
178impl CacheEntry {
179    fn new(data: Vec<u8>, policy: CachePolicy) -> Self {
180        let size = data.len();
181        let now = Instant::now();
182        Self {
183            data,
184            size,
185            created_at: now,
186            last_accessed: now,
187            access_count: AtomicU64::new(0),
188            policy,
189        }
190    }
191
192    fn is_expired(&self) -> bool {
193        let elapsed = if self.policy.sliding {
194            self.last_accessed.elapsed()
195        } else {
196            self.created_at.elapsed()
197        };
198        elapsed > self.policy.ttl
199    }
200
201    fn touch(&mut self) {
202        self.access_count.fetch_add(1, Ordering::Relaxed);
203        self.last_accessed = Instant::now();
204    }
205
206    /// Calculate eviction score (lower = more likely to evict)
207    fn eviction_score(&self) -> u64 {
208        let frequency = self.access_count.load(Ordering::Relaxed);
209        let recency = self.last_accessed.elapsed().as_secs();
210        let priority = self.policy.priority as u64;
211
212        // Score: frequency * priority / recency
213        // Higher = keep longer
214        frequency
215            .saturating_mul(priority)
216            .checked_div(recency)
217            .unwrap_or_else(|| frequency.saturating_mul(priority).saturating_mul(1000))
218    }
219}
220
221// ============================================================================
222// Cache Statistics
223// ============================================================================
224
225/// Cache statistics
226#[derive(Debug, Clone, Default)]
227pub struct ResultCacheStats {
228    /// Cache hits
229    pub hits: u64,
230    /// Cache misses
231    pub misses: u64,
232    /// Entries evicted
233    pub evictions: u64,
234    /// Current entry count
235    pub entry_count: usize,
236    /// Current memory usage in bytes
237    pub memory_bytes: usize,
238    /// Maximum memory limit
239    pub max_memory_bytes: usize,
240    /// Entries expired
241    pub expirations: u64,
242    /// Invalidations by dependency
243    pub invalidations: u64,
244}
245
246impl ResultCacheStats {
247    /// Calculate hit rate
248    pub fn hit_rate(&self) -> f64 {
249        let total = self.hits + self.misses;
250        if total == 0 {
251            0.0
252        } else {
253            self.hits as f64 / total as f64
254        }
255    }
256
257    /// Calculate memory utilization
258    pub fn memory_utilization(&self) -> f64 {
259        if self.max_memory_bytes == 0 {
260            0.0
261        } else {
262            self.memory_bytes as f64 / self.max_memory_bytes as f64
263        }
264    }
265}
266
267// ============================================================================
268// Result Cache
269// ============================================================================
270
271/// LRU cache for query results with memory management
272pub struct ResultCache {
273    /// Cached entries
274    entries: HashMap<CacheKey, CacheEntry>,
275    /// Dependency index: table -> keys depending on it
276    dependency_index: HashMap<String, HashSet<CacheKey>>,
277    /// Maximum memory in bytes
278    max_memory: usize,
279    /// Current memory usage
280    current_memory: usize,
281    /// Statistics
282    stats: ResultCacheStats,
283}
284
285impl ResultCache {
286    /// Create a new result cache with max memory limit
287    pub fn new(max_memory_bytes: usize) -> Self {
288        Self {
289            entries: HashMap::new(),
290            dependency_index: HashMap::new(),
291            max_memory: max_memory_bytes,
292            current_memory: 0,
293            stats: ResultCacheStats {
294                max_memory_bytes,
295                ..Default::default()
296            },
297        }
298    }
299
300    /// Get a cached result
301    pub fn get(&mut self, key: &CacheKey) -> Option<Vec<u8>> {
302        // Remove expired entry if present
303        if let Some(entry) = self.entries.get(key) {
304            if entry.is_expired() {
305                self.remove(key);
306                self.stats.expirations += 1;
307                self.stats.misses += 1;
308                return None;
309            }
310        }
311
312        if let Some(entry) = self.entries.get_mut(key) {
313            entry.touch();
314            self.stats.hits += 1;
315            Some(entry.data.clone())
316        } else {
317            self.stats.misses += 1;
318            None
319        }
320    }
321
322    /// Check if a key exists (without touching)
323    pub fn contains(&self, key: &CacheKey) -> bool {
324        if let Some(entry) = self.entries.get(key) {
325            !entry.is_expired()
326        } else {
327            false
328        }
329    }
330
331    /// Insert a result into the cache
332    pub fn insert(&mut self, key: CacheKey, data: Vec<u8>, policy: CachePolicy) {
333        let entry_size = data.len() + std::mem::size_of::<CacheEntry>();
334
335        // Remove existing entry if present
336        if self.entries.contains_key(&key) {
337            self.remove(&key);
338        }
339
340        // Evict until we have space
341        while self.current_memory + entry_size > self.max_memory && !self.entries.is_empty() {
342            self.evict_one();
343        }
344
345        // Index dependencies
346        for dep in &policy.dependencies {
347            self.dependency_index
348                .entry(dep.clone())
349                .or_default()
350                .insert(key.clone());
351        }
352
353        let entry = CacheEntry::new(data, policy);
354        self.current_memory += entry.size;
355        self.entries.insert(key, entry);
356        self.stats.entry_count = self.entries.len();
357        self.stats.memory_bytes = self.current_memory;
358    }
359
360    /// Remove an entry
361    pub fn remove(&mut self, key: &CacheKey) -> Option<Vec<u8>> {
362        if let Some(entry) = self.entries.remove(key) {
363            self.current_memory = self.current_memory.saturating_sub(entry.size);
364
365            // Remove from dependency index
366            for dep in &entry.policy.dependencies {
367                if let Some(keys) = self.dependency_index.get_mut(dep) {
368                    keys.remove(key);
369                }
370            }
371
372            self.stats.entry_count = self.entries.len();
373            self.stats.memory_bytes = self.current_memory;
374            Some(entry.data)
375        } else {
376            None
377        }
378    }
379
380    /// Invalidate all entries depending on a table/collection
381    pub fn invalidate_by_dependency(&mut self, dependency: &str) {
382        if let Some(keys) = self.dependency_index.remove(dependency) {
383            for key in keys {
384                if self.entries.remove(&key).is_some() {
385                    self.stats.invalidations += 1;
386                }
387            }
388            self.stats.entry_count = self.entries.len();
389            // Recalculate memory
390            self.current_memory = self.entries.values().map(|e| e.size).sum();
391            self.stats.memory_bytes = self.current_memory;
392        }
393    }
394
395    /// Invalidate entries matching a predicate
396    pub fn invalidate_where<F>(&mut self, predicate: F)
397    where
398        F: Fn(&CacheKey) -> bool,
399    {
400        let keys_to_remove: Vec<CacheKey> = self
401            .entries
402            .keys()
403            .filter(|k| predicate(k))
404            .cloned()
405            .collect();
406
407        for key in keys_to_remove {
408            self.remove(&key);
409            self.stats.invalidations += 1;
410        }
411    }
412
413    /// Prune all expired entries
414    pub fn prune_expired(&mut self) {
415        let expired: Vec<CacheKey> = self
416            .entries
417            .iter()
418            .filter(|(_, v)| v.is_expired())
419            .map(|(k, _)| k.clone())
420            .collect();
421
422        for key in expired {
423            self.remove(&key);
424            self.stats.expirations += 1;
425        }
426    }
427
428    /// Clear all entries
429    pub fn clear(&mut self) {
430        self.entries.clear();
431        self.dependency_index.clear();
432        self.current_memory = 0;
433        self.stats.entry_count = 0;
434        self.stats.memory_bytes = 0;
435    }
436
437    /// Get cache statistics
438    pub fn stats(&self) -> &ResultCacheStats {
439        &self.stats
440    }
441
442    /// Evict one entry (lowest eviction score)
443    fn evict_one(&mut self) {
444        let victim = self
445            .entries
446            .iter()
447            .min_by_key(|(_, v)| v.eviction_score())
448            .map(|(k, _)| k.clone());
449
450        if let Some(key) = victim {
451            self.remove(&key);
452            self.stats.evictions += 1;
453        }
454    }
455}
456
457// ============================================================================
458// Materialized View Cache
459// ============================================================================
460
461/// Definition of a materialized view
462#[derive(Debug, Clone)]
463pub struct MaterializedViewDef {
464    /// View name
465    pub name: String,
466    /// Query that populates the view
467    pub query: String,
468    /// Tables this view depends on
469    pub dependencies: Vec<String>,
470    /// Refresh policy
471    pub refresh: RefreshPolicy,
472    /// `WITH RETENTION <duration>` clause on CREATE MATERIALIZED VIEW
473    /// (issue #584 slice 12). Persisted on the view definition; the
474    /// physical sweep against view-backing rows activates once the
475    /// slice-9 row-storage follow-up lands.
476    pub retention_duration_ms: Option<u64>,
477}
478
479/// How to refresh a materialized view
480#[derive(Debug, Clone)]
481pub enum RefreshPolicy {
482    /// Refresh on demand only
483    Manual,
484    /// Refresh when dependencies change
485    OnChange,
486    /// Refresh periodically
487    Periodic(Duration),
488    /// Refresh after N invalidations
489    AfterWrites(usize),
490}
491
492/// Materialized view cache entry
493struct MaterializedView {
494    /// The cached result
495    data: Vec<u8>,
496    /// View definition
497    def: MaterializedViewDef,
498    /// When last refreshed (logical clock — `Instant::now()` in
499    /// production, fed by callers for fake-clock unit tests).
500    last_refresh: Instant,
501    /// Wall-clock timestamp of the last successful or attempted
502    /// refresh — surfaced via `red.materialized_views.last_refresh_at`.
503    last_refresh_at_ms: u64,
504    /// Duration of the last refresh attempt (success or failure).
505    last_refresh_duration_ms: u64,
506    /// Last refresh error, cleared on next successful refresh.
507    last_error: Option<String>,
508    /// Row count of the most recent successful refresh.
509    current_row_count: u64,
510    /// Scheduled refresh cadence in milliseconds (mirrors
511    /// `RefreshPolicy::Periodic`). `None` means refresh-on-demand.
512    refresh_every_ms: Option<u64>,
513    /// `WITH RETENTION <duration>` from CREATE MATERIALIZED VIEW
514    /// (issue #584 slice 12).
515    view_retention_ms: Option<u64>,
516    /// Write count since last refresh
517    writes_since_refresh: usize,
518    /// Whether the view is stale
519    stale: bool,
520}
521
522/// Public snapshot of a materialized view's runtime state. Returned
523/// by `MaterializedViewCache::metadata` for the `red.materialized_views`
524/// virtual table.
525#[derive(Debug, Clone)]
526pub struct MaterializedViewMetadata {
527    pub name: String,
528    pub query_text: String,
529    pub refresh_every_ms: Option<u64>,
530    pub last_refresh_at_ms: u64,
531    pub last_refresh_duration_ms: u64,
532    pub last_error: Option<String>,
533    pub current_row_count: u64,
534    /// `WITH RETENTION <duration>` clause from CREATE MATERIALIZED VIEW
535    /// (issue #584 slice 12).
536    pub retention_duration_ms: Option<u64>,
537}
538
539/// Cache for materialized views
540pub struct MaterializedViewCache {
541    /// Views by name
542    views: HashMap<String, MaterializedView>,
543    /// Dependency index: table -> view names
544    dependency_index: HashMap<String, HashSet<String>>,
545}
546
547impl MaterializedViewCache {
548    /// Create a new materialized view cache
549    pub fn new() -> Self {
550        Self {
551            views: HashMap::new(),
552            dependency_index: HashMap::new(),
553        }
554    }
555
556    /// Register a view definition
557    pub fn register(&mut self, def: MaterializedViewDef) {
558        // Index dependencies
559        for dep in &def.dependencies {
560            self.dependency_index
561                .entry(dep.clone())
562                .or_default()
563                .insert(def.name.clone());
564        }
565
566        let refresh_every_ms = match &def.refresh {
567            RefreshPolicy::Periodic(d) => Some(d.as_millis() as u64),
568            _ => None,
569        };
570        let view_retention_ms = def.retention_duration_ms;
571
572        let view = MaterializedView {
573            data: Vec::new(),
574            def,
575            last_refresh: Instant::now(),
576            last_refresh_at_ms: 0,
577            last_refresh_duration_ms: 0,
578            last_error: None,
579            current_row_count: 0,
580            refresh_every_ms,
581            view_retention_ms,
582            writes_since_refresh: 0,
583            stale: true,
584        };
585
586        self.views.insert(view.def.name.clone(), view);
587    }
588
589    /// Atomically claim the set of views due for periodic refresh at
590    /// the given logical instant. Returned views have their
591    /// `last_refresh` pre-advanced to `now` so a concurrent caller on
592    /// the same tick sees them as not-due (the contract requested by
593    /// the "exactly once per tick, no duplicates" acceptance criterion
594    /// in issue #583 slice 10).
595    pub fn claim_due_at(&mut self, now: Instant) -> Vec<String> {
596        let mut due = Vec::new();
597        for view in self.views.values_mut() {
598            if let RefreshPolicy::Periodic(interval) = &view.def.refresh {
599                let elapsed = now.saturating_duration_since(view.last_refresh);
600                if elapsed >= *interval {
601                    due.push(view.def.name.clone());
602                    view.last_refresh = now;
603                }
604            }
605        }
606        due
607    }
608
609    /// Record a successful refresh — updates cached data, row count,
610    /// duration, wall-clock timestamp, and clears any prior error.
611    pub fn record_refresh_success(
612        &mut self,
613        name: &str,
614        data: Vec<u8>,
615        row_count: u64,
616        duration_ms: u64,
617        at_unix_ms: u64,
618    ) {
619        if let Some(view) = self.views.get_mut(name) {
620            view.data = data;
621            view.last_refresh = Instant::now();
622            view.last_refresh_at_ms = at_unix_ms;
623            view.last_refresh_duration_ms = duration_ms;
624            view.last_error = None;
625            view.current_row_count = row_count;
626            view.writes_since_refresh = 0;
627            view.stale = false;
628        }
629    }
630
631    /// Record a refresh failure — captures the error string and
632    /// duration but leaves `data` / `current_row_count` intact so
633    /// the prior content remains readable (per acceptance criterion).
634    pub fn record_refresh_failure(
635        &mut self,
636        name: &str,
637        error: String,
638        duration_ms: u64,
639        at_unix_ms: u64,
640    ) {
641        if let Some(view) = self.views.get_mut(name) {
642            view.last_refresh = Instant::now();
643            view.last_refresh_at_ms = at_unix_ms;
644            view.last_refresh_duration_ms = duration_ms;
645            view.last_error = Some(error);
646        }
647    }
648
649    /// Per-view runtime state snapshot — feeds `red.materialized_views`.
650    pub fn metadata(&self) -> Vec<MaterializedViewMetadata> {
651        self.views
652            .values()
653            .map(|v| MaterializedViewMetadata {
654                name: v.def.name.clone(),
655                query_text: v.def.query.clone(),
656                refresh_every_ms: v.refresh_every_ms,
657                last_refresh_at_ms: v.last_refresh_at_ms,
658                last_refresh_duration_ms: v.last_refresh_duration_ms,
659                last_error: v.last_error.clone(),
660                current_row_count: v.current_row_count,
661                retention_duration_ms: v.view_retention_ms,
662            })
663            .collect()
664    }
665
666    /// Get view data (if not stale)
667    pub fn get(&self, name: &str) -> Option<&[u8]> {
668        self.views
669            .get(name)
670            .filter(|v| !v.stale && !v.data.is_empty())
671            .map(|v| v.data.as_slice())
672    }
673
674    /// Check if view needs refresh
675    pub fn needs_refresh(&self, name: &str) -> bool {
676        self.views.get(name).map(|v| v.stale).unwrap_or(false)
677    }
678
679    /// Refresh a view with new data
680    pub fn refresh(&mut self, name: &str, data: Vec<u8>) {
681        if let Some(view) = self.views.get_mut(name) {
682            view.data = data;
683            view.last_refresh = Instant::now();
684            view.writes_since_refresh = 0;
685            view.stale = false;
686        }
687    }
688
689    /// Mark views depending on a table as stale
690    pub fn mark_stale(&mut self, table: &str) {
691        if let Some(view_names) = self.dependency_index.get(table) {
692            for name in view_names.clone() {
693                if let Some(view) = self.views.get_mut(&name) {
694                    view.writes_since_refresh += 1;
695
696                    match &view.def.refresh {
697                        RefreshPolicy::OnChange => {
698                            view.stale = true;
699                        }
700                        RefreshPolicy::AfterWrites(threshold)
701                            if view.writes_since_refresh >= *threshold =>
702                        {
703                            view.stale = true;
704                        }
705                        _ => {}
706                    }
707                }
708            }
709        }
710    }
711
712    /// Get views needing periodic refresh
713    pub fn due_for_refresh(&self) -> Vec<String> {
714        self.views
715            .values()
716            .filter(|v| {
717                if let RefreshPolicy::Periodic(interval) = &v.def.refresh {
718                    v.last_refresh.elapsed() >= *interval
719                } else {
720                    false
721                }
722            })
723            .map(|v| v.def.name.clone())
724            .collect()
725    }
726
727    /// Remove a view
728    pub fn remove(&mut self, name: &str) {
729        if let Some(view) = self.views.remove(name) {
730            for dep in &view.def.dependencies {
731                if let Some(names) = self.dependency_index.get_mut(dep) {
732                    names.remove(name);
733                }
734            }
735        }
736    }
737
738    /// List all view names
739    pub fn list(&self) -> Vec<&str> {
740        self.views.keys().map(|s| s.as_str()).collect()
741    }
742}
743
744impl Default for MaterializedViewCache {
745    fn default() -> Self {
746        Self::new()
747    }
748}
749
750// ============================================================================
751// Tests
752// ============================================================================
753
754#[cfg(test)]
755mod tests {
756    use super::*;
757
758    fn mk_periodic_view(name: &str, ms: u64) -> MaterializedViewDef {
759        MaterializedViewDef {
760            name: name.into(),
761            query: "<test>".into(),
762            dependencies: vec![],
763            refresh: RefreshPolicy::Periodic(Duration::from_millis(ms)),
764            retention_duration_ms: None,
765        }
766    }
767
768    /// Issue #583 slice 10 — `claim_due_at` returns due views and
769    /// pre-advances their `last_refresh`, so a second tick at the
770    /// same instant returns an empty set.
771    #[test]
772    fn test_materialized_view_claim_due_exactly_once_per_tick() {
773        let mut cache = MaterializedViewCache::new();
774        cache.register(mk_periodic_view("v1", 100));
775
776        let t0 = Instant::now();
777        // No time has passed — not due yet.
778        assert!(cache.claim_due_at(t0).is_empty());
779
780        // Advance past the interval.
781        let t1 = t0 + Duration::from_millis(150);
782        let due = cache.claim_due_at(t1);
783        assert_eq!(due, vec!["v1".to_string()]);
784
785        // Same tick → already claimed, must not duplicate.
786        assert!(cache.claim_due_at(t1).is_empty());
787
788        // Advance another full interval → due again exactly once.
789        let t2 = t1 + Duration::from_millis(150);
790        assert_eq!(cache.claim_due_at(t2), vec!["v1".to_string()]);
791    }
792
793    /// `Manual` views never appear in `claim_due_at`'s output.
794    #[test]
795    fn test_materialized_view_claim_due_skips_manual_views() {
796        let mut cache = MaterializedViewCache::new();
797        cache.register(MaterializedViewDef {
798            name: "m".into(),
799            query: "<test>".into(),
800            dependencies: vec![],
801            refresh: RefreshPolicy::Manual,
802            retention_duration_ms: None,
803        });
804        let t = Instant::now() + Duration::from_secs(60);
805        assert!(cache.claim_due_at(t).is_empty());
806    }
807
808    /// `record_refresh_failure` keeps prior data + row count intact
809    /// and stores the error string; the next success clears it.
810    #[test]
811    fn test_materialized_view_failure_preserves_prior_content() {
812        let mut cache = MaterializedViewCache::new();
813        cache.register(mk_periodic_view("v", 100));
814
815        cache.record_refresh_success("v", b"first-payload".to_vec(), 42, 7, 1_000);
816        {
817            let md = cache.metadata();
818            let entry = md.iter().find(|m| m.name == "v").unwrap();
819            assert_eq!(entry.current_row_count, 42);
820            assert!(entry.last_error.is_none());
821        }
822
823        cache.record_refresh_failure("v", "boom".into(), 3, 2_000);
824        {
825            let md = cache.metadata();
826            let entry = md.iter().find(|m| m.name == "v").unwrap();
827            // Prior content intact.
828            assert_eq!(entry.current_row_count, 42);
829            assert_eq!(entry.last_error.as_deref(), Some("boom"));
830            assert_eq!(entry.last_refresh_at_ms, 2_000);
831        }
832
833        cache.record_refresh_success("v", b"second".to_vec(), 7, 4, 3_000);
834        {
835            let md = cache.metadata();
836            let entry = md.iter().find(|m| m.name == "v").unwrap();
837            assert_eq!(entry.current_row_count, 7);
838            assert!(entry.last_error.is_none());
839        }
840    }
841
842    /// DROP cleanup: `remove` drops the view from both indices and
843    /// future `claim_due_at` ticks see nothing — proves no leaked
844    /// scheduled state remains after the view is dropped.
845    #[test]
846    fn test_materialized_view_drop_cleans_scheduled_work() {
847        let mut cache = MaterializedViewCache::new();
848        cache.register(mk_periodic_view("v", 50));
849        cache.remove("v");
850        let t = Instant::now() + Duration::from_secs(10);
851        assert!(cache.claim_due_at(t).is_empty());
852        assert!(cache.metadata().is_empty());
853    }
854
855    /// `metadata()` reflects the seven columns surfaced by
856    /// `red.materialized_views`.
857    #[test]
858    fn test_materialized_view_metadata_exposes_seven_fields() {
859        let mut cache = MaterializedViewCache::new();
860        cache.register(mk_periodic_view("v", 500));
861        let md = cache.metadata();
862        assert_eq!(md.len(), 1);
863        let m = &md[0];
864        assert_eq!(m.name, "v");
865        assert_eq!(m.refresh_every_ms, Some(500));
866        assert_eq!(m.last_refresh_at_ms, 0);
867        assert_eq!(m.last_refresh_duration_ms, 0);
868        assert!(m.last_error.is_none());
869        assert_eq!(m.current_row_count, 0);
870    }
871
872    #[test]
873    fn test_cache_key_hashing() {
874        let key1 = CacheKey::new("attack_paths")
875            .param("from", "host1")
876            .param("to", "host2");
877
878        let key2 = CacheKey::new("attack_paths")
879            .param("to", "host2")
880            .param("from", "host1"); // Different order
881
882        assert_eq!(key1, key2);
883        assert_eq!(key1.hash, key2.hash);
884    }
885
886    #[test]
887    fn test_result_cache_basic() {
888        let mut cache = ResultCache::new(1024 * 1024); // 1MB
889
890        let key = CacheKey::new("test_query").param("id", "123");
891        let data = vec![1, 2, 3, 4, 5];
892
893        cache.insert(key.clone(), data.clone(), CachePolicy::default());
894
895        let result = cache.get(&key);
896        assert_eq!(result, Some(data));
897        assert_eq!(cache.stats().hits, 1);
898    }
899
900    #[test]
901    fn test_cache_expiration() {
902        let mut cache = ResultCache::new(1024 * 1024);
903
904        let key = CacheKey::new("test");
905        let data = vec![1, 2, 3];
906
907        // Very short TTL
908        cache.insert(
909            key.clone(),
910            data,
911            CachePolicy::default().ttl(Duration::from_millis(1)),
912        );
913
914        // Wait for expiration
915        std::thread::sleep(Duration::from_millis(10));
916
917        assert!(cache.get(&key).is_none());
918        assert_eq!(cache.stats().expirations, 1);
919    }
920
921    #[test]
922    fn test_dependency_invalidation() {
923        let mut cache = ResultCache::new(1024 * 1024);
924
925        let key = CacheKey::new("host_query");
926        cache.insert(
927            key.clone(),
928            vec![1, 2, 3],
929            CachePolicy::default().depends_on(&["hosts"]),
930        );
931
932        assert!(cache.contains(&key));
933
934        // Invalidate hosts table
935        cache.invalidate_by_dependency("hosts");
936
937        assert!(!cache.contains(&key));
938        assert_eq!(cache.stats().invalidations, 1);
939    }
940
941    #[test]
942    fn test_memory_eviction() {
943        let mut cache = ResultCache::new(100); // Very small
944
945        // Insert enough to trigger eviction
946        for i in 0..10 {
947            let key = CacheKey::new("query").param("i", i.to_string());
948            cache.insert(key, vec![0u8; 20], CachePolicy::default());
949        }
950
951        // Should have evicted some entries
952        assert!(cache.stats().evictions > 0);
953        assert!(cache.stats().memory_bytes <= 100);
954    }
955
956    #[test]
957    fn test_materialized_view() {
958        let mut cache = MaterializedViewCache::new();
959
960        cache.register(MaterializedViewDef {
961            name: "active_hosts".to_string(),
962            query: "SELECT * FROM hosts WHERE status = 'active'".to_string(),
963            dependencies: vec!["hosts".to_string()],
964            refresh: RefreshPolicy::OnChange,
965            retention_duration_ms: None,
966        });
967
968        // Initially stale
969        assert!(cache.needs_refresh("active_hosts"));
970
971        // Refresh it
972        cache.refresh("active_hosts", vec![1, 2, 3]);
973        assert!(!cache.needs_refresh("active_hosts"));
974        assert_eq!(cache.get("active_hosts"), Some(&[1, 2, 3][..]));
975
976        // Mark stale due to dependency
977        cache.mark_stale("hosts");
978        assert!(cache.needs_refresh("active_hosts"));
979    }
980}