1use super::invalidation_engine::{
7 InvalidationEngine, InvalidationStatistics, InvalidationStrategy, RdfUpdateListener,
8};
9use crate::algebra::TriplePattern;
10use crate::query_plan_cache::{CacheStats as PlanCacheStats, QueryPlanCache};
11use crate::query_result_cache::{CacheStatistics as ResultCacheStatistics, QueryResultCache};
12use anyhow::{Context, Result};
13use scirs2_core::metrics::{Counter, Timer};
14use serde::{Deserialize, Serialize};
15use std::collections::HashMap;
16use std::sync::{Arc, RwLock};
17use std::time::Instant;
18
19#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
21pub enum CacheLevel {
22 Result,
24 Plan,
26 Optimizer,
28}
29
30#[derive(Debug, Clone, Serialize, Deserialize)]
32pub struct InvalidationConfig {
33 pub strategy: InvalidationStrategy,
35 pub propagate_invalidations: bool,
37 pub enable_metrics: bool,
39 pub max_batch_size: usize,
41 pub flush_interval_ms: u64,
43}
44
45impl Default for InvalidationConfig {
46 fn default() -> Self {
47 Self {
48 strategy: InvalidationStrategy::Batched {
49 batch_size: 100,
50 max_delay_ms: 50,
51 },
52 propagate_invalidations: true,
53 enable_metrics: true,
54 max_batch_size: 1000,
55 flush_interval_ms: 50,
56 }
57 }
58}
59
60pub struct CacheCoordinator {
62 result_cache: Option<Arc<QueryResultCache>>,
64 plan_cache: Option<Arc<QueryPlanCache>>,
66 optimizer_cache: Arc<RwLock<HashMap<String, Vec<u8>>>>,
68 invalidation_engine: Arc<InvalidationEngine>,
70 config: InvalidationConfig,
72 metrics: CoordinatorMetrics,
74 entry_metadata: Arc<RwLock<HashMap<String, CacheEntryMetadata>>>,
76}
77
78#[derive(Debug, Clone)]
79#[allow(dead_code)]
80struct CacheEntryMetadata {
81 level: CacheLevel,
82 cache_key: String,
83 created_at: Instant,
84 last_accessed: Instant,
85 access_count: usize,
86 size_bytes: usize,
87}
88
89#[derive(Clone)]
90struct CoordinatorMetrics {
91 total_invalidations: Arc<Counter>,
93 result_invalidations: Arc<Counter>,
95 plan_invalidations: Arc<Counter>,
96 optimizer_invalidations: Arc<Counter>,
97 coordination_overhead: Arc<Timer>,
99 coherence_checks: Arc<Counter>,
101 coherence_violations: Arc<Counter>,
103}
104
105impl CoordinatorMetrics {
106 fn new() -> Self {
107 Self {
108 total_invalidations: Arc::new(Counter::new("cache_total_invalidations".to_string())),
109 result_invalidations: Arc::new(Counter::new("cache_result_invalidations".to_string())),
110 plan_invalidations: Arc::new(Counter::new("cache_plan_invalidations".to_string())),
111 optimizer_invalidations: Arc::new(Counter::new(
112 "cache_optimizer_invalidations".to_string(),
113 )),
114 coordination_overhead: Arc::new(Timer::new("cache_coordination_overhead".to_string())),
115 coherence_checks: Arc::new(Counter::new("cache_coherence_checks".to_string())),
116 coherence_violations: Arc::new(Counter::new("cache_coherence_violations".to_string())),
117 }
118 }
119}
120
121impl CacheCoordinator {
122 pub fn new(config: InvalidationConfig) -> Self {
124 let invalidation_engine = Arc::new(InvalidationEngine::with_config(
125 config.strategy,
126 super::invalidation_engine::InvalidationConfig {
127 enable_metrics: config.enable_metrics,
128 max_pending_batches: config.max_batch_size,
129 aggressive_matching: false,
130 default_ttl: None, enable_ttl_cleanup: false, ttl_cleanup_interval_secs: 300,
133 },
134 ));
135
136 Self {
137 result_cache: None,
138 plan_cache: None,
139 optimizer_cache: Arc::new(RwLock::new(HashMap::new())),
140 invalidation_engine,
141 config,
142 metrics: CoordinatorMetrics::new(),
143 entry_metadata: Arc::new(RwLock::new(HashMap::new())),
144 }
145 }
146
147 pub fn attach_result_cache(&mut self, cache: Arc<QueryResultCache>) {
149 self.result_cache = Some(cache);
150 }
151
152 pub fn attach_plan_cache(&mut self, cache: Arc<QueryPlanCache>) {
154 self.plan_cache = Some(cache);
155 }
156
157 pub fn register_cache_entry(
159 &self,
160 level: CacheLevel,
161 cache_key: String,
162 patterns: Vec<TriplePattern>,
163 size_bytes: usize,
164 ) -> Result<()> {
165 self.invalidation_engine
167 .register_dependencies(cache_key.clone(), patterns)?;
168
169 let mut metadata = self
171 .entry_metadata
172 .write()
173 .map_err(|e| anyhow::anyhow!("Lock poisoned: {}", e))?;
174
175 metadata.insert(
176 cache_key.clone(),
177 CacheEntryMetadata {
178 level,
179 cache_key,
180 created_at: Instant::now(),
181 last_accessed: Instant::now(),
182 access_count: 0,
183 size_bytes,
184 },
185 );
186
187 Ok(())
188 }
189
190 pub fn invalidate_on_update(&self, triple: &TriplePattern) -> Result<()> {
192 let start_time = Instant::now();
193
194 let affected = self
196 .invalidation_engine
197 .find_affected_entries(triple)
198 .context("Failed to find affected entries")?;
199
200 let mut result_keys = Vec::new();
202 let mut plan_keys = Vec::new();
203 let mut optimizer_keys = Vec::new();
204
205 {
206 let metadata = self
207 .entry_metadata
208 .read()
209 .map_err(|e| anyhow::anyhow!("Lock poisoned: {}", e))?;
210
211 for cache_key in &affected {
212 if let Some(entry) = metadata.get(cache_key) {
213 match entry.level {
214 CacheLevel::Result => result_keys.push(cache_key.clone()),
215 CacheLevel::Plan => plan_keys.push(cache_key.clone()),
216 CacheLevel::Optimizer => optimizer_keys.push(cache_key.clone()),
217 }
218 }
219 }
220 }
221
222 self.invalidate_result_entries(&result_keys)?;
224 self.invalidate_plan_entries(&plan_keys)?;
225 self.invalidate_optimizer_entries(&optimizer_keys)?;
226
227 if self.config.propagate_invalidations {
229 self.propagate_invalidations(&result_keys, &plan_keys, &optimizer_keys)?;
230 }
231
232 if self.config.enable_metrics {
234 let elapsed = start_time.elapsed();
235 self.metrics.coordination_overhead.observe(elapsed);
236 self.metrics.total_invalidations.add(affected.len() as u64);
237 }
238
239 Ok(())
240 }
241
242 fn invalidate_result_entries(&self, keys: &[String]) -> Result<()> {
244 if !keys.is_empty() {
245 self.metrics.result_invalidations.add(keys.len() as u64);
246 }
247 if let Some(cache) = &self.result_cache {
248 for key in keys {
249 cache
250 .invalidate(key)
251 .context("Failed to invalidate result cache entry")?;
252 self.remove_metadata(key)?;
253 }
254 }
255 Ok(())
256 }
257
258 fn invalidate_plan_entries(&self, keys: &[String]) -> Result<()> {
260 if !keys.is_empty() {
261 self.metrics.plan_invalidations.add(keys.len() as u64);
262 }
263 if let Some(cache) = &self.plan_cache {
264 for _key in keys {
265 cache.clear();
270 }
271 }
272 Ok(())
273 }
274
275 fn invalidate_optimizer_entries(&self, keys: &[String]) -> Result<()> {
277 let mut cache = self
278 .optimizer_cache
279 .write()
280 .map_err(|e| anyhow::anyhow!("Lock poisoned: {}", e))?;
281
282 for key in keys {
283 cache.remove(key);
284 self.remove_metadata(key)?;
285 }
286
287 self.metrics.optimizer_invalidations.add(keys.len() as u64);
288 Ok(())
289 }
290
291 fn propagate_invalidations(
293 &self,
294 _result_keys: &[String],
295 plan_keys: &[String],
296 optimizer_keys: &[String],
297 ) -> Result<()> {
298 if !plan_keys.is_empty() && self.result_cache.is_some() {
300 }
303
304 if !optimizer_keys.is_empty() && self.plan_cache.is_some() {
306 if let Some(cache) = &self.plan_cache {
309 cache.clear();
310 }
311 }
312
313 Ok(())
314 }
315
316 fn remove_metadata(&self, cache_key: &str) -> Result<()> {
318 let mut metadata = self
319 .entry_metadata
320 .write()
321 .map_err(|e| anyhow::anyhow!("Lock poisoned: {}", e))?;
322 metadata.remove(cache_key);
323 Ok(())
324 }
325
326 pub fn check_coherence(&self) -> Result<CoherenceReport> {
328 self.metrics.coherence_checks.inc();
329
330 let violations = Vec::new();
331
332 let has_violations = !violations.is_empty();
339 if has_violations {
340 self.metrics
341 .coherence_violations
342 .add(violations.len() as u64);
343 }
344
345 Ok(CoherenceReport {
346 is_coherent: !has_violations,
347 violations,
348 check_time: Instant::now(),
349 })
350 }
351
352 pub fn statistics(&self) -> CoordinatorStatistics {
354 let invalidation_stats = self.invalidation_engine.statistics();
355
356 let result_cache_stats = self
357 .result_cache
358 .as_ref()
359 .map(|c| c.statistics())
360 .unwrap_or_default();
361
362 let plan_cache_stats = self
363 .plan_cache
364 .as_ref()
365 .map(|c| c.statistics())
366 .unwrap_or_else(|| PlanCacheStats {
367 hits: 0,
368 misses: 0,
369 evictions: 0,
370 invalidations: 0,
371 size: 0,
372 capacity: 0,
373 hit_rate: 0.0,
374 });
375
376 let metadata = self.entry_metadata.read().ok();
377 let entry_count_by_level = metadata.as_ref().map(|m| {
378 let mut counts = HashMap::new();
379 for entry in m.values() {
380 *counts.entry(entry.level).or_insert(0) += 1;
381 }
382 counts
383 });
384
385 let overhead_stats = self.metrics.coordination_overhead.get_stats();
386
387 CoordinatorStatistics {
388 total_invalidations: self.metrics.total_invalidations.get(),
389 result_invalidations: self.metrics.result_invalidations.get(),
390 plan_invalidations: self.metrics.plan_invalidations.get(),
391 optimizer_invalidations: self.metrics.optimizer_invalidations.get(),
392 avg_coordination_overhead_us: overhead_stats.mean,
393 coherence_checks: self.metrics.coherence_checks.get(),
394 coherence_violations: self.metrics.coherence_violations.get(),
395 invalidation_engine_stats: invalidation_stats,
396 result_cache_stats,
397 plan_cache_stats,
398 entry_count_by_level: entry_count_by_level.unwrap_or_default(),
399 }
400 }
401
402 pub fn clear_all(&self) -> Result<()> {
404 if let Some(cache) = &self.result_cache {
405 cache.invalidate_all()?;
406 }
407
408 if let Some(cache) = &self.plan_cache {
409 cache.clear();
410 }
411
412 {
413 let mut optimizer_cache = self
414 .optimizer_cache
415 .write()
416 .map_err(|e| anyhow::anyhow!("Lock poisoned: {}", e))?;
417 optimizer_cache.clear();
418 }
419
420 {
421 let mut metadata = self
422 .entry_metadata
423 .write()
424 .map_err(|e| anyhow::anyhow!("Lock poisoned: {}", e))?;
425 metadata.clear();
426 }
427
428 self.invalidation_engine.clear()?;
429
430 Ok(())
431 }
432
433 pub fn flush_pending(&self) -> Result<()> {
435 self.invalidation_engine.flush_pending(|key| {
436 if let Some(cache) = &self.result_cache {
438 let _ = cache.invalidate(key);
439 }
440 let mut optimizer_cache = self
443 .optimizer_cache
444 .write()
445 .map_err(|e| anyhow::anyhow!("Lock poisoned: {}", e))?;
446 optimizer_cache.remove(key);
447 Ok(())
448 })
449 }
450
451 pub fn invalidation_engine(&self) -> Arc<InvalidationEngine> {
453 Arc::clone(&self.invalidation_engine)
454 }
455}
456
457#[derive(Debug, Clone, Serialize, Deserialize)]
459pub struct CoherenceReport {
460 pub is_coherent: bool,
461 pub violations: Vec<CoherenceViolation>,
462 #[serde(skip, default = "Instant::now")]
463 pub check_time: Instant,
464}
465
466#[derive(Debug, Clone, Serialize, Deserialize)]
468pub struct CoherenceViolation {
469 pub level: CacheLevel,
470 pub cache_key: String,
471 pub violation_type: ViolationType,
472 pub details: String,
473}
474
475#[derive(Debug, Clone, Serialize, Deserialize)]
477pub enum ViolationType {
478 StaleResultReference,
480 StalePlanReference,
482 CrossLevelInconsistency,
484}
485
486#[derive(Debug, Clone, Serialize, Deserialize)]
488pub struct CoordinatorStatistics {
489 pub total_invalidations: u64,
490 pub result_invalidations: u64,
491 pub plan_invalidations: u64,
492 pub optimizer_invalidations: u64,
493 pub avg_coordination_overhead_us: f64,
494 pub coherence_checks: u64,
495 pub coherence_violations: u64,
496 pub invalidation_engine_stats: InvalidationStatistics,
497 pub result_cache_stats: ResultCacheStatistics,
498 pub plan_cache_stats: PlanCacheStats,
499 pub entry_count_by_level: HashMap<CacheLevel, usize>,
500}
501
502impl RdfUpdateListener for CacheCoordinator {
504 fn on_insert(&mut self, triple: &TriplePattern) -> Result<()> {
505 self.invalidate_on_update(triple)
506 }
507
508 fn on_delete(&mut self, triple: &TriplePattern) -> Result<()> {
509 self.invalidate_on_update(triple)
510 }
511
512 fn on_batch_insert(&mut self, triples: &[TriplePattern]) -> Result<()> {
513 for triple in triples {
514 self.invalidate_on_update(triple)?;
515 }
516 self.flush_pending()?;
518 Ok(())
519 }
520
521 fn on_batch_delete(&mut self, triples: &[TriplePattern]) -> Result<()> {
522 for triple in triples {
523 self.invalidate_on_update(triple)?;
524 }
525 self.flush_pending()?;
527 Ok(())
528 }
529}
530
531#[cfg(test)]
532mod tests {
533 use super::*;
534 use crate::algebra::{Term, Variable};
535 use crate::query_result_cache::CacheConfig;
536
537 fn create_test_pattern(s: &str, p: &str, o: &str) -> TriplePattern {
538 TriplePattern {
539 subject: Term::Variable(Variable::new(s).expect("valid variable")),
540 predicate: Term::Variable(Variable::new(p).expect("valid variable")),
541 object: Term::Variable(Variable::new(o).expect("valid variable")),
542 }
543 }
544
545 #[test]
546 fn test_coordinator_creation() {
547 let config = InvalidationConfig::default();
548 let coordinator = CacheCoordinator::new(config);
549
550 let stats = coordinator.statistics();
551 assert_eq!(stats.total_invalidations, 0);
552 }
553
554 #[test]
555 fn test_register_and_invalidate() {
556 let config = InvalidationConfig::default();
557 let coordinator = CacheCoordinator::new(config);
558
559 let pattern = create_test_pattern("s", "p", "o");
560 let cache_key = "test_key".to_string();
561
562 coordinator
563 .register_cache_entry(
564 CacheLevel::Result,
565 cache_key.clone(),
566 vec![pattern.clone()],
567 100,
568 )
569 .unwrap();
570
571 coordinator.invalidate_on_update(&pattern).unwrap();
572
573 let stats = coordinator.statistics();
574 assert_eq!(stats.total_invalidations, 1);
575 }
576
577 #[test]
578 fn test_attach_caches() {
579 let mut coordinator = CacheCoordinator::new(InvalidationConfig::default());
580
581 let result_cache = Arc::new(QueryResultCache::new(CacheConfig::default()));
582 let plan_cache = Arc::new(QueryPlanCache::new());
583
584 coordinator.attach_result_cache(result_cache);
585 coordinator.attach_plan_cache(plan_cache);
586
587 assert!(coordinator.result_cache.is_some());
589 assert!(coordinator.plan_cache.is_some());
590 }
591
592 #[test]
593 fn test_clear_all() {
594 let config = InvalidationConfig::default();
595 let coordinator = CacheCoordinator::new(config);
596
597 let pattern = create_test_pattern("s", "p", "o");
598 coordinator
599 .register_cache_entry(CacheLevel::Result, "key1".to_string(), vec![pattern], 100)
600 .unwrap();
601
602 coordinator.clear_all().unwrap();
603
604 let stats = coordinator.statistics();
605 assert_eq!(stats.entry_count_by_level.len(), 0);
606 }
607
608 #[test]
609 fn test_multi_level_invalidation() {
610 let config = InvalidationConfig {
611 propagate_invalidations: true,
612 ..Default::default()
613 };
614 let coordinator = CacheCoordinator::new(config);
615
616 let pattern = create_test_pattern("s", "p", "o");
617
618 coordinator
620 .register_cache_entry(
621 CacheLevel::Result,
622 "result_key".to_string(),
623 vec![pattern.clone()],
624 100,
625 )
626 .unwrap();
627 coordinator
628 .register_cache_entry(
629 CacheLevel::Plan,
630 "plan_key".to_string(),
631 vec![pattern.clone()],
632 50,
633 )
634 .unwrap();
635
636 coordinator.invalidate_on_update(&pattern).unwrap();
638
639 let stats = coordinator.statistics();
640 assert!(stats.result_invalidations > 0 || stats.plan_invalidations > 0);
641 }
642}