use crate::algebra::TriplePattern;
use anyhow::Result;
use dashmap::DashMap;
use scirs2_core::metrics::{Counter, Histogram, Timer};
use serde::{Deserialize, Serialize};
use std::collections::{HashSet, VecDeque};
use std::hash::{Hash, Hasher};
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
use std::sync::{Arc, RwLock};
use std::time::{Duration, Instant};
#[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize)]
pub enum InvalidationStrategy {
Immediate,
Batched {
batch_size: usize,
max_delay_ms: u64,
},
BloomFilter {
expected_elements: usize,
false_positive_rate: f64,
},
CostBased {
threshold: f64,
},
}
impl Default for InvalidationStrategy {
fn default() -> Self {
Self::Batched {
batch_size: 100,
max_delay_ms: 50,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub struct TriplePatternHash(u64);
impl TriplePatternHash {
pub fn from_pattern(pattern: &TriplePattern) -> Self {
use std::collections::hash_map::DefaultHasher;
let mut hasher = DefaultHasher::new();
pattern.hash(&mut hasher);
Self(hasher.finish())
}
pub fn value(&self) -> u64 {
self.0
}
}
pub type CacheKey = String;
#[derive(Debug, Clone)]
pub struct CacheEntryMetadata {
pub created_at: Instant,
pub ttl: Option<Duration>,
pub dependencies: HashSet<TriplePatternHash>,
}
impl CacheEntryMetadata {
pub fn is_expired(&self) -> bool {
if let Some(ttl) = self.ttl {
self.created_at.elapsed() >= ttl
} else {
false
}
}
pub fn remaining_ttl(&self) -> Option<Duration> {
self.ttl.and_then(|ttl| {
let elapsed = self.created_at.elapsed();
if elapsed < ttl {
Some(ttl - elapsed)
} else {
None
}
})
}
}
#[derive(Debug, Clone)]
pub struct DependencyGraph {
pattern_to_entries: Arc<DashMap<TriplePatternHash, HashSet<CacheKey>>>,
entry_metadata: Arc<DashMap<CacheKey, CacheEntryMetadata>>,
stats: Arc<DependencyGraphStats>,
}
#[derive(Debug, Default)]
struct DependencyGraphStats {
pattern_count: AtomicUsize,
entry_count: AtomicUsize,
edge_count: AtomicUsize,
avg_deps_per_entry: AtomicU64,
}
impl DependencyGraph {
pub fn new() -> Self {
Self {
pattern_to_entries: Arc::new(DashMap::new()),
entry_metadata: Arc::new(DashMap::new()),
stats: Arc::new(DependencyGraphStats::default()),
}
}
pub fn register_dependencies(
&self,
cache_key: CacheKey,
patterns: Vec<TriplePattern>,
) -> Result<()> {
self.register_dependencies_with_ttl(cache_key, patterns, None)
}
pub fn register_dependencies_with_ttl(
&self,
cache_key: CacheKey,
patterns: Vec<TriplePattern>,
ttl: Option<Duration>,
) -> Result<()> {
if patterns.is_empty() {
return Ok(());
}
let pattern_hashes: HashSet<TriplePatternHash> = patterns
.iter()
.map(TriplePatternHash::from_pattern)
.collect();
let metadata = CacheEntryMetadata {
created_at: Instant::now(),
ttl,
dependencies: pattern_hashes.clone(),
};
let is_new_entry = !self.entry_metadata.contains_key(&cache_key);
self.entry_metadata.insert(cache_key.clone(), metadata);
for pattern_hash in &pattern_hashes {
self.pattern_to_entries
.entry(*pattern_hash)
.or_default()
.insert(cache_key.clone());
}
if is_new_entry {
self.stats.entry_count.fetch_add(1, Ordering::Relaxed);
}
self.stats
.edge_count
.fetch_add(pattern_hashes.len(), Ordering::Relaxed);
self.update_avg_deps();
Ok(())
}
pub fn remove_entry(&self, cache_key: &CacheKey) -> Result<()> {
if let Some((_, metadata)) = self.entry_metadata.remove(cache_key) {
for pattern_hash in &metadata.dependencies {
if let Some(mut entries) = self.pattern_to_entries.get_mut(pattern_hash) {
entries.remove(cache_key);
if entries.is_empty() {
drop(entries);
self.pattern_to_entries.remove(pattern_hash);
let _ = self.stats.pattern_count.fetch_update(
Ordering::Relaxed,
Ordering::Relaxed,
|val| Some(val.saturating_sub(1)),
);
}
}
}
let _ =
self.stats
.entry_count
.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |val| {
Some(val.saturating_sub(1))
});
let _ =
self.stats
.edge_count
.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |val| {
Some(val.saturating_sub(metadata.dependencies.len()))
});
self.update_avg_deps();
}
Ok(())
}
pub fn find_expired_entries(&self) -> Vec<CacheKey> {
self.entry_metadata
.iter()
.filter_map(|entry| {
if entry.value().is_expired() {
Some(entry.key().clone())
} else {
None
}
})
.collect()
}
pub fn get_ttl_info(&self, cache_key: &CacheKey) -> Option<(Duration, Option<Duration>)> {
self.entry_metadata.get(cache_key).and_then(|metadata| {
let elapsed = metadata.created_at.elapsed();
let remaining = metadata.remaining_ttl();
metadata.ttl.map(|_| (elapsed, remaining))
})
}
pub fn find_affected_entries(&self, pattern: &TriplePattern) -> HashSet<CacheKey> {
let pattern_hash = TriplePatternHash::from_pattern(pattern);
let mut affected = self
.pattern_to_entries
.get(&pattern_hash)
.map(|entries| entries.clone())
.unwrap_or_default();
for entry in self.pattern_to_entries.iter() {
if self.pattern_matches(*entry.key(), pattern) {
affected.extend(entry.value().iter().cloned());
}
}
affected
}
fn pattern_matches(
&self,
stored_hash: TriplePatternHash,
query_pattern: &TriplePattern,
) -> bool {
stored_hash == TriplePatternHash::from_pattern(query_pattern)
}
pub fn statistics(&self) -> DependencyGraphStatistics {
DependencyGraphStatistics {
pattern_count: self.stats.pattern_count.load(Ordering::Relaxed),
entry_count: self.stats.entry_count.load(Ordering::Relaxed),
edge_count: self.stats.edge_count.load(Ordering::Relaxed),
avg_deps_per_entry: f64::from_bits(
self.stats.avg_deps_per_entry.load(Ordering::Relaxed),
),
}
}
fn update_avg_deps(&self) {
let entries = self.stats.entry_count.load(Ordering::Relaxed);
if entries > 0 {
let edges = self.stats.edge_count.load(Ordering::Relaxed);
let avg = edges as f64 / entries as f64;
self.stats
.avg_deps_per_entry
.store(avg.to_bits(), Ordering::Relaxed);
}
}
pub fn clear(&self) {
self.pattern_to_entries.clear();
self.entry_metadata.clear();
self.stats.pattern_count.store(0, Ordering::Relaxed);
self.stats.entry_count.store(0, Ordering::Relaxed);
self.stats.edge_count.store(0, Ordering::Relaxed);
self.stats.avg_deps_per_entry.store(0, Ordering::Relaxed);
}
pub fn memory_usage(&self) -> usize {
let pattern_count = self.stats.pattern_count.load(Ordering::Relaxed);
let entry_count = self.stats.entry_count.load(Ordering::Relaxed);
let edge_count = self.stats.edge_count.load(Ordering::Relaxed);
pattern_count
.saturating_mul(24)
.saturating_add(entry_count.saturating_mul(24))
.saturating_add(edge_count.saturating_mul(48))
}
}
impl Default for DependencyGraph {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
pub struct DependencyGraphStatistics {
pub pattern_count: usize,
pub entry_count: usize,
pub edge_count: usize,
pub avg_deps_per_entry: f64,
}
struct BloomFilter {
bits: Vec<AtomicU64>,
num_hash_functions: usize,
bit_count: usize,
}
impl BloomFilter {
fn new(expected_elements: usize, false_positive_rate: f64) -> Self {
let m = Self::optimal_bit_count(expected_elements, false_positive_rate);
let k = Self::optimal_hash_count(expected_elements, m);
let num_u64s = (m + 63) / 64;
let bits = (0..num_u64s).map(|_| AtomicU64::new(0)).collect();
Self {
bits,
num_hash_functions: k,
bit_count: m,
}
}
fn optimal_bit_count(n: usize, p: f64) -> usize {
let ln2_squared = std::f64::consts::LN_2 * std::f64::consts::LN_2;
(-(n as f64 * p.ln()) / ln2_squared).ceil() as usize
}
fn optimal_hash_count(n: usize, m: usize) -> usize {
((m as f64 / n as f64) * std::f64::consts::LN_2).ceil() as usize
}
fn add(&self, pattern_hash: TriplePatternHash) {
for i in 0..self.num_hash_functions {
let bit_index = self.hash_i(pattern_hash, i) % self.bit_count;
let word_index = bit_index / 64;
let bit_offset = bit_index % 64;
self.bits[word_index].fetch_or(1u64 << bit_offset, Ordering::Relaxed);
}
}
fn might_contain(&self, pattern_hash: TriplePatternHash) -> bool {
for i in 0..self.num_hash_functions {
let bit_index = self.hash_i(pattern_hash, i) % self.bit_count;
let word_index = bit_index / 64;
let bit_offset = bit_index % 64;
let word = self.bits[word_index].load(Ordering::Relaxed);
if (word & (1u64 << bit_offset)) == 0 {
return false;
}
}
true
}
fn hash_i(&self, pattern_hash: TriplePatternHash, i: usize) -> usize {
let h1 = pattern_hash.value() as usize;
let h2 = (pattern_hash.value().wrapping_mul(2654435761)) as usize;
h1.wrapping_add(i.wrapping_mul(h2))
}
fn clear(&self) {
for word in &self.bits {
word.store(0, Ordering::Relaxed);
}
}
}
#[derive(Debug)]
struct InvalidationBatch {
entries: Vec<CacheKey>,
timestamp: Instant,
}
pub struct InvalidationEngine {
dependency_graph: DependencyGraph,
strategy: InvalidationStrategy,
bloom_filter: Option<Arc<BloomFilter>>,
pending_invalidations: Arc<RwLock<VecDeque<InvalidationBatch>>>,
metrics: InvalidationMetrics,
config: InvalidationConfig,
}
#[derive(Clone)]
struct InvalidationMetrics {
total_invalidations: Arc<Counter>,
invalidation_time: Arc<Timer>,
overhead_ratio: Arc<Histogram>,
entries_per_update: Arc<Histogram>,
ttl_evictions: Arc<Counter>,
ttl_cleanup_time: Arc<Timer>,
}
impl InvalidationMetrics {
fn new() -> Self {
Self {
total_invalidations: Arc::new(Counter::new("invalidation_total".to_string())),
invalidation_time: Arc::new(Timer::new("invalidation_time".to_string())),
overhead_ratio: Arc::new(Histogram::new("invalidation_overhead".to_string())),
entries_per_update: Arc::new(Histogram::new(
"invalidation_entries_per_update".to_string(),
)),
ttl_evictions: Arc::new(Counter::new("invalidation_ttl_evictions".to_string())),
ttl_cleanup_time: Arc::new(Timer::new("invalidation_ttl_cleanup_time".to_string())),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct InvalidationConfig {
pub enable_metrics: bool,
pub max_pending_batches: usize,
pub aggressive_matching: bool,
pub default_ttl: Option<Duration>,
pub enable_ttl_cleanup: bool,
pub ttl_cleanup_interval_secs: u64,
}
impl Default for InvalidationConfig {
fn default() -> Self {
Self {
enable_metrics: true,
max_pending_batches: 100,
aggressive_matching: false,
default_ttl: Some(Duration::from_secs(3600)), enable_ttl_cleanup: true,
ttl_cleanup_interval_secs: 300, }
}
}
impl InvalidationEngine {
pub fn new(strategy: InvalidationStrategy) -> Self {
Self::with_config(strategy, InvalidationConfig::default())
}
pub fn with_config(strategy: InvalidationStrategy, config: InvalidationConfig) -> Self {
let bloom_filter = match strategy {
InvalidationStrategy::BloomFilter {
expected_elements,
false_positive_rate,
} => Some(Arc::new(BloomFilter::new(
expected_elements,
false_positive_rate,
))),
_ => None,
};
Self {
dependency_graph: DependencyGraph::new(),
strategy,
bloom_filter,
pending_invalidations: Arc::new(RwLock::new(VecDeque::new())),
metrics: InvalidationMetrics::new(),
config,
}
}
pub fn register_dependencies(
&self,
cache_key: CacheKey,
patterns: Vec<TriplePattern>,
) -> Result<()> {
let ttl = self.config.default_ttl;
self.register_dependencies_with_ttl(cache_key, patterns, ttl)
}
pub fn register_dependencies_with_ttl(
&self,
cache_key: CacheKey,
patterns: Vec<TriplePattern>,
ttl: Option<Duration>,
) -> Result<()> {
self.dependency_graph
.register_dependencies_with_ttl(cache_key, patterns.clone(), ttl)?;
if let Some(bloom) = &self.bloom_filter {
for pattern in &patterns {
bloom.add(TriplePatternHash::from_pattern(pattern));
}
}
Ok(())
}
pub fn cleanup_expired<F>(&self, mut invalidate_fn: F) -> Result<usize>
where
F: FnMut(&CacheKey) -> Result<()>,
{
if !self.config.enable_ttl_cleanup {
return Ok(0);
}
let start_time = Instant::now();
let expired_entries = self.dependency_graph.find_expired_entries();
let expired_count = expired_entries.len();
for cache_key in &expired_entries {
invalidate_fn(cache_key)?;
self.dependency_graph.remove_entry(cache_key)?;
}
if self.config.enable_metrics {
let elapsed = start_time.elapsed();
self.metrics.ttl_cleanup_time.observe(elapsed);
self.metrics.ttl_evictions.add(expired_count as u64);
}
Ok(expired_count)
}
pub fn start_ttl_cleanup_task<F>(&self, invalidate_fn: F)
where
F: Fn(&CacheKey) -> Result<()> + Send + Sync + 'static,
{
if !self.config.enable_ttl_cleanup {
return;
}
let engine_clone = self.clone();
let interval_secs = self.config.ttl_cleanup_interval_secs;
let invalidate_fn = Arc::new(invalidate_fn);
std::thread::spawn(move || loop {
std::thread::sleep(Duration::from_secs(interval_secs));
let fn_clone = Arc::clone(&invalidate_fn);
if let Ok(count) = engine_clone.cleanup_expired(|key| fn_clone(key)) {
if count > 0 {
tracing::debug!("TTL cleanup removed {} expired cache entries", count);
}
}
});
}
pub fn remove_entry(&self, cache_key: &CacheKey) -> Result<()> {
self.dependency_graph.remove_entry(cache_key)
}
pub fn find_affected_entries(&self, triple: &TriplePattern) -> Result<HashSet<CacheKey>> {
let start_time = Instant::now();
let affected = match self.strategy {
InvalidationStrategy::BloomFilter { .. } => {
if let Some(bloom) = &self.bloom_filter {
let pattern_hash = TriplePatternHash::from_pattern(triple);
if bloom.might_contain(pattern_hash) {
self.dependency_graph.find_affected_entries(triple)
} else {
HashSet::new()
}
} else {
self.dependency_graph.find_affected_entries(triple)
}
}
_ => self.dependency_graph.find_affected_entries(triple),
};
if self.config.enable_metrics {
let elapsed = start_time.elapsed();
self.metrics.invalidation_time.observe(elapsed);
self.metrics
.entries_per_update
.observe(affected.len() as f64);
}
Ok(affected)
}
pub fn invalidate<F>(&self, triple: &TriplePattern, mut invalidate_fn: F) -> Result<()>
where
F: FnMut(&CacheKey) -> Result<()>,
{
let affected = self.find_affected_entries(triple)?;
let affected_count = affected.len();
match self.strategy {
InvalidationStrategy::Immediate => {
for cache_key in &affected {
invalidate_fn(cache_key)?;
self.dependency_graph.remove_entry(cache_key)?;
}
}
InvalidationStrategy::Batched {
batch_size,
max_delay_ms,
} => {
self.add_to_batch(affected, batch_size, max_delay_ms, &mut invalidate_fn)?;
}
InvalidationStrategy::BloomFilter { .. } => {
for cache_key in &affected {
invalidate_fn(cache_key)?;
self.dependency_graph.remove_entry(cache_key)?;
}
}
InvalidationStrategy::CostBased { threshold } => {
for cache_key in &affected {
if self.should_invalidate_cost_based(cache_key, threshold)? {
invalidate_fn(cache_key)?;
self.dependency_graph.remove_entry(cache_key)?;
}
}
}
}
if self.config.enable_metrics {
self.metrics.total_invalidations.add(affected_count as u64);
}
Ok(())
}
fn add_to_batch<F>(
&self,
entries: HashSet<CacheKey>,
batch_size: usize,
max_delay_ms: u64,
invalidate_fn: &mut F,
) -> Result<()>
where
F: FnMut(&CacheKey) -> Result<()>,
{
let mut pending = self
.pending_invalidations
.write()
.map_err(|e| anyhow::anyhow!("Lock poisoned: {}", e))?;
pending.push_back(InvalidationBatch {
entries: entries.into_iter().collect(),
timestamp: Instant::now(),
});
let should_flush = pending.len() >= batch_size
|| pending
.front()
.map(|b| b.timestamp.elapsed().as_millis() as u64 >= max_delay_ms)
.unwrap_or(false);
if should_flush {
self.flush_batches(&mut pending, invalidate_fn)?;
}
Ok(())
}
fn flush_batches<F>(
&self,
pending: &mut VecDeque<InvalidationBatch>,
invalidate_fn: &mut F,
) -> Result<()>
where
F: FnMut(&CacheKey) -> Result<()>,
{
while let Some(batch) = pending.pop_front() {
for cache_key in &batch.entries {
invalidate_fn(cache_key)?;
self.dependency_graph.remove_entry(cache_key)?;
}
}
Ok(())
}
pub fn flush_pending<F>(&self, mut invalidate_fn: F) -> Result<()>
where
F: FnMut(&CacheKey) -> Result<()>,
{
let mut pending = self
.pending_invalidations
.write()
.map_err(|e| anyhow::anyhow!("Lock poisoned: {}", e))?;
self.flush_batches(&mut pending, &mut invalidate_fn)
}
fn should_invalidate_cost_based(&self, _cache_key: &CacheKey, _threshold: f64) -> Result<bool> {
Ok(true)
}
pub fn statistics(&self) -> InvalidationStatistics {
let graph_stats = self.dependency_graph.statistics();
let time_stats = self.metrics.invalidation_time.get_stats();
let overhead_stats = self.metrics.overhead_ratio.get_stats();
let entries_stats = self.metrics.entries_per_update.get_stats();
let ttl_cleanup_stats = self.metrics.ttl_cleanup_time.get_stats();
InvalidationStatistics {
strategy: self.strategy,
total_invalidations: self.metrics.total_invalidations.get(),
avg_invalidation_time_us: time_stats.mean,
overhead_ratio: overhead_stats.mean,
avg_entries_per_update: entries_stats.mean,
ttl_evictions: self.metrics.ttl_evictions.get(),
avg_ttl_cleanup_time_us: ttl_cleanup_stats.mean,
dependency_graph: graph_stats,
memory_usage_bytes: self.dependency_graph.memory_usage(),
}
}
pub fn clear(&self) -> Result<()> {
self.dependency_graph.clear();
if let Some(bloom) = &self.bloom_filter {
bloom.clear();
}
let mut pending = self
.pending_invalidations
.write()
.map_err(|e| anyhow::anyhow!("Lock poisoned: {}", e))?;
pending.clear();
Ok(())
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct InvalidationStatistics {
pub strategy: InvalidationStrategy,
pub total_invalidations: u64,
pub avg_invalidation_time_us: f64,
pub overhead_ratio: f64,
pub avg_entries_per_update: f64,
pub ttl_evictions: u64,
pub avg_ttl_cleanup_time_us: f64,
pub dependency_graph: DependencyGraphStatistics,
pub memory_usage_bytes: usize,
}
impl Clone for InvalidationEngine {
fn clone(&self) -> Self {
Self {
dependency_graph: self.dependency_graph.clone(),
strategy: self.strategy,
bloom_filter: self.bloom_filter.clone(),
pending_invalidations: Arc::new(RwLock::new(VecDeque::new())),
metrics: self.metrics.clone(),
config: self.config.clone(),
}
}
}
pub trait RdfUpdateListener {
fn on_insert(&mut self, triple: &TriplePattern) -> Result<()>;
fn on_delete(&mut self, triple: &TriplePattern) -> Result<()>;
fn on_batch_insert(&mut self, triples: &[TriplePattern]) -> Result<()> {
for triple in triples {
self.on_insert(triple)?;
}
Ok(())
}
fn on_batch_delete(&mut self, triples: &[TriplePattern]) -> Result<()> {
for triple in triples {
self.on_delete(triple)?;
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::algebra::{Term, Variable};
fn create_test_pattern(s: &str, p: &str, o: &str) -> TriplePattern {
TriplePattern {
subject: Term::Variable(Variable::new(s).expect("valid variable")),
predicate: Term::Variable(Variable::new(p).expect("valid variable")),
object: Term::Variable(Variable::new(o).expect("valid variable")),
}
}
#[test]
fn test_dependency_graph_basic() {
let graph = DependencyGraph::new();
let pattern1 = create_test_pattern("s", "p", "o");
let pattern2 = create_test_pattern("x", "y", "z");
graph
.register_dependencies("key1".to_string(), vec![pattern1.clone(), pattern2.clone()])
.unwrap();
let stats = graph.statistics();
assert_eq!(stats.entry_count, 1);
assert_eq!(stats.edge_count, 2);
}
#[test]
fn test_invalidation_engine_immediate() {
let engine = InvalidationEngine::new(InvalidationStrategy::Immediate);
let pattern = create_test_pattern("s", "p", "o");
engine
.register_dependencies("key1".to_string(), vec![pattern.clone()])
.unwrap();
let affected = engine.find_affected_entries(&pattern).unwrap();
assert_eq!(affected.len(), 1);
assert!(affected.contains("key1"));
}
#[test]
fn test_invalidation_engine_batched() {
let engine = InvalidationEngine::new(InvalidationStrategy::Batched {
batch_size: 10,
max_delay_ms: 100,
});
let pattern = create_test_pattern("s", "p", "o");
engine
.register_dependencies("key1".to_string(), vec![pattern.clone()])
.unwrap();
let mut invalidated = Vec::new();
engine
.invalidate(&pattern, |key| {
invalidated.push(key.clone());
Ok(())
})
.unwrap();
engine
.flush_pending(|key| {
invalidated.push(key.clone());
Ok(())
})
.unwrap();
assert!(!invalidated.is_empty());
}
#[test]
fn test_bloom_filter() {
let filter = BloomFilter::new(1000, 0.01);
let pattern = create_test_pattern("s", "p", "o");
let hash = TriplePatternHash::from_pattern(&pattern);
assert!(!filter.might_contain(hash));
filter.add(hash);
assert!(filter.might_contain(hash));
}
#[test]
fn test_remove_entry() {
let graph = DependencyGraph::new();
let pattern = create_test_pattern("s", "p", "o");
graph
.register_dependencies("key1".to_string(), vec![pattern.clone()])
.unwrap();
assert_eq!(graph.statistics().entry_count, 1);
graph.remove_entry(&"key1".to_string()).unwrap();
assert_eq!(graph.statistics().entry_count, 0);
}
#[test]
fn test_multiple_dependencies() {
let engine = InvalidationEngine::new(InvalidationStrategy::Immediate);
let pattern1 = create_test_pattern("s", "p", "o");
let pattern2 = create_test_pattern("x", "y", "z");
engine
.register_dependencies("key1".to_string(), vec![pattern1.clone()])
.unwrap();
engine
.register_dependencies("key2".to_string(), vec![pattern1.clone(), pattern2.clone()])
.unwrap();
let affected = engine.find_affected_entries(&pattern1).unwrap();
assert_eq!(affected.len(), 2);
let affected2 = engine.find_affected_entries(&pattern2).unwrap();
assert_eq!(affected2.len(), 1);
assert!(affected2.contains("key2"));
}
#[test]
fn test_ttl_registration() {
let config = InvalidationConfig {
default_ttl: Some(Duration::from_secs(10)),
..Default::default()
};
let engine = InvalidationEngine::with_config(InvalidationStrategy::Immediate, config);
let pattern = create_test_pattern("s", "p", "o");
engine
.register_dependencies("key1".to_string(), vec![pattern.clone()])
.unwrap();
engine
.register_dependencies_with_ttl(
"key2".to_string(),
vec![pattern.clone()],
Some(Duration::from_secs(5)),
)
.unwrap();
let stats = engine.statistics();
assert_eq!(stats.dependency_graph.entry_count, 2);
}
#[test]
fn test_ttl_expiration() {
let config = InvalidationConfig {
default_ttl: Some(Duration::from_millis(100)),
enable_ttl_cleanup: true,
..Default::default()
};
let engine = InvalidationEngine::with_config(InvalidationStrategy::Immediate, config);
let pattern = create_test_pattern("s", "p", "o");
engine
.register_dependencies_with_ttl(
"key1".to_string(),
vec![pattern.clone()],
Some(Duration::from_millis(50)),
)
.unwrap();
let expired = engine.dependency_graph.find_expired_entries();
assert_eq!(expired.len(), 0);
std::thread::sleep(Duration::from_millis(100));
let expired = engine.dependency_graph.find_expired_entries();
assert_eq!(expired.len(), 1);
assert!(expired.contains(&"key1".to_string()));
}
#[test]
fn test_ttl_cleanup() {
let config = InvalidationConfig {
default_ttl: Some(Duration::from_millis(50)),
enable_ttl_cleanup: true,
..Default::default()
};
let engine = InvalidationEngine::with_config(InvalidationStrategy::Immediate, config);
let pattern = create_test_pattern("s", "p", "o");
for i in 0..5 {
engine
.register_dependencies(format!("key{}", i), vec![pattern.clone()])
.unwrap();
}
assert_eq!(engine.dependency_graph.statistics().entry_count, 5);
std::thread::sleep(Duration::from_millis(100));
let mut removed_keys = Vec::new();
let count = engine
.cleanup_expired(|key| {
removed_keys.push(key.clone());
Ok(())
})
.unwrap();
assert_eq!(count, 5);
assert_eq!(removed_keys.len(), 5);
assert_eq!(engine.dependency_graph.statistics().entry_count, 0);
}
#[test]
fn test_ttl_metadata() {
let graph = DependencyGraph::new();
let pattern = create_test_pattern("s", "p", "o");
let ttl = Duration::from_secs(60);
graph
.register_dependencies_with_ttl("key1".to_string(), vec![pattern], Some(ttl))
.unwrap();
let ttl_info = graph.get_ttl_info(&"key1".to_string());
assert!(ttl_info.is_some());
let (elapsed, remaining) = ttl_info.unwrap();
assert!(elapsed < ttl);
assert!(remaining.is_some());
assert!(remaining.unwrap() <= ttl);
}
#[test]
fn test_mixed_ttl_no_ttl() {
let config = InvalidationConfig {
default_ttl: None,
enable_ttl_cleanup: true,
..Default::default()
};
let engine = InvalidationEngine::with_config(InvalidationStrategy::Immediate, config);
let pattern = create_test_pattern("s", "p", "o");
engine
.register_dependencies("key_no_ttl".to_string(), vec![pattern.clone()])
.unwrap();
engine
.register_dependencies_with_ttl(
"key_with_ttl".to_string(),
vec![pattern.clone()],
Some(Duration::from_millis(50)),
)
.unwrap();
std::thread::sleep(Duration::from_millis(100));
let expired = engine.dependency_graph.find_expired_entries();
assert_eq!(expired.len(), 1);
assert!(expired.contains(&"key_with_ttl".to_string()));
assert!(!expired.contains(&"key_no_ttl".to_string()));
}
#[test]
fn test_ttl_statistics() {
let config = InvalidationConfig {
default_ttl: Some(Duration::from_millis(50)),
enable_ttl_cleanup: true,
enable_metrics: true,
..Default::default()
};
let engine = InvalidationEngine::with_config(InvalidationStrategy::Immediate, config);
let pattern = create_test_pattern("s", "p", "o");
for i in 0..3 {
engine
.register_dependencies(format!("key{}", i), vec![pattern.clone()])
.unwrap();
}
std::thread::sleep(Duration::from_millis(100));
let _count = engine.cleanup_expired(|_| Ok(())).unwrap();
let stats = engine.statistics();
assert_eq!(stats.ttl_evictions, 3);
assert!(stats.avg_ttl_cleanup_time_us > 0.0);
}
}