pub mod async_batch;
pub mod delta_computation;
pub mod lazy_propagation;
pub use lazy_propagation::{
LazyPropagationConfig, LazyPropagationEngine, PendingUpdate, PropagationResult,
PropagationStats, UpdateStatus,
};
pub use delta_computation::{
ChangeType, DeltaComputationConfig, DeltaComputer, DeltaStatistics, EdgeModification,
EdgeSnapshot, GraphDelta, GraphSnapshot, HashAlgorithm, NodeModification, NodeSnapshot,
PropertyChange,
};
pub use async_batch::{
AsyncBatchConfig, AsyncBatchUpdater, BatchResult, BatchStatistics, BatchStatus, OperationType,
UpdateBatch, UpdateData, UpdateOperation,
};
use crate::{GraphRAGError, Result};
use parking_lot::RwLock;
use petgraph::graph::{DiGraph, NodeIndex};
use petgraph::visit::EdgeRef;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
#[derive(Clone)]
pub struct IncrementalGraphManager {
graph: Arc<RwLock<DiGraph<GraphNode, GraphEdge>>>,
node_index: Arc<RwLock<HashMap<String, NodeIndex>>>,
update_history: Arc<RwLock<Vec<UpdateRecord>>>,
config: IncrementalConfig,
change_detector: Arc<RwLock<ChangeDetector>>,
lazy_propagation: Arc<LazyPropagationEngine>,
delta_computer: Arc<DeltaComputer>,
last_snapshot: Arc<RwLock<Option<GraphSnapshot>>>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct IncrementalConfig {
pub auto_detect_changes: bool,
pub min_entity_confidence: f32,
pub max_batch_size: usize,
pub parallel_updates: bool,
pub conflict_resolution: ConflictResolution,
pub enable_lazy_propagation: bool,
pub lazy_propagation_threshold: usize,
pub enable_delta_computation: bool,
pub delta_use_bloom_filter: bool,
}
impl Default for IncrementalConfig {
fn default() -> Self {
Self {
auto_detect_changes: true,
min_entity_confidence: 0.7,
max_batch_size: 1000,
parallel_updates: true,
conflict_resolution: ConflictResolution::LatestWins,
enable_lazy_propagation: true,
lazy_propagation_threshold: 100,
enable_delta_computation: true,
delta_use_bloom_filter: true,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum ConflictResolution {
LatestWins,
HighestConfidence,
Merge,
Manual,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct GraphNode {
pub id: String,
pub label: String,
pub node_type: NodeType,
pub attributes: HashMap<String, String>,
pub embeddings: Option<Vec<f32>>,
pub created_at: chrono::DateTime<chrono::Utc>,
pub updated_at: chrono::DateTime<chrono::Utc>,
pub version: u32,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum NodeType {
Entity,
Concept,
Document,
Chunk,
Summary,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct GraphEdge {
pub edge_type: EdgeType,
pub weight: f32,
pub attributes: HashMap<String, String>,
pub created_at: chrono::DateTime<chrono::Utc>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum EdgeType {
Related,
Contains,
References,
Derived,
Similar,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct UpdateRecord {
pub id: String,
pub timestamp: chrono::DateTime<chrono::Utc>,
pub update_type: UpdateType,
pub affected_nodes: Vec<String>,
pub affected_edges: Vec<(String, String)>,
pub metadata: HashMap<String, String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum UpdateType {
AddNode,
UpdateNode,
RemoveNode,
AddEdge,
UpdateEdge,
RemoveEdge,
BatchUpdate,
}
#[derive(Debug, Clone)]
struct ChangeDetector {
document_hashes: HashMap<String, String>,
#[allow(dead_code)]
entity_versions: HashMap<String, u32>,
}
impl IncrementalGraphManager {
pub fn new(config: IncrementalConfig) -> Self {
let lazy_propagation = Arc::new(LazyPropagationEngine::new(LazyPropagationConfig {
propagation_threshold: config.lazy_propagation_threshold,
max_delay_seconds: 300, propagate_on_query: true,
track_dependencies: true,
max_propagation_depth: 3,
}));
let delta_computer = Arc::new(DeltaComputer::new(DeltaComputationConfig {
use_bloom_filter: config.delta_use_bloom_filter,
bloom_false_positive_rate: 0.01,
parallel_computation: config.parallel_updates,
parallel_chunk_size: 1000,
detailed_tracking: true,
hash_algorithm: HashAlgorithm::Sha256,
}));
Self {
graph: Arc::new(RwLock::new(DiGraph::new())),
node_index: Arc::new(RwLock::new(HashMap::new())),
update_history: Arc::new(RwLock::new(Vec::new())),
config,
change_detector: Arc::new(RwLock::new(ChangeDetector {
document_hashes: HashMap::new(),
entity_versions: HashMap::new(),
})),
lazy_propagation,
delta_computer,
last_snapshot: Arc::new(RwLock::new(None)),
}
}
pub fn add_content(&mut self, content: &DocumentContent) -> Result<UpdateSummary> {
let start_time = chrono::Utc::now();
if !self.has_content_changed(content) {
return Ok(UpdateSummary {
nodes_added: 0,
nodes_updated: 0,
nodes_removed: 0,
edges_added: 0,
edges_updated: 0,
edges_removed: 0,
time_taken_ms: 0,
});
}
let extraction = self.extract_from_content(content)?;
let summary = self.apply_incremental_update(extraction)?;
self.record_update(UpdateRecord {
id: uuid::Uuid::new_v4().to_string(),
timestamp: start_time,
update_type: UpdateType::BatchUpdate,
affected_nodes: summary.get_affected_nodes(),
affected_edges: summary.get_affected_edges(),
metadata: HashMap::new(),
})?;
self.update_change_detector(content)?;
let time_taken = (chrono::Utc::now() - start_time).num_milliseconds() as u64;
Ok(UpdateSummary {
time_taken_ms: time_taken,
..summary
})
}
pub fn update_node(&mut self, node_id: &str, updates: NodeUpdate) -> Result<()> {
let index = self.node_index.read();
let node_idx = index.get(node_id).copied();
drop(index);
if let Some(node_idx) = node_idx {
let mut graph = self.graph.write();
if let Some(node) = graph.node_weight_mut(node_idx) {
match self.config.conflict_resolution {
ConflictResolution::LatestWins => {
if let Some(label) = updates.label {
node.label = label;
}
if let Some(attrs) = updates.attributes {
node.attributes.extend(attrs);
}
if let Some(emb) = updates.embeddings {
node.embeddings = Some(emb);
}
},
ConflictResolution::HighestConfidence => {
},
ConflictResolution::Merge => {
if let Some(attrs) = updates.attributes {
for (key, value) in attrs {
node.attributes.entry(key).or_insert(value);
}
}
},
ConflictResolution::Manual => {
return Err(GraphRAGError::IncrementalUpdate {
message: "Manual conflict resolution required".to_string(),
});
},
}
node.updated_at = chrono::Utc::now();
node.version += 1;
}
drop(graph);
} else {
self.add_node(GraphNode {
id: node_id.to_string(),
label: updates.label.unwrap_or_default(),
node_type: updates.node_type.unwrap_or(NodeType::Entity),
attributes: updates.attributes.unwrap_or_default(),
embeddings: updates.embeddings,
created_at: chrono::Utc::now(),
updated_at: chrono::Utc::now(),
version: 1,
})?;
}
Ok(())
}
pub fn add_edge(&mut self, source: &str, target: &str, edge: GraphEdge) -> Result<()> {
let mut graph = self.graph.write();
let index = self.node_index.read();
if let (Some(&source_idx), Some(&target_idx)) = (index.get(source), index.get(target)) {
graph.add_edge(source_idx, target_idx, edge);
} else {
return Err(GraphRAGError::NotFound {
resource: "Node".to_string(),
id: format!("{} or {}", source, target),
});
}
Ok(())
}
pub fn remove_node(&mut self, node_id: &str) -> Result<()> {
let mut graph = self.graph.write();
let mut index = self.node_index.write();
if let Some(&node_idx) = index.get(node_id) {
graph.remove_node(node_idx);
index.remove(node_id);
}
Ok(())
}
pub fn stats(&self) -> GraphStats {
let graph = self.graph.read();
let history = self.update_history.read();
GraphStats {
node_count: graph.node_count(),
edge_count: graph.edge_count(),
update_count: history.len(),
last_update: history.last().map(|r| r.timestamp),
}
}
pub fn force_propagate_updates(&self) -> Result<PropagationResult> {
if !self.config.enable_lazy_propagation {
return Ok(PropagationResult {
updates_processed: 0,
updates_failed: 0,
time_taken_ms: 0,
dirty_nodes_cleared: 0,
dirty_edges_cleared: 0,
});
}
self.lazy_propagation
.propagate_pending_updates()
.map_err(|e| GraphRAGError::IncrementalUpdate {
message: format!("Lazy propagation failed: {}", e),
})
}
pub fn get_propagation_stats(&self) -> PropagationStats {
self.lazy_propagation.propagation_stats()
}
#[allow(dead_code)]
fn queue_lazy_update(
&self,
node_id: String,
affected_relationships: Vec<String>,
) -> Result<String> {
if !self.config.enable_lazy_propagation {
return Ok(String::new()); }
self.lazy_propagation
.queue_node_update(node_id, affected_relationships)
.map_err(|e| GraphRAGError::IncrementalUpdate {
message: format!("Failed to queue lazy update: {}", e),
})
}
pub fn create_snapshot(&self) -> GraphSnapshot {
let graph = self.graph.read();
let node_index = self.node_index.read();
let mut nodes = HashMap::new();
let mut edges = HashMap::new();
for (node_id, &node_idx) in node_index.iter() {
if let Some(node) = graph.node_weight(node_idx) {
let content_hash = self
.delta_computer
.hash_node_content(node_id, &node.attributes);
nodes.insert(
node_id.clone(),
NodeSnapshot {
node_id: node_id.clone(),
content_hash,
properties: node.attributes.clone(),
last_modified: node.updated_at,
},
);
}
}
for edge_ref in graph.edge_references() {
let source_id = Self::get_node_id_from_index(&node_index, &graph, edge_ref.source());
let target_id = Self::get_node_id_from_index(&node_index, &graph, edge_ref.target());
if let (Some(source), Some(target)) = (source_id, target_id) {
let edge_data = edge_ref.weight();
let content_hash = format!("{}-{}-{:?}", source, target, edge_data.edge_type);
edges.insert(
(source.clone(), target.clone()),
EdgeSnapshot {
source: source.clone(),
target: target.clone(),
edge_type: format!("{:?}", edge_data.edge_type),
content_hash,
properties: edge_data.attributes.clone(),
last_modified: edge_data.created_at,
},
);
}
}
self.delta_computer
.create_snapshot(uuid::Uuid::new_v4().to_string(), nodes, edges)
}
pub fn compute_delta_since_last_snapshot(&self) -> Result<Option<GraphDelta>> {
if !self.config.enable_delta_computation {
return Ok(None);
}
let last_snapshot = self.last_snapshot.read();
if last_snapshot.is_none() {
return Ok(None);
}
let before = last_snapshot.as_ref().expect("checked above");
let after = self.create_snapshot();
self.delta_computer
.compute_delta(before, &after)
.map(Some)
.map_err(|e| GraphRAGError::IncrementalUpdate {
message: format!("Delta computation failed: {}", e),
})
}
pub fn update_snapshot(&self) {
if self.config.enable_delta_computation {
let snapshot = self.create_snapshot();
*self.last_snapshot.write() = Some(snapshot);
}
}
fn get_node_id_from_index(
node_index: &HashMap<String, NodeIndex>,
_graph: &DiGraph<GraphNode, GraphEdge>,
idx: NodeIndex,
) -> Option<String> {
for (node_id, &node_idx) in node_index.iter() {
if node_idx == idx {
return Some(node_id.clone());
}
}
None
}
pub fn rollback(&mut self, version_id: &str) -> Result<()> {
let history = self.update_history.read();
let rollback_point = history
.iter()
.position(|r| r.id == version_id)
.ok_or_else(|| GraphRAGError::NotFound {
resource: "Version".to_string(),
id: version_id.to_string(),
})?;
let records_to_rollback: Vec<UpdateRecord> = history
.iter()
.skip(rollback_point + 1)
.rev()
.cloned()
.collect();
drop(history);
for record in &records_to_rollback {
self.apply_inverse_update(record)?;
}
let mut history_mut = self.update_history.write();
history_mut.truncate(rollback_point + 1);
Ok(())
}
pub fn add_node(&mut self, node: GraphNode) -> Result<NodeIndex> {
let mut graph = self.graph.write();
let mut index = self.node_index.write();
let node_id = node.id.clone();
let node_idx = graph.add_node(node);
index.insert(node_id, node_idx);
Ok(node_idx)
}
fn has_content_changed(&self, content: &DocumentContent) -> bool {
if !self.config.auto_detect_changes {
return true; }
let content_hash = self.hash_content(content);
self.change_detector
.read()
.document_hashes
.get(&content.id)
.map(|existing_hash| existing_hash != &content_hash)
.unwrap_or(true)
}
fn hash_content(&self, content: &DocumentContent) -> String {
use sha2::{Digest, Sha256};
let mut hasher = Sha256::new();
hasher.update(content.text.as_bytes());
format!("{:x}", hasher.finalize())
}
fn extract_from_content(&self, _content: &DocumentContent) -> Result<ExtractionResult> {
Ok(ExtractionResult {
entities: vec![],
relationships: vec![],
concepts: vec![],
})
}
fn apply_incremental_update(&mut self, extraction: ExtractionResult) -> Result<UpdateSummary> {
let mut summary = UpdateSummary::default();
for entity in extraction.entities {
if let Some(existing_id) = self.find_similar_entity(&entity) {
self.update_node(
&existing_id,
NodeUpdate {
label: Some(entity.name),
attributes: Some(entity.attributes),
embeddings: None,
node_type: None,
},
)?;
summary.nodes_updated += 1;
} else {
self.add_node(GraphNode {
id: uuid::Uuid::new_v4().to_string(),
label: entity.name,
node_type: NodeType::Entity,
attributes: entity.attributes,
embeddings: None,
created_at: chrono::Utc::now(),
updated_at: chrono::Utc::now(),
version: 1,
})?;
summary.nodes_added += 1;
}
}
for relationship in extraction.relationships {
self.add_edge(
&relationship.source,
&relationship.target,
GraphEdge {
edge_type: EdgeType::Related,
weight: relationship.confidence,
attributes: HashMap::new(),
created_at: chrono::Utc::now(),
},
)?;
summary.edges_added += 1;
}
Ok(summary)
}
fn find_similar_entity(&self, entity: &ExtractedEntity) -> Option<String> {
let index = self.node_index.read();
let graph = self.graph.read();
for (id, &node_idx) in index.iter() {
if let Some(node) = graph.node_weight(node_idx) {
if node.label.to_lowercase() == entity.name.to_lowercase() {
return Some(id.clone());
}
}
}
None
}
fn record_update(&mut self, record: UpdateRecord) -> Result<()> {
let mut history = self.update_history.write();
history.push(record);
if history.len() > 1000 {
history.drain(0..100);
}
Ok(())
}
fn update_change_detector(&mut self, content: &DocumentContent) -> Result<()> {
let hash = self.hash_content(content);
self.change_detector
.write()
.document_hashes
.insert(content.id.clone(), hash);
Ok(())
}
fn apply_inverse_update(&mut self, record: &UpdateRecord) -> Result<()> {
match record.update_type {
UpdateType::AddNode => {
for node_id in &record.affected_nodes {
self.remove_node(node_id)?;
}
},
UpdateType::RemoveNode => {
},
_ => {},
}
Ok(())
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DocumentContent {
pub id: String,
pub text: String,
pub metadata: HashMap<String, String>,
}
#[derive(Debug, Clone)]
pub struct NodeUpdate {
pub label: Option<String>,
pub attributes: Option<HashMap<String, String>>,
pub embeddings: Option<Vec<f32>>,
pub node_type: Option<NodeType>,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct UpdateSummary {
pub nodes_added: usize,
pub nodes_updated: usize,
pub nodes_removed: usize,
pub edges_added: usize,
pub edges_updated: usize,
pub edges_removed: usize,
pub time_taken_ms: u64,
}
impl UpdateSummary {
fn get_affected_nodes(&self) -> Vec<String> {
vec![] }
fn get_affected_edges(&self) -> Vec<(String, String)> {
vec![] }
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct GraphStats {
pub node_count: usize,
pub edge_count: usize,
pub update_count: usize,
pub last_update: Option<chrono::DateTime<chrono::Utc>>,
}
struct ExtractionResult {
entities: Vec<ExtractedEntity>,
relationships: Vec<ExtractedRelationship>,
#[allow(dead_code)]
concepts: Vec<ExtractedConcept>,
}
struct ExtractedEntity {
name: String,
#[allow(dead_code)]
entity_type: String,
attributes: HashMap<String, String>,
}
struct ExtractedRelationship {
source: String,
target: String,
#[allow(dead_code)]
relationship_type: String,
confidence: f32,
}
#[allow(dead_code)]
struct ExtractedConcept {
name: String,
description: String,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_incremental_update() {
let mut manager = IncrementalGraphManager::new(IncrementalConfig::default());
let content = DocumentContent {
id: "doc1".to_string(),
text: "Test content".to_string(),
metadata: HashMap::new(),
};
let summary = manager.add_content(&content).unwrap();
assert_eq!(summary.nodes_added, 0); }
#[test]
fn test_node_operations() {
let mut manager = IncrementalGraphManager::new(IncrementalConfig::default());
manager
.add_node(GraphNode {
id: "node1".to_string(),
label: "Test Node".to_string(),
node_type: NodeType::Entity,
attributes: HashMap::new(),
embeddings: None,
created_at: chrono::Utc::now(),
updated_at: chrono::Utc::now(),
version: 1,
})
.unwrap();
let stats = manager.stats();
assert_eq!(stats.node_count, 1);
manager.remove_node("node1").unwrap();
let stats = manager.stats();
assert_eq!(stats.node_count, 0);
}
}