1use crate::core::{
25 DocumentId, Entity, EntityId, GraphRAGError, KnowledgeGraph, Relationship, Result, TextChunk,
26};
27use chrono::{DateTime, Utc};
28use serde::{Deserialize, Serialize};
29use std::collections::{HashMap, HashSet};
30use std::time::{Duration, Instant};
31
32#[cfg(feature = "incremental")]
33use std::sync::Arc;
34
35#[cfg(feature = "incremental")]
36use {
37 dashmap::DashMap,
38 parking_lot::{Mutex, RwLock},
39 tokio::sync::{broadcast, Semaphore},
40 uuid::Uuid,
41};
42
43#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
49pub struct UpdateId(String);
50
51impl UpdateId {
52 pub fn new() -> Self {
54 #[cfg(feature = "incremental")]
55 {
56 Self(Uuid::new_v4().to_string())
57 }
58 #[cfg(not(feature = "incremental"))]
59 {
60 Self(format!(
61 "update_{}",
62 Utc::now().timestamp_nanos_opt().unwrap_or(0)
63 ))
64 }
65 }
66
67 pub fn from_string(id: String) -> Self {
69 Self(id)
70 }
71
72 pub fn as_str(&self) -> &str {
74 &self.0
75 }
76}
77
78impl std::fmt::Display for UpdateId {
79 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
80 write!(f, "{}", self.0)
81 }
82}
83
84impl Default for UpdateId {
85 fn default() -> Self {
86 Self::new()
87 }
88}
89
90#[derive(Debug, Clone, Serialize, Deserialize)]
92pub struct ChangeRecord {
93 pub change_id: UpdateId,
95 pub timestamp: DateTime<Utc>,
97 pub change_type: ChangeType,
99 pub entity_id: Option<EntityId>,
101 pub document_id: Option<DocumentId>,
103 pub operation: Operation,
105 pub data: ChangeData,
107 pub metadata: HashMap<String, String>,
109}
110
111#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
113pub enum ChangeType {
114 EntityAdded,
116 EntityUpdated,
118 EntityRemoved,
120 RelationshipAdded,
122 RelationshipUpdated,
124 RelationshipRemoved,
126 DocumentAdded,
128 DocumentUpdated,
130 DocumentRemoved,
132 ChunkAdded,
134 ChunkUpdated,
136 ChunkRemoved,
138 EmbeddingAdded,
140 EmbeddingUpdated,
142 EmbeddingRemoved,
144}
145
146#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
148pub enum Operation {
149 Insert,
151 Update,
153 Delete,
155 Upsert,
157}
158
159#[derive(Debug, Clone, Serialize, Deserialize)]
161pub enum ChangeData {
162 Entity(Entity),
164 Relationship(Relationship),
166 Document(Document),
168 Chunk(Box<TextChunk>),
170 Embedding {
172 entity_id: EntityId,
174 embedding: Vec<f32>,
176 },
177 Empty,
179}
180
181#[derive(Debug, Clone, Serialize, Deserialize)]
183pub struct Document {
184 pub id: DocumentId,
186 pub title: String,
188 pub content: String,
190 pub metadata: HashMap<String, String>,
192}
193
194#[derive(Debug, Clone, Serialize, Deserialize)]
196pub struct GraphDelta {
197 pub delta_id: UpdateId,
199 pub timestamp: DateTime<Utc>,
201 pub changes: Vec<ChangeRecord>,
203 pub dependencies: Vec<UpdateId>,
205 pub status: DeltaStatus,
207 pub rollback_data: Option<RollbackData>,
209}
210
211#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
213pub enum DeltaStatus {
214 Pending,
216 Applied,
218 Committed,
220 RolledBack,
222 Failed {
224 error: String,
226 },
227}
228
229#[derive(Debug, Clone, Serialize, Deserialize)]
231pub struct RollbackData {
232 pub previous_entities: Vec<Entity>,
234 pub previous_relationships: Vec<Relationship>,
236 pub affected_caches: Vec<String>,
238}
239
240#[derive(Debug, Clone, Serialize, Deserialize)]
242pub enum ConflictStrategy {
243 KeepExisting,
245 KeepNew,
247 Merge,
249 LLMDecision,
251 UserPrompt,
253 Custom(String),
255}
256
257#[derive(Debug, Clone, Serialize, Deserialize)]
259pub struct Conflict {
260 pub conflict_id: UpdateId,
262 pub conflict_type: ConflictType,
264 pub existing_data: ChangeData,
266 pub new_data: ChangeData,
268 pub resolution: Option<ConflictResolution>,
270}
271
272#[derive(Debug, Clone, Serialize, Deserialize)]
274pub enum ConflictType {
275 EntityExists,
277 RelationshipExists,
279 VersionMismatch,
281 DataInconsistency,
283 ConstraintViolation,
285}
286
287#[derive(Debug, Clone, Serialize, Deserialize)]
289pub struct ConflictResolution {
290 pub strategy: ConflictStrategy,
292 pub resolved_data: ChangeData,
294 pub metadata: HashMap<String, String>,
296}
297
298#[async_trait::async_trait]
304pub trait IncrementalGraphStore: Send + Sync {
305 type Error: std::error::Error + Send + Sync + 'static;
307
308 async fn upsert_entity(&mut self, entity: Entity) -> Result<UpdateId>;
310
311 async fn upsert_relationship(&mut self, relationship: Relationship) -> Result<UpdateId>;
313
314 async fn delete_entity(&mut self, entity_id: &EntityId) -> Result<UpdateId>;
316
317 async fn delete_relationship(
319 &mut self,
320 source: &EntityId,
321 target: &EntityId,
322 relation_type: &str,
323 ) -> Result<UpdateId>;
324
325 async fn apply_delta(&mut self, delta: GraphDelta) -> Result<UpdateId>;
327
328 async fn rollback_delta(&mut self, delta_id: &UpdateId) -> Result<()>;
330
331 async fn get_change_log(&self, since: Option<DateTime<Utc>>) -> Result<Vec<ChangeRecord>>;
333
334 async fn begin_transaction(&mut self) -> Result<TransactionId>;
336
337 async fn commit_transaction(&mut self, tx_id: TransactionId) -> Result<()>;
339
340 async fn rollback_transaction(&mut self, tx_id: TransactionId) -> Result<()>;
342
343 async fn batch_upsert_entities(
345 &mut self,
346 entities: Vec<Entity>,
347 _strategy: ConflictStrategy,
348 ) -> Result<Vec<UpdateId>>;
349
350 async fn batch_upsert_relationships(
352 &mut self,
353 relationships: Vec<Relationship>,
354 _strategy: ConflictStrategy,
355 ) -> Result<Vec<UpdateId>>;
356
357 async fn update_entity_embedding(
359 &mut self,
360 entity_id: &EntityId,
361 embedding: Vec<f32>,
362 ) -> Result<UpdateId>;
363
364 async fn bulk_update_embeddings(
366 &mut self,
367 updates: Vec<(EntityId, Vec<f32>)>,
368 ) -> Result<Vec<UpdateId>>;
369
370 async fn get_pending_transactions(&self) -> Result<Vec<TransactionId>>;
372
373 async fn get_graph_statistics(&self) -> Result<GraphStatistics>;
375
376 async fn validate_consistency(&self) -> Result<ConsistencyReport>;
378}
379
380#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
382pub struct TransactionId(String);
383
384impl TransactionId {
385 pub fn new() -> Self {
387 #[cfg(feature = "incremental")]
388 {
389 Self(Uuid::new_v4().to_string())
390 }
391 #[cfg(not(feature = "incremental"))]
392 {
393 Self(format!(
394 "tx_{}",
395 Utc::now().timestamp_nanos_opt().unwrap_or(0)
396 ))
397 }
398 }
399
400 pub fn as_str(&self) -> &str {
402 &self.0
403 }
404}
405
406impl std::fmt::Display for TransactionId {
407 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
408 write!(f, "{}", self.0)
409 }
410}
411
412impl Default for TransactionId {
413 fn default() -> Self {
414 Self::new()
415 }
416}
417
418#[derive(Debug, Clone, Serialize, Deserialize)]
420pub struct GraphStatistics {
421 pub node_count: usize,
423 pub edge_count: usize,
425 pub average_degree: f64,
427 pub max_degree: usize,
429 pub connected_components: usize,
431 pub clustering_coefficient: f64,
433 pub last_updated: DateTime<Utc>,
435}
436
437#[derive(Debug, Clone, Serialize, Deserialize)]
439pub struct ConsistencyReport {
440 pub is_consistent: bool,
442 pub orphaned_entities: Vec<EntityId>,
444 pub broken_relationships: Vec<(EntityId, EntityId, String)>,
446 pub missing_embeddings: Vec<EntityId>,
448 pub validation_time: DateTime<Utc>,
450 pub issues_found: usize,
452}
453
454#[derive(Debug, Clone)]
460pub enum InvalidationStrategy {
461 Selective(Vec<String>),
463 Regional(String),
465 Global,
467 Relational(EntityId, u32), }
470
471#[derive(Debug, Clone)]
473pub struct CacheRegion {
474 pub region_id: String,
476 pub entity_ids: HashSet<EntityId>,
478 pub relationship_types: HashSet<String>,
480 pub document_ids: HashSet<DocumentId>,
482 pub last_modified: DateTime<Utc>,
484}
485
486#[cfg(feature = "incremental")]
488pub struct SelectiveInvalidation {
489 cache_regions: DashMap<String, CacheRegion>,
490 entity_to_regions: DashMap<EntityId, HashSet<String>>,
491 invalidation_log: Mutex<Vec<(DateTime<Utc>, InvalidationStrategy)>>,
492}
493
494#[cfg(feature = "incremental")]
495impl Default for SelectiveInvalidation {
496 fn default() -> Self {
497 Self::new()
498 }
499}
500
501#[cfg(feature = "incremental")]
502impl SelectiveInvalidation {
503 pub fn new() -> Self {
505 Self {
506 cache_regions: DashMap::new(),
507 entity_to_regions: DashMap::new(),
508 invalidation_log: Mutex::new(Vec::new()),
509 }
510 }
511
512 pub fn register_cache_region(&self, region: CacheRegion) {
514 let region_id = region.region_id.clone();
515
516 for entity_id in ®ion.entity_ids {
518 self.entity_to_regions
519 .entry(entity_id.clone())
520 .or_default()
521 .insert(region_id.clone());
522 }
523
524 self.cache_regions.insert(region_id, region);
525 }
526
527 pub fn invalidate_for_changes(&self, changes: &[ChangeRecord]) -> Vec<InvalidationStrategy> {
529 let mut strategies = Vec::new();
530 let mut affected_regions = HashSet::new();
531
532 for change in changes {
533 match &change.change_type {
534 ChangeType::EntityAdded | ChangeType::EntityUpdated | ChangeType::EntityRemoved => {
535 if let Some(entity_id) = &change.entity_id {
536 if let Some(regions) = self.entity_to_regions.get(entity_id) {
537 affected_regions.extend(regions.clone());
538 }
539 strategies.push(InvalidationStrategy::Relational(entity_id.clone(), 2));
540 }
541 },
542 ChangeType::RelationshipAdded
543 | ChangeType::RelationshipUpdated
544 | ChangeType::RelationshipRemoved => {
545 if let ChangeData::Relationship(rel) = &change.data {
547 strategies.push(InvalidationStrategy::Relational(rel.source.clone(), 1));
548 strategies.push(InvalidationStrategy::Relational(rel.target.clone(), 1));
549 }
550 },
551 _ => {
552 let cache_keys = self.generate_cache_keys_for_change(change);
554 if !cache_keys.is_empty() {
555 strategies.push(InvalidationStrategy::Selective(cache_keys));
556 }
557 },
558 }
559 }
560
561 for region_id in affected_regions {
563 strategies.push(InvalidationStrategy::Regional(region_id));
564 }
565
566 let mut log = self.invalidation_log.lock();
568 for strategy in &strategies {
569 log.push((Utc::now(), strategy.clone()));
570 }
571
572 strategies
573 }
574
575 fn generate_cache_keys_for_change(&self, change: &ChangeRecord) -> Vec<String> {
576 let mut keys = Vec::new();
577
578 match &change.change_type {
580 ChangeType::EntityAdded | ChangeType::EntityUpdated => {
581 if let Some(entity_id) = &change.entity_id {
582 keys.push(format!("entity:{entity_id}"));
583 keys.push(format!("entity_neighbors:{entity_id}"));
584 }
585 },
586 ChangeType::DocumentAdded | ChangeType::DocumentUpdated => {
587 if let Some(doc_id) = &change.document_id {
588 keys.push(format!("document:{doc_id}"));
589 keys.push(format!("document_chunks:{doc_id}"));
590 }
591 },
592 ChangeType::EmbeddingAdded | ChangeType::EmbeddingUpdated => {
593 if let Some(entity_id) = &change.entity_id {
594 keys.push(format!("embedding:{entity_id}"));
595 keys.push(format!("similarity:{entity_id}"));
596 }
597 },
598 _ => {},
599 }
600
601 keys
602 }
603
604 pub fn get_invalidation_stats(&self) -> InvalidationStats {
606 let log = self.invalidation_log.lock();
607
608 InvalidationStats {
609 total_invalidations: log.len(),
610 cache_regions: self.cache_regions.len(),
611 entity_mappings: self.entity_to_regions.len(),
612 last_invalidation: log.last().map(|(time, _)| *time),
613 }
614 }
615}
616
617#[derive(Debug, Clone)]
619pub struct InvalidationStats {
620 pub total_invalidations: usize,
622 pub cache_regions: usize,
624 pub entity_mappings: usize,
626 pub last_invalidation: Option<DateTime<Utc>>,
628}
629
630pub struct ConflictResolver {
636 strategy: ConflictStrategy,
637 custom_resolvers: HashMap<String, ConflictResolverFn>,
638}
639
640type ConflictResolverFn = Box<dyn Fn(&Conflict) -> Result<ConflictResolution> + Send + Sync>;
642
643impl ConflictResolver {
644 pub fn new(strategy: ConflictStrategy) -> Self {
646 Self {
647 strategy,
648 custom_resolvers: HashMap::new(),
649 }
650 }
651
652 pub fn with_custom_resolver<F>(mut self, name: String, resolver: F) -> Self
654 where
655 F: Fn(&Conflict) -> Result<ConflictResolution> + Send + Sync + 'static,
656 {
657 self.custom_resolvers.insert(name, Box::new(resolver));
658 self
659 }
660
661 pub async fn resolve_conflict(&self, conflict: &Conflict) -> Result<ConflictResolution> {
663 match &self.strategy {
664 ConflictStrategy::KeepExisting => Ok(ConflictResolution {
665 strategy: ConflictStrategy::KeepExisting,
666 resolved_data: conflict.existing_data.clone(),
667 metadata: HashMap::new(),
668 }),
669 ConflictStrategy::KeepNew => Ok(ConflictResolution {
670 strategy: ConflictStrategy::KeepNew,
671 resolved_data: conflict.new_data.clone(),
672 metadata: HashMap::new(),
673 }),
674 ConflictStrategy::Merge => self.merge_conflict_data(conflict).await,
675 ConflictStrategy::Custom(resolver_name) => {
676 if let Some(resolver) = self.custom_resolvers.get(resolver_name) {
677 resolver(conflict)
678 } else {
679 Err(GraphRAGError::ConflictResolution {
680 message: format!("Custom resolver '{resolver_name}' not found"),
681 })
682 }
683 },
684 _ => Err(GraphRAGError::ConflictResolution {
685 message: "Conflict resolution strategy not implemented".to_string(),
686 }),
687 }
688 }
689
690 async fn merge_conflict_data(&self, conflict: &Conflict) -> Result<ConflictResolution> {
691 match (&conflict.existing_data, &conflict.new_data) {
692 (ChangeData::Entity(existing), ChangeData::Entity(new)) => {
693 let merged = self.merge_entities(existing, new)?;
694 Ok(ConflictResolution {
695 strategy: ConflictStrategy::Merge,
696 resolved_data: ChangeData::Entity(merged),
697 metadata: [("merge_strategy".to_string(), "entity_merge".to_string())]
698 .into_iter()
699 .collect(),
700 })
701 },
702 (ChangeData::Relationship(existing), ChangeData::Relationship(new)) => {
703 let merged = self.merge_relationships(existing, new)?;
704 Ok(ConflictResolution {
705 strategy: ConflictStrategy::Merge,
706 resolved_data: ChangeData::Relationship(merged),
707 metadata: [(
708 "merge_strategy".to_string(),
709 "relationship_merge".to_string(),
710 )]
711 .into_iter()
712 .collect(),
713 })
714 },
715 _ => Err(GraphRAGError::ConflictResolution {
716 message: "Cannot merge incompatible data types".to_string(),
717 }),
718 }
719 }
720
721 fn merge_entities(&self, existing: &Entity, new: &Entity) -> Result<Entity> {
722 let mut merged = existing.clone();
723
724 if new.confidence > existing.confidence {
726 merged.confidence = new.confidence;
727 merged.name = new.name.clone();
728 merged.entity_type = new.entity_type.clone();
729 }
730
731 let mut all_mentions = existing.mentions.clone();
733 for new_mention in &new.mentions {
734 if !all_mentions.iter().any(|m| {
735 m.chunk_id == new_mention.chunk_id && m.start_offset == new_mention.start_offset
736 }) {
737 all_mentions.push(new_mention.clone());
738 }
739 }
740 merged.mentions = all_mentions;
741
742 if new.embedding.is_some() {
744 merged.embedding = new.embedding.clone();
745 }
746
747 Ok(merged)
748 }
749
750 fn merge_relationships(
751 &self,
752 existing: &Relationship,
753 new: &Relationship,
754 ) -> Result<Relationship> {
755 let mut merged = existing.clone();
756
757 if new.confidence > existing.confidence {
759 merged.confidence = new.confidence;
760 merged.relation_type = new.relation_type.clone();
761 }
762
763 let mut all_contexts = existing.context.clone();
765 for new_context in &new.context {
766 if !all_contexts.contains(new_context) {
767 all_contexts.push(new_context.clone());
768 }
769 }
770 merged.context = all_contexts;
771
772 Ok(merged)
773 }
774}
775
776#[cfg(feature = "incremental")]
782pub struct UpdateMonitor {
783 metrics: DashMap<String, UpdateMetric>,
784 operations_log: Mutex<Vec<OperationLog>>,
785 performance_stats: RwLock<PerformanceStats>,
786}
787
788#[cfg(feature = "incremental")]
789impl Default for UpdateMonitor {
790 fn default() -> Self {
791 Self::new()
792 }
793}
794
795#[derive(Debug, Clone)]
797pub struct UpdateMetric {
798 pub name: String,
800 pub value: f64,
802 pub timestamp: DateTime<Utc>,
804 pub tags: HashMap<String, String>,
806}
807
808#[derive(Debug, Clone)]
810pub struct OperationLog {
811 pub operation_id: UpdateId,
813 pub operation_type: String,
815 pub start_time: Instant,
817 pub end_time: Option<Instant>,
819 pub success: Option<bool>,
821 pub error_message: Option<String>,
823 pub affected_entities: usize,
825 pub affected_relationships: usize,
827}
828
829#[derive(Debug, Clone)]
831pub struct PerformanceStats {
832 pub total_operations: u64,
834 pub successful_operations: u64,
836 pub failed_operations: u64,
838 pub average_operation_time: Duration,
840 pub peak_operations_per_second: f64,
842 pub cache_hit_rate: f64,
844 pub conflict_resolution_rate: f64,
846}
847
848#[cfg(feature = "incremental")]
849impl UpdateMonitor {
850 pub fn new() -> Self {
852 Self {
853 metrics: DashMap::new(),
854 operations_log: Mutex::new(Vec::new()),
855 performance_stats: RwLock::new(PerformanceStats {
856 total_operations: 0,
857 successful_operations: 0,
858 failed_operations: 0,
859 average_operation_time: Duration::from_millis(0),
860 peak_operations_per_second: 0.0,
861 cache_hit_rate: 0.0,
862 conflict_resolution_rate: 0.0,
863 }),
864 }
865 }
866
867 pub fn start_operation(&self, operation_type: &str) -> UpdateId {
869 let operation_id = UpdateId::new();
870 let log_entry = OperationLog {
871 operation_id: operation_id.clone(),
872 operation_type: operation_type.to_string(),
873 start_time: Instant::now(),
874 end_time: None,
875 success: None,
876 error_message: None,
877 affected_entities: 0,
878 affected_relationships: 0,
879 };
880
881 self.operations_log.lock().push(log_entry);
882 operation_id
883 }
884
885 pub fn complete_operation(
887 &self,
888 operation_id: &UpdateId,
889 success: bool,
890 error: Option<String>,
891 affected_entities: usize,
892 affected_relationships: usize,
893 ) {
894 let mut log = self.operations_log.lock();
895 if let Some(entry) = log.iter_mut().find(|e| &e.operation_id == operation_id) {
896 entry.end_time = Some(Instant::now());
897 entry.success = Some(success);
898 entry.error_message = error;
899 entry.affected_entities = affected_entities;
900 entry.affected_relationships = affected_relationships;
901 }
902
903 self.update_performance_stats();
905 }
906
907 fn update_performance_stats(&self) {
908 let log = self.operations_log.lock();
909 let completed_ops: Vec<_> = log
910 .iter()
911 .filter(|op| op.end_time.is_some() && op.success.is_some())
912 .collect();
913
914 if completed_ops.is_empty() {
915 return;
916 }
917
918 let mut stats = self.performance_stats.write();
919 stats.total_operations = completed_ops.len() as u64;
920 stats.successful_operations = completed_ops
921 .iter()
922 .filter(|op| op.success == Some(true))
923 .count() as u64;
924 stats.failed_operations = stats.total_operations - stats.successful_operations;
925
926 let total_time: Duration = completed_ops
928 .iter()
929 .filter_map(|op| op.end_time.map(|end| end.duration_since(op.start_time)))
930 .sum();
931
932 if !completed_ops.is_empty() {
933 stats.average_operation_time = total_time / completed_ops.len() as u32;
934 }
935 }
936
937 pub fn record_metric(&self, name: &str, value: f64, tags: HashMap<String, String>) {
939 let metric = UpdateMetric {
940 name: name.to_string(),
941 value,
942 timestamp: Utc::now(),
943 tags,
944 };
945 self.metrics.insert(name.to_string(), metric);
946 }
947
948 pub fn get_performance_stats(&self) -> PerformanceStats {
950 self.performance_stats.read().clone()
951 }
952
953 pub fn get_recent_operations(&self, limit: usize) -> Vec<OperationLog> {
955 let log = self.operations_log.lock();
956 log.iter().rev().take(limit).cloned().collect()
957 }
958}
959
960#[cfg(feature = "incremental")]
966pub struct IncrementalGraphManager {
967 graph: Arc<RwLock<KnowledgeGraph>>,
968 change_log: DashMap<UpdateId, ChangeRecord>,
969 deltas: DashMap<UpdateId, GraphDelta>,
970 cache_invalidation: Arc<SelectiveInvalidation>,
971 conflict_resolver: Arc<ConflictResolver>,
972 monitor: Arc<UpdateMonitor>,
973 config: IncrementalConfig,
974}
975
976#[cfg(not(feature = "incremental"))]
977pub struct IncrementalGraphManager {
979 graph: KnowledgeGraph,
980 change_log: Vec<ChangeRecord>,
981 config: IncrementalConfig,
982}
983
984#[derive(Debug, Clone)]
986pub struct IncrementalConfig {
987 pub max_change_log_size: usize,
989 pub max_delta_size: usize,
991 pub conflict_strategy: ConflictStrategy,
993 pub enable_monitoring: bool,
995 pub cache_invalidation_strategy: String,
997 pub batch_size: usize,
999 pub max_concurrent_operations: usize,
1001}
1002
1003impl Default for IncrementalConfig {
1004 fn default() -> Self {
1005 Self {
1006 max_change_log_size: 10000,
1007 max_delta_size: 1000,
1008 conflict_strategy: ConflictStrategy::Merge,
1009 enable_monitoring: true,
1010 cache_invalidation_strategy: "selective".to_string(),
1011 batch_size: 100,
1012 max_concurrent_operations: 10,
1013 }
1014 }
1015}
1016
1017#[cfg(feature = "incremental")]
1018impl IncrementalGraphManager {
1019 pub fn new(graph: KnowledgeGraph, config: IncrementalConfig) -> Self {
1021 Self {
1022 graph: Arc::new(RwLock::new(graph)),
1023 change_log: DashMap::new(),
1024 deltas: DashMap::new(),
1025 cache_invalidation: Arc::new(SelectiveInvalidation::new()),
1026 conflict_resolver: Arc::new(ConflictResolver::new(config.conflict_strategy.clone())),
1027 monitor: Arc::new(UpdateMonitor::new()),
1028 config,
1029 }
1030 }
1031
1032 pub fn with_conflict_resolver(mut self, resolver: ConflictResolver) -> Self {
1034 self.conflict_resolver = Arc::new(resolver);
1035 self
1036 }
1037
1038 pub fn graph(&self) -> Arc<RwLock<KnowledgeGraph>> {
1040 Arc::clone(&self.graph)
1041 }
1042
1043 pub fn conflict_resolver(&self) -> Arc<ConflictResolver> {
1045 Arc::clone(&self.conflict_resolver)
1046 }
1047
1048 pub fn monitor(&self) -> Arc<UpdateMonitor> {
1050 Arc::clone(&self.monitor)
1051 }
1052}
1053
1054#[cfg(not(feature = "incremental"))]
1055impl IncrementalGraphManager {
1056 pub fn new(graph: KnowledgeGraph, config: IncrementalConfig) -> Self {
1058 Self {
1059 graph,
1060 change_log: Vec::new(),
1061 config,
1062 }
1063 }
1064
1065 pub fn graph(&self) -> &KnowledgeGraph {
1067 &self.graph
1068 }
1069
1070 pub fn graph_mut(&mut self) -> &mut KnowledgeGraph {
1072 &mut self.graph
1073 }
1074}
1075
1076impl IncrementalGraphManager {
1078 pub fn create_change_record(
1080 &self,
1081 change_type: ChangeType,
1082 operation: Operation,
1083 change_data: ChangeData,
1084 entity_id: Option<EntityId>,
1085 document_id: Option<DocumentId>,
1086 ) -> ChangeRecord {
1087 ChangeRecord {
1088 change_id: UpdateId::new(),
1089 timestamp: Utc::now(),
1090 change_type,
1091 entity_id,
1092 document_id,
1093 operation,
1094 data: change_data,
1095 metadata: HashMap::new(),
1096 }
1097 }
1098
1099 pub fn config(&self) -> &IncrementalConfig {
1101 &self.config
1102 }
1103
1104 pub fn basic_upsert_entity(&mut self, entity: Entity) -> Result<UpdateId> {
1106 let update_id = UpdateId::new();
1107
1108 #[cfg(feature = "incremental")]
1109 {
1110 let operation_id = self.monitor.start_operation("upsert_entity");
1111 let mut graph = self.graph.write();
1112
1113 match graph.add_entity(entity.clone()) {
1114 Ok(_) => {
1115 let ent_id = entity.id.clone();
1116 let change = self.create_change_record(
1117 ChangeType::EntityAdded,
1118 Operation::Upsert,
1119 ChangeData::Entity(entity),
1120 Some(ent_id),
1121 None,
1122 );
1123 self.change_log.insert(change.change_id.clone(), change);
1124 self.monitor
1125 .complete_operation(&operation_id, true, None, 1, 0);
1126 Ok(update_id)
1127 },
1128 Err(e) => {
1129 self.monitor.complete_operation(
1130 &operation_id,
1131 false,
1132 Some(e.to_string()),
1133 0,
1134 0,
1135 );
1136 Err(e)
1137 },
1138 }
1139 }
1140
1141 #[cfg(not(feature = "incremental"))]
1142 {
1143 self.graph.add_entity(entity.clone())?;
1144 let ent_id = entity.id.clone();
1146 let change = self.create_change_record(
1147 ChangeType::EntityAdded,
1148 Operation::Upsert,
1149 ChangeData::Entity(entity),
1150 Some(ent_id),
1151 None,
1152 );
1153 self.change_log.push(change);
1154 Ok(update_id)
1155 }
1156 }
1157}
1158
1159#[derive(Debug, Clone, Serialize, Deserialize)]
1165pub struct IncrementalStatistics {
1166 pub total_updates: usize,
1168 pub successful_updates: usize,
1170 pub failed_updates: usize,
1172 pub entities_added: usize,
1174 pub entities_updated: usize,
1176 pub entities_removed: usize,
1178 pub relationships_added: usize,
1180 pub relationships_updated: usize,
1182 pub relationships_removed: usize,
1184 pub conflicts_resolved: usize,
1186 pub cache_invalidations: usize,
1188 pub average_update_time_ms: f64,
1190 pub peak_updates_per_second: f64,
1192 pub current_change_log_size: usize,
1194 pub current_delta_count: usize,
1196}
1197
1198impl IncrementalStatistics {
1199 pub fn empty() -> Self {
1201 Self {
1202 total_updates: 0,
1203 successful_updates: 0,
1204 failed_updates: 0,
1205 entities_added: 0,
1206 entities_updated: 0,
1207 entities_removed: 0,
1208 relationships_added: 0,
1209 relationships_updated: 0,
1210 relationships_removed: 0,
1211 conflicts_resolved: 0,
1212 cache_invalidations: 0,
1213 average_update_time_ms: 0.0,
1214 peak_updates_per_second: 0.0,
1215 current_change_log_size: 0,
1216 current_delta_count: 0,
1217 }
1218 }
1219
1220 pub fn print(&self) {
1222 println!("🔄 Incremental Updates Statistics");
1223 println!(" Total updates: {}", self.total_updates);
1224 println!(
1225 " Successful: {} ({:.1}%)",
1226 self.successful_updates,
1227 if self.total_updates > 0 {
1228 (self.successful_updates as f64 / self.total_updates as f64) * 100.0
1229 } else {
1230 0.0
1231 }
1232 );
1233 println!(" Failed: {}", self.failed_updates);
1234 println!(
1235 " Entities: +{} ~{} -{}",
1236 self.entities_added, self.entities_updated, self.entities_removed
1237 );
1238 println!(
1239 " Relationships: +{} ~{} -{}",
1240 self.relationships_added, self.relationships_updated, self.relationships_removed
1241 );
1242 println!(" Conflicts resolved: {}", self.conflicts_resolved);
1243 println!(" Cache invalidations: {}", self.cache_invalidations);
1244 println!(" Avg update time: {:.2}ms", self.average_update_time_ms);
1245 println!(" Peak updates/sec: {:.1}", self.peak_updates_per_second);
1246 println!(" Change log size: {}", self.current_change_log_size);
1247 println!(" Active deltas: {}", self.current_delta_count);
1248 }
1249}
1250
1251#[cfg(feature = "incremental")]
1252impl IncrementalGraphManager {
1253 pub fn get_statistics(&self) -> IncrementalStatistics {
1255 let perf_stats = self.monitor.get_performance_stats();
1256 let invalidation_stats = self.cache_invalidation.get_invalidation_stats();
1257
1258 let mut entity_stats = (0, 0, 0); let mut relationship_stats = (0, 0, 0);
1261 let conflicts_resolved = 0;
1262
1263 for change in self.change_log.iter() {
1264 match change.value().change_type {
1265 ChangeType::EntityAdded => entity_stats.0 += 1,
1266 ChangeType::EntityUpdated => entity_stats.1 += 1,
1267 ChangeType::EntityRemoved => entity_stats.2 += 1,
1268 ChangeType::RelationshipAdded => relationship_stats.0 += 1,
1269 ChangeType::RelationshipUpdated => relationship_stats.1 += 1,
1270 ChangeType::RelationshipRemoved => relationship_stats.2 += 1,
1271 _ => {},
1272 }
1273 }
1274
1275 IncrementalStatistics {
1276 total_updates: perf_stats.total_operations as usize,
1277 successful_updates: perf_stats.successful_operations as usize,
1278 failed_updates: perf_stats.failed_operations as usize,
1279 entities_added: entity_stats.0,
1280 entities_updated: entity_stats.1,
1281 entities_removed: entity_stats.2,
1282 relationships_added: relationship_stats.0,
1283 relationships_updated: relationship_stats.1,
1284 relationships_removed: relationship_stats.2,
1285 conflicts_resolved,
1286 cache_invalidations: invalidation_stats.total_invalidations,
1287 average_update_time_ms: perf_stats.average_operation_time.as_millis() as f64,
1288 peak_updates_per_second: perf_stats.peak_operations_per_second,
1289 current_change_log_size: self.change_log.len(),
1290 current_delta_count: self.deltas.len(),
1291 }
1292 }
1293}
1294
1295#[cfg(not(feature = "incremental"))]
1296impl IncrementalGraphManager {
1297 pub fn get_statistics(&self) -> IncrementalStatistics {
1299 let mut stats = IncrementalStatistics::empty();
1300 stats.current_change_log_size = self.change_log.len();
1301
1302 for change in &self.change_log {
1303 match change.change_type {
1304 ChangeType::EntityAdded => stats.entities_added += 1,
1305 ChangeType::EntityUpdated => stats.entities_updated += 1,
1306 ChangeType::EntityRemoved => stats.entities_removed += 1,
1307 ChangeType::RelationshipAdded => stats.relationships_added += 1,
1308 ChangeType::RelationshipUpdated => stats.relationships_updated += 1,
1309 ChangeType::RelationshipRemoved => stats.relationships_removed += 1,
1310 _ => {},
1311 }
1312 }
1313
1314 stats.total_updates = self.change_log.len();
1315 stats.successful_updates = self.change_log.len(); stats
1317 }
1318}
1319
1320#[cfg(feature = "incremental")]
1326#[allow(dead_code)]
1327pub struct IncrementalPageRank {
1328 scores: DashMap<EntityId, f64>,
1329 adjacency_changes: DashMap<EntityId, Vec<(EntityId, f64)>>, damping_factor: f64,
1331 tolerance: f64,
1332 max_iterations: usize,
1333 last_full_computation: DateTime<Utc>,
1334 incremental_threshold: usize, pending_changes: RwLock<usize>,
1336}
1337
1338#[cfg(feature = "incremental")]
1339impl IncrementalPageRank {
1340 pub fn new(damping_factor: f64, tolerance: f64, max_iterations: usize) -> Self {
1342 Self {
1343 scores: DashMap::new(),
1344 adjacency_changes: DashMap::new(),
1345 damping_factor,
1346 tolerance,
1347 max_iterations,
1348 last_full_computation: Utc::now(),
1349 incremental_threshold: 1000,
1350 pending_changes: RwLock::new(0),
1351 }
1352 }
1353
1354 pub async fn update_incremental(
1356 &self,
1357 changed_entities: &[EntityId],
1358 graph: &KnowledgeGraph,
1359 ) -> Result<()> {
1360 let start = Instant::now();
1361
1362 {
1364 let pending = *self.pending_changes.read();
1365 if pending > self.incremental_threshold {
1366 return self.full_recomputation(graph).await;
1367 }
1368 }
1369
1370 let mut affected_entities = HashSet::new();
1372
1373 for entity_id in changed_entities {
1375 affected_entities.insert(entity_id.clone());
1376
1377 for (neighbor, _) in graph.get_neighbors(entity_id) {
1379 affected_entities.insert(neighbor.id.clone());
1380
1381 for (second_hop, _) in graph.get_neighbors(&neighbor.id) {
1383 affected_entities.insert(second_hop.id.clone());
1384 }
1385 }
1386 }
1387
1388 self.localized_pagerank(&affected_entities, graph).await?;
1390
1391 *self.pending_changes.write() = 0;
1393
1394 let duration = start.elapsed();
1395 println!(
1396 "🔄 Incremental PageRank update completed in {:?} for {} entities",
1397 duration,
1398 affected_entities.len()
1399 );
1400
1401 Ok(())
1402 }
1403
1404 async fn full_recomputation(&self, graph: &KnowledgeGraph) -> Result<()> {
1406 let start = Instant::now();
1407
1408 let entities: Vec<EntityId> = graph.entities().map(|e| e.id.clone()).collect();
1410 let n = entities.len();
1411
1412 if n == 0 {
1413 return Ok(());
1414 }
1415
1416 let initial_score = 1.0 / n as f64;
1418 for entity_id in &entities {
1419 self.scores.insert(entity_id.clone(), initial_score);
1420 }
1421
1422 for iteration in 0..self.max_iterations {
1424 let mut new_scores = HashMap::new();
1425 let mut max_diff: f64 = 0.0;
1426
1427 for entity_id in &entities {
1428 let mut score = (1.0 - self.damping_factor) / n as f64;
1429
1430 for other_entity in &entities {
1432 if let Some(weight) = self.get_edge_weight(other_entity, entity_id, graph) {
1433 let other_score = self
1434 .scores
1435 .get(other_entity)
1436 .map(|s| *s.value())
1437 .unwrap_or(initial_score);
1438 let out_degree = self.get_out_degree(other_entity, graph);
1439
1440 if out_degree > 0.0 {
1441 score += self.damping_factor * other_score * weight / out_degree;
1442 }
1443 }
1444 }
1445
1446 let old_score = self
1447 .scores
1448 .get(entity_id)
1449 .map(|s| *s.value())
1450 .unwrap_or(initial_score);
1451 let diff = (score - old_score).abs();
1452 max_diff = max_diff.max(diff);
1453
1454 new_scores.insert(entity_id.clone(), score);
1455 }
1456
1457 for (entity_id, score) in new_scores {
1459 self.scores.insert(entity_id, score);
1460 }
1461
1462 if max_diff < self.tolerance {
1464 println!(
1465 "🎯 PageRank converged after {} iterations (diff: {:.6})",
1466 iteration + 1,
1467 max_diff
1468 );
1469 break;
1470 }
1471 }
1472
1473 let duration = start.elapsed();
1474 println!("🔄 Full PageRank recomputation completed in {duration:?} for {n} entities");
1475
1476 Ok(())
1477 }
1478
1479 async fn localized_pagerank(
1481 &self,
1482 entities: &HashSet<EntityId>,
1483 graph: &KnowledgeGraph,
1484 ) -> Result<()> {
1485 let entity_vec: Vec<EntityId> = entities.iter().cloned().collect();
1486 let n = entity_vec.len();
1487
1488 if n == 0 {
1489 return Ok(());
1490 }
1491
1492 for _iteration in 0..self.max_iterations {
1494 let mut max_diff: f64 = 0.0;
1495
1496 for entity_id in &entity_vec {
1497 let mut score = (1.0 - self.damping_factor) / n as f64;
1498
1499 for other_entity in &entity_vec {
1501 if let Some(weight) = self.get_edge_weight(other_entity, entity_id, graph) {
1502 let other_score = self
1503 .scores
1504 .get(other_entity)
1505 .map(|s| *s.value())
1506 .unwrap_or(1.0 / n as f64);
1507 let out_degree =
1508 self.get_localized_out_degree(other_entity, entities, graph);
1509
1510 if out_degree > 0.0 {
1511 score += self.damping_factor * other_score * weight / out_degree;
1512 }
1513 }
1514 }
1515
1516 let old_score = self
1517 .scores
1518 .get(entity_id)
1519 .map(|s| *s.value())
1520 .unwrap_or(1.0 / n as f64);
1521 let diff = (score - old_score).abs();
1522 max_diff = max_diff.max(diff);
1523
1524 self.scores.insert(entity_id.clone(), score);
1525 }
1526
1527 if max_diff < self.tolerance {
1529 break;
1530 }
1531 }
1532
1533 Ok(())
1534 }
1535
1536 fn get_edge_weight(
1537 &self,
1538 from: &EntityId,
1539 to: &EntityId,
1540 graph: &KnowledgeGraph,
1541 ) -> Option<f64> {
1542 for (neighbor, relationship) in graph.get_neighbors(from) {
1544 if neighbor.id == *to {
1545 return Some(relationship.confidence as f64);
1546 }
1547 }
1548 None
1549 }
1550
1551 fn get_out_degree(&self, entity_id: &EntityId, graph: &KnowledgeGraph) -> f64 {
1552 graph
1553 .get_neighbors(entity_id)
1554 .iter()
1555 .map(|(_, rel)| rel.confidence as f64)
1556 .sum()
1557 }
1558
1559 fn get_localized_out_degree(
1560 &self,
1561 entity_id: &EntityId,
1562 subset: &HashSet<EntityId>,
1563 graph: &KnowledgeGraph,
1564 ) -> f64 {
1565 graph
1566 .get_neighbors(entity_id)
1567 .iter()
1568 .filter(|(neighbor, _)| subset.contains(&neighbor.id))
1569 .map(|(_, rel)| rel.confidence as f64)
1570 .sum()
1571 }
1572
1573 pub fn get_score(&self, entity_id: &EntityId) -> Option<f64> {
1575 self.scores.get(entity_id).map(|s| *s.value())
1576 }
1577
1578 pub fn get_top_entities(&self, k: usize) -> Vec<(EntityId, f64)> {
1580 let mut entities: Vec<(EntityId, f64)> = self
1581 .scores
1582 .iter()
1583 .map(|entry| (entry.key().clone(), *entry.value()))
1584 .collect();
1585
1586 entities.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
1587 entities.truncate(k);
1588 entities
1589 }
1590
1591 pub fn record_change(&self, _entity_id: EntityId) {
1593 *self.pending_changes.write() += 1;
1594 }
1595}
1596
1597#[cfg(feature = "incremental")]
1603pub struct BatchProcessor {
1604 batch_size: usize,
1605 max_wait_time: Duration,
1606 pending_batches: DashMap<String, PendingBatch>,
1607 processing_semaphore: Semaphore,
1608 metrics: RwLock<BatchMetrics>,
1609}
1610
1611#[derive(Debug, Clone)]
1612#[allow(dead_code)]
1613struct PendingBatch {
1614 changes: Vec<ChangeRecord>,
1615 created_at: Instant,
1616 batch_id: String,
1617}
1618
1619#[derive(Debug, Clone)]
1621pub struct BatchMetrics {
1622 pub total_batches_processed: u64,
1624 pub total_changes_processed: u64,
1626 pub average_batch_size: f64,
1628 pub average_processing_time: Duration,
1630 pub throughput_per_second: f64,
1632 pub last_batch_processed: Option<DateTime<Utc>>,
1634}
1635
1636#[cfg(feature = "incremental")]
1637impl BatchProcessor {
1638 pub fn new(batch_size: usize, max_wait_time: Duration, max_concurrent_batches: usize) -> Self {
1640 Self {
1641 batch_size,
1642 max_wait_time,
1643 pending_batches: DashMap::new(),
1644 processing_semaphore: Semaphore::new(max_concurrent_batches),
1645 metrics: RwLock::new(BatchMetrics {
1646 total_batches_processed: 0,
1647 total_changes_processed: 0,
1648 average_batch_size: 0.0,
1649 average_processing_time: Duration::from_millis(0),
1650 throughput_per_second: 0.0,
1651 last_batch_processed: None,
1652 }),
1653 }
1654 }
1655
1656 pub async fn add_change(&self, change: ChangeRecord) -> Result<String> {
1658 let batch_key = self.get_batch_key(&change);
1659
1660 let batch_id = {
1661 let mut entry = self
1662 .pending_batches
1663 .entry(batch_key.clone())
1664 .or_insert_with(|| PendingBatch {
1665 changes: Vec::new(),
1666 created_at: Instant::now(),
1667 batch_id: format!("batch_{}", Uuid::new_v4()),
1668 });
1669
1670 entry.changes.push(change);
1671 let should_process = entry.changes.len() >= self.batch_size
1672 || entry.created_at.elapsed() > self.max_wait_time;
1673
1674 let batch_id = entry.batch_id.clone();
1675
1676 if should_process {
1677 let batch = entry.clone();
1679 self.pending_batches.remove(&batch_key);
1680
1681 let processor = Arc::new(self.clone());
1683 tokio::spawn(async move {
1684 if let Err(e) = processor.process_batch(batch).await {
1685 eprintln!("Batch processing error: {e}");
1686 }
1687 });
1688 }
1689
1690 batch_id
1691 };
1692
1693 Ok(batch_id)
1694 }
1695
1696 async fn process_batch(&self, batch: PendingBatch) -> Result<()> {
1697 let _permit = self.processing_semaphore.acquire().await.map_err(|_| {
1698 GraphRAGError::IncrementalUpdate {
1699 message: "Failed to acquire processing permit".to_string(),
1700 }
1701 })?;
1702
1703 let start = Instant::now();
1704
1705 let mut entity_changes = Vec::new();
1707 let mut relationship_changes = Vec::new();
1708 let mut embedding_changes = Vec::new();
1709
1710 for change in &batch.changes {
1711 match &change.change_type {
1712 ChangeType::EntityAdded | ChangeType::EntityUpdated | ChangeType::EntityRemoved => {
1713 entity_changes.push(change);
1714 },
1715 ChangeType::RelationshipAdded
1716 | ChangeType::RelationshipUpdated
1717 | ChangeType::RelationshipRemoved => {
1718 relationship_changes.push(change);
1719 },
1720 ChangeType::EmbeddingAdded
1721 | ChangeType::EmbeddingUpdated
1722 | ChangeType::EmbeddingRemoved => {
1723 embedding_changes.push(change);
1724 },
1725 _ => {},
1726 }
1727 }
1728
1729 self.process_entity_changes(&entity_changes).await?;
1731 self.process_relationship_changes(&relationship_changes)
1732 .await?;
1733 self.process_embedding_changes(&embedding_changes).await?;
1734
1735 let processing_time = start.elapsed();
1736
1737 self.update_metrics(&batch, processing_time).await;
1739
1740 println!(
1741 "🚀 Processed batch {} with {} changes in {:?}",
1742 batch.batch_id,
1743 batch.changes.len(),
1744 processing_time
1745 );
1746
1747 Ok(())
1748 }
1749
1750 async fn process_entity_changes(&self, _changes: &[&ChangeRecord]) -> Result<()> {
1751 Ok(())
1753 }
1754
1755 async fn process_relationship_changes(&self, _changes: &[&ChangeRecord]) -> Result<()> {
1756 Ok(())
1758 }
1759
1760 async fn process_embedding_changes(&self, _changes: &[&ChangeRecord]) -> Result<()> {
1761 Ok(())
1763 }
1764
1765 fn get_batch_key(&self, change: &ChangeRecord) -> String {
1766 match (&change.entity_id, &change.document_id) {
1768 (Some(entity_id), _) => format!("entity:{entity_id}"),
1769 (None, Some(doc_id)) => format!("document:{doc_id}"),
1770 _ => "global".to_string(),
1771 }
1772 }
1773
1774 async fn update_metrics(&self, batch: &PendingBatch, processing_time: Duration) {
1775 let mut metrics = self.metrics.write();
1776
1777 metrics.total_batches_processed += 1;
1778 metrics.total_changes_processed += batch.changes.len() as u64;
1779
1780 let total_batches = metrics.total_batches_processed as f64;
1782 metrics.average_batch_size = (metrics.average_batch_size * (total_batches - 1.0)
1783 + batch.changes.len() as f64)
1784 / total_batches;
1785
1786 let prev_avg_ms = metrics.average_processing_time.as_millis() as f64;
1787 let new_avg_ms = (prev_avg_ms * (total_batches - 1.0) + processing_time.as_millis() as f64)
1788 / total_batches;
1789 metrics.average_processing_time = Duration::from_millis(new_avg_ms as u64);
1790
1791 if processing_time.as_secs_f64() > 0.0 {
1793 metrics.throughput_per_second =
1794 batch.changes.len() as f64 / processing_time.as_secs_f64();
1795 }
1796
1797 metrics.last_batch_processed = Some(Utc::now());
1798 }
1799
1800 pub fn get_metrics(&self) -> BatchMetrics {
1802 self.metrics.read().clone()
1803 }
1804}
1805
1806#[cfg(feature = "incremental")]
1808impl Clone for BatchProcessor {
1809 fn clone(&self) -> Self {
1810 Self {
1811 batch_size: self.batch_size,
1812 max_wait_time: self.max_wait_time,
1813 pending_batches: DashMap::new(), processing_semaphore: Semaphore::new(self.processing_semaphore.available_permits()),
1815 metrics: RwLock::new(self.get_metrics()),
1816 }
1817 }
1818}
1819
1820impl GraphRAGError {
1825 pub fn conflict_resolution(message: String) -> Self {
1827 GraphRAGError::GraphConstruction { message }
1828 }
1829
1830 pub fn incremental_update(message: String) -> Self {
1832 GraphRAGError::GraphConstruction { message }
1833 }
1834}
1835
1836#[cfg(feature = "incremental")]
1842#[allow(dead_code)]
1843pub struct ProductionGraphStore {
1844 graph: Arc<RwLock<KnowledgeGraph>>,
1845 transactions: DashMap<TransactionId, Transaction>,
1846 change_log: DashMap<UpdateId, ChangeRecord>,
1847 rollback_data: DashMap<UpdateId, RollbackData>,
1848 conflict_resolver: Arc<ConflictResolver>,
1849 cache_invalidation: Arc<SelectiveInvalidation>,
1850 monitor: Arc<UpdateMonitor>,
1851 batch_processor: Arc<BatchProcessor>,
1852 incremental_pagerank: Arc<IncrementalPageRank>,
1853 event_publisher: broadcast::Sender<ChangeEvent>,
1854 config: IncrementalConfig,
1855}
1856
1857#[derive(Debug, Clone)]
1859#[allow(dead_code)]
1860struct Transaction {
1861 id: TransactionId,
1862 changes: Vec<ChangeRecord>,
1863 status: TransactionStatus,
1864 created_at: DateTime<Utc>,
1865 isolation_level: IsolationLevel,
1866}
1867
1868#[derive(Debug, Clone, PartialEq)]
1869#[allow(dead_code)]
1870enum TransactionStatus {
1871 Active,
1872 Preparing,
1873 Committed,
1874 Aborted,
1875}
1876
1877#[derive(Debug, Clone)]
1878#[allow(dead_code)]
1879enum IsolationLevel {
1880 ReadUncommitted,
1881 ReadCommitted,
1882 RepeatableRead,
1883 Serializable,
1884}
1885
1886#[derive(Debug, Clone, Serialize, Deserialize)]
1888pub struct ChangeEvent {
1889 pub event_id: UpdateId,
1891 pub event_type: ChangeEventType,
1893 pub entity_id: Option<EntityId>,
1895 pub timestamp: DateTime<Utc>,
1897 pub metadata: HashMap<String, String>,
1899}
1900
1901#[derive(Debug, Clone, Serialize, Deserialize)]
1903pub enum ChangeEventType {
1904 EntityUpserted,
1906 EntityDeleted,
1908 RelationshipUpserted,
1910 RelationshipDeleted,
1912 EmbeddingUpdated,
1914 TransactionStarted,
1916 TransactionCommitted,
1918 TransactionRolledBack,
1920 ConflictResolved,
1922 CacheInvalidated,
1924 BatchProcessed,
1926}
1927
1928#[cfg(feature = "incremental")]
1929impl ProductionGraphStore {
1930 pub fn new(
1932 graph: KnowledgeGraph,
1933 config: IncrementalConfig,
1934 conflict_resolver: ConflictResolver,
1935 ) -> Self {
1936 let (event_tx, _) = broadcast::channel(1000);
1937
1938 Self {
1939 graph: Arc::new(RwLock::new(graph)),
1940 transactions: DashMap::new(),
1941 change_log: DashMap::new(),
1942 rollback_data: DashMap::new(),
1943 conflict_resolver: Arc::new(conflict_resolver),
1944 cache_invalidation: Arc::new(SelectiveInvalidation::new()),
1945 monitor: Arc::new(UpdateMonitor::new()),
1946 batch_processor: Arc::new(BatchProcessor::new(
1947 config.batch_size,
1948 Duration::from_millis(100),
1949 config.max_concurrent_operations,
1950 )),
1951 incremental_pagerank: Arc::new(IncrementalPageRank::new(0.85, 1e-6, 100)),
1952 event_publisher: event_tx,
1953 config,
1954 }
1955 }
1956
1957 pub fn subscribe_events(&self) -> broadcast::Receiver<ChangeEvent> {
1959 self.event_publisher.subscribe()
1960 }
1961
1962 async fn publish_event(&self, event: ChangeEvent) {
1963 let _ = self.event_publisher.send(event);
1964 }
1965
1966 fn create_change_record(
1967 &self,
1968 change_type: ChangeType,
1969 operation: Operation,
1970 change_data: ChangeData,
1971 entity_id: Option<EntityId>,
1972 document_id: Option<DocumentId>,
1973 ) -> ChangeRecord {
1974 ChangeRecord {
1975 change_id: UpdateId::new(),
1976 timestamp: Utc::now(),
1977 change_type,
1978 entity_id,
1979 document_id,
1980 operation,
1981 data: change_data,
1982 metadata: HashMap::new(),
1983 }
1984 }
1985
1986 async fn apply_change_with_conflict_resolution(
1987 &self,
1988 change: ChangeRecord,
1989 ) -> Result<UpdateId> {
1990 let operation_id = self.monitor.start_operation("apply_change");
1991
1992 if let Some(conflict) = self.detect_conflict(&change)? {
1994 let resolution = self.conflict_resolver.resolve_conflict(&conflict).await?;
1995
1996 let resolved_change = ChangeRecord {
1998 data: resolution.resolved_data,
1999 metadata: resolution.metadata,
2000 ..change
2001 };
2002
2003 self.apply_change_internal(resolved_change).await?;
2004
2005 self.publish_event(ChangeEvent {
2007 event_id: UpdateId::new(),
2008 event_type: ChangeEventType::ConflictResolved,
2009 entity_id: conflict.existing_data.get_entity_id(),
2010 timestamp: Utc::now(),
2011 metadata: HashMap::new(),
2012 })
2013 .await;
2014 } else {
2015 self.apply_change_internal(change).await?;
2016 }
2017
2018 self.monitor
2019 .complete_operation(&operation_id, true, None, 1, 0);
2020 Ok(operation_id)
2021 }
2022
2023 fn detect_conflict(&self, change: &ChangeRecord) -> Result<Option<Conflict>> {
2024 match &change.data {
2025 ChangeData::Entity(entity) => {
2026 let graph = self.graph.read();
2027 if let Some(existing) = graph.get_entity(&entity.id) {
2028 if existing.name != entity.name || existing.entity_type != entity.entity_type {
2029 return Ok(Some(Conflict {
2030 conflict_id: UpdateId::new(),
2031 conflict_type: ConflictType::EntityExists,
2032 existing_data: ChangeData::Entity(existing.clone()),
2033 new_data: change.data.clone(),
2034 resolution: None,
2035 }));
2036 }
2037 }
2038 },
2039 ChangeData::Relationship(relationship) => {
2040 let graph = self.graph.read();
2041 for existing_rel in graph.get_all_relationships() {
2042 if existing_rel.source == relationship.source
2043 && existing_rel.target == relationship.target
2044 && existing_rel.relation_type == relationship.relation_type
2045 {
2046 return Ok(Some(Conflict {
2047 conflict_id: UpdateId::new(),
2048 conflict_type: ConflictType::RelationshipExists,
2049 existing_data: ChangeData::Relationship(existing_rel.clone()),
2050 new_data: change.data.clone(),
2051 resolution: None,
2052 }));
2053 }
2054 }
2055 },
2056 _ => {},
2057 }
2058
2059 Ok(None)
2060 }
2061
2062 async fn apply_change_internal(&self, change: ChangeRecord) -> Result<()> {
2063 let change_id = change.change_id.clone();
2064
2065 let rollback_data = {
2067 let graph = self.graph.read();
2068 self.create_rollback_data(&change, &graph)?
2069 };
2070
2071 self.rollback_data.insert(change_id.clone(), rollback_data);
2072
2073 {
2075 let mut graph = self.graph.write();
2076 match &change.data {
2077 ChangeData::Entity(entity) => {
2078 match change.operation {
2079 Operation::Insert | Operation::Upsert => {
2080 graph.add_entity(entity.clone())?;
2081 self.incremental_pagerank.record_change(entity.id.clone());
2082 },
2083 Operation::Delete => {
2084 },
2087 _ => {},
2088 }
2089 },
2090 ChangeData::Relationship(relationship) => {
2091 match change.operation {
2092 Operation::Insert | Operation::Upsert => {
2093 graph.add_relationship(relationship.clone())?;
2094 self.incremental_pagerank
2095 .record_change(relationship.source.clone());
2096 self.incremental_pagerank
2097 .record_change(relationship.target.clone());
2098 },
2099 Operation::Delete => {
2100 },
2103 _ => {},
2104 }
2105 },
2106 ChangeData::Embedding {
2107 entity_id,
2108 embedding,
2109 } => {
2110 if let Some(entity) = graph.get_entity_mut(entity_id) {
2111 entity.embedding = Some(embedding.clone());
2112 }
2113 },
2114 _ => {},
2115 }
2116 }
2117
2118 self.change_log.insert(change_id, change);
2120
2121 Ok(())
2122 }
2123
2124 fn create_rollback_data(
2125 &self,
2126 change: &ChangeRecord,
2127 graph: &KnowledgeGraph,
2128 ) -> Result<RollbackData> {
2129 let mut previous_entities = Vec::new();
2130 let mut previous_relationships = Vec::new();
2131
2132 match &change.data {
2133 ChangeData::Entity(entity) => {
2134 if let Some(existing) = graph.get_entity(&entity.id) {
2135 previous_entities.push(existing.clone());
2136 }
2137 },
2138 ChangeData::Relationship(relationship) => {
2139 for rel in graph.get_all_relationships() {
2141 if rel.source == relationship.source && rel.target == relationship.target {
2142 previous_relationships.push(rel.clone());
2143 }
2144 }
2145 },
2146 _ => {},
2147 }
2148
2149 Ok(RollbackData {
2150 previous_entities,
2151 previous_relationships,
2152 affected_caches: vec![], })
2154 }
2155}
2156
2157#[cfg(feature = "incremental")]
2158#[async_trait::async_trait]
2159impl IncrementalGraphStore for ProductionGraphStore {
2160 type Error = GraphRAGError;
2161
2162 async fn upsert_entity(&mut self, entity: Entity) -> Result<UpdateId> {
2163 let change = self.create_change_record(
2164 ChangeType::EntityAdded,
2165 Operation::Upsert,
2166 ChangeData::Entity(entity.clone()),
2167 Some(entity.id.clone()),
2168 None,
2169 );
2170
2171 let update_id = self.apply_change_with_conflict_resolution(change).await?;
2172
2173 let changes = vec![self.change_log.get(&update_id).unwrap().clone()];
2175 let _invalidation_strategies = self.cache_invalidation.invalidate_for_changes(&changes);
2176
2177 self.publish_event(ChangeEvent {
2179 event_id: UpdateId::new(),
2180 event_type: ChangeEventType::EntityUpserted,
2181 entity_id: Some(entity.id),
2182 timestamp: Utc::now(),
2183 metadata: HashMap::new(),
2184 })
2185 .await;
2186
2187 Ok(update_id)
2188 }
2189
2190 async fn upsert_relationship(&mut self, relationship: Relationship) -> Result<UpdateId> {
2191 let change = self.create_change_record(
2192 ChangeType::RelationshipAdded,
2193 Operation::Upsert,
2194 ChangeData::Relationship(relationship.clone()),
2195 None,
2196 None,
2197 );
2198
2199 let update_id = self.apply_change_with_conflict_resolution(change).await?;
2200
2201 self.publish_event(ChangeEvent {
2203 event_id: UpdateId::new(),
2204 event_type: ChangeEventType::RelationshipUpserted,
2205 entity_id: Some(relationship.source),
2206 timestamp: Utc::now(),
2207 metadata: HashMap::new(),
2208 })
2209 .await;
2210
2211 Ok(update_id)
2212 }
2213
2214 async fn delete_entity(&mut self, entity_id: &EntityId) -> Result<UpdateId> {
2215 let update_id = UpdateId::new();
2217
2218 self.publish_event(ChangeEvent {
2220 event_id: UpdateId::new(),
2221 event_type: ChangeEventType::EntityDeleted,
2222 entity_id: Some(entity_id.clone()),
2223 timestamp: Utc::now(),
2224 metadata: HashMap::new(),
2225 })
2226 .await;
2227
2228 Ok(update_id)
2229 }
2230
2231 async fn delete_relationship(
2232 &mut self,
2233 source: &EntityId,
2234 _target: &EntityId,
2235 _relation_type: &str,
2236 ) -> Result<UpdateId> {
2237 let update_id = UpdateId::new();
2239
2240 self.publish_event(ChangeEvent {
2242 event_id: UpdateId::new(),
2243 event_type: ChangeEventType::RelationshipDeleted,
2244 entity_id: Some(source.clone()),
2245 timestamp: Utc::now(),
2246 metadata: HashMap::new(),
2247 })
2248 .await;
2249
2250 Ok(update_id)
2251 }
2252
2253 async fn apply_delta(&mut self, delta: GraphDelta) -> Result<UpdateId> {
2254 let tx_id = self.begin_transaction().await?;
2255
2256 for change in delta.changes {
2257 self.apply_change_with_conflict_resolution(change).await?;
2258 }
2259
2260 self.commit_transaction(tx_id).await?;
2261 Ok(delta.delta_id)
2262 }
2263
2264 async fn rollback_delta(&mut self, _delta_id: &UpdateId) -> Result<()> {
2265 Ok(())
2267 }
2268
2269 async fn get_change_log(&self, since: Option<DateTime<Utc>>) -> Result<Vec<ChangeRecord>> {
2270 let changes: Vec<ChangeRecord> = self
2271 .change_log
2272 .iter()
2273 .filter_map(|entry| {
2274 let change = entry.value();
2275 if let Some(since_time) = since {
2276 if change.timestamp >= since_time {
2277 Some(change.clone())
2278 } else {
2279 None
2280 }
2281 } else {
2282 Some(change.clone())
2283 }
2284 })
2285 .collect();
2286
2287 Ok(changes)
2288 }
2289
2290 async fn begin_transaction(&mut self) -> Result<TransactionId> {
2291 let tx_id = TransactionId::new();
2292 let transaction = Transaction {
2293 id: tx_id.clone(),
2294 changes: Vec::new(),
2295 status: TransactionStatus::Active,
2296 created_at: Utc::now(),
2297 isolation_level: IsolationLevel::ReadCommitted,
2298 };
2299
2300 self.transactions.insert(tx_id.clone(), transaction);
2301
2302 self.publish_event(ChangeEvent {
2304 event_id: UpdateId::new(),
2305 event_type: ChangeEventType::TransactionStarted,
2306 entity_id: None,
2307 timestamp: Utc::now(),
2308 metadata: [("transaction_id".to_string(), tx_id.to_string())]
2309 .into_iter()
2310 .collect(),
2311 })
2312 .await;
2313
2314 Ok(tx_id)
2315 }
2316
2317 async fn commit_transaction(&mut self, tx_id: TransactionId) -> Result<()> {
2318 if let Some((_, mut tx)) = self.transactions.remove(&tx_id) {
2319 tx.status = TransactionStatus::Committed;
2320
2321 self.publish_event(ChangeEvent {
2323 event_id: UpdateId::new(),
2324 event_type: ChangeEventType::TransactionCommitted,
2325 entity_id: None,
2326 timestamp: Utc::now(),
2327 metadata: [("transaction_id".to_string(), tx_id.to_string())]
2328 .into_iter()
2329 .collect(),
2330 })
2331 .await;
2332
2333 Ok(())
2334 } else {
2335 Err(GraphRAGError::IncrementalUpdate {
2336 message: format!("Transaction {tx_id} not found"),
2337 })
2338 }
2339 }
2340
2341 async fn rollback_transaction(&mut self, tx_id: TransactionId) -> Result<()> {
2342 if let Some((_, mut tx)) = self.transactions.remove(&tx_id) {
2343 tx.status = TransactionStatus::Aborted;
2344
2345 for _change in &tx.changes {
2347 }
2349
2350 self.publish_event(ChangeEvent {
2352 event_id: UpdateId::new(),
2353 event_type: ChangeEventType::TransactionRolledBack,
2354 entity_id: None,
2355 timestamp: Utc::now(),
2356 metadata: [("transaction_id".to_string(), tx_id.to_string())]
2357 .into_iter()
2358 .collect(),
2359 })
2360 .await;
2361
2362 Ok(())
2363 } else {
2364 Err(GraphRAGError::IncrementalUpdate {
2365 message: format!("Transaction {tx_id} not found"),
2366 })
2367 }
2368 }
2369
2370 async fn batch_upsert_entities(
2371 &mut self,
2372 entities: Vec<Entity>,
2373 _strategy: ConflictStrategy,
2374 ) -> Result<Vec<UpdateId>> {
2375 let mut update_ids = Vec::new();
2376
2377 for entity in entities {
2378 let update_id = self.upsert_entity(entity).await?;
2379 update_ids.push(update_id);
2380 }
2381
2382 Ok(update_ids)
2383 }
2384
2385 async fn batch_upsert_relationships(
2386 &mut self,
2387 relationships: Vec<Relationship>,
2388 _strategy: ConflictStrategy,
2389 ) -> Result<Vec<UpdateId>> {
2390 let mut update_ids = Vec::new();
2391
2392 for relationship in relationships {
2393 let update_id = self.upsert_relationship(relationship).await?;
2394 update_ids.push(update_id);
2395 }
2396
2397 Ok(update_ids)
2398 }
2399
2400 async fn update_entity_embedding(
2401 &mut self,
2402 entity_id: &EntityId,
2403 embedding: Vec<f32>,
2404 ) -> Result<UpdateId> {
2405 let change = self.create_change_record(
2406 ChangeType::EmbeddingUpdated,
2407 Operation::Update,
2408 ChangeData::Embedding {
2409 entity_id: entity_id.clone(),
2410 embedding,
2411 },
2412 Some(entity_id.clone()),
2413 None,
2414 );
2415
2416 let update_id = self.apply_change_with_conflict_resolution(change).await?;
2417
2418 self.publish_event(ChangeEvent {
2420 event_id: UpdateId::new(),
2421 event_type: ChangeEventType::EmbeddingUpdated,
2422 entity_id: Some(entity_id.clone()),
2423 timestamp: Utc::now(),
2424 metadata: HashMap::new(),
2425 })
2426 .await;
2427
2428 Ok(update_id)
2429 }
2430
2431 async fn bulk_update_embeddings(
2432 &mut self,
2433 updates: Vec<(EntityId, Vec<f32>)>,
2434 ) -> Result<Vec<UpdateId>> {
2435 let mut update_ids = Vec::new();
2436
2437 for (entity_id, embedding) in updates {
2438 let update_id = self.update_entity_embedding(&entity_id, embedding).await?;
2439 update_ids.push(update_id);
2440 }
2441
2442 Ok(update_ids)
2443 }
2444
2445 async fn get_pending_transactions(&self) -> Result<Vec<TransactionId>> {
2446 let pending: Vec<TransactionId> = self
2447 .transactions
2448 .iter()
2449 .filter(|entry| entry.value().status == TransactionStatus::Active)
2450 .map(|entry| entry.key().clone())
2451 .collect();
2452
2453 Ok(pending)
2454 }
2455
2456 async fn get_graph_statistics(&self) -> Result<GraphStatistics> {
2457 let graph = self.graph.read();
2458 let entities: Vec<_> = graph.entities().collect();
2459 let relationships = graph.get_all_relationships();
2460
2461 let node_count = entities.len();
2462 let edge_count = relationships.len();
2463
2464 let total_degree: usize = entities
2466 .iter()
2467 .map(|entity| graph.get_neighbors(&entity.id).len())
2468 .sum();
2469
2470 let average_degree = if node_count > 0 {
2471 total_degree as f64 / node_count as f64
2472 } else {
2473 0.0
2474 };
2475
2476 let max_degree = entities
2478 .iter()
2479 .map(|entity| graph.get_neighbors(&entity.id).len())
2480 .max()
2481 .unwrap_or(0);
2482
2483 Ok(GraphStatistics {
2484 node_count,
2485 edge_count,
2486 average_degree,
2487 max_degree,
2488 connected_components: 1, clustering_coefficient: 0.0, last_updated: Utc::now(),
2491 })
2492 }
2493
2494 async fn validate_consistency(&self) -> Result<ConsistencyReport> {
2495 let graph = self.graph.read();
2496 let mut orphaned_entities = Vec::new();
2497 let mut broken_relationships = Vec::new();
2498 let mut missing_embeddings = Vec::new();
2499
2500 for entity in graph.entities() {
2502 let neighbors = graph.get_neighbors(&entity.id);
2503 if neighbors.is_empty() {
2504 orphaned_entities.push(entity.id.clone());
2505 }
2506
2507 if entity.embedding.is_none() {
2509 missing_embeddings.push(entity.id.clone());
2510 }
2511 }
2512
2513 for relationship in graph.get_all_relationships() {
2515 if graph.get_entity(&relationship.source).is_none()
2516 || graph.get_entity(&relationship.target).is_none()
2517 {
2518 broken_relationships.push((
2519 relationship.source.clone(),
2520 relationship.target.clone(),
2521 relationship.relation_type.clone(),
2522 ));
2523 }
2524 }
2525
2526 let issues_found =
2527 orphaned_entities.len() + broken_relationships.len() + missing_embeddings.len();
2528
2529 Ok(ConsistencyReport {
2530 is_consistent: issues_found == 0,
2531 orphaned_entities,
2532 broken_relationships,
2533 missing_embeddings,
2534 validation_time: Utc::now(),
2535 issues_found,
2536 })
2537 }
2538}
2539
2540#[allow(dead_code)]
2542trait ChangeDataExt {
2543 fn get_entity_id(&self) -> Option<EntityId>;
2544}
2545
2546impl ChangeDataExt for ChangeData {
2547 fn get_entity_id(&self) -> Option<EntityId> {
2548 match self {
2549 ChangeData::Entity(entity) => Some(entity.id.clone()),
2550 ChangeData::Embedding { entity_id, .. } => Some(entity_id.clone()),
2551 _ => None,
2552 }
2553 }
2554}
2555
2556#[cfg(test)]
2559mod tests {
2560 use super::*;
2561
2562 #[test]
2563 fn test_update_id_generation() {
2564 let id1 = UpdateId::new();
2565 let id2 = UpdateId::new();
2566 assert_ne!(id1.as_str(), id2.as_str());
2567 }
2568
2569 #[test]
2570 fn test_transaction_id_generation() {
2571 let tx1 = TransactionId::new();
2572 let tx2 = TransactionId::new();
2573 assert_ne!(tx1.as_str(), tx2.as_str());
2574 }
2575
2576 #[test]
2577 fn test_change_record_creation() {
2578 let entity = Entity::new(
2579 EntityId::new("test".to_string()),
2580 "Test Entity".to_string(),
2581 "Person".to_string(),
2582 0.9,
2583 );
2584
2585 let config = IncrementalConfig::default();
2586 let graph = KnowledgeGraph::new();
2587 let manager = IncrementalGraphManager::new(graph, config);
2588
2589 let change = manager.create_change_record(
2590 ChangeType::EntityAdded,
2591 Operation::Insert,
2592 ChangeData::Entity(entity.clone()),
2593 Some(entity.id.clone()),
2594 None,
2595 );
2596
2597 assert_eq!(change.change_type, ChangeType::EntityAdded);
2598 assert_eq!(change.operation, Operation::Insert);
2599 assert_eq!(change.entity_id, Some(entity.id));
2600 }
2601
2602 #[test]
2603 fn test_conflict_resolver_creation() {
2604 let resolver = ConflictResolver::new(ConflictStrategy::KeepExisting);
2605 assert!(matches!(resolver.strategy, ConflictStrategy::KeepExisting));
2606 }
2607
2608 #[test]
2609 fn test_incremental_config_default() {
2610 let config = IncrementalConfig::default();
2611 assert_eq!(config.max_change_log_size, 10000);
2612 assert_eq!(config.batch_size, 100);
2613 assert!(config.enable_monitoring);
2614 }
2615
2616 #[test]
2617 fn test_statistics_creation() {
2618 let stats = IncrementalStatistics::empty();
2619 assert_eq!(stats.total_updates, 0);
2620 assert_eq!(stats.entities_added, 0);
2621 assert_eq!(stats.average_update_time_ms, 0.0);
2622 }
2623
2624 #[tokio::test]
2625 async fn test_basic_entity_upsert() {
2626 let config = IncrementalConfig::default();
2627 let graph = KnowledgeGraph::new();
2628 let mut manager = IncrementalGraphManager::new(graph, config);
2629
2630 let entity = Entity::new(
2631 EntityId::new("test_entity".to_string()),
2632 "Test Entity".to_string(),
2633 "Person".to_string(),
2634 0.9,
2635 );
2636
2637 let update_id = manager.basic_upsert_entity(entity).unwrap();
2638 assert!(!update_id.as_str().is_empty());
2639
2640 let stats = manager.get_statistics();
2641 assert_eq!(stats.entities_added, 1);
2642 }
2643
2644 #[cfg(feature = "incremental")]
2645 #[tokio::test]
2646 async fn test_production_graph_store_creation() {
2647 let graph = KnowledgeGraph::new();
2648 let config = IncrementalConfig::default();
2649 let resolver = ConflictResolver::new(ConflictStrategy::Merge);
2650
2651 let store = ProductionGraphStore::new(graph, config, resolver);
2652 let _receiver = store.subscribe_events();
2653 }
2655
2656 #[cfg(feature = "incremental")]
2657 #[tokio::test]
2658 async fn test_production_graph_store_entity_upsert() {
2659 let graph = KnowledgeGraph::new();
2660 let config = IncrementalConfig::default();
2661 let resolver = ConflictResolver::new(ConflictStrategy::Merge);
2662
2663 let mut store = ProductionGraphStore::new(graph, config, resolver);
2664
2665 let entity = Entity::new(
2666 EntityId::new("test_entity".to_string()),
2667 "Test Entity".to_string(),
2668 "Person".to_string(),
2669 0.9,
2670 );
2671
2672 let update_id = store.upsert_entity(entity).await.unwrap();
2673 assert!(!update_id.as_str().is_empty());
2674
2675 let stats = store.get_graph_statistics().await.unwrap();
2676 assert_eq!(stats.node_count, 1);
2677 }
2678
2679 #[cfg(feature = "incremental")]
2680 #[tokio::test]
2681 async fn test_production_graph_store_relationship_upsert() {
2682 let graph = KnowledgeGraph::new();
2683 let config = IncrementalConfig::default();
2684 let resolver = ConflictResolver::new(ConflictStrategy::Merge);
2685
2686 let mut store = ProductionGraphStore::new(graph, config, resolver);
2687
2688 let entity1 = Entity::new(
2690 EntityId::new("entity1".to_string()),
2691 "Entity 1".to_string(),
2692 "Person".to_string(),
2693 0.9,
2694 );
2695
2696 let entity2 = Entity::new(
2697 EntityId::new("entity2".to_string()),
2698 "Entity 2".to_string(),
2699 "Person".to_string(),
2700 0.9,
2701 );
2702
2703 store.upsert_entity(entity1.clone()).await.unwrap();
2704 store.upsert_entity(entity2.clone()).await.unwrap();
2705
2706 let relationship = Relationship::new(entity1.id, entity2.id, "KNOWS".to_string(), 0.8);
2707
2708 let update_id = store.upsert_relationship(relationship).await.unwrap();
2709 assert!(!update_id.as_str().is_empty());
2710
2711 let stats = store.get_graph_statistics().await.unwrap();
2712 assert_eq!(stats.edge_count, 1);
2713 }
2714
2715 #[cfg(feature = "incremental")]
2716 #[tokio::test]
2717 async fn test_production_graph_store_transactions() {
2718 let graph = KnowledgeGraph::new();
2719 let config = IncrementalConfig::default();
2720 let resolver = ConflictResolver::new(ConflictStrategy::Merge);
2721
2722 let mut store = ProductionGraphStore::new(graph, config, resolver);
2723
2724 let tx_id = store.begin_transaction().await.unwrap();
2725 assert!(!tx_id.as_str().is_empty());
2726
2727 let pending = store.get_pending_transactions().await.unwrap();
2728 assert_eq!(pending.len(), 1);
2729 assert_eq!(pending[0], tx_id);
2730
2731 store.commit_transaction(tx_id).await.unwrap();
2732
2733 let pending_after = store.get_pending_transactions().await.unwrap();
2734 assert_eq!(pending_after.len(), 0);
2735 }
2736
2737 #[cfg(feature = "incremental")]
2738 #[tokio::test]
2739 async fn test_production_graph_store_consistency_validation() {
2740 let graph = KnowledgeGraph::new();
2741 let config = IncrementalConfig::default();
2742 let resolver = ConflictResolver::new(ConflictStrategy::Merge);
2743
2744 let store = ProductionGraphStore::new(graph, config, resolver);
2745
2746 let report = store.validate_consistency().await.unwrap();
2747 assert!(report.is_consistent);
2748 assert_eq!(report.issues_found, 0);
2749 }
2750
2751 #[cfg(feature = "incremental")]
2752 #[tokio::test]
2753 async fn test_production_graph_store_event_publishing() {
2754 let graph = KnowledgeGraph::new();
2755 let config = IncrementalConfig::default();
2756 let resolver = ConflictResolver::new(ConflictStrategy::Merge);
2757
2758 let store = ProductionGraphStore::new(graph, config, resolver);
2759 let mut event_receiver = store.subscribe_events();
2760
2761 let entity = Entity::new(
2762 EntityId::new("test_entity".to_string()),
2763 "Test Entity".to_string(),
2764 "Person".to_string(),
2765 0.9,
2766 );
2767
2768 let store_clone = Arc::new(tokio::sync::Mutex::new(store));
2770 let store_for_task = Arc::clone(&store_clone);
2771
2772 tokio::spawn(async move {
2773 let mut store = store_for_task.lock().await;
2774 let _ = store.upsert_entity(entity).await;
2775 });
2776
2777 let event =
2779 tokio::time::timeout(std::time::Duration::from_millis(100), event_receiver.recv())
2780 .await;
2781 assert!(event.is_ok());
2782 }
2783
2784 #[cfg(feature = "incremental")]
2785 #[test]
2786 fn test_incremental_pagerank_creation() {
2787 let pagerank = IncrementalPageRank::new(0.85, 1e-6, 100);
2788 assert!(pagerank.scores.is_empty());
2789 }
2790
2791 #[cfg(feature = "incremental")]
2792 #[test]
2793 fn test_batch_processor_creation() {
2794 let processor = BatchProcessor::new(100, Duration::from_millis(500), 10);
2795 let metrics = processor.get_metrics();
2796 assert_eq!(metrics.total_batches_processed, 0);
2797 }
2798
2799 #[cfg(feature = "incremental")]
2800 #[tokio::test]
2801 async fn test_selective_invalidation() {
2802 let invalidation = SelectiveInvalidation::new();
2803
2804 let region = CacheRegion {
2805 region_id: "test_region".to_string(),
2806 entity_ids: [EntityId::new("entity1".to_string())].into_iter().collect(),
2807 relationship_types: ["KNOWS".to_string()].into_iter().collect(),
2808 document_ids: HashSet::new(),
2809 last_modified: Utc::now(),
2810 };
2811
2812 invalidation.register_cache_region(region);
2813
2814 let entity = Entity::new(
2815 EntityId::new("entity1".to_string()),
2816 "Entity 1".to_string(),
2817 "Person".to_string(),
2818 0.9,
2819 );
2820
2821 let ent_id_for_log = entity.id.clone();
2822 let change = ChangeRecord {
2823 change_id: UpdateId::new(),
2824 timestamp: Utc::now(),
2825 change_type: ChangeType::EntityUpdated,
2826 entity_id: Some(ent_id_for_log),
2827 document_id: None,
2828 operation: Operation::Update,
2829 data: ChangeData::Entity(entity),
2830 metadata: HashMap::new(),
2831 };
2832
2833 let strategies = invalidation.invalidate_for_changes(&[change]);
2834 assert!(!strategies.is_empty());
2835 }
2836
2837 #[cfg(feature = "incremental")]
2838 #[test]
2839 fn test_conflict_resolver_merge() {
2840 let resolver = ConflictResolver::new(ConflictStrategy::Merge);
2841
2842 let entity1 = Entity::new(
2843 EntityId::new("entity1".to_string()),
2844 "Entity 1".to_string(),
2845 "Person".to_string(),
2846 0.8,
2847 );
2848
2849 let entity2 = Entity::new(
2850 EntityId::new("entity1".to_string()),
2851 "Entity 1 Updated".to_string(),
2852 "Person".to_string(),
2853 0.9,
2854 );
2855
2856 let merged = resolver.merge_entities(&entity1, &entity2).unwrap();
2857 assert_eq!(merged.confidence, 0.9); assert_eq!(merged.name, "Entity 1 Updated");
2859 }
2860
2861 #[test]
2862 fn test_graph_statistics_creation() {
2863 let stats = GraphStatistics {
2864 node_count: 100,
2865 edge_count: 150,
2866 average_degree: 3.0,
2867 max_degree: 10,
2868 connected_components: 1,
2869 clustering_coefficient: 0.3,
2870 last_updated: Utc::now(),
2871 };
2872
2873 assert_eq!(stats.node_count, 100);
2874 assert_eq!(stats.edge_count, 150);
2875 }
2876
2877 #[test]
2878 fn test_consistency_report_creation() {
2879 let report = ConsistencyReport {
2880 is_consistent: true,
2881 orphaned_entities: vec![],
2882 broken_relationships: vec![],
2883 missing_embeddings: vec![],
2884 validation_time: Utc::now(),
2885 issues_found: 0,
2886 };
2887
2888 assert!(report.is_consistent);
2889 assert_eq!(report.issues_found, 0);
2890 }
2891
2892 #[test]
2893 fn test_change_event_creation() {
2894 let event = ChangeEvent {
2895 event_id: UpdateId::new(),
2896 event_type: ChangeEventType::EntityUpserted,
2897 entity_id: Some(EntityId::new("entity1".to_string())),
2898 timestamp: Utc::now(),
2899 metadata: HashMap::new(),
2900 };
2901
2902 assert!(matches!(event.event_type, ChangeEventType::EntityUpserted));
2903 assert!(event.entity_id.is_some());
2904 }
2905}