use std::collections::{HashMap, HashSet};
use std::sync::{Arc, RwLock};
use std::time::{Duration, Instant};
#[derive(Debug, Clone)]
#[allow(dead_code)] pub enum InvalidationStrategy {
Manual,
TTL {
#[allow(dead_code)]
default_ttl: Duration,
#[allow(dead_code)] max_ttl: Duration,
},
TagBased {
#[allow(dead_code)]
sensitive_tags: HashSet<String>,
#[allow(dead_code)] propagation_delay: Duration,
},
Versioned {
#[allow(dead_code)]
track_data_version: bool,
#[allow(dead_code)]
track_schema_version: bool,
},
WriteThrough {
#[allow(dead_code)] watched_tables: HashSet<String>,
#[allow(dead_code)] immediate: bool,
},
DependencyBased {
#[allow(dead_code)]
max_depth: usize,
#[allow(dead_code)] transitive: bool,
},
}
#[derive(Debug, Clone)]
pub enum InvalidationEvent {
DataUpdate {
table: String,
#[allow(dead_code)] affected_rows: u64,
#[allow(dead_code)] columns: Vec<String>,
},
SchemaChange {
table: String,
#[allow(dead_code)] change_type: SchemaChangeType,
},
Manual {
tags: Vec<String>,
#[allow(dead_code)] reason: String,
},
MemoryPressure {
#[allow(dead_code)] current_usage: usize,
#[allow(dead_code)] max_usage: usize,
},
}
#[derive(Debug, Clone)]
#[allow(dead_code)] pub enum SchemaChangeType {
TableCreated,
TableDropped,
ColumnAdded,
ColumnDropped,
ColumnModified,
ConstraintAdded,
ConstraintDropped,
}
#[derive(Debug, Clone)]
#[allow(dead_code)] pub struct InvalidationResult {
pub entries_invalidated: usize,
pub memory_freed: usize,
pub duration: Duration,
pub strategy_used: String,
pub cascade_depth: usize,
}
#[derive(Debug, Clone)]
#[allow(dead_code)] pub struct CacheDependency {
pub entry_key: String,
pub depends_on: HashSet<String>, pub dependency_type: DependencyType,
pub last_validated: Instant,
}
#[derive(Debug, Clone)]
#[allow(dead_code)] pub enum DependencyType {
Table(String),
Schema(String),
Index(String),
Query(String),
Expression(u64), }
#[allow(dead_code)] pub struct InvalidationManager {
strategy: InvalidationStrategy,
dependencies: Arc<RwLock<HashMap<String, CacheDependency>>>,
reverse_deps: Arc<RwLock<HashMap<String, HashSet<String>>>>,
event_history: Arc<RwLock<Vec<(InvalidationEvent, InvalidationResult, Instant)>>>,
max_history_size: usize,
stats: Arc<RwLock<InvalidationStats>>,
}
#[derive(Debug, Default, Clone)]
#[allow(dead_code)] pub struct InvalidationStats {
pub total_events: u64,
pub total_invalidations: u64,
pub total_memory_freed: usize,
pub cascade_invalidations: u64,
pub average_cascade_depth: f64,
pub false_positives: u64, }
impl InvalidationManager {
pub fn new(strategy: InvalidationStrategy, max_history_size: usize) -> Self {
Self {
strategy,
dependencies: Arc::new(RwLock::new(HashMap::new())),
reverse_deps: Arc::new(RwLock::new(HashMap::new())),
event_history: Arc::new(RwLock::new(Vec::new())),
max_history_size,
stats: Arc::new(RwLock::new(InvalidationStats::default())),
}
}
#[allow(dead_code)] pub fn register_dependency(&self, entry_key: String, dependency: CacheDependency) {
let dep_key = self.dependency_key(&dependency.dependency_type);
{
let mut dependencies = self.dependencies.write().unwrap();
dependencies.insert(entry_key.clone(), dependency);
}
{
let mut reverse_deps = self.reverse_deps.write().unwrap();
reverse_deps
.entry(dep_key)
.or_insert_with(HashSet::new)
.insert(entry_key);
}
}
pub fn handle_event(&self, event: InvalidationEvent) -> InvalidationResult {
let start_time = Instant::now();
let mut _entries_invalidated = 0;
let memory_freed = 0;
let mut cascade_depth = 0;
let affected_entries = match &event {
InvalidationEvent::DataUpdate { table, .. } => {
self.find_dependent_entries(&DependencyType::Table(table.clone()))
}
InvalidationEvent::SchemaChange { table, .. } => {
let mut affected =
self.find_dependent_entries(&DependencyType::Table(table.clone()));
affected
.extend(self.find_dependent_entries(&DependencyType::Schema(table.clone())));
affected
}
InvalidationEvent::Manual { tags, .. } => self.find_entries_by_tags(tags),
InvalidationEvent::MemoryPressure { .. } => {
HashSet::new()
}
};
_entries_invalidated = affected_entries.len();
if self.should_cascade(&event) {
let cascaded = self.cascade_invalidation(&affected_entries);
_entries_invalidated += cascaded.len();
cascade_depth = self.calculate_cascade_depth(&affected_entries, &cascaded);
}
let duration = start_time.elapsed();
let result = InvalidationResult {
entries_invalidated: _entries_invalidated,
memory_freed,
duration,
strategy_used: self.strategy_name(),
cascade_depth,
};
self.record_event_result(event, result.clone());
{
let mut stats = self.stats.write().unwrap();
stats.total_events += 1;
stats.total_invalidations += _entries_invalidated as u64;
stats.total_memory_freed += memory_freed;
if cascade_depth > 0 {
stats.cascade_invalidations += 1;
stats.average_cascade_depth = (stats.average_cascade_depth
* (stats.cascade_invalidations - 1) as f64
+ cascade_depth as f64)
/ stats.cascade_invalidations as f64;
}
}
result
}
#[allow(dead_code)] pub fn cleanup_expired_dependencies(&self, max_age: Duration) {
let cutoff_time = Instant::now() - max_age;
let mut expired_keys = Vec::new();
{
let dependencies = self.dependencies.read().unwrap();
for (key, dep) in dependencies.iter() {
if dep.last_validated < cutoff_time {
expired_keys.push(key.clone());
}
}
}
for key in expired_keys {
self.remove_dependency(&key);
}
}
#[allow(dead_code)] pub fn stats(&self) -> InvalidationStats {
self.stats.read().unwrap().clone()
}
#[allow(dead_code)] pub fn recent_events(
&self,
limit: usize,
) -> Vec<(InvalidationEvent, InvalidationResult, Instant)> {
let history = self.event_history.read().unwrap();
let limit = limit.min(history.len());
history.iter().rev().take(limit).cloned().collect()
}
fn find_dependent_entries(&self, dependency_type: &DependencyType) -> HashSet<String> {
let dep_key = self.dependency_key(dependency_type);
self.reverse_deps
.read()
.unwrap()
.get(&dep_key)
.cloned()
.unwrap_or_default()
}
fn find_entries_by_tags(&self, _tags: &[String]) -> HashSet<String> {
HashSet::new()
}
#[allow(dead_code)] fn find_version_sensitive_entries(&self) -> HashSet<String> {
let dependencies = self.dependencies.read().unwrap();
dependencies.keys().cloned().collect()
}
fn should_cascade(&self, _event: &InvalidationEvent) -> bool {
match &self.strategy {
InvalidationStrategy::DependencyBased { transitive, .. } => *transitive,
InvalidationStrategy::TagBased { .. } => true,
_ => false,
}
}
fn cascade_invalidation(&self, initial_entries: &HashSet<String>) -> HashSet<String> {
let mut cascaded = HashSet::new();
let mut to_process: Vec<String> = initial_entries.iter().cloned().collect();
let mut processed = HashSet::new();
let max_depth = match &self.strategy {
InvalidationStrategy::DependencyBased { max_depth, .. } => *max_depth,
_ => 3, };
let mut current_depth = 0;
while !to_process.is_empty() && current_depth < max_depth {
let current_batch = to_process;
to_process = Vec::new();
current_depth += 1;
for entry in current_batch {
if processed.contains(&entry) {
continue;
}
processed.insert(entry.clone());
let dependent_entries = self.find_entries_dependent_on(&entry);
for dep in dependent_entries {
if !cascaded.contains(&dep) && !initial_entries.contains(&dep) {
cascaded.insert(dep.clone());
to_process.push(dep);
}
}
}
}
cascaded
}
fn find_entries_dependent_on(&self, _entry_key: &str) -> HashSet<String> {
HashSet::new()
}
fn calculate_cascade_depth(
&self,
initial: &HashSet<String>,
cascaded: &HashSet<String>,
) -> usize {
if cascaded.is_empty() {
0
} else {
(cascaded.len() as f64 / initial.len() as f64).ceil() as usize
}
}
fn dependency_key(&self, dependency_type: &DependencyType) -> String {
match dependency_type {
DependencyType::Table(name) => format!("table:{}", name),
DependencyType::Schema(name) => format!("schema:{}", name),
DependencyType::Index(name) => format!("index:{}", name),
DependencyType::Query(query) => format!("query:{}", query),
DependencyType::Expression(hash) => format!("expr:{}", hash),
}
}
fn strategy_name(&self) -> String {
match &self.strategy {
InvalidationStrategy::Manual => "Manual".to_string(),
InvalidationStrategy::TTL { .. } => "TTL".to_string(),
InvalidationStrategy::TagBased { .. } => "TagBased".to_string(),
InvalidationStrategy::Versioned { .. } => "Versioned".to_string(),
InvalidationStrategy::WriteThrough { .. } => "WriteThrough".to_string(),
InvalidationStrategy::DependencyBased { .. } => "DependencyBased".to_string(),
}
}
fn record_event_result(&self, event: InvalidationEvent, result: InvalidationResult) {
let mut history = self.event_history.write().unwrap();
if history.len() >= self.max_history_size {
history.remove(0);
}
history.push((event, result, Instant::now()));
}
#[allow(dead_code)] fn remove_dependency(&self, entry_key: &str) {
if let Some(dependency) = self.dependencies.write().unwrap().remove(entry_key) {
let dep_key = self.dependency_key(&dependency.dependency_type);
let mut reverse_deps = self.reverse_deps.write().unwrap();
if let Some(entries) = reverse_deps.get_mut(&dep_key) {
entries.remove(entry_key);
if entries.is_empty() {
reverse_deps.remove(&dep_key);
}
}
}
}
}
impl InvalidationStrategy {
#[allow(dead_code)] pub fn default_ttl() -> Self {
Self::TTL {
default_ttl: Duration::from_secs(1800), max_ttl: Duration::from_secs(7200), }
}
#[allow(dead_code)] pub fn graph_tag_based() -> Self {
let mut sensitive_tags = HashSet::new();
sensitive_tags.insert("nodes".to_string());
sensitive_tags.insert("edges".to_string());
sensitive_tags.insert("schema".to_string());
Self::TagBased {
sensitive_tags,
propagation_delay: Duration::from_millis(100),
}
}
#[allow(dead_code)] pub fn full_versioned() -> Self {
Self::Versioned {
track_data_version: true,
track_schema_version: true,
}
}
#[allow(dead_code)] pub fn dependency_tracking() -> Self {
Self::DependencyBased {
max_depth: 3,
transitive: true,
}
}
}