Skip to main content

oxirs_arq/cache/
coordinator.rs

1//! Cache Coordinator
2//!
3//! Unified coordinator for managing 3-level cache hierarchy and ensuring
4//! cache consistency through coordinated invalidations.
5
6use super::invalidation_engine::{
7    InvalidationEngine, InvalidationStatistics, InvalidationStrategy, RdfUpdateListener,
8};
9use crate::algebra::TriplePattern;
10use crate::query_plan_cache::{CacheStats as PlanCacheStats, QueryPlanCache};
11use crate::query_result_cache::{CacheStatistics as ResultCacheStatistics, QueryResultCache};
12use anyhow::{Context, Result};
13use scirs2_core::metrics::{Counter, Timer};
14use serde::{Deserialize, Serialize};
15use std::collections::HashMap;
16use std::sync::{Arc, RwLock};
17use std::time::Instant;
18
19/// Cache level identifier
20#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
21pub enum CacheLevel {
22    /// Query result cache (L1)
23    Result,
24    /// Query plan cache (L2)
25    Plan,
26    /// Optimizer/statistics cache (L3)
27    Optimizer,
28}
29
30/// Cache invalidation configuration
31#[derive(Debug, Clone, Serialize, Deserialize)]
32pub struct InvalidationConfig {
33    /// Invalidation strategy
34    pub strategy: InvalidationStrategy,
35    /// Enable cross-level invalidation propagation
36    pub propagate_invalidations: bool,
37    /// Enable metrics collection
38    pub enable_metrics: bool,
39    /// Maximum batch size for batched strategy
40    pub max_batch_size: usize,
41    /// Flush interval for batched strategy (milliseconds)
42    pub flush_interval_ms: u64,
43}
44
45impl Default for InvalidationConfig {
46    fn default() -> Self {
47        Self {
48            strategy: InvalidationStrategy::Batched {
49                batch_size: 100,
50                max_delay_ms: 50,
51            },
52            propagate_invalidations: true,
53            enable_metrics: true,
54            max_batch_size: 1000,
55            flush_interval_ms: 50,
56        }
57    }
58}
59
60/// Unified cache coordinator
61pub struct CacheCoordinator {
62    /// Result cache
63    result_cache: Option<Arc<QueryResultCache>>,
64    /// Plan cache
65    plan_cache: Option<Arc<QueryPlanCache>>,
66    /// Optimizer cache (placeholder for now)
67    optimizer_cache: Arc<RwLock<HashMap<String, Vec<u8>>>>,
68    /// Invalidation engine
69    invalidation_engine: Arc<InvalidationEngine>,
70    /// Configuration
71    config: InvalidationConfig,
72    /// Coordinator metrics
73    metrics: CoordinatorMetrics,
74    /// Cache entry metadata
75    entry_metadata: Arc<RwLock<HashMap<String, CacheEntryMetadata>>>,
76}
77
78#[derive(Debug, Clone)]
79#[allow(dead_code)]
80struct CacheEntryMetadata {
81    level: CacheLevel,
82    cache_key: String,
83    created_at: Instant,
84    last_accessed: Instant,
85    access_count: usize,
86    size_bytes: usize,
87}
88
89#[derive(Clone)]
90struct CoordinatorMetrics {
91    /// Total invalidations across all levels
92    total_invalidations: Arc<Counter>,
93    /// Invalidations per level
94    result_invalidations: Arc<Counter>,
95    plan_invalidations: Arc<Counter>,
96    optimizer_invalidations: Arc<Counter>,
97    /// Coordination overhead
98    coordination_overhead: Arc<Timer>,
99    /// Cache coherence checks
100    coherence_checks: Arc<Counter>,
101    /// Coherence violations detected
102    coherence_violations: Arc<Counter>,
103}
104
105impl CoordinatorMetrics {
106    fn new() -> Self {
107        Self {
108            total_invalidations: Arc::new(Counter::new("cache_total_invalidations".to_string())),
109            result_invalidations: Arc::new(Counter::new("cache_result_invalidations".to_string())),
110            plan_invalidations: Arc::new(Counter::new("cache_plan_invalidations".to_string())),
111            optimizer_invalidations: Arc::new(Counter::new(
112                "cache_optimizer_invalidations".to_string(),
113            )),
114            coordination_overhead: Arc::new(Timer::new("cache_coordination_overhead".to_string())),
115            coherence_checks: Arc::new(Counter::new("cache_coherence_checks".to_string())),
116            coherence_violations: Arc::new(Counter::new("cache_coherence_violations".to_string())),
117        }
118    }
119}
120
121impl CacheCoordinator {
122    /// Create new cache coordinator
123    pub fn new(config: InvalidationConfig) -> Self {
124        let invalidation_engine = Arc::new(InvalidationEngine::with_config(
125            config.strategy,
126            super::invalidation_engine::InvalidationConfig {
127                enable_metrics: config.enable_metrics,
128                max_pending_batches: config.max_batch_size,
129                aggressive_matching: false,
130                default_ttl: None,         // Coordinator can override this later
131                enable_ttl_cleanup: false, // Disabled by default in coordinator
132                ttl_cleanup_interval_secs: 300,
133            },
134        ));
135
136        Self {
137            result_cache: None,
138            plan_cache: None,
139            optimizer_cache: Arc::new(RwLock::new(HashMap::new())),
140            invalidation_engine,
141            config,
142            metrics: CoordinatorMetrics::new(),
143            entry_metadata: Arc::new(RwLock::new(HashMap::new())),
144        }
145    }
146
147    /// Attach result cache
148    pub fn attach_result_cache(&mut self, cache: Arc<QueryResultCache>) {
149        self.result_cache = Some(cache);
150    }
151
152    /// Attach plan cache
153    pub fn attach_plan_cache(&mut self, cache: Arc<QueryPlanCache>) {
154        self.plan_cache = Some(cache);
155    }
156
157    /// Register a cache entry with its dependencies
158    pub fn register_cache_entry(
159        &self,
160        level: CacheLevel,
161        cache_key: String,
162        patterns: Vec<TriplePattern>,
163        size_bytes: usize,
164    ) -> Result<()> {
165        // Register with invalidation engine
166        self.invalidation_engine
167            .register_dependencies(cache_key.clone(), patterns)?;
168
169        // Store metadata
170        let mut metadata = self
171            .entry_metadata
172            .write()
173            .map_err(|e| anyhow::anyhow!("Lock poisoned: {}", e))?;
174
175        metadata.insert(
176            cache_key.clone(),
177            CacheEntryMetadata {
178                level,
179                cache_key,
180                created_at: Instant::now(),
181                last_accessed: Instant::now(),
182                access_count: 0,
183                size_bytes,
184            },
185        );
186
187        Ok(())
188    }
189
190    /// Invalidate caches based on RDF update
191    pub fn invalidate_on_update(&self, triple: &TriplePattern) -> Result<()> {
192        let start_time = Instant::now();
193
194        // Find affected entries
195        let affected = self
196            .invalidation_engine
197            .find_affected_entries(triple)
198            .context("Failed to find affected entries")?;
199
200        // Group by cache level
201        let mut result_keys = Vec::new();
202        let mut plan_keys = Vec::new();
203        let mut optimizer_keys = Vec::new();
204
205        {
206            let metadata = self
207                .entry_metadata
208                .read()
209                .map_err(|e| anyhow::anyhow!("Lock poisoned: {}", e))?;
210
211            for cache_key in &affected {
212                if let Some(entry) = metadata.get(cache_key) {
213                    match entry.level {
214                        CacheLevel::Result => result_keys.push(cache_key.clone()),
215                        CacheLevel::Plan => plan_keys.push(cache_key.clone()),
216                        CacheLevel::Optimizer => optimizer_keys.push(cache_key.clone()),
217                    }
218                }
219            }
220        }
221
222        // Invalidate at each level
223        self.invalidate_result_entries(&result_keys)?;
224        self.invalidate_plan_entries(&plan_keys)?;
225        self.invalidate_optimizer_entries(&optimizer_keys)?;
226
227        // Propagate invalidations if configured
228        if self.config.propagate_invalidations {
229            self.propagate_invalidations(&result_keys, &plan_keys, &optimizer_keys)?;
230        }
231
232        // Update metrics
233        if self.config.enable_metrics {
234            let elapsed = start_time.elapsed();
235            self.metrics.coordination_overhead.observe(elapsed);
236            self.metrics.total_invalidations.add(affected.len() as u64);
237        }
238
239        Ok(())
240    }
241
242    /// Invalidate result cache entries
243    fn invalidate_result_entries(&self, keys: &[String]) -> Result<()> {
244        if !keys.is_empty() {
245            self.metrics.result_invalidations.add(keys.len() as u64);
246        }
247        if let Some(cache) = &self.result_cache {
248            for key in keys {
249                cache
250                    .invalidate(key)
251                    .context("Failed to invalidate result cache entry")?;
252                self.remove_metadata(key)?;
253            }
254        }
255        Ok(())
256    }
257
258    /// Invalidate plan cache entries
259    fn invalidate_plan_entries(&self, keys: &[String]) -> Result<()> {
260        if !keys.is_empty() {
261            self.metrics.plan_invalidations.add(keys.len() as u64);
262        }
263        if let Some(cache) = &self.plan_cache {
264            for _key in keys {
265                // QueryPlanCache uses pattern-based invalidation
266                // We'd need to extract pattern from key or store mapping
267                // For now, we can clear the entire cache as a conservative approach
268                // In production, you'd maintain a mapping of keys to patterns
269                cache.clear();
270            }
271        }
272        Ok(())
273    }
274
275    /// Invalidate optimizer cache entries
276    fn invalidate_optimizer_entries(&self, keys: &[String]) -> Result<()> {
277        let mut cache = self
278            .optimizer_cache
279            .write()
280            .map_err(|e| anyhow::anyhow!("Lock poisoned: {}", e))?;
281
282        for key in keys {
283            cache.remove(key);
284            self.remove_metadata(key)?;
285        }
286
287        self.metrics.optimizer_invalidations.add(keys.len() as u64);
288        Ok(())
289    }
290
291    /// Propagate invalidations across cache levels
292    fn propagate_invalidations(
293        &self,
294        _result_keys: &[String],
295        plan_keys: &[String],
296        optimizer_keys: &[String],
297    ) -> Result<()> {
298        // If plan cache entries are invalidated, also invalidate dependent result cache entries
299        if !plan_keys.is_empty() && self.result_cache.is_some() {
300            // Find result cache entries that depend on these plans
301            // This would require additional tracking; simplified for now
302        }
303
304        // If optimizer cache entries are invalidated, invalidate dependent plan cache entries
305        if !optimizer_keys.is_empty() && self.plan_cache.is_some() {
306            // Optimizer changes affect plan cache
307            // For now, clear plan cache conservatively
308            if let Some(cache) = &self.plan_cache {
309                cache.clear();
310            }
311        }
312
313        Ok(())
314    }
315
316    /// Remove entry metadata
317    fn remove_metadata(&self, cache_key: &str) -> Result<()> {
318        let mut metadata = self
319            .entry_metadata
320            .write()
321            .map_err(|e| anyhow::anyhow!("Lock poisoned: {}", e))?;
322        metadata.remove(cache_key);
323        Ok(())
324    }
325
326    /// Check cache coherence (for testing/verification)
327    pub fn check_coherence(&self) -> Result<CoherenceReport> {
328        self.metrics.coherence_checks.inc();
329
330        let violations = Vec::new();
331
332        // Check if any result cache entries reference invalidated plans
333        // This would require additional tracking
334
335        // Check if any plan cache entries reference invalidated optimizer state
336        // This would require additional tracking
337
338        let has_violations = !violations.is_empty();
339        if has_violations {
340            self.metrics
341                .coherence_violations
342                .add(violations.len() as u64);
343        }
344
345        Ok(CoherenceReport {
346            is_coherent: !has_violations,
347            violations,
348            check_time: Instant::now(),
349        })
350    }
351
352    /// Get coordinator statistics
353    pub fn statistics(&self) -> CoordinatorStatistics {
354        let invalidation_stats = self.invalidation_engine.statistics();
355
356        let result_cache_stats = self
357            .result_cache
358            .as_ref()
359            .map(|c| c.statistics())
360            .unwrap_or_default();
361
362        let plan_cache_stats = self
363            .plan_cache
364            .as_ref()
365            .map(|c| c.statistics())
366            .unwrap_or_else(|| PlanCacheStats {
367                hits: 0,
368                misses: 0,
369                evictions: 0,
370                invalidations: 0,
371                size: 0,
372                capacity: 0,
373                hit_rate: 0.0,
374            });
375
376        let metadata = self.entry_metadata.read().ok();
377        let entry_count_by_level = metadata.as_ref().map(|m| {
378            let mut counts = HashMap::new();
379            for entry in m.values() {
380                *counts.entry(entry.level).or_insert(0) += 1;
381            }
382            counts
383        });
384
385        let overhead_stats = self.metrics.coordination_overhead.get_stats();
386
387        CoordinatorStatistics {
388            total_invalidations: self.metrics.total_invalidations.get(),
389            result_invalidations: self.metrics.result_invalidations.get(),
390            plan_invalidations: self.metrics.plan_invalidations.get(),
391            optimizer_invalidations: self.metrics.optimizer_invalidations.get(),
392            avg_coordination_overhead_us: overhead_stats.mean,
393            coherence_checks: self.metrics.coherence_checks.get(),
394            coherence_violations: self.metrics.coherence_violations.get(),
395            invalidation_engine_stats: invalidation_stats,
396            result_cache_stats,
397            plan_cache_stats,
398            entry_count_by_level: entry_count_by_level.unwrap_or_default(),
399        }
400    }
401
402    /// Clear all caches
403    pub fn clear_all(&self) -> Result<()> {
404        if let Some(cache) = &self.result_cache {
405            cache.invalidate_all()?;
406        }
407
408        if let Some(cache) = &self.plan_cache {
409            cache.clear();
410        }
411
412        {
413            let mut optimizer_cache = self
414                .optimizer_cache
415                .write()
416                .map_err(|e| anyhow::anyhow!("Lock poisoned: {}", e))?;
417            optimizer_cache.clear();
418        }
419
420        {
421            let mut metadata = self
422                .entry_metadata
423                .write()
424                .map_err(|e| anyhow::anyhow!("Lock poisoned: {}", e))?;
425            metadata.clear();
426        }
427
428        self.invalidation_engine.clear()?;
429
430        Ok(())
431    }
432
433    /// Force flush any pending invalidations
434    pub fn flush_pending(&self) -> Result<()> {
435        self.invalidation_engine.flush_pending(|key| {
436            // Invalidate across all caches
437            if let Some(cache) = &self.result_cache {
438                let _ = cache.invalidate(key);
439            }
440            // Plan cache would need key-to-pattern mapping
441            // Optimizer cache
442            let mut optimizer_cache = self
443                .optimizer_cache
444                .write()
445                .map_err(|e| anyhow::anyhow!("Lock poisoned: {}", e))?;
446            optimizer_cache.remove(key);
447            Ok(())
448        })
449    }
450
451    /// Get invalidation engine
452    pub fn invalidation_engine(&self) -> Arc<InvalidationEngine> {
453        Arc::clone(&self.invalidation_engine)
454    }
455}
456
457/// Coherence check report
458#[derive(Debug, Clone, Serialize, Deserialize)]
459pub struct CoherenceReport {
460    pub is_coherent: bool,
461    pub violations: Vec<CoherenceViolation>,
462    #[serde(skip, default = "Instant::now")]
463    pub check_time: Instant,
464}
465
466/// Coherence violation details
467#[derive(Debug, Clone, Serialize, Deserialize)]
468pub struct CoherenceViolation {
469    pub level: CacheLevel,
470    pub cache_key: String,
471    pub violation_type: ViolationType,
472    pub details: String,
473}
474
475/// Types of coherence violations
476#[derive(Debug, Clone, Serialize, Deserialize)]
477pub enum ViolationType {
478    /// Result cache references invalidated plan
479    StaleResultReference,
480    /// Plan cache references invalidated optimizer state
481    StalePlanReference,
482    /// Cross-level inconsistency
483    CrossLevelInconsistency,
484}
485
486/// Coordinator statistics
487#[derive(Debug, Clone, Serialize, Deserialize)]
488pub struct CoordinatorStatistics {
489    pub total_invalidations: u64,
490    pub result_invalidations: u64,
491    pub plan_invalidations: u64,
492    pub optimizer_invalidations: u64,
493    pub avg_coordination_overhead_us: f64,
494    pub coherence_checks: u64,
495    pub coherence_violations: u64,
496    pub invalidation_engine_stats: InvalidationStatistics,
497    pub result_cache_stats: ResultCacheStatistics,
498    pub plan_cache_stats: PlanCacheStats,
499    pub entry_count_by_level: HashMap<CacheLevel, usize>,
500}
501
502/// Implement RdfUpdateListener for CacheCoordinator
503impl RdfUpdateListener for CacheCoordinator {
504    fn on_insert(&mut self, triple: &TriplePattern) -> Result<()> {
505        self.invalidate_on_update(triple)
506    }
507
508    fn on_delete(&mut self, triple: &TriplePattern) -> Result<()> {
509        self.invalidate_on_update(triple)
510    }
511
512    fn on_batch_insert(&mut self, triples: &[TriplePattern]) -> Result<()> {
513        for triple in triples {
514            self.invalidate_on_update(triple)?;
515        }
516        // Flush any pending batched invalidations
517        self.flush_pending()?;
518        Ok(())
519    }
520
521    fn on_batch_delete(&mut self, triples: &[TriplePattern]) -> Result<()> {
522        for triple in triples {
523            self.invalidate_on_update(triple)?;
524        }
525        // Flush any pending batched invalidations
526        self.flush_pending()?;
527        Ok(())
528    }
529}
530
531#[cfg(test)]
532mod tests {
533    use super::*;
534    use crate::algebra::{Term, Variable};
535    use crate::query_result_cache::CacheConfig;
536
537    fn create_test_pattern(s: &str, p: &str, o: &str) -> TriplePattern {
538        TriplePattern {
539            subject: Term::Variable(Variable::new(s).expect("valid variable")),
540            predicate: Term::Variable(Variable::new(p).expect("valid variable")),
541            object: Term::Variable(Variable::new(o).expect("valid variable")),
542        }
543    }
544
545    #[test]
546    fn test_coordinator_creation() {
547        let config = InvalidationConfig::default();
548        let coordinator = CacheCoordinator::new(config);
549
550        let stats = coordinator.statistics();
551        assert_eq!(stats.total_invalidations, 0);
552    }
553
554    #[test]
555    fn test_register_and_invalidate() {
556        let config = InvalidationConfig::default();
557        let coordinator = CacheCoordinator::new(config);
558
559        let pattern = create_test_pattern("s", "p", "o");
560        let cache_key = "test_key".to_string();
561
562        coordinator
563            .register_cache_entry(
564                CacheLevel::Result,
565                cache_key.clone(),
566                vec![pattern.clone()],
567                100,
568            )
569            .unwrap();
570
571        coordinator.invalidate_on_update(&pattern).unwrap();
572
573        let stats = coordinator.statistics();
574        assert_eq!(stats.total_invalidations, 1);
575    }
576
577    #[test]
578    fn test_attach_caches() {
579        let mut coordinator = CacheCoordinator::new(InvalidationConfig::default());
580
581        let result_cache = Arc::new(QueryResultCache::new(CacheConfig::default()));
582        let plan_cache = Arc::new(QueryPlanCache::new());
583
584        coordinator.attach_result_cache(result_cache);
585        coordinator.attach_plan_cache(plan_cache);
586
587        // Coordinator should now have caches attached
588        assert!(coordinator.result_cache.is_some());
589        assert!(coordinator.plan_cache.is_some());
590    }
591
592    #[test]
593    fn test_clear_all() {
594        let config = InvalidationConfig::default();
595        let coordinator = CacheCoordinator::new(config);
596
597        let pattern = create_test_pattern("s", "p", "o");
598        coordinator
599            .register_cache_entry(CacheLevel::Result, "key1".to_string(), vec![pattern], 100)
600            .unwrap();
601
602        coordinator.clear_all().unwrap();
603
604        let stats = coordinator.statistics();
605        assert_eq!(stats.entry_count_by_level.len(), 0);
606    }
607
608    #[test]
609    fn test_multi_level_invalidation() {
610        let config = InvalidationConfig {
611            propagate_invalidations: true,
612            ..Default::default()
613        };
614        let coordinator = CacheCoordinator::new(config);
615
616        let pattern = create_test_pattern("s", "p", "o");
617
618        // Register entries at different levels
619        coordinator
620            .register_cache_entry(
621                CacheLevel::Result,
622                "result_key".to_string(),
623                vec![pattern.clone()],
624                100,
625            )
626            .unwrap();
627        coordinator
628            .register_cache_entry(
629                CacheLevel::Plan,
630                "plan_key".to_string(),
631                vec![pattern.clone()],
632                50,
633            )
634            .unwrap();
635
636        // Invalidate
637        coordinator.invalidate_on_update(&pattern).unwrap();
638
639        let stats = coordinator.statistics();
640        assert!(stats.result_invalidations > 0 || stats.plan_invalidations > 0);
641    }
642}