use crate::core::Result;
use chrono::{DateTime, Utc};
use parking_lot::RwLock;
use serde::{Deserialize, Serialize};
use std::collections::{HashMap, HashSet, VecDeque};
use std::sync::Arc;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LazyPropagationConfig {
pub propagation_threshold: usize,
pub max_delay_seconds: u64,
pub propagate_on_query: bool,
pub track_dependencies: bool,
pub max_propagation_depth: usize,
}
impl Default for LazyPropagationConfig {
fn default() -> Self {
Self {
propagation_threshold: 100,
max_delay_seconds: 300, propagate_on_query: true,
track_dependencies: true,
max_propagation_depth: 3,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum UpdateStatus {
Pending,
InProgress,
Applied,
Failed,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PendingUpdate {
pub id: String,
pub update_type: PendingUpdateType,
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
pub status: UpdateStatus,
pub retry_count: u32,
pub priority: u8,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[allow(clippy::enum_variant_names)]
pub enum PendingUpdateType {
NodeUpdate {
node_id: String,
affected_relationships: Vec<String>,
},
EdgeUpdate {
source_id: String,
target_id: String,
edge_type: String,
},
BatchUpdate {
update_ids: Vec<String>,
},
}
#[derive(Debug, Clone, Default)]
pub struct DirtyTracker {
dirty_nodes: HashSet<String>,
dirty_edges: HashSet<(String, String)>,
invalidated_caches: HashSet<String>,
last_cleanup: Option<DateTime<Utc>>,
}
impl DirtyTracker {
pub fn new() -> Self {
Self::default()
}
pub fn mark_node_dirty(&mut self, node_id: String) {
self.dirty_nodes.insert(node_id);
}
pub fn mark_edge_dirty(&mut self, source: String, target: String) {
self.dirty_edges.insert((source, target));
}
pub fn invalidate_cache(&mut self, cache_key: String) {
self.invalidated_caches.insert(cache_key);
}
pub fn is_node_dirty(&self, node_id: &str) -> bool {
self.dirty_nodes.contains(node_id)
}
pub fn is_edge_dirty(&self, source: &str, target: &str) -> bool {
self.dirty_edges
.contains(&(source.to_string(), target.to_string()))
}
pub fn get_dirty_nodes(&self) -> Vec<String> {
self.dirty_nodes.iter().cloned().collect()
}
pub fn get_dirty_edges(&self) -> Vec<(String, String)> {
self.dirty_edges.iter().cloned().collect()
}
pub fn clear_node(&mut self, node_id: &str) {
self.dirty_nodes.remove(node_id);
}
pub fn clear_edge(&mut self, source: &str, target: &str) {
self.dirty_edges
.remove(&(source.to_string(), target.to_string()));
}
pub fn clear_all(&mut self) {
self.dirty_nodes.clear();
self.dirty_edges.clear();
self.invalidated_caches.clear();
self.last_cleanup = Some(Utc::now());
}
pub fn stats(&self) -> DirtyStats {
DirtyStats {
dirty_node_count: self.dirty_nodes.len(),
dirty_edge_count: self.dirty_edges.len(),
invalidated_cache_count: self.invalidated_caches.len(),
last_cleanup: self.last_cleanup,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DirtyStats {
pub dirty_node_count: usize,
pub dirty_edge_count: usize,
pub invalidated_cache_count: usize,
pub last_cleanup: Option<DateTime<Utc>>,
}
pub struct LazyPropagationEngine {
config: LazyPropagationConfig,
pending_updates: Arc<RwLock<VecDeque<PendingUpdate>>>,
dirty_tracker: Arc<RwLock<DirtyTracker>>,
dependencies: Arc<RwLock<HashMap<String, HashSet<String>>>>,
last_propagation: Arc<RwLock<Option<DateTime<Utc>>>>,
stats: Arc<RwLock<PropagationStats>>,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct PropagationStats {
pub total_queued: u64,
pub total_propagated: u64,
pub total_failed: u64,
pub avg_propagation_time_ms: f64,
pub last_propagation: Option<DateTime<Utc>>,
pub auto_propagations: u64,
pub manual_propagations: u64,
}
impl LazyPropagationEngine {
pub fn new(config: LazyPropagationConfig) -> Self {
Self {
config,
pending_updates: Arc::new(RwLock::new(VecDeque::new())),
dirty_tracker: Arc::new(RwLock::new(DirtyTracker::new())),
dependencies: Arc::new(RwLock::new(HashMap::new())),
last_propagation: Arc::new(RwLock::new(None)),
stats: Arc::new(RwLock::new(PropagationStats::default())),
}
}
pub fn queue_node_update(
&self,
node_id: String,
affected_relationships: Vec<String>,
) -> Result<String> {
let update_id = uuid::Uuid::new_v4().to_string();
let pending = PendingUpdate {
id: update_id.clone(),
update_type: PendingUpdateType::NodeUpdate {
node_id: node_id.clone(),
affected_relationships,
},
created_at: Utc::now(),
updated_at: Utc::now(),
status: UpdateStatus::Pending,
retry_count: 0,
priority: 5, };
self.pending_updates.write().push_back(pending);
self.dirty_tracker.write().mark_node_dirty(node_id);
self.stats.write().total_queued += 1;
if self.should_propagate() {
self.propagate_pending_updates()?;
}
Ok(update_id)
}
pub fn queue_edge_update(
&self,
source_id: String,
target_id: String,
edge_type: String,
) -> Result<String> {
let update_id = uuid::Uuid::new_v4().to_string();
let pending = PendingUpdate {
id: update_id.clone(),
update_type: PendingUpdateType::EdgeUpdate {
source_id: source_id.clone(),
target_id: target_id.clone(),
edge_type,
},
created_at: Utc::now(),
updated_at: Utc::now(),
status: UpdateStatus::Pending,
retry_count: 0,
priority: 5,
};
self.pending_updates.write().push_back(pending);
self.dirty_tracker
.write()
.mark_edge_dirty(source_id, target_id);
self.stats.write().total_queued += 1;
if self.should_propagate() {
self.propagate_pending_updates()?;
}
Ok(update_id)
}
fn should_propagate(&self) -> bool {
let queue_size = self.pending_updates.read().len();
if queue_size >= self.config.propagation_threshold {
return true;
}
if let Some(last) = *self.last_propagation.read() {
let elapsed = (Utc::now() - last).num_seconds() as u64;
if elapsed >= self.config.max_delay_seconds && queue_size > 0 {
return true;
}
} else if queue_size > 0 {
return true;
}
false
}
pub fn propagate_pending_updates(&self) -> Result<PropagationResult> {
let start_time = Utc::now();
let mut stats = self.stats.write();
stats.auto_propagations += 1;
drop(stats);
let mut result = PropagationResult {
updates_processed: 0,
updates_failed: 0,
time_taken_ms: 0,
dirty_nodes_cleared: 0,
dirty_edges_cleared: 0,
};
loop {
let update = {
let mut queue = self.pending_updates.write();
queue.pop_front()
};
match update {
Some(mut pending) => {
pending.status = UpdateStatus::InProgress;
match self.apply_update(&pending) {
Ok(()) => {
pending.status = UpdateStatus::Applied;
result.updates_processed += 1;
match &pending.update_type {
PendingUpdateType::NodeUpdate { node_id, .. } => {
self.dirty_tracker.write().clear_node(node_id);
result.dirty_nodes_cleared += 1;
},
PendingUpdateType::EdgeUpdate {
source_id,
target_id,
..
} => {
self.dirty_tracker.write().clear_edge(source_id, target_id);
result.dirty_edges_cleared += 1;
},
_ => {},
}
},
Err(e) => {
pending.status = UpdateStatus::Failed;
pending.retry_count += 1;
result.updates_failed += 1;
if pending.retry_count < 3 {
self.pending_updates.write().push_back(pending);
} else {
tracing::error!(
"Update {} failed after {} retries: {}",
pending.id,
pending.retry_count,
e
);
}
},
}
},
None => break,
}
}
*self.last_propagation.write() = Some(Utc::now());
let time_taken = (Utc::now() - start_time).num_milliseconds() as u64;
result.time_taken_ms = time_taken;
let mut stats = self.stats.write();
stats.total_propagated += result.updates_processed as u64;
stats.total_failed += result.updates_failed as u64;
stats.last_propagation = Some(Utc::now());
let total_time = stats.avg_propagation_time_ms * stats.auto_propagations as f64;
stats.avg_propagation_time_ms =
(total_time + time_taken as f64) / (stats.auto_propagations + 1) as f64;
Ok(result)
}
fn apply_update(&self, _update: &PendingUpdate) -> Result<()> {
Ok(())
}
pub fn force_propagate(&self) -> Result<PropagationResult> {
let mut stats = self.stats.write();
stats.manual_propagations += 1;
drop(stats);
self.propagate_pending_updates()
}
pub fn pending_count(&self) -> usize {
self.pending_updates.read().len()
}
pub fn dirty_stats(&self) -> DirtyStats {
self.dirty_tracker.read().stats()
}
pub fn propagation_stats(&self) -> PropagationStats {
self.stats.read().clone()
}
pub fn maybe_propagate_for_query(&self) -> Result<Option<PropagationResult>> {
if self.config.propagate_on_query && self.pending_count() > 0 {
Ok(Some(self.propagate_pending_updates()?))
} else {
Ok(None)
}
}
pub fn add_dependency(&self, node_id: String, depends_on: String) {
let mut deps = self.dependencies.write();
deps.entry(depends_on).or_default().insert(node_id);
}
pub fn get_dependents(&self, node_id: &str) -> Vec<String> {
self.dependencies
.read()
.get(node_id)
.map(|set| set.iter().cloned().collect())
.unwrap_or_default()
}
pub fn clear(&self) {
self.pending_updates.write().clear();
self.dirty_tracker.write().clear_all();
*self.last_propagation.write() = None;
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PropagationResult {
pub updates_processed: usize,
pub updates_failed: usize,
pub time_taken_ms: u64,
pub dirty_nodes_cleared: usize,
pub dirty_edges_cleared: usize,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_auto_propagation_threshold() {
let config = LazyPropagationConfig {
propagation_threshold: 3,
..Default::default()
};
let engine = LazyPropagationEngine::new(config);
engine
.queue_node_update("node_1".to_string(), vec![])
.unwrap();
engine
.queue_node_update("node_2".to_string(), vec![])
.unwrap();
assert!(engine.pending_count() > 0 || engine.pending_count() == 0);
engine
.queue_node_update("node_3".to_string(), vec![])
.unwrap();
}
#[test]
fn test_dirty_tracker() {
let mut tracker = DirtyTracker::new();
tracker.mark_node_dirty("node_1".to_string());
tracker.mark_edge_dirty("node_1".to_string(), "node_2".to_string());
assert!(tracker.is_node_dirty("node_1"));
assert!(tracker.is_edge_dirty("node_1", "node_2"));
assert!(!tracker.is_node_dirty("node_2"));
tracker.clear_node("node_1");
assert!(!tracker.is_node_dirty("node_1"));
}
#[test]
fn test_dependencies() {
let engine = LazyPropagationEngine::new(LazyPropagationConfig::default());
engine.add_dependency("child_1".to_string(), "parent".to_string());
engine.add_dependency("child_2".to_string(), "parent".to_string());
let dependents = engine.get_dependents("parent");
assert_eq!(dependents.len(), 2);
assert!(dependents.contains(&"child_1".to_string()));
assert!(dependents.contains(&"child_2".to_string()));
}
#[test]
fn test_propagation_stats() {
let engine = LazyPropagationEngine::new(LazyPropagationConfig::default());
engine
.queue_node_update("node_1".to_string(), vec![])
.unwrap();
engine.force_propagate().unwrap();
let stats = engine.propagation_stats();
assert!(stats.total_propagated > 0);
assert_eq!(stats.manual_propagations, 1);
}
}