Skip to main content

oxirs_arq/cache/
invalidation_engine.rs

1//! Cache Invalidation Engine
2//!
3//! Core system for tracking cache dependencies and automatically invalidating stale entries
4//! when RDF updates occur. Provides multiple invalidation strategies with <1% overhead target.
5
6use crate::algebra::TriplePattern;
7use anyhow::Result;
8use dashmap::DashMap;
9use scirs2_core::metrics::{Counter, Histogram, Timer};
10use serde::{Deserialize, Serialize};
11use std::collections::{HashSet, VecDeque};
12use std::hash::{Hash, Hasher};
13use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
14use std::sync::{Arc, RwLock};
15use std::time::{Duration, Instant};
16
17/// Cache invalidation strategy
18#[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize)]
19pub enum InvalidationStrategy {
20    /// Invalidate immediately when updates occur (safest, highest overhead)
21    Immediate,
22    /// Buffer invalidations and flush periodically (balanced)
23    Batched {
24        /// Batch size before flush
25        batch_size: usize,
26        /// Maximum time before flush (milliseconds)
27        max_delay_ms: u64,
28    },
29    /// Use Bloom filter for fast "may be affected" check
30    BloomFilter {
31        /// Expected number of elements
32        expected_elements: usize,
33        /// False positive rate (0.0 to 1.0)
34        false_positive_rate: f64,
35    },
36    /// Invalidate only if re-execution cost > invalidation cost
37    CostBased {
38        /// Threshold ratio: invalidate if cost_ratio > threshold
39        threshold: f64,
40    },
41}
42
43impl Default for InvalidationStrategy {
44    fn default() -> Self {
45        Self::Batched {
46            batch_size: 100,
47            max_delay_ms: 50,
48        }
49    }
50}
51
52/// Triple pattern hash for efficient lookup
53#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
54pub struct TriplePatternHash(u64);
55
56impl TriplePatternHash {
57    /// Create from triple pattern
58    pub fn from_pattern(pattern: &TriplePattern) -> Self {
59        use std::collections::hash_map::DefaultHasher;
60        let mut hasher = DefaultHasher::new();
61        pattern.hash(&mut hasher);
62        Self(hasher.finish())
63    }
64
65    /// Get hash value
66    pub fn value(&self) -> u64 {
67        self.0
68    }
69}
70
71/// Cache key identifier
72pub type CacheKey = String;
73
74/// Cache entry metadata with TTL support
75#[derive(Debug, Clone)]
76pub struct CacheEntryMetadata {
77    /// Timestamp when entry was created
78    pub created_at: Instant,
79    /// Time-to-live duration (None = no expiration)
80    pub ttl: Option<Duration>,
81    /// Triple pattern dependencies
82    pub dependencies: HashSet<TriplePatternHash>,
83}
84
85impl CacheEntryMetadata {
86    /// Check if entry has expired based on TTL
87    pub fn is_expired(&self) -> bool {
88        if let Some(ttl) = self.ttl {
89            self.created_at.elapsed() >= ttl
90        } else {
91            false
92        }
93    }
94
95    /// Get remaining time to live
96    pub fn remaining_ttl(&self) -> Option<Duration> {
97        self.ttl.and_then(|ttl| {
98            let elapsed = self.created_at.elapsed();
99            if elapsed < ttl {
100                Some(ttl - elapsed)
101            } else {
102                None
103            }
104        })
105    }
106}
107
108/// Dependency graph tracking which cache entries depend on which triple patterns
109#[derive(Debug, Clone)]
110pub struct DependencyGraph {
111    /// Map: TriplePattern → Set of cache entries that depend on it
112    pattern_to_entries: Arc<DashMap<TriplePatternHash, HashSet<CacheKey>>>,
113    /// Map: CacheEntry → Metadata (dependencies + TTL)
114    entry_metadata: Arc<DashMap<CacheKey, CacheEntryMetadata>>,
115    /// Statistics
116    stats: Arc<DependencyGraphStats>,
117}
118
119#[derive(Debug, Default)]
120struct DependencyGraphStats {
121    /// Total patterns tracked
122    pattern_count: AtomicUsize,
123    /// Total cache entries tracked
124    entry_count: AtomicUsize,
125    /// Total edges in bipartite graph
126    edge_count: AtomicUsize,
127    /// Average dependencies per entry
128    avg_deps_per_entry: AtomicU64,
129}
130
131impl DependencyGraph {
132    /// Create new dependency graph
133    pub fn new() -> Self {
134        Self {
135            pattern_to_entries: Arc::new(DashMap::new()),
136            entry_metadata: Arc::new(DashMap::new()),
137            stats: Arc::new(DependencyGraphStats::default()),
138        }
139    }
140
141    /// Register dependencies for a cache entry with optional TTL
142    pub fn register_dependencies(
143        &self,
144        cache_key: CacheKey,
145        patterns: Vec<TriplePattern>,
146    ) -> Result<()> {
147        self.register_dependencies_with_ttl(cache_key, patterns, None)
148    }
149
150    /// Register dependencies for a cache entry with TTL
151    pub fn register_dependencies_with_ttl(
152        &self,
153        cache_key: CacheKey,
154        patterns: Vec<TriplePattern>,
155        ttl: Option<Duration>,
156    ) -> Result<()> {
157        if patterns.is_empty() {
158            return Ok(());
159        }
160
161        let pattern_hashes: HashSet<TriplePatternHash> = patterns
162            .iter()
163            .map(TriplePatternHash::from_pattern)
164            .collect();
165
166        // Create metadata
167        let metadata = CacheEntryMetadata {
168            created_at: Instant::now(),
169            ttl,
170            dependencies: pattern_hashes.clone(),
171        };
172
173        // Update entry metadata
174        let is_new_entry = !self.entry_metadata.contains_key(&cache_key);
175        self.entry_metadata.insert(cache_key.clone(), metadata);
176
177        // Update pattern → entries mapping
178        for pattern_hash in &pattern_hashes {
179            self.pattern_to_entries
180                .entry(*pattern_hash)
181                .or_default()
182                .insert(cache_key.clone());
183        }
184
185        // Update statistics
186        if is_new_entry {
187            self.stats.entry_count.fetch_add(1, Ordering::Relaxed);
188        }
189        self.stats
190            .edge_count
191            .fetch_add(pattern_hashes.len(), Ordering::Relaxed);
192        self.update_avg_deps();
193
194        Ok(())
195    }
196
197    /// Remove a cache entry and its dependencies
198    pub fn remove_entry(&self, cache_key: &CacheKey) -> Result<()> {
199        // Get metadata for this entry
200        if let Some((_, metadata)) = self.entry_metadata.remove(cache_key) {
201            // Remove entry from all pattern mappings
202            for pattern_hash in &metadata.dependencies {
203                if let Some(mut entries) = self.pattern_to_entries.get_mut(pattern_hash) {
204                    entries.remove(cache_key);
205                    if entries.is_empty() {
206                        drop(entries);
207                        self.pattern_to_entries.remove(pattern_hash);
208                        // Use fetch_update to prevent underflow
209                        let _ = self.stats.pattern_count.fetch_update(
210                            Ordering::Relaxed,
211                            Ordering::Relaxed,
212                            |val| Some(val.saturating_sub(1)),
213                        );
214                    }
215                }
216            }
217
218            // Use fetch_update to prevent underflow
219            let _ =
220                self.stats
221                    .entry_count
222                    .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |val| {
223                        Some(val.saturating_sub(1))
224                    });
225            let _ =
226                self.stats
227                    .edge_count
228                    .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |val| {
229                        Some(val.saturating_sub(metadata.dependencies.len()))
230                    });
231            self.update_avg_deps();
232        }
233
234        Ok(())
235    }
236
237    /// Find all expired cache entries based on TTL
238    pub fn find_expired_entries(&self) -> Vec<CacheKey> {
239        self.entry_metadata
240            .iter()
241            .filter_map(|entry| {
242                if entry.value().is_expired() {
243                    Some(entry.key().clone())
244                } else {
245                    None
246                }
247            })
248            .collect()
249    }
250
251    /// Get TTL information for a cache entry
252    pub fn get_ttl_info(&self, cache_key: &CacheKey) -> Option<(Duration, Option<Duration>)> {
253        self.entry_metadata.get(cache_key).and_then(|metadata| {
254            let elapsed = metadata.created_at.elapsed();
255            let remaining = metadata.remaining_ttl();
256            metadata.ttl.map(|_| (elapsed, remaining))
257        })
258    }
259
260    /// Find all cache entries affected by a triple pattern
261    pub fn find_affected_entries(&self, pattern: &TriplePattern) -> HashSet<CacheKey> {
262        let pattern_hash = TriplePatternHash::from_pattern(pattern);
263
264        // Check exact match
265        let mut affected = self
266            .pattern_to_entries
267            .get(&pattern_hash)
268            .map(|entries| entries.clone())
269            .unwrap_or_default();
270
271        // Check for pattern subsumption (pattern with variables can match multiple)
272        // For now, use simple matching; can be enhanced with more sophisticated logic
273        for entry in self.pattern_to_entries.iter() {
274            if self.pattern_matches(*entry.key(), pattern) {
275                affected.extend(entry.value().iter().cloned());
276            }
277        }
278
279        affected
280    }
281
282    /// Check if a pattern matches (considering variables)
283    fn pattern_matches(
284        &self,
285        stored_hash: TriplePatternHash,
286        query_pattern: &TriplePattern,
287    ) -> bool {
288        // This is a simplified version
289        // In practice, you'd need to reconstruct the pattern or store metadata
290        // For now, we use exact hash matching
291        stored_hash == TriplePatternHash::from_pattern(query_pattern)
292    }
293
294    /// Get statistics
295    pub fn statistics(&self) -> DependencyGraphStatistics {
296        DependencyGraphStatistics {
297            pattern_count: self.stats.pattern_count.load(Ordering::Relaxed),
298            entry_count: self.stats.entry_count.load(Ordering::Relaxed),
299            edge_count: self.stats.edge_count.load(Ordering::Relaxed),
300            avg_deps_per_entry: f64::from_bits(
301                self.stats.avg_deps_per_entry.load(Ordering::Relaxed),
302            ),
303        }
304    }
305
306    /// Update average dependencies per entry
307    fn update_avg_deps(&self) {
308        let entries = self.stats.entry_count.load(Ordering::Relaxed);
309        if entries > 0 {
310            let edges = self.stats.edge_count.load(Ordering::Relaxed);
311            let avg = edges as f64 / entries as f64;
312            self.stats
313                .avg_deps_per_entry
314                .store(avg.to_bits(), Ordering::Relaxed);
315        }
316    }
317
318    /// Clear all dependencies
319    pub fn clear(&self) {
320        self.pattern_to_entries.clear();
321        self.entry_metadata.clear();
322        self.stats.pattern_count.store(0, Ordering::Relaxed);
323        self.stats.entry_count.store(0, Ordering::Relaxed);
324        self.stats.edge_count.store(0, Ordering::Relaxed);
325        self.stats.avg_deps_per_entry.store(0, Ordering::Relaxed);
326    }
327
328    /// Get memory usage estimate (bytes)
329    pub fn memory_usage(&self) -> usize {
330        let pattern_count = self.stats.pattern_count.load(Ordering::Relaxed);
331        let entry_count = self.stats.entry_count.load(Ordering::Relaxed);
332        let edge_count = self.stats.edge_count.load(Ordering::Relaxed);
333
334        // Rough estimate:
335        // - 24 bytes per HashMap entry (key + value pointer)
336        // - 8 bytes per hash
337        // - 40 bytes per String (average cache key)
338        // Use saturating arithmetic to prevent overflow
339        pattern_count
340            .saturating_mul(24)
341            .saturating_add(entry_count.saturating_mul(24))
342            .saturating_add(edge_count.saturating_mul(48))
343    }
344}
345
346impl Default for DependencyGraph {
347    fn default() -> Self {
348        Self::new()
349    }
350}
351
352/// Statistics snapshot
353#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
354pub struct DependencyGraphStatistics {
355    pub pattern_count: usize,
356    pub entry_count: usize,
357    pub edge_count: usize,
358    pub avg_deps_per_entry: f64,
359}
360
361/// Bloom filter for efficient pattern matching
362struct BloomFilter {
363    bits: Vec<AtomicU64>,
364    num_hash_functions: usize,
365    bit_count: usize,
366}
367
368impl BloomFilter {
369    /// Create new Bloom filter
370    fn new(expected_elements: usize, false_positive_rate: f64) -> Self {
371        // Calculate optimal bit count and hash functions
372        let m = Self::optimal_bit_count(expected_elements, false_positive_rate);
373        let k = Self::optimal_hash_count(expected_elements, m);
374
375        let num_u64s = (m + 63) / 64;
376        let bits = (0..num_u64s).map(|_| AtomicU64::new(0)).collect();
377
378        Self {
379            bits,
380            num_hash_functions: k,
381            bit_count: m,
382        }
383    }
384
385    /// Calculate optimal bit count
386    fn optimal_bit_count(n: usize, p: f64) -> usize {
387        let ln2_squared = std::f64::consts::LN_2 * std::f64::consts::LN_2;
388        (-(n as f64 * p.ln()) / ln2_squared).ceil() as usize
389    }
390
391    /// Calculate optimal hash function count
392    fn optimal_hash_count(n: usize, m: usize) -> usize {
393        ((m as f64 / n as f64) * std::f64::consts::LN_2).ceil() as usize
394    }
395
396    /// Add pattern to filter
397    fn add(&self, pattern_hash: TriplePatternHash) {
398        for i in 0..self.num_hash_functions {
399            let bit_index = self.hash_i(pattern_hash, i) % self.bit_count;
400            let word_index = bit_index / 64;
401            let bit_offset = bit_index % 64;
402            self.bits[word_index].fetch_or(1u64 << bit_offset, Ordering::Relaxed);
403        }
404    }
405
406    /// Check if pattern might be in set
407    fn might_contain(&self, pattern_hash: TriplePatternHash) -> bool {
408        for i in 0..self.num_hash_functions {
409            let bit_index = self.hash_i(pattern_hash, i) % self.bit_count;
410            let word_index = bit_index / 64;
411            let bit_offset = bit_index % 64;
412            let word = self.bits[word_index].load(Ordering::Relaxed);
413            if (word & (1u64 << bit_offset)) == 0 {
414                return false;
415            }
416        }
417        true
418    }
419
420    /// Hash function with index
421    fn hash_i(&self, pattern_hash: TriplePatternHash, i: usize) -> usize {
422        // Simple double hashing
423        let h1 = pattern_hash.value() as usize;
424        let h2 = (pattern_hash.value().wrapping_mul(2654435761)) as usize;
425        h1.wrapping_add(i.wrapping_mul(h2))
426    }
427
428    /// Clear filter
429    fn clear(&self) {
430        for word in &self.bits {
431            word.store(0, Ordering::Relaxed);
432        }
433    }
434}
435
436/// Batch of pending invalidations
437#[derive(Debug)]
438struct InvalidationBatch {
439    entries: Vec<CacheKey>,
440    timestamp: Instant,
441}
442
443/// Core invalidation engine
444pub struct InvalidationEngine {
445    /// Dependency graph
446    dependency_graph: DependencyGraph,
447    /// Invalidation strategy
448    strategy: InvalidationStrategy,
449    /// Bloom filter (if using BloomFilter strategy)
450    bloom_filter: Option<Arc<BloomFilter>>,
451    /// Pending invalidations (for batched strategy)
452    pending_invalidations: Arc<RwLock<VecDeque<InvalidationBatch>>>,
453    /// Metrics
454    metrics: InvalidationMetrics,
455    /// Configuration
456    config: InvalidationConfig,
457}
458
459#[derive(Clone)]
460struct InvalidationMetrics {
461    /// Total invalidations triggered
462    total_invalidations: Arc<Counter>,
463    /// Time spent in invalidation
464    invalidation_time: Arc<Timer>,
465    /// Invalidation overhead ratio
466    overhead_ratio: Arc<Histogram>,
467    /// Cache entries invalidated per update
468    entries_per_update: Arc<Histogram>,
469    /// TTL-based evictions
470    ttl_evictions: Arc<Counter>,
471    /// Time spent in TTL cleanup
472    ttl_cleanup_time: Arc<Timer>,
473}
474
475impl InvalidationMetrics {
476    fn new() -> Self {
477        Self {
478            total_invalidations: Arc::new(Counter::new("invalidation_total".to_string())),
479            invalidation_time: Arc::new(Timer::new("invalidation_time".to_string())),
480            overhead_ratio: Arc::new(Histogram::new("invalidation_overhead".to_string())),
481            entries_per_update: Arc::new(Histogram::new(
482                "invalidation_entries_per_update".to_string(),
483            )),
484            ttl_evictions: Arc::new(Counter::new("invalidation_ttl_evictions".to_string())),
485            ttl_cleanup_time: Arc::new(Timer::new("invalidation_ttl_cleanup_time".to_string())),
486        }
487    }
488}
489
490#[derive(Debug, Clone, Serialize, Deserialize)]
491pub struct InvalidationConfig {
492    /// Enable metrics tracking
493    pub enable_metrics: bool,
494    /// Maximum pending batch size
495    pub max_pending_batches: usize,
496    /// Enable aggressive pattern matching
497    pub aggressive_matching: bool,
498    /// Default TTL for cache entries (None = no expiration)
499    pub default_ttl: Option<Duration>,
500    /// Enable automatic TTL-based cleanup
501    pub enable_ttl_cleanup: bool,
502    /// TTL cleanup interval in seconds
503    pub ttl_cleanup_interval_secs: u64,
504}
505
506impl Default for InvalidationConfig {
507    fn default() -> Self {
508        Self {
509            enable_metrics: true,
510            max_pending_batches: 100,
511            aggressive_matching: false,
512            default_ttl: Some(Duration::from_secs(3600)), // 1 hour default
513            enable_ttl_cleanup: true,
514            ttl_cleanup_interval_secs: 300, // 5 minutes
515        }
516    }
517}
518
519impl InvalidationEngine {
520    /// Create new invalidation engine
521    pub fn new(strategy: InvalidationStrategy) -> Self {
522        Self::with_config(strategy, InvalidationConfig::default())
523    }
524
525    /// Create with configuration
526    pub fn with_config(strategy: InvalidationStrategy, config: InvalidationConfig) -> Self {
527        let bloom_filter = match strategy {
528            InvalidationStrategy::BloomFilter {
529                expected_elements,
530                false_positive_rate,
531            } => Some(Arc::new(BloomFilter::new(
532                expected_elements,
533                false_positive_rate,
534            ))),
535            _ => None,
536        };
537
538        Self {
539            dependency_graph: DependencyGraph::new(),
540            strategy,
541            bloom_filter,
542            pending_invalidations: Arc::new(RwLock::new(VecDeque::new())),
543            metrics: InvalidationMetrics::new(),
544            config,
545        }
546    }
547
548    /// Register dependencies for a cache entry with default TTL
549    pub fn register_dependencies(
550        &self,
551        cache_key: CacheKey,
552        patterns: Vec<TriplePattern>,
553    ) -> Result<()> {
554        let ttl = self.config.default_ttl;
555        self.register_dependencies_with_ttl(cache_key, patterns, ttl)
556    }
557
558    /// Register dependencies for a cache entry with custom TTL
559    pub fn register_dependencies_with_ttl(
560        &self,
561        cache_key: CacheKey,
562        patterns: Vec<TriplePattern>,
563        ttl: Option<Duration>,
564    ) -> Result<()> {
565        // Add to dependency graph with TTL
566        self.dependency_graph
567            .register_dependencies_with_ttl(cache_key, patterns.clone(), ttl)?;
568
569        // Add to bloom filter if using that strategy
570        if let Some(bloom) = &self.bloom_filter {
571            for pattern in &patterns {
572                bloom.add(TriplePatternHash::from_pattern(pattern));
573            }
574        }
575
576        Ok(())
577    }
578
579    /// Clean up expired cache entries based on TTL
580    pub fn cleanup_expired<F>(&self, mut invalidate_fn: F) -> Result<usize>
581    where
582        F: FnMut(&CacheKey) -> Result<()>,
583    {
584        if !self.config.enable_ttl_cleanup {
585            return Ok(0);
586        }
587
588        let start_time = Instant::now();
589        let expired_entries = self.dependency_graph.find_expired_entries();
590        let expired_count = expired_entries.len();
591
592        // Invalidate expired entries
593        for cache_key in &expired_entries {
594            invalidate_fn(cache_key)?;
595            self.dependency_graph.remove_entry(cache_key)?;
596        }
597
598        // Track metrics
599        if self.config.enable_metrics {
600            let elapsed = start_time.elapsed();
601            self.metrics.ttl_cleanup_time.observe(elapsed);
602            self.metrics.ttl_evictions.add(expired_count as u64);
603        }
604
605        Ok(expired_count)
606    }
607
608    /// Start background TTL cleanup task
609    pub fn start_ttl_cleanup_task<F>(&self, invalidate_fn: F)
610    where
611        F: Fn(&CacheKey) -> Result<()> + Send + Sync + 'static,
612    {
613        if !self.config.enable_ttl_cleanup {
614            return;
615        }
616
617        let engine_clone = self.clone();
618        let interval_secs = self.config.ttl_cleanup_interval_secs;
619        let invalidate_fn = Arc::new(invalidate_fn);
620
621        std::thread::spawn(move || loop {
622            std::thread::sleep(Duration::from_secs(interval_secs));
623
624            let fn_clone = Arc::clone(&invalidate_fn);
625            if let Ok(count) = engine_clone.cleanup_expired(|key| fn_clone(key)) {
626                if count > 0 {
627                    tracing::debug!("TTL cleanup removed {} expired cache entries", count);
628                }
629            }
630        });
631    }
632
633    /// Remove cache entry and its dependencies
634    pub fn remove_entry(&self, cache_key: &CacheKey) -> Result<()> {
635        self.dependency_graph.remove_entry(cache_key)
636    }
637
638    /// Find entries that should be invalidated due to a triple update
639    pub fn find_affected_entries(&self, triple: &TriplePattern) -> Result<HashSet<CacheKey>> {
640        let start_time = Instant::now();
641
642        let affected = match self.strategy {
643            InvalidationStrategy::BloomFilter { .. } => {
644                // Use Bloom filter for fast check
645                if let Some(bloom) = &self.bloom_filter {
646                    let pattern_hash = TriplePatternHash::from_pattern(triple);
647                    if bloom.might_contain(pattern_hash) {
648                        self.dependency_graph.find_affected_entries(triple)
649                    } else {
650                        HashSet::new()
651                    }
652                } else {
653                    self.dependency_graph.find_affected_entries(triple)
654                }
655            }
656            _ => self.dependency_graph.find_affected_entries(triple),
657        };
658
659        // Track metrics
660        if self.config.enable_metrics {
661            let elapsed = start_time.elapsed();
662            self.metrics.invalidation_time.observe(elapsed);
663            self.metrics
664                .entries_per_update
665                .observe(affected.len() as f64);
666        }
667
668        Ok(affected)
669    }
670
671    /// Invalidate cache entries (strategy-dependent)
672    pub fn invalidate<F>(&self, triple: &TriplePattern, mut invalidate_fn: F) -> Result<()>
673    where
674        F: FnMut(&CacheKey) -> Result<()>,
675    {
676        let affected = self.find_affected_entries(triple)?;
677        let affected_count = affected.len();
678
679        match self.strategy {
680            InvalidationStrategy::Immediate => {
681                // Invalidate immediately
682                for cache_key in &affected {
683                    invalidate_fn(cache_key)?;
684                    self.dependency_graph.remove_entry(cache_key)?;
685                }
686            }
687            InvalidationStrategy::Batched {
688                batch_size,
689                max_delay_ms,
690            } => {
691                // Add to batch
692                self.add_to_batch(affected, batch_size, max_delay_ms, &mut invalidate_fn)?;
693            }
694            InvalidationStrategy::BloomFilter { .. } => {
695                // Same as immediate for actual invalidation
696                for cache_key in &affected {
697                    invalidate_fn(cache_key)?;
698                    self.dependency_graph.remove_entry(cache_key)?;
699                }
700            }
701            InvalidationStrategy::CostBased { threshold } => {
702                // Only invalidate if beneficial
703                for cache_key in &affected {
704                    if self.should_invalidate_cost_based(cache_key, threshold)? {
705                        invalidate_fn(cache_key)?;
706                        self.dependency_graph.remove_entry(cache_key)?;
707                    }
708                }
709            }
710        }
711
712        // Update metrics
713        if self.config.enable_metrics {
714            self.metrics.total_invalidations.add(affected_count as u64);
715        }
716
717        Ok(())
718    }
719
720    /// Add entries to batch for later invalidation
721    fn add_to_batch<F>(
722        &self,
723        entries: HashSet<CacheKey>,
724        batch_size: usize,
725        max_delay_ms: u64,
726        invalidate_fn: &mut F,
727    ) -> Result<()>
728    where
729        F: FnMut(&CacheKey) -> Result<()>,
730    {
731        let mut pending = self
732            .pending_invalidations
733            .write()
734            .map_err(|e| anyhow::anyhow!("Lock poisoned: {}", e))?;
735
736        // Add new batch
737        pending.push_back(InvalidationBatch {
738            entries: entries.into_iter().collect(),
739            timestamp: Instant::now(),
740        });
741
742        // Flush if batch size exceeded or max delay reached
743        let should_flush = pending.len() >= batch_size
744            || pending
745                .front()
746                .map(|b| b.timestamp.elapsed().as_millis() as u64 >= max_delay_ms)
747                .unwrap_or(false);
748
749        if should_flush {
750            self.flush_batches(&mut pending, invalidate_fn)?;
751        }
752
753        Ok(())
754    }
755
756    /// Flush pending invalidation batches
757    fn flush_batches<F>(
758        &self,
759        pending: &mut VecDeque<InvalidationBatch>,
760        invalidate_fn: &mut F,
761    ) -> Result<()>
762    where
763        F: FnMut(&CacheKey) -> Result<()>,
764    {
765        while let Some(batch) = pending.pop_front() {
766            for cache_key in &batch.entries {
767                invalidate_fn(cache_key)?;
768                self.dependency_graph.remove_entry(cache_key)?;
769            }
770        }
771        Ok(())
772    }
773
774    /// Force flush all pending invalidations
775    pub fn flush_pending<F>(&self, mut invalidate_fn: F) -> Result<()>
776    where
777        F: FnMut(&CacheKey) -> Result<()>,
778    {
779        let mut pending = self
780            .pending_invalidations
781            .write()
782            .map_err(|e| anyhow::anyhow!("Lock poisoned: {}", e))?;
783        self.flush_batches(&mut pending, &mut invalidate_fn)
784    }
785
786    /// Check if entry should be invalidated (cost-based strategy)
787    fn should_invalidate_cost_based(&self, _cache_key: &CacheKey, _threshold: f64) -> Result<bool> {
788        // Simplified: In practice, you'd compare:
789        // - Cost of re-executing query
790        // - Cost of invalidating and warming cache
791        // For now, always invalidate (conservative)
792        Ok(true)
793    }
794
795    /// Get invalidation statistics
796    pub fn statistics(&self) -> InvalidationStatistics {
797        let graph_stats = self.dependency_graph.statistics();
798        let time_stats = self.metrics.invalidation_time.get_stats();
799        let overhead_stats = self.metrics.overhead_ratio.get_stats();
800        let entries_stats = self.metrics.entries_per_update.get_stats();
801        let ttl_cleanup_stats = self.metrics.ttl_cleanup_time.get_stats();
802
803        InvalidationStatistics {
804            strategy: self.strategy,
805            total_invalidations: self.metrics.total_invalidations.get(),
806            avg_invalidation_time_us: time_stats.mean,
807            overhead_ratio: overhead_stats.mean,
808            avg_entries_per_update: entries_stats.mean,
809            ttl_evictions: self.metrics.ttl_evictions.get(),
810            avg_ttl_cleanup_time_us: ttl_cleanup_stats.mean,
811            dependency_graph: graph_stats,
812            memory_usage_bytes: self.dependency_graph.memory_usage(),
813        }
814    }
815
816    /// Clear all state
817    pub fn clear(&self) -> Result<()> {
818        self.dependency_graph.clear();
819        if let Some(bloom) = &self.bloom_filter {
820            bloom.clear();
821        }
822        let mut pending = self
823            .pending_invalidations
824            .write()
825            .map_err(|e| anyhow::anyhow!("Lock poisoned: {}", e))?;
826        pending.clear();
827        Ok(())
828    }
829}
830
831/// Invalidation statistics
832#[derive(Debug, Clone, Serialize, Deserialize)]
833pub struct InvalidationStatistics {
834    pub strategy: InvalidationStrategy,
835    pub total_invalidations: u64,
836    pub avg_invalidation_time_us: f64,
837    pub overhead_ratio: f64,
838    pub avg_entries_per_update: f64,
839    pub ttl_evictions: u64,
840    pub avg_ttl_cleanup_time_us: f64,
841    pub dependency_graph: DependencyGraphStatistics,
842    pub memory_usage_bytes: usize,
843}
844
845impl Clone for InvalidationEngine {
846    fn clone(&self) -> Self {
847        Self {
848            dependency_graph: self.dependency_graph.clone(),
849            strategy: self.strategy,
850            bloom_filter: self.bloom_filter.clone(),
851            pending_invalidations: Arc::new(RwLock::new(VecDeque::new())),
852            metrics: self.metrics.clone(),
853            config: self.config.clone(),
854        }
855    }
856}
857
858/// RDF update listener trait
859pub trait RdfUpdateListener {
860    /// Called when an RDF triple is inserted
861    fn on_insert(&mut self, triple: &TriplePattern) -> Result<()>;
862
863    /// Called when an RDF triple is deleted
864    fn on_delete(&mut self, triple: &TriplePattern) -> Result<()>;
865
866    /// Called when multiple triples are inserted
867    fn on_batch_insert(&mut self, triples: &[TriplePattern]) -> Result<()> {
868        for triple in triples {
869            self.on_insert(triple)?;
870        }
871        Ok(())
872    }
873
874    /// Called when multiple triples are deleted
875    fn on_batch_delete(&mut self, triples: &[TriplePattern]) -> Result<()> {
876        for triple in triples {
877            self.on_delete(triple)?;
878        }
879        Ok(())
880    }
881}
882
883#[cfg(test)]
884mod tests {
885    use super::*;
886    use crate::algebra::{Term, Variable};
887
888    fn create_test_pattern(s: &str, p: &str, o: &str) -> TriplePattern {
889        TriplePattern {
890            subject: Term::Variable(Variable::new(s).expect("valid variable")),
891            predicate: Term::Variable(Variable::new(p).expect("valid variable")),
892            object: Term::Variable(Variable::new(o).expect("valid variable")),
893        }
894    }
895
896    #[test]
897    fn test_dependency_graph_basic() {
898        let graph = DependencyGraph::new();
899
900        let pattern1 = create_test_pattern("s", "p", "o");
901        let pattern2 = create_test_pattern("x", "y", "z");
902
903        graph
904            .register_dependencies("key1".to_string(), vec![pattern1.clone(), pattern2.clone()])
905            .unwrap();
906
907        let stats = graph.statistics();
908        assert_eq!(stats.entry_count, 1);
909        assert_eq!(stats.edge_count, 2);
910    }
911
912    #[test]
913    fn test_invalidation_engine_immediate() {
914        let engine = InvalidationEngine::new(InvalidationStrategy::Immediate);
915
916        let pattern = create_test_pattern("s", "p", "o");
917        engine
918            .register_dependencies("key1".to_string(), vec![pattern.clone()])
919            .unwrap();
920
921        let affected = engine.find_affected_entries(&pattern).unwrap();
922        assert_eq!(affected.len(), 1);
923        assert!(affected.contains("key1"));
924    }
925
926    #[test]
927    fn test_invalidation_engine_batched() {
928        let engine = InvalidationEngine::new(InvalidationStrategy::Batched {
929            batch_size: 10,
930            max_delay_ms: 100,
931        });
932
933        let pattern = create_test_pattern("s", "p", "o");
934        engine
935            .register_dependencies("key1".to_string(), vec![pattern.clone()])
936            .unwrap();
937
938        let mut invalidated = Vec::new();
939        engine
940            .invalidate(&pattern, |key| {
941                invalidated.push(key.clone());
942                Ok(())
943            })
944            .unwrap();
945
946        // Force flush
947        engine
948            .flush_pending(|key| {
949                invalidated.push(key.clone());
950                Ok(())
951            })
952            .unwrap();
953
954        // Should have invalidated key1 (possibly twice if batched)
955        assert!(!invalidated.is_empty());
956    }
957
958    #[test]
959    fn test_bloom_filter() {
960        let filter = BloomFilter::new(1000, 0.01);
961
962        let pattern = create_test_pattern("s", "p", "o");
963        let hash = TriplePatternHash::from_pattern(&pattern);
964
965        // Should not contain before adding
966        assert!(!filter.might_contain(hash));
967
968        // Add and check
969        filter.add(hash);
970        assert!(filter.might_contain(hash));
971    }
972
973    #[test]
974    fn test_remove_entry() {
975        let graph = DependencyGraph::new();
976
977        let pattern = create_test_pattern("s", "p", "o");
978        graph
979            .register_dependencies("key1".to_string(), vec![pattern.clone()])
980            .unwrap();
981
982        assert_eq!(graph.statistics().entry_count, 1);
983
984        graph.remove_entry(&"key1".to_string()).unwrap();
985
986        assert_eq!(graph.statistics().entry_count, 0);
987    }
988
989    #[test]
990    fn test_multiple_dependencies() {
991        let engine = InvalidationEngine::new(InvalidationStrategy::Immediate);
992
993        let pattern1 = create_test_pattern("s", "p", "o");
994        let pattern2 = create_test_pattern("x", "y", "z");
995
996        engine
997            .register_dependencies("key1".to_string(), vec![pattern1.clone()])
998            .unwrap();
999        engine
1000            .register_dependencies("key2".to_string(), vec![pattern1.clone(), pattern2.clone()])
1001            .unwrap();
1002
1003        // Update pattern1 should affect both entries
1004        let affected = engine.find_affected_entries(&pattern1).unwrap();
1005        assert_eq!(affected.len(), 2);
1006
1007        // Update pattern2 should only affect key2
1008        let affected2 = engine.find_affected_entries(&pattern2).unwrap();
1009        assert_eq!(affected2.len(), 1);
1010        assert!(affected2.contains("key2"));
1011    }
1012
1013    #[test]
1014    fn test_ttl_registration() {
1015        let config = InvalidationConfig {
1016            default_ttl: Some(Duration::from_secs(10)),
1017            ..Default::default()
1018        };
1019        let engine = InvalidationEngine::with_config(InvalidationStrategy::Immediate, config);
1020
1021        let pattern = create_test_pattern("s", "p", "o");
1022
1023        // Register with default TTL
1024        engine
1025            .register_dependencies("key1".to_string(), vec![pattern.clone()])
1026            .unwrap();
1027
1028        // Register with custom TTL
1029        engine
1030            .register_dependencies_with_ttl(
1031                "key2".to_string(),
1032                vec![pattern.clone()],
1033                Some(Duration::from_secs(5)),
1034            )
1035            .unwrap();
1036
1037        // Both entries should exist
1038        let stats = engine.statistics();
1039        assert_eq!(stats.dependency_graph.entry_count, 2);
1040    }
1041
1042    #[test]
1043    fn test_ttl_expiration() {
1044        let config = InvalidationConfig {
1045            default_ttl: Some(Duration::from_millis(100)),
1046            enable_ttl_cleanup: true,
1047            ..Default::default()
1048        };
1049        let engine = InvalidationEngine::with_config(InvalidationStrategy::Immediate, config);
1050
1051        let pattern = create_test_pattern("s", "p", "o");
1052
1053        // Register entry with short TTL
1054        engine
1055            .register_dependencies_with_ttl(
1056                "key1".to_string(),
1057                vec![pattern.clone()],
1058                Some(Duration::from_millis(50)),
1059            )
1060            .unwrap();
1061
1062        // Entry should exist initially
1063        let expired = engine.dependency_graph.find_expired_entries();
1064        assert_eq!(expired.len(), 0);
1065
1066        // Wait for expiration
1067        std::thread::sleep(Duration::from_millis(100));
1068
1069        // Entry should be expired now
1070        let expired = engine.dependency_graph.find_expired_entries();
1071        assert_eq!(expired.len(), 1);
1072        assert!(expired.contains(&"key1".to_string()));
1073    }
1074
1075    #[test]
1076    fn test_ttl_cleanup() {
1077        let config = InvalidationConfig {
1078            default_ttl: Some(Duration::from_millis(50)),
1079            enable_ttl_cleanup: true,
1080            ..Default::default()
1081        };
1082        let engine = InvalidationEngine::with_config(InvalidationStrategy::Immediate, config);
1083
1084        let pattern = create_test_pattern("s", "p", "o");
1085
1086        // Register multiple entries
1087        for i in 0..5 {
1088            engine
1089                .register_dependencies(format!("key{}", i), vec![pattern.clone()])
1090                .unwrap();
1091        }
1092
1093        // All entries should exist
1094        assert_eq!(engine.dependency_graph.statistics().entry_count, 5);
1095
1096        // Wait for expiration
1097        std::thread::sleep(Duration::from_millis(100));
1098
1099        // Run cleanup
1100        let mut removed_keys = Vec::new();
1101        let count = engine
1102            .cleanup_expired(|key| {
1103                removed_keys.push(key.clone());
1104                Ok(())
1105            })
1106            .unwrap();
1107
1108        // All entries should be cleaned up
1109        assert_eq!(count, 5);
1110        assert_eq!(removed_keys.len(), 5);
1111        assert_eq!(engine.dependency_graph.statistics().entry_count, 0);
1112    }
1113
1114    #[test]
1115    fn test_ttl_metadata() {
1116        let graph = DependencyGraph::new();
1117
1118        let pattern = create_test_pattern("s", "p", "o");
1119        let ttl = Duration::from_secs(60);
1120
1121        graph
1122            .register_dependencies_with_ttl("key1".to_string(), vec![pattern], Some(ttl))
1123            .unwrap();
1124
1125        // Check TTL info
1126        let ttl_info = graph.get_ttl_info(&"key1".to_string());
1127        assert!(ttl_info.is_some());
1128
1129        let (elapsed, remaining) = ttl_info.unwrap();
1130        assert!(elapsed < ttl);
1131        assert!(remaining.is_some());
1132        assert!(remaining.unwrap() <= ttl);
1133    }
1134
1135    #[test]
1136    fn test_mixed_ttl_no_ttl() {
1137        let config = InvalidationConfig {
1138            default_ttl: None,
1139            enable_ttl_cleanup: true,
1140            ..Default::default()
1141        };
1142        let engine = InvalidationEngine::with_config(InvalidationStrategy::Immediate, config);
1143
1144        let pattern = create_test_pattern("s", "p", "o");
1145
1146        // Register entry with no TTL
1147        engine
1148            .register_dependencies("key_no_ttl".to_string(), vec![pattern.clone()])
1149            .unwrap();
1150
1151        // Register entry with TTL
1152        engine
1153            .register_dependencies_with_ttl(
1154                "key_with_ttl".to_string(),
1155                vec![pattern.clone()],
1156                Some(Duration::from_millis(50)),
1157            )
1158            .unwrap();
1159
1160        // Wait for TTL expiration
1161        std::thread::sleep(Duration::from_millis(100));
1162
1163        // Only one should expire
1164        let expired = engine.dependency_graph.find_expired_entries();
1165        assert_eq!(expired.len(), 1);
1166        assert!(expired.contains(&"key_with_ttl".to_string()));
1167        assert!(!expired.contains(&"key_no_ttl".to_string()));
1168    }
1169
1170    #[test]
1171    fn test_ttl_statistics() {
1172        let config = InvalidationConfig {
1173            default_ttl: Some(Duration::from_millis(50)),
1174            enable_ttl_cleanup: true,
1175            enable_metrics: true,
1176            ..Default::default()
1177        };
1178        let engine = InvalidationEngine::with_config(InvalidationStrategy::Immediate, config);
1179
1180        let pattern = create_test_pattern("s", "p", "o");
1181
1182        // Register entries
1183        for i in 0..3 {
1184            engine
1185                .register_dependencies(format!("key{}", i), vec![pattern.clone()])
1186                .unwrap();
1187        }
1188
1189        // Wait for expiration
1190        std::thread::sleep(Duration::from_millis(100));
1191
1192        // Run cleanup
1193        let _count = engine.cleanup_expired(|_| Ok(())).unwrap();
1194
1195        // Check statistics
1196        let stats = engine.statistics();
1197        assert_eq!(stats.ttl_evictions, 3);
1198        assert!(stats.avg_ttl_cleanup_time_us > 0.0);
1199    }
1200}