1use crate::algebra::TriplePattern;
7use anyhow::Result;
8use dashmap::DashMap;
9use scirs2_core::metrics::{Counter, Histogram, Timer};
10use serde::{Deserialize, Serialize};
11use std::collections::{HashSet, VecDeque};
12use std::hash::{Hash, Hasher};
13use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
14use std::sync::{Arc, RwLock};
15use std::time::{Duration, Instant};
16
17#[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize)]
19pub enum InvalidationStrategy {
20 Immediate,
22 Batched {
24 batch_size: usize,
26 max_delay_ms: u64,
28 },
29 BloomFilter {
31 expected_elements: usize,
33 false_positive_rate: f64,
35 },
36 CostBased {
38 threshold: f64,
40 },
41}
42
43impl Default for InvalidationStrategy {
44 fn default() -> Self {
45 Self::Batched {
46 batch_size: 100,
47 max_delay_ms: 50,
48 }
49 }
50}
51
52#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
54pub struct TriplePatternHash(u64);
55
56impl TriplePatternHash {
57 pub fn from_pattern(pattern: &TriplePattern) -> Self {
59 use std::collections::hash_map::DefaultHasher;
60 let mut hasher = DefaultHasher::new();
61 pattern.hash(&mut hasher);
62 Self(hasher.finish())
63 }
64
65 pub fn value(&self) -> u64 {
67 self.0
68 }
69}
70
71pub type CacheKey = String;
73
74#[derive(Debug, Clone)]
76pub struct CacheEntryMetadata {
77 pub created_at: Instant,
79 pub ttl: Option<Duration>,
81 pub dependencies: HashSet<TriplePatternHash>,
83}
84
85impl CacheEntryMetadata {
86 pub fn is_expired(&self) -> bool {
88 if let Some(ttl) = self.ttl {
89 self.created_at.elapsed() >= ttl
90 } else {
91 false
92 }
93 }
94
95 pub fn remaining_ttl(&self) -> Option<Duration> {
97 self.ttl.and_then(|ttl| {
98 let elapsed = self.created_at.elapsed();
99 if elapsed < ttl {
100 Some(ttl - elapsed)
101 } else {
102 None
103 }
104 })
105 }
106}
107
108#[derive(Debug, Clone)]
110pub struct DependencyGraph {
111 pattern_to_entries: Arc<DashMap<TriplePatternHash, HashSet<CacheKey>>>,
113 entry_metadata: Arc<DashMap<CacheKey, CacheEntryMetadata>>,
115 stats: Arc<DependencyGraphStats>,
117}
118
119#[derive(Debug, Default)]
120struct DependencyGraphStats {
121 pattern_count: AtomicUsize,
123 entry_count: AtomicUsize,
125 edge_count: AtomicUsize,
127 avg_deps_per_entry: AtomicU64,
129}
130
131impl DependencyGraph {
132 pub fn new() -> Self {
134 Self {
135 pattern_to_entries: Arc::new(DashMap::new()),
136 entry_metadata: Arc::new(DashMap::new()),
137 stats: Arc::new(DependencyGraphStats::default()),
138 }
139 }
140
141 pub fn register_dependencies(
143 &self,
144 cache_key: CacheKey,
145 patterns: Vec<TriplePattern>,
146 ) -> Result<()> {
147 self.register_dependencies_with_ttl(cache_key, patterns, None)
148 }
149
150 pub fn register_dependencies_with_ttl(
152 &self,
153 cache_key: CacheKey,
154 patterns: Vec<TriplePattern>,
155 ttl: Option<Duration>,
156 ) -> Result<()> {
157 if patterns.is_empty() {
158 return Ok(());
159 }
160
161 let pattern_hashes: HashSet<TriplePatternHash> = patterns
162 .iter()
163 .map(TriplePatternHash::from_pattern)
164 .collect();
165
166 let metadata = CacheEntryMetadata {
168 created_at: Instant::now(),
169 ttl,
170 dependencies: pattern_hashes.clone(),
171 };
172
173 let is_new_entry = !self.entry_metadata.contains_key(&cache_key);
175 self.entry_metadata.insert(cache_key.clone(), metadata);
176
177 for pattern_hash in &pattern_hashes {
179 self.pattern_to_entries
180 .entry(*pattern_hash)
181 .or_default()
182 .insert(cache_key.clone());
183 }
184
185 if is_new_entry {
187 self.stats.entry_count.fetch_add(1, Ordering::Relaxed);
188 }
189 self.stats
190 .edge_count
191 .fetch_add(pattern_hashes.len(), Ordering::Relaxed);
192 self.update_avg_deps();
193
194 Ok(())
195 }
196
197 pub fn remove_entry(&self, cache_key: &CacheKey) -> Result<()> {
199 if let Some((_, metadata)) = self.entry_metadata.remove(cache_key) {
201 for pattern_hash in &metadata.dependencies {
203 if let Some(mut entries) = self.pattern_to_entries.get_mut(pattern_hash) {
204 entries.remove(cache_key);
205 if entries.is_empty() {
206 drop(entries);
207 self.pattern_to_entries.remove(pattern_hash);
208 let _ = self.stats.pattern_count.fetch_update(
210 Ordering::Relaxed,
211 Ordering::Relaxed,
212 |val| Some(val.saturating_sub(1)),
213 );
214 }
215 }
216 }
217
218 let _ =
220 self.stats
221 .entry_count
222 .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |val| {
223 Some(val.saturating_sub(1))
224 });
225 let _ =
226 self.stats
227 .edge_count
228 .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |val| {
229 Some(val.saturating_sub(metadata.dependencies.len()))
230 });
231 self.update_avg_deps();
232 }
233
234 Ok(())
235 }
236
237 pub fn find_expired_entries(&self) -> Vec<CacheKey> {
239 self.entry_metadata
240 .iter()
241 .filter_map(|entry| {
242 if entry.value().is_expired() {
243 Some(entry.key().clone())
244 } else {
245 None
246 }
247 })
248 .collect()
249 }
250
251 pub fn get_ttl_info(&self, cache_key: &CacheKey) -> Option<(Duration, Option<Duration>)> {
253 self.entry_metadata.get(cache_key).and_then(|metadata| {
254 let elapsed = metadata.created_at.elapsed();
255 let remaining = metadata.remaining_ttl();
256 metadata.ttl.map(|_| (elapsed, remaining))
257 })
258 }
259
260 pub fn find_affected_entries(&self, pattern: &TriplePattern) -> HashSet<CacheKey> {
262 let pattern_hash = TriplePatternHash::from_pattern(pattern);
263
264 let mut affected = self
266 .pattern_to_entries
267 .get(&pattern_hash)
268 .map(|entries| entries.clone())
269 .unwrap_or_default();
270
271 for entry in self.pattern_to_entries.iter() {
274 if self.pattern_matches(*entry.key(), pattern) {
275 affected.extend(entry.value().iter().cloned());
276 }
277 }
278
279 affected
280 }
281
282 fn pattern_matches(
284 &self,
285 stored_hash: TriplePatternHash,
286 query_pattern: &TriplePattern,
287 ) -> bool {
288 stored_hash == TriplePatternHash::from_pattern(query_pattern)
292 }
293
294 pub fn statistics(&self) -> DependencyGraphStatistics {
296 DependencyGraphStatistics {
297 pattern_count: self.stats.pattern_count.load(Ordering::Relaxed),
298 entry_count: self.stats.entry_count.load(Ordering::Relaxed),
299 edge_count: self.stats.edge_count.load(Ordering::Relaxed),
300 avg_deps_per_entry: f64::from_bits(
301 self.stats.avg_deps_per_entry.load(Ordering::Relaxed),
302 ),
303 }
304 }
305
306 fn update_avg_deps(&self) {
308 let entries = self.stats.entry_count.load(Ordering::Relaxed);
309 if entries > 0 {
310 let edges = self.stats.edge_count.load(Ordering::Relaxed);
311 let avg = edges as f64 / entries as f64;
312 self.stats
313 .avg_deps_per_entry
314 .store(avg.to_bits(), Ordering::Relaxed);
315 }
316 }
317
318 pub fn clear(&self) {
320 self.pattern_to_entries.clear();
321 self.entry_metadata.clear();
322 self.stats.pattern_count.store(0, Ordering::Relaxed);
323 self.stats.entry_count.store(0, Ordering::Relaxed);
324 self.stats.edge_count.store(0, Ordering::Relaxed);
325 self.stats.avg_deps_per_entry.store(0, Ordering::Relaxed);
326 }
327
328 pub fn memory_usage(&self) -> usize {
330 let pattern_count = self.stats.pattern_count.load(Ordering::Relaxed);
331 let entry_count = self.stats.entry_count.load(Ordering::Relaxed);
332 let edge_count = self.stats.edge_count.load(Ordering::Relaxed);
333
334 pattern_count
340 .saturating_mul(24)
341 .saturating_add(entry_count.saturating_mul(24))
342 .saturating_add(edge_count.saturating_mul(48))
343 }
344}
345
346impl Default for DependencyGraph {
347 fn default() -> Self {
348 Self::new()
349 }
350}
351
352#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
354pub struct DependencyGraphStatistics {
355 pub pattern_count: usize,
356 pub entry_count: usize,
357 pub edge_count: usize,
358 pub avg_deps_per_entry: f64,
359}
360
361struct BloomFilter {
363 bits: Vec<AtomicU64>,
364 num_hash_functions: usize,
365 bit_count: usize,
366}
367
368impl BloomFilter {
369 fn new(expected_elements: usize, false_positive_rate: f64) -> Self {
371 let m = Self::optimal_bit_count(expected_elements, false_positive_rate);
373 let k = Self::optimal_hash_count(expected_elements, m);
374
375 let num_u64s = (m + 63) / 64;
376 let bits = (0..num_u64s).map(|_| AtomicU64::new(0)).collect();
377
378 Self {
379 bits,
380 num_hash_functions: k,
381 bit_count: m,
382 }
383 }
384
385 fn optimal_bit_count(n: usize, p: f64) -> usize {
387 let ln2_squared = std::f64::consts::LN_2 * std::f64::consts::LN_2;
388 (-(n as f64 * p.ln()) / ln2_squared).ceil() as usize
389 }
390
391 fn optimal_hash_count(n: usize, m: usize) -> usize {
393 ((m as f64 / n as f64) * std::f64::consts::LN_2).ceil() as usize
394 }
395
396 fn add(&self, pattern_hash: TriplePatternHash) {
398 for i in 0..self.num_hash_functions {
399 let bit_index = self.hash_i(pattern_hash, i) % self.bit_count;
400 let word_index = bit_index / 64;
401 let bit_offset = bit_index % 64;
402 self.bits[word_index].fetch_or(1u64 << bit_offset, Ordering::Relaxed);
403 }
404 }
405
406 fn might_contain(&self, pattern_hash: TriplePatternHash) -> bool {
408 for i in 0..self.num_hash_functions {
409 let bit_index = self.hash_i(pattern_hash, i) % self.bit_count;
410 let word_index = bit_index / 64;
411 let bit_offset = bit_index % 64;
412 let word = self.bits[word_index].load(Ordering::Relaxed);
413 if (word & (1u64 << bit_offset)) == 0 {
414 return false;
415 }
416 }
417 true
418 }
419
420 fn hash_i(&self, pattern_hash: TriplePatternHash, i: usize) -> usize {
422 let h1 = pattern_hash.value() as usize;
424 let h2 = (pattern_hash.value().wrapping_mul(2654435761)) as usize;
425 h1.wrapping_add(i.wrapping_mul(h2))
426 }
427
428 fn clear(&self) {
430 for word in &self.bits {
431 word.store(0, Ordering::Relaxed);
432 }
433 }
434}
435
436#[derive(Debug)]
438struct InvalidationBatch {
439 entries: Vec<CacheKey>,
440 timestamp: Instant,
441}
442
443pub struct InvalidationEngine {
445 dependency_graph: DependencyGraph,
447 strategy: InvalidationStrategy,
449 bloom_filter: Option<Arc<BloomFilter>>,
451 pending_invalidations: Arc<RwLock<VecDeque<InvalidationBatch>>>,
453 metrics: InvalidationMetrics,
455 config: InvalidationConfig,
457}
458
459#[derive(Clone)]
460struct InvalidationMetrics {
461 total_invalidations: Arc<Counter>,
463 invalidation_time: Arc<Timer>,
465 overhead_ratio: Arc<Histogram>,
467 entries_per_update: Arc<Histogram>,
469 ttl_evictions: Arc<Counter>,
471 ttl_cleanup_time: Arc<Timer>,
473}
474
475impl InvalidationMetrics {
476 fn new() -> Self {
477 Self {
478 total_invalidations: Arc::new(Counter::new("invalidation_total".to_string())),
479 invalidation_time: Arc::new(Timer::new("invalidation_time".to_string())),
480 overhead_ratio: Arc::new(Histogram::new("invalidation_overhead".to_string())),
481 entries_per_update: Arc::new(Histogram::new(
482 "invalidation_entries_per_update".to_string(),
483 )),
484 ttl_evictions: Arc::new(Counter::new("invalidation_ttl_evictions".to_string())),
485 ttl_cleanup_time: Arc::new(Timer::new("invalidation_ttl_cleanup_time".to_string())),
486 }
487 }
488}
489
490#[derive(Debug, Clone, Serialize, Deserialize)]
491pub struct InvalidationConfig {
492 pub enable_metrics: bool,
494 pub max_pending_batches: usize,
496 pub aggressive_matching: bool,
498 pub default_ttl: Option<Duration>,
500 pub enable_ttl_cleanup: bool,
502 pub ttl_cleanup_interval_secs: u64,
504}
505
506impl Default for InvalidationConfig {
507 fn default() -> Self {
508 Self {
509 enable_metrics: true,
510 max_pending_batches: 100,
511 aggressive_matching: false,
512 default_ttl: Some(Duration::from_secs(3600)), enable_ttl_cleanup: true,
514 ttl_cleanup_interval_secs: 300, }
516 }
517}
518
519impl InvalidationEngine {
520 pub fn new(strategy: InvalidationStrategy) -> Self {
522 Self::with_config(strategy, InvalidationConfig::default())
523 }
524
525 pub fn with_config(strategy: InvalidationStrategy, config: InvalidationConfig) -> Self {
527 let bloom_filter = match strategy {
528 InvalidationStrategy::BloomFilter {
529 expected_elements,
530 false_positive_rate,
531 } => Some(Arc::new(BloomFilter::new(
532 expected_elements,
533 false_positive_rate,
534 ))),
535 _ => None,
536 };
537
538 Self {
539 dependency_graph: DependencyGraph::new(),
540 strategy,
541 bloom_filter,
542 pending_invalidations: Arc::new(RwLock::new(VecDeque::new())),
543 metrics: InvalidationMetrics::new(),
544 config,
545 }
546 }
547
548 pub fn register_dependencies(
550 &self,
551 cache_key: CacheKey,
552 patterns: Vec<TriplePattern>,
553 ) -> Result<()> {
554 let ttl = self.config.default_ttl;
555 self.register_dependencies_with_ttl(cache_key, patterns, ttl)
556 }
557
558 pub fn register_dependencies_with_ttl(
560 &self,
561 cache_key: CacheKey,
562 patterns: Vec<TriplePattern>,
563 ttl: Option<Duration>,
564 ) -> Result<()> {
565 self.dependency_graph
567 .register_dependencies_with_ttl(cache_key, patterns.clone(), ttl)?;
568
569 if let Some(bloom) = &self.bloom_filter {
571 for pattern in &patterns {
572 bloom.add(TriplePatternHash::from_pattern(pattern));
573 }
574 }
575
576 Ok(())
577 }
578
579 pub fn cleanup_expired<F>(&self, mut invalidate_fn: F) -> Result<usize>
581 where
582 F: FnMut(&CacheKey) -> Result<()>,
583 {
584 if !self.config.enable_ttl_cleanup {
585 return Ok(0);
586 }
587
588 let start_time = Instant::now();
589 let expired_entries = self.dependency_graph.find_expired_entries();
590 let expired_count = expired_entries.len();
591
592 for cache_key in &expired_entries {
594 invalidate_fn(cache_key)?;
595 self.dependency_graph.remove_entry(cache_key)?;
596 }
597
598 if self.config.enable_metrics {
600 let elapsed = start_time.elapsed();
601 self.metrics.ttl_cleanup_time.observe(elapsed);
602 self.metrics.ttl_evictions.add(expired_count as u64);
603 }
604
605 Ok(expired_count)
606 }
607
608 pub fn start_ttl_cleanup_task<F>(&self, invalidate_fn: F)
610 where
611 F: Fn(&CacheKey) -> Result<()> + Send + Sync + 'static,
612 {
613 if !self.config.enable_ttl_cleanup {
614 return;
615 }
616
617 let engine_clone = self.clone();
618 let interval_secs = self.config.ttl_cleanup_interval_secs;
619 let invalidate_fn = Arc::new(invalidate_fn);
620
621 std::thread::spawn(move || loop {
622 std::thread::sleep(Duration::from_secs(interval_secs));
623
624 let fn_clone = Arc::clone(&invalidate_fn);
625 if let Ok(count) = engine_clone.cleanup_expired(|key| fn_clone(key)) {
626 if count > 0 {
627 tracing::debug!("TTL cleanup removed {} expired cache entries", count);
628 }
629 }
630 });
631 }
632
633 pub fn remove_entry(&self, cache_key: &CacheKey) -> Result<()> {
635 self.dependency_graph.remove_entry(cache_key)
636 }
637
638 pub fn find_affected_entries(&self, triple: &TriplePattern) -> Result<HashSet<CacheKey>> {
640 let start_time = Instant::now();
641
642 let affected = match self.strategy {
643 InvalidationStrategy::BloomFilter { .. } => {
644 if let Some(bloom) = &self.bloom_filter {
646 let pattern_hash = TriplePatternHash::from_pattern(triple);
647 if bloom.might_contain(pattern_hash) {
648 self.dependency_graph.find_affected_entries(triple)
649 } else {
650 HashSet::new()
651 }
652 } else {
653 self.dependency_graph.find_affected_entries(triple)
654 }
655 }
656 _ => self.dependency_graph.find_affected_entries(triple),
657 };
658
659 if self.config.enable_metrics {
661 let elapsed = start_time.elapsed();
662 self.metrics.invalidation_time.observe(elapsed);
663 self.metrics
664 .entries_per_update
665 .observe(affected.len() as f64);
666 }
667
668 Ok(affected)
669 }
670
671 pub fn invalidate<F>(&self, triple: &TriplePattern, mut invalidate_fn: F) -> Result<()>
673 where
674 F: FnMut(&CacheKey) -> Result<()>,
675 {
676 let affected = self.find_affected_entries(triple)?;
677 let affected_count = affected.len();
678
679 match self.strategy {
680 InvalidationStrategy::Immediate => {
681 for cache_key in &affected {
683 invalidate_fn(cache_key)?;
684 self.dependency_graph.remove_entry(cache_key)?;
685 }
686 }
687 InvalidationStrategy::Batched {
688 batch_size,
689 max_delay_ms,
690 } => {
691 self.add_to_batch(affected, batch_size, max_delay_ms, &mut invalidate_fn)?;
693 }
694 InvalidationStrategy::BloomFilter { .. } => {
695 for cache_key in &affected {
697 invalidate_fn(cache_key)?;
698 self.dependency_graph.remove_entry(cache_key)?;
699 }
700 }
701 InvalidationStrategy::CostBased { threshold } => {
702 for cache_key in &affected {
704 if self.should_invalidate_cost_based(cache_key, threshold)? {
705 invalidate_fn(cache_key)?;
706 self.dependency_graph.remove_entry(cache_key)?;
707 }
708 }
709 }
710 }
711
712 if self.config.enable_metrics {
714 self.metrics.total_invalidations.add(affected_count as u64);
715 }
716
717 Ok(())
718 }
719
720 fn add_to_batch<F>(
722 &self,
723 entries: HashSet<CacheKey>,
724 batch_size: usize,
725 max_delay_ms: u64,
726 invalidate_fn: &mut F,
727 ) -> Result<()>
728 where
729 F: FnMut(&CacheKey) -> Result<()>,
730 {
731 let mut pending = self
732 .pending_invalidations
733 .write()
734 .map_err(|e| anyhow::anyhow!("Lock poisoned: {}", e))?;
735
736 pending.push_back(InvalidationBatch {
738 entries: entries.into_iter().collect(),
739 timestamp: Instant::now(),
740 });
741
742 let should_flush = pending.len() >= batch_size
744 || pending
745 .front()
746 .map(|b| b.timestamp.elapsed().as_millis() as u64 >= max_delay_ms)
747 .unwrap_or(false);
748
749 if should_flush {
750 self.flush_batches(&mut pending, invalidate_fn)?;
751 }
752
753 Ok(())
754 }
755
756 fn flush_batches<F>(
758 &self,
759 pending: &mut VecDeque<InvalidationBatch>,
760 invalidate_fn: &mut F,
761 ) -> Result<()>
762 where
763 F: FnMut(&CacheKey) -> Result<()>,
764 {
765 while let Some(batch) = pending.pop_front() {
766 for cache_key in &batch.entries {
767 invalidate_fn(cache_key)?;
768 self.dependency_graph.remove_entry(cache_key)?;
769 }
770 }
771 Ok(())
772 }
773
774 pub fn flush_pending<F>(&self, mut invalidate_fn: F) -> Result<()>
776 where
777 F: FnMut(&CacheKey) -> Result<()>,
778 {
779 let mut pending = self
780 .pending_invalidations
781 .write()
782 .map_err(|e| anyhow::anyhow!("Lock poisoned: {}", e))?;
783 self.flush_batches(&mut pending, &mut invalidate_fn)
784 }
785
786 fn should_invalidate_cost_based(&self, _cache_key: &CacheKey, _threshold: f64) -> Result<bool> {
788 Ok(true)
793 }
794
795 pub fn statistics(&self) -> InvalidationStatistics {
797 let graph_stats = self.dependency_graph.statistics();
798 let time_stats = self.metrics.invalidation_time.get_stats();
799 let overhead_stats = self.metrics.overhead_ratio.get_stats();
800 let entries_stats = self.metrics.entries_per_update.get_stats();
801 let ttl_cleanup_stats = self.metrics.ttl_cleanup_time.get_stats();
802
803 InvalidationStatistics {
804 strategy: self.strategy,
805 total_invalidations: self.metrics.total_invalidations.get(),
806 avg_invalidation_time_us: time_stats.mean,
807 overhead_ratio: overhead_stats.mean,
808 avg_entries_per_update: entries_stats.mean,
809 ttl_evictions: self.metrics.ttl_evictions.get(),
810 avg_ttl_cleanup_time_us: ttl_cleanup_stats.mean,
811 dependency_graph: graph_stats,
812 memory_usage_bytes: self.dependency_graph.memory_usage(),
813 }
814 }
815
816 pub fn clear(&self) -> Result<()> {
818 self.dependency_graph.clear();
819 if let Some(bloom) = &self.bloom_filter {
820 bloom.clear();
821 }
822 let mut pending = self
823 .pending_invalidations
824 .write()
825 .map_err(|e| anyhow::anyhow!("Lock poisoned: {}", e))?;
826 pending.clear();
827 Ok(())
828 }
829}
830
831#[derive(Debug, Clone, Serialize, Deserialize)]
833pub struct InvalidationStatistics {
834 pub strategy: InvalidationStrategy,
835 pub total_invalidations: u64,
836 pub avg_invalidation_time_us: f64,
837 pub overhead_ratio: f64,
838 pub avg_entries_per_update: f64,
839 pub ttl_evictions: u64,
840 pub avg_ttl_cleanup_time_us: f64,
841 pub dependency_graph: DependencyGraphStatistics,
842 pub memory_usage_bytes: usize,
843}
844
845impl Clone for InvalidationEngine {
846 fn clone(&self) -> Self {
847 Self {
848 dependency_graph: self.dependency_graph.clone(),
849 strategy: self.strategy,
850 bloom_filter: self.bloom_filter.clone(),
851 pending_invalidations: Arc::new(RwLock::new(VecDeque::new())),
852 metrics: self.metrics.clone(),
853 config: self.config.clone(),
854 }
855 }
856}
857
858pub trait RdfUpdateListener {
860 fn on_insert(&mut self, triple: &TriplePattern) -> Result<()>;
862
863 fn on_delete(&mut self, triple: &TriplePattern) -> Result<()>;
865
866 fn on_batch_insert(&mut self, triples: &[TriplePattern]) -> Result<()> {
868 for triple in triples {
869 self.on_insert(triple)?;
870 }
871 Ok(())
872 }
873
874 fn on_batch_delete(&mut self, triples: &[TriplePattern]) -> Result<()> {
876 for triple in triples {
877 self.on_delete(triple)?;
878 }
879 Ok(())
880 }
881}
882
883#[cfg(test)]
884mod tests {
885 use super::*;
886 use crate::algebra::{Term, Variable};
887
888 fn create_test_pattern(s: &str, p: &str, o: &str) -> TriplePattern {
889 TriplePattern {
890 subject: Term::Variable(Variable::new(s).expect("valid variable")),
891 predicate: Term::Variable(Variable::new(p).expect("valid variable")),
892 object: Term::Variable(Variable::new(o).expect("valid variable")),
893 }
894 }
895
896 #[test]
897 fn test_dependency_graph_basic() {
898 let graph = DependencyGraph::new();
899
900 let pattern1 = create_test_pattern("s", "p", "o");
901 let pattern2 = create_test_pattern("x", "y", "z");
902
903 graph
904 .register_dependencies("key1".to_string(), vec![pattern1.clone(), pattern2.clone()])
905 .unwrap();
906
907 let stats = graph.statistics();
908 assert_eq!(stats.entry_count, 1);
909 assert_eq!(stats.edge_count, 2);
910 }
911
912 #[test]
913 fn test_invalidation_engine_immediate() {
914 let engine = InvalidationEngine::new(InvalidationStrategy::Immediate);
915
916 let pattern = create_test_pattern("s", "p", "o");
917 engine
918 .register_dependencies("key1".to_string(), vec![pattern.clone()])
919 .unwrap();
920
921 let affected = engine.find_affected_entries(&pattern).unwrap();
922 assert_eq!(affected.len(), 1);
923 assert!(affected.contains("key1"));
924 }
925
926 #[test]
927 fn test_invalidation_engine_batched() {
928 let engine = InvalidationEngine::new(InvalidationStrategy::Batched {
929 batch_size: 10,
930 max_delay_ms: 100,
931 });
932
933 let pattern = create_test_pattern("s", "p", "o");
934 engine
935 .register_dependencies("key1".to_string(), vec![pattern.clone()])
936 .unwrap();
937
938 let mut invalidated = Vec::new();
939 engine
940 .invalidate(&pattern, |key| {
941 invalidated.push(key.clone());
942 Ok(())
943 })
944 .unwrap();
945
946 engine
948 .flush_pending(|key| {
949 invalidated.push(key.clone());
950 Ok(())
951 })
952 .unwrap();
953
954 assert!(!invalidated.is_empty());
956 }
957
958 #[test]
959 fn test_bloom_filter() {
960 let filter = BloomFilter::new(1000, 0.01);
961
962 let pattern = create_test_pattern("s", "p", "o");
963 let hash = TriplePatternHash::from_pattern(&pattern);
964
965 assert!(!filter.might_contain(hash));
967
968 filter.add(hash);
970 assert!(filter.might_contain(hash));
971 }
972
973 #[test]
974 fn test_remove_entry() {
975 let graph = DependencyGraph::new();
976
977 let pattern = create_test_pattern("s", "p", "o");
978 graph
979 .register_dependencies("key1".to_string(), vec![pattern.clone()])
980 .unwrap();
981
982 assert_eq!(graph.statistics().entry_count, 1);
983
984 graph.remove_entry(&"key1".to_string()).unwrap();
985
986 assert_eq!(graph.statistics().entry_count, 0);
987 }
988
989 #[test]
990 fn test_multiple_dependencies() {
991 let engine = InvalidationEngine::new(InvalidationStrategy::Immediate);
992
993 let pattern1 = create_test_pattern("s", "p", "o");
994 let pattern2 = create_test_pattern("x", "y", "z");
995
996 engine
997 .register_dependencies("key1".to_string(), vec![pattern1.clone()])
998 .unwrap();
999 engine
1000 .register_dependencies("key2".to_string(), vec![pattern1.clone(), pattern2.clone()])
1001 .unwrap();
1002
1003 let affected = engine.find_affected_entries(&pattern1).unwrap();
1005 assert_eq!(affected.len(), 2);
1006
1007 let affected2 = engine.find_affected_entries(&pattern2).unwrap();
1009 assert_eq!(affected2.len(), 1);
1010 assert!(affected2.contains("key2"));
1011 }
1012
1013 #[test]
1014 fn test_ttl_registration() {
1015 let config = InvalidationConfig {
1016 default_ttl: Some(Duration::from_secs(10)),
1017 ..Default::default()
1018 };
1019 let engine = InvalidationEngine::with_config(InvalidationStrategy::Immediate, config);
1020
1021 let pattern = create_test_pattern("s", "p", "o");
1022
1023 engine
1025 .register_dependencies("key1".to_string(), vec![pattern.clone()])
1026 .unwrap();
1027
1028 engine
1030 .register_dependencies_with_ttl(
1031 "key2".to_string(),
1032 vec![pattern.clone()],
1033 Some(Duration::from_secs(5)),
1034 )
1035 .unwrap();
1036
1037 let stats = engine.statistics();
1039 assert_eq!(stats.dependency_graph.entry_count, 2);
1040 }
1041
1042 #[test]
1043 fn test_ttl_expiration() {
1044 let config = InvalidationConfig {
1045 default_ttl: Some(Duration::from_millis(100)),
1046 enable_ttl_cleanup: true,
1047 ..Default::default()
1048 };
1049 let engine = InvalidationEngine::with_config(InvalidationStrategy::Immediate, config);
1050
1051 let pattern = create_test_pattern("s", "p", "o");
1052
1053 engine
1055 .register_dependencies_with_ttl(
1056 "key1".to_string(),
1057 vec![pattern.clone()],
1058 Some(Duration::from_millis(50)),
1059 )
1060 .unwrap();
1061
1062 let expired = engine.dependency_graph.find_expired_entries();
1064 assert_eq!(expired.len(), 0);
1065
1066 std::thread::sleep(Duration::from_millis(100));
1068
1069 let expired = engine.dependency_graph.find_expired_entries();
1071 assert_eq!(expired.len(), 1);
1072 assert!(expired.contains(&"key1".to_string()));
1073 }
1074
1075 #[test]
1076 fn test_ttl_cleanup() {
1077 let config = InvalidationConfig {
1078 default_ttl: Some(Duration::from_millis(50)),
1079 enable_ttl_cleanup: true,
1080 ..Default::default()
1081 };
1082 let engine = InvalidationEngine::with_config(InvalidationStrategy::Immediate, config);
1083
1084 let pattern = create_test_pattern("s", "p", "o");
1085
1086 for i in 0..5 {
1088 engine
1089 .register_dependencies(format!("key{}", i), vec![pattern.clone()])
1090 .unwrap();
1091 }
1092
1093 assert_eq!(engine.dependency_graph.statistics().entry_count, 5);
1095
1096 std::thread::sleep(Duration::from_millis(100));
1098
1099 let mut removed_keys = Vec::new();
1101 let count = engine
1102 .cleanup_expired(|key| {
1103 removed_keys.push(key.clone());
1104 Ok(())
1105 })
1106 .unwrap();
1107
1108 assert_eq!(count, 5);
1110 assert_eq!(removed_keys.len(), 5);
1111 assert_eq!(engine.dependency_graph.statistics().entry_count, 0);
1112 }
1113
1114 #[test]
1115 fn test_ttl_metadata() {
1116 let graph = DependencyGraph::new();
1117
1118 let pattern = create_test_pattern("s", "p", "o");
1119 let ttl = Duration::from_secs(60);
1120
1121 graph
1122 .register_dependencies_with_ttl("key1".to_string(), vec![pattern], Some(ttl))
1123 .unwrap();
1124
1125 let ttl_info = graph.get_ttl_info(&"key1".to_string());
1127 assert!(ttl_info.is_some());
1128
1129 let (elapsed, remaining) = ttl_info.unwrap();
1130 assert!(elapsed < ttl);
1131 assert!(remaining.is_some());
1132 assert!(remaining.unwrap() <= ttl);
1133 }
1134
1135 #[test]
1136 fn test_mixed_ttl_no_ttl() {
1137 let config = InvalidationConfig {
1138 default_ttl: None,
1139 enable_ttl_cleanup: true,
1140 ..Default::default()
1141 };
1142 let engine = InvalidationEngine::with_config(InvalidationStrategy::Immediate, config);
1143
1144 let pattern = create_test_pattern("s", "p", "o");
1145
1146 engine
1148 .register_dependencies("key_no_ttl".to_string(), vec![pattern.clone()])
1149 .unwrap();
1150
1151 engine
1153 .register_dependencies_with_ttl(
1154 "key_with_ttl".to_string(),
1155 vec![pattern.clone()],
1156 Some(Duration::from_millis(50)),
1157 )
1158 .unwrap();
1159
1160 std::thread::sleep(Duration::from_millis(100));
1162
1163 let expired = engine.dependency_graph.find_expired_entries();
1165 assert_eq!(expired.len(), 1);
1166 assert!(expired.contains(&"key_with_ttl".to_string()));
1167 assert!(!expired.contains(&"key_no_ttl".to_string()));
1168 }
1169
1170 #[test]
1171 fn test_ttl_statistics() {
1172 let config = InvalidationConfig {
1173 default_ttl: Some(Duration::from_millis(50)),
1174 enable_ttl_cleanup: true,
1175 enable_metrics: true,
1176 ..Default::default()
1177 };
1178 let engine = InvalidationEngine::with_config(InvalidationStrategy::Immediate, config);
1179
1180 let pattern = create_test_pattern("s", "p", "o");
1181
1182 for i in 0..3 {
1184 engine
1185 .register_dependencies(format!("key{}", i), vec![pattern.clone()])
1186 .unwrap();
1187 }
1188
1189 std::thread::sleep(Duration::from_millis(100));
1191
1192 let _count = engine.cleanup_expired(|_| Ok(())).unwrap();
1194
1195 let stats = engine.statistics();
1197 assert_eq!(stats.ttl_evictions, 3);
1198 assert!(stats.avg_ttl_cleanup_time_us > 0.0);
1199 }
1200}