1use crate::core::{
25 DocumentId, Entity, EntityId, GraphRAGError, KnowledgeGraph, Relationship, Result, TextChunk,
26};
27use chrono::{DateTime, Utc};
28use serde::{Deserialize, Serialize};
29use std::collections::{HashMap, HashSet};
30use std::time::{Duration, Instant};
31
32#[cfg(feature = "incremental")]
33use std::sync::Arc;
34
35#[cfg(feature = "incremental")]
36use {
37 dashmap::DashMap,
38 parking_lot::{Mutex, RwLock},
39 tokio::sync::{broadcast, Semaphore},
40 uuid::Uuid,
41};
42
43#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
49pub struct UpdateId(String);
50
51impl UpdateId {
52 pub fn new() -> Self {
54 #[cfg(feature = "incremental")]
55 {
56 Self(Uuid::new_v4().to_string())
57 }
58 #[cfg(not(feature = "incremental"))]
59 {
60 Self(format!(
61 "update_{}",
62 Utc::now().timestamp_nanos_opt().unwrap_or(0)
63 ))
64 }
65 }
66
67 pub fn from_string(id: String) -> Self {
69 Self(id)
70 }
71
72 pub fn as_str(&self) -> &str {
74 &self.0
75 }
76}
77
78impl std::fmt::Display for UpdateId {
79 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
80 write!(f, "{}", self.0)
81 }
82}
83
84impl Default for UpdateId {
85 fn default() -> Self {
86 Self::new()
87 }
88}
89
90#[derive(Debug, Clone, Serialize, Deserialize)]
92pub struct ChangeRecord {
93 pub change_id: UpdateId,
95 pub timestamp: DateTime<Utc>,
97 pub change_type: ChangeType,
99 pub entity_id: Option<EntityId>,
101 pub document_id: Option<DocumentId>,
103 pub operation: Operation,
105 pub data: ChangeData,
107 pub metadata: HashMap<String, String>,
109}
110
111#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
113pub enum ChangeType {
114 EntityAdded,
116 EntityUpdated,
118 EntityRemoved,
120 RelationshipAdded,
122 RelationshipUpdated,
124 RelationshipRemoved,
126 DocumentAdded,
128 DocumentUpdated,
130 DocumentRemoved,
132 ChunkAdded,
134 ChunkUpdated,
136 ChunkRemoved,
138 EmbeddingAdded,
140 EmbeddingUpdated,
142 EmbeddingRemoved,
144}
145
146#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
148pub enum Operation {
149 Insert,
151 Update,
153 Delete,
155 Upsert,
157}
158
159#[derive(Debug, Clone, Serialize, Deserialize)]
161pub enum ChangeData {
162 Entity(Entity),
164 Relationship(Relationship),
166 Document(Document),
168 Chunk(TextChunk),
170 Embedding {
172 entity_id: EntityId,
174 embedding: Vec<f32>,
176 },
177 Empty,
179}
180
181#[derive(Debug, Clone, Serialize, Deserialize)]
183pub struct Document {
184 pub id: DocumentId,
186 pub title: String,
188 pub content: String,
190 pub metadata: HashMap<String, String>,
192}
193
194#[derive(Debug, Clone, Serialize, Deserialize)]
196pub struct GraphDelta {
197 pub delta_id: UpdateId,
199 pub timestamp: DateTime<Utc>,
201 pub changes: Vec<ChangeRecord>,
203 pub dependencies: Vec<UpdateId>,
205 pub status: DeltaStatus,
207 pub rollback_data: Option<RollbackData>,
209}
210
211#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
213pub enum DeltaStatus {
214 Pending,
216 Applied,
218 Committed,
220 RolledBack,
222 Failed {
224 error: String
226 },
227}
228
229#[derive(Debug, Clone, Serialize, Deserialize)]
231pub struct RollbackData {
232 pub previous_entities: Vec<Entity>,
234 pub previous_relationships: Vec<Relationship>,
236 pub affected_caches: Vec<String>,
238}
239
240#[derive(Debug, Clone, Serialize, Deserialize)]
242pub enum ConflictStrategy {
243 KeepExisting,
245 KeepNew,
247 Merge,
249 LLMDecision,
251 UserPrompt,
253 Custom(String),
255}
256
257#[derive(Debug, Clone, Serialize, Deserialize)]
259pub struct Conflict {
260 pub conflict_id: UpdateId,
262 pub conflict_type: ConflictType,
264 pub existing_data: ChangeData,
266 pub new_data: ChangeData,
268 pub resolution: Option<ConflictResolution>,
270}
271
272#[derive(Debug, Clone, Serialize, Deserialize)]
274pub enum ConflictType {
275 EntityExists,
277 RelationshipExists,
279 VersionMismatch,
281 DataInconsistency,
283 ConstraintViolation,
285}
286
287#[derive(Debug, Clone, Serialize, Deserialize)]
289pub struct ConflictResolution {
290 pub strategy: ConflictStrategy,
292 pub resolved_data: ChangeData,
294 pub metadata: HashMap<String, String>,
296}
297
298#[async_trait::async_trait]
304pub trait IncrementalGraphStore: Send + Sync {
305 type Error: std::error::Error + Send + Sync + 'static;
307
308 async fn upsert_entity(&mut self, entity: Entity) -> Result<UpdateId>;
310
311 async fn upsert_relationship(&mut self, relationship: Relationship) -> Result<UpdateId>;
313
314 async fn delete_entity(&mut self, entity_id: &EntityId) -> Result<UpdateId>;
316
317 async fn delete_relationship(
319 &mut self,
320 source: &EntityId,
321 target: &EntityId,
322 relation_type: &str,
323 ) -> Result<UpdateId>;
324
325 async fn apply_delta(&mut self, delta: GraphDelta) -> Result<UpdateId>;
327
328 async fn rollback_delta(&mut self, delta_id: &UpdateId) -> Result<()>;
330
331 async fn get_change_log(&self, since: Option<DateTime<Utc>>) -> Result<Vec<ChangeRecord>>;
333
334 async fn begin_transaction(&mut self) -> Result<TransactionId>;
336
337 async fn commit_transaction(&mut self, tx_id: TransactionId) -> Result<()>;
339
340 async fn rollback_transaction(&mut self, tx_id: TransactionId) -> Result<()>;
342
343 async fn batch_upsert_entities(
345 &mut self,
346 entities: Vec<Entity>,
347 _strategy: ConflictStrategy,
348 ) -> Result<Vec<UpdateId>>;
349
350 async fn batch_upsert_relationships(
352 &mut self,
353 relationships: Vec<Relationship>,
354 _strategy: ConflictStrategy,
355 ) -> Result<Vec<UpdateId>>;
356
357 async fn update_entity_embedding(
359 &mut self,
360 entity_id: &EntityId,
361 embedding: Vec<f32>,
362 ) -> Result<UpdateId>;
363
364 async fn bulk_update_embeddings(
366 &mut self,
367 updates: Vec<(EntityId, Vec<f32>)>,
368 ) -> Result<Vec<UpdateId>>;
369
370 async fn get_pending_transactions(&self) -> Result<Vec<TransactionId>>;
372
373 async fn get_graph_statistics(&self) -> Result<GraphStatistics>;
375
376 async fn validate_consistency(&self) -> Result<ConsistencyReport>;
378}
379
380#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
382pub struct TransactionId(String);
383
384impl TransactionId {
385 pub fn new() -> Self {
387 #[cfg(feature = "incremental")]
388 {
389 Self(Uuid::new_v4().to_string())
390 }
391 #[cfg(not(feature = "incremental"))]
392 {
393 Self(format!(
394 "tx_{}",
395 Utc::now().timestamp_nanos_opt().unwrap_or(0)
396 ))
397 }
398 }
399
400 pub fn as_str(&self) -> &str {
402 &self.0
403 }
404}
405
406impl std::fmt::Display for TransactionId {
407 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
408 write!(f, "{}", self.0)
409 }
410}
411
412impl Default for TransactionId {
413 fn default() -> Self {
414 Self::new()
415 }
416}
417
418#[derive(Debug, Clone, Serialize, Deserialize)]
420pub struct GraphStatistics {
421 pub node_count: usize,
423 pub edge_count: usize,
425 pub average_degree: f64,
427 pub max_degree: usize,
429 pub connected_components: usize,
431 pub clustering_coefficient: f64,
433 pub last_updated: DateTime<Utc>,
435}
436
437#[derive(Debug, Clone, Serialize, Deserialize)]
439pub struct ConsistencyReport {
440 pub is_consistent: bool,
442 pub orphaned_entities: Vec<EntityId>,
444 pub broken_relationships: Vec<(EntityId, EntityId, String)>,
446 pub missing_embeddings: Vec<EntityId>,
448 pub validation_time: DateTime<Utc>,
450 pub issues_found: usize,
452}
453
454#[derive(Debug, Clone)]
460pub enum InvalidationStrategy {
461 Selective(Vec<String>),
463 Regional(String),
465 Global,
467 Relational(EntityId, u32), }
470
471#[derive(Debug, Clone)]
473pub struct CacheRegion {
474 pub region_id: String,
476 pub entity_ids: HashSet<EntityId>,
478 pub relationship_types: HashSet<String>,
480 pub document_ids: HashSet<DocumentId>,
482 pub last_modified: DateTime<Utc>,
484}
485
486#[cfg(feature = "incremental")]
488pub struct SelectiveInvalidation {
489 cache_regions: DashMap<String, CacheRegion>,
490 entity_to_regions: DashMap<EntityId, HashSet<String>>,
491 invalidation_log: Mutex<Vec<(DateTime<Utc>, InvalidationStrategy)>>,
492}
493
494#[cfg(feature = "incremental")]
495impl Default for SelectiveInvalidation {
496 fn default() -> Self {
497 Self::new()
498 }
499}
500
501#[cfg(feature = "incremental")]
502impl SelectiveInvalidation {
503 pub fn new() -> Self {
505 Self {
506 cache_regions: DashMap::new(),
507 entity_to_regions: DashMap::new(),
508 invalidation_log: Mutex::new(Vec::new()),
509 }
510 }
511
512 pub fn register_cache_region(&self, region: CacheRegion) {
514 let region_id = region.region_id.clone();
515
516 for entity_id in ®ion.entity_ids {
518 self.entity_to_regions
519 .entry(entity_id.clone())
520 .or_default()
521 .insert(region_id.clone());
522 }
523
524 self.cache_regions.insert(region_id, region);
525 }
526
527 pub fn invalidate_for_changes(&self, changes: &[ChangeRecord]) -> Vec<InvalidationStrategy> {
529 let mut strategies = Vec::new();
530 let mut affected_regions = HashSet::new();
531
532 for change in changes {
533 match &change.change_type {
534 ChangeType::EntityAdded | ChangeType::EntityUpdated | ChangeType::EntityRemoved => {
535 if let Some(entity_id) = &change.entity_id {
536 if let Some(regions) = self.entity_to_regions.get(entity_id) {
537 affected_regions.extend(regions.clone());
538 }
539 strategies.push(InvalidationStrategy::Relational(entity_id.clone(), 2));
540 }
541 }
542 ChangeType::RelationshipAdded
543 | ChangeType::RelationshipUpdated
544 | ChangeType::RelationshipRemoved => {
545 if let ChangeData::Relationship(rel) = &change.data {
547 strategies.push(InvalidationStrategy::Relational(rel.source.clone(), 1));
548 strategies.push(InvalidationStrategy::Relational(rel.target.clone(), 1));
549 }
550 }
551 _ => {
552 let cache_keys = self.generate_cache_keys_for_change(change);
554 if !cache_keys.is_empty() {
555 strategies.push(InvalidationStrategy::Selective(cache_keys));
556 }
557 }
558 }
559 }
560
561 for region_id in affected_regions {
563 strategies.push(InvalidationStrategy::Regional(region_id));
564 }
565
566 let mut log = self.invalidation_log.lock();
568 for strategy in &strategies {
569 log.push((Utc::now(), strategy.clone()));
570 }
571
572 strategies
573 }
574
575 fn generate_cache_keys_for_change(&self, change: &ChangeRecord) -> Vec<String> {
576 let mut keys = Vec::new();
577
578 match &change.change_type {
580 ChangeType::EntityAdded | ChangeType::EntityUpdated => {
581 if let Some(entity_id) = &change.entity_id {
582 keys.push(format!("entity:{entity_id}"));
583 keys.push(format!("entity_neighbors:{entity_id}"));
584 }
585 }
586 ChangeType::DocumentAdded | ChangeType::DocumentUpdated => {
587 if let Some(doc_id) = &change.document_id {
588 keys.push(format!("document:{doc_id}"));
589 keys.push(format!("document_chunks:{doc_id}"));
590 }
591 }
592 ChangeType::EmbeddingAdded | ChangeType::EmbeddingUpdated => {
593 if let Some(entity_id) = &change.entity_id {
594 keys.push(format!("embedding:{entity_id}"));
595 keys.push(format!("similarity:{entity_id}"));
596 }
597 }
598 _ => {}
599 }
600
601 keys
602 }
603
604 pub fn get_invalidation_stats(&self) -> InvalidationStats {
606 let log = self.invalidation_log.lock();
607
608 InvalidationStats {
609 total_invalidations: log.len(),
610 cache_regions: self.cache_regions.len(),
611 entity_mappings: self.entity_to_regions.len(),
612 last_invalidation: log.last().map(|(time, _)| *time),
613 }
614 }
615}
616
617#[derive(Debug, Clone)]
619pub struct InvalidationStats {
620 pub total_invalidations: usize,
622 pub cache_regions: usize,
624 pub entity_mappings: usize,
626 pub last_invalidation: Option<DateTime<Utc>>,
628}
629
630pub struct ConflictResolver {
636 strategy: ConflictStrategy,
637 custom_resolvers: HashMap<String, ConflictResolverFn>,
638}
639
640type ConflictResolverFn = Box<dyn Fn(&Conflict) -> Result<ConflictResolution> + Send + Sync>;
642
643impl ConflictResolver {
644 pub fn new(strategy: ConflictStrategy) -> Self {
646 Self {
647 strategy,
648 custom_resolvers: HashMap::new(),
649 }
650 }
651
652 pub fn with_custom_resolver<F>(mut self, name: String, resolver: F) -> Self
654 where
655 F: Fn(&Conflict) -> Result<ConflictResolution> + Send + Sync + 'static,
656 {
657 self.custom_resolvers.insert(name, Box::new(resolver));
658 self
659 }
660
661 pub async fn resolve_conflict(&self, conflict: &Conflict) -> Result<ConflictResolution> {
663 match &self.strategy {
664 ConflictStrategy::KeepExisting => Ok(ConflictResolution {
665 strategy: ConflictStrategy::KeepExisting,
666 resolved_data: conflict.existing_data.clone(),
667 metadata: HashMap::new(),
668 }),
669 ConflictStrategy::KeepNew => Ok(ConflictResolution {
670 strategy: ConflictStrategy::KeepNew,
671 resolved_data: conflict.new_data.clone(),
672 metadata: HashMap::new(),
673 }),
674 ConflictStrategy::Merge => self.merge_conflict_data(conflict).await,
675 ConflictStrategy::Custom(resolver_name) => {
676 if let Some(resolver) = self.custom_resolvers.get(resolver_name) {
677 resolver(conflict)
678 } else {
679 Err(GraphRAGError::ConflictResolution {
680 message: format!("Custom resolver '{resolver_name}' not found"),
681 })
682 }
683 }
684 _ => Err(GraphRAGError::ConflictResolution {
685 message: "Conflict resolution strategy not implemented".to_string(),
686 }),
687 }
688 }
689
690 async fn merge_conflict_data(&self, conflict: &Conflict) -> Result<ConflictResolution> {
691 match (&conflict.existing_data, &conflict.new_data) {
692 (ChangeData::Entity(existing), ChangeData::Entity(new)) => {
693 let merged = self.merge_entities(existing, new)?;
694 Ok(ConflictResolution {
695 strategy: ConflictStrategy::Merge,
696 resolved_data: ChangeData::Entity(merged),
697 metadata: [("merge_strategy".to_string(), "entity_merge".to_string())]
698 .into_iter()
699 .collect(),
700 })
701 }
702 (ChangeData::Relationship(existing), ChangeData::Relationship(new)) => {
703 let merged = self.merge_relationships(existing, new)?;
704 Ok(ConflictResolution {
705 strategy: ConflictStrategy::Merge,
706 resolved_data: ChangeData::Relationship(merged),
707 metadata: [(
708 "merge_strategy".to_string(),
709 "relationship_merge".to_string(),
710 )]
711 .into_iter()
712 .collect(),
713 })
714 }
715 _ => Err(GraphRAGError::ConflictResolution {
716 message: "Cannot merge incompatible data types".to_string(),
717 }),
718 }
719 }
720
721 fn merge_entities(&self, existing: &Entity, new: &Entity) -> Result<Entity> {
722 let mut merged = existing.clone();
723
724 if new.confidence > existing.confidence {
726 merged.confidence = new.confidence;
727 merged.name = new.name.clone();
728 merged.entity_type = new.entity_type.clone();
729 }
730
731 let mut all_mentions = existing.mentions.clone();
733 for new_mention in &new.mentions {
734 if !all_mentions.iter().any(|m| {
735 m.chunk_id == new_mention.chunk_id && m.start_offset == new_mention.start_offset
736 }) {
737 all_mentions.push(new_mention.clone());
738 }
739 }
740 merged.mentions = all_mentions;
741
742 if new.embedding.is_some() {
744 merged.embedding = new.embedding.clone();
745 }
746
747 Ok(merged)
748 }
749
750 fn merge_relationships(
751 &self,
752 existing: &Relationship,
753 new: &Relationship,
754 ) -> Result<Relationship> {
755 let mut merged = existing.clone();
756
757 if new.confidence > existing.confidence {
759 merged.confidence = new.confidence;
760 merged.relation_type = new.relation_type.clone();
761 }
762
763 let mut all_contexts = existing.context.clone();
765 for new_context in &new.context {
766 if !all_contexts.contains(new_context) {
767 all_contexts.push(new_context.clone());
768 }
769 }
770 merged.context = all_contexts;
771
772 Ok(merged)
773 }
774}
775
776#[cfg(feature = "incremental")]
782pub struct UpdateMonitor {
783 metrics: DashMap<String, UpdateMetric>,
784 operations_log: Mutex<Vec<OperationLog>>,
785 performance_stats: RwLock<PerformanceStats>,
786}
787
788#[cfg(feature = "incremental")]
789impl Default for UpdateMonitor {
790 fn default() -> Self {
791 Self::new()
792 }
793}
794
795#[derive(Debug, Clone)]
797pub struct UpdateMetric {
798 pub name: String,
800 pub value: f64,
802 pub timestamp: DateTime<Utc>,
804 pub tags: HashMap<String, String>,
806}
807
808#[derive(Debug, Clone)]
810pub struct OperationLog {
811 pub operation_id: UpdateId,
813 pub operation_type: String,
815 pub start_time: Instant,
817 pub end_time: Option<Instant>,
819 pub success: Option<bool>,
821 pub error_message: Option<String>,
823 pub affected_entities: usize,
825 pub affected_relationships: usize,
827}
828
829#[derive(Debug, Clone)]
831pub struct PerformanceStats {
832 pub total_operations: u64,
834 pub successful_operations: u64,
836 pub failed_operations: u64,
838 pub average_operation_time: Duration,
840 pub peak_operations_per_second: f64,
842 pub cache_hit_rate: f64,
844 pub conflict_resolution_rate: f64,
846}
847
848#[cfg(feature = "incremental")]
849impl UpdateMonitor {
850 pub fn new() -> Self {
852 Self {
853 metrics: DashMap::new(),
854 operations_log: Mutex::new(Vec::new()),
855 performance_stats: RwLock::new(PerformanceStats {
856 total_operations: 0,
857 successful_operations: 0,
858 failed_operations: 0,
859 average_operation_time: Duration::from_millis(0),
860 peak_operations_per_second: 0.0,
861 cache_hit_rate: 0.0,
862 conflict_resolution_rate: 0.0,
863 }),
864 }
865 }
866
867 pub fn start_operation(&self, operation_type: &str) -> UpdateId {
869 let operation_id = UpdateId::new();
870 let log_entry = OperationLog {
871 operation_id: operation_id.clone(),
872 operation_type: operation_type.to_string(),
873 start_time: Instant::now(),
874 end_time: None,
875 success: None,
876 error_message: None,
877 affected_entities: 0,
878 affected_relationships: 0,
879 };
880
881 self.operations_log.lock().push(log_entry);
882 operation_id
883 }
884
885 pub fn complete_operation(
887 &self,
888 operation_id: &UpdateId,
889 success: bool,
890 error: Option<String>,
891 affected_entities: usize,
892 affected_relationships: usize,
893 ) {
894 let mut log = self.operations_log.lock();
895 if let Some(entry) = log.iter_mut().find(|e| &e.operation_id == operation_id) {
896 entry.end_time = Some(Instant::now());
897 entry.success = Some(success);
898 entry.error_message = error;
899 entry.affected_entities = affected_entities;
900 entry.affected_relationships = affected_relationships;
901 }
902
903 self.update_performance_stats();
905 }
906
907 fn update_performance_stats(&self) {
908 let log = self.operations_log.lock();
909 let completed_ops: Vec<_> = log
910 .iter()
911 .filter(|op| op.end_time.is_some() && op.success.is_some())
912 .collect();
913
914 if completed_ops.is_empty() {
915 return;
916 }
917
918 let mut stats = self.performance_stats.write();
919 stats.total_operations = completed_ops.len() as u64;
920 stats.successful_operations = completed_ops
921 .iter()
922 .filter(|op| op.success == Some(true))
923 .count() as u64;
924 stats.failed_operations = stats.total_operations - stats.successful_operations;
925
926 let total_time: Duration = completed_ops
928 .iter()
929 .filter_map(|op| op.end_time.map(|end| end.duration_since(op.start_time)))
930 .sum();
931
932 if !completed_ops.is_empty() {
933 stats.average_operation_time = total_time / completed_ops.len() as u32;
934 }
935 }
936
937 pub fn record_metric(&self, name: &str, value: f64, tags: HashMap<String, String>) {
939 let metric = UpdateMetric {
940 name: name.to_string(),
941 value,
942 timestamp: Utc::now(),
943 tags,
944 };
945 self.metrics.insert(name.to_string(), metric);
946 }
947
948 pub fn get_performance_stats(&self) -> PerformanceStats {
950 self.performance_stats.read().clone()
951 }
952
953 pub fn get_recent_operations(&self, limit: usize) -> Vec<OperationLog> {
955 let log = self.operations_log.lock();
956 log.iter().rev().take(limit).cloned().collect()
957 }
958}
959
960#[cfg(feature = "incremental")]
966pub struct IncrementalGraphManager {
967 graph: Arc<RwLock<KnowledgeGraph>>,
968 change_log: DashMap<UpdateId, ChangeRecord>,
969 deltas: DashMap<UpdateId, GraphDelta>,
970 cache_invalidation: Arc<SelectiveInvalidation>,
971 conflict_resolver: Arc<ConflictResolver>,
972 monitor: Arc<UpdateMonitor>,
973 config: IncrementalConfig,
974}
975
976#[cfg(not(feature = "incremental"))]
977pub struct IncrementalGraphManager {
979 graph: KnowledgeGraph,
980 change_log: Vec<ChangeRecord>,
981 config: IncrementalConfig,
982}
983
984#[derive(Debug, Clone)]
986pub struct IncrementalConfig {
987 pub max_change_log_size: usize,
989 pub max_delta_size: usize,
991 pub conflict_strategy: ConflictStrategy,
993 pub enable_monitoring: bool,
995 pub cache_invalidation_strategy: String,
997 pub batch_size: usize,
999 pub max_concurrent_operations: usize,
1001}
1002
1003impl Default for IncrementalConfig {
1004 fn default() -> Self {
1005 Self {
1006 max_change_log_size: 10000,
1007 max_delta_size: 1000,
1008 conflict_strategy: ConflictStrategy::Merge,
1009 enable_monitoring: true,
1010 cache_invalidation_strategy: "selective".to_string(),
1011 batch_size: 100,
1012 max_concurrent_operations: 10,
1013 }
1014 }
1015}
1016
1017#[cfg(feature = "incremental")]
1018impl IncrementalGraphManager {
1019 pub fn new(graph: KnowledgeGraph, config: IncrementalConfig) -> Self {
1021 Self {
1022 graph: Arc::new(RwLock::new(graph)),
1023 change_log: DashMap::new(),
1024 deltas: DashMap::new(),
1025 cache_invalidation: Arc::new(SelectiveInvalidation::new()),
1026 conflict_resolver: Arc::new(ConflictResolver::new(config.conflict_strategy.clone())),
1027 monitor: Arc::new(UpdateMonitor::new()),
1028 config,
1029 }
1030 }
1031
1032 pub fn with_conflict_resolver(mut self, resolver: ConflictResolver) -> Self {
1034 self.conflict_resolver = Arc::new(resolver);
1035 self
1036 }
1037
1038 pub fn graph(&self) -> Arc<RwLock<KnowledgeGraph>> {
1040 Arc::clone(&self.graph)
1041 }
1042
1043 pub fn conflict_resolver(&self) -> Arc<ConflictResolver> {
1045 Arc::clone(&self.conflict_resolver)
1046 }
1047
1048 pub fn monitor(&self) -> Arc<UpdateMonitor> {
1050 Arc::clone(&self.monitor)
1051 }
1052}
1053
1054#[cfg(not(feature = "incremental"))]
1055impl IncrementalGraphManager {
1056 pub fn new(graph: KnowledgeGraph, config: IncrementalConfig) -> Self {
1058 Self {
1059 graph,
1060 change_log: Vec::new(),
1061 config,
1062 }
1063 }
1064
1065 pub fn graph(&self) -> &KnowledgeGraph {
1067 &self.graph
1068 }
1069
1070 pub fn graph_mut(&mut self) -> &mut KnowledgeGraph {
1072 &mut self.graph
1073 }
1074}
1075
1076impl IncrementalGraphManager {
1078 pub fn create_change_record(
1080 &self,
1081 change_type: ChangeType,
1082 operation: Operation,
1083 change_data: ChangeData,
1084 entity_id: Option<EntityId>,
1085 document_id: Option<DocumentId>,
1086 ) -> ChangeRecord {
1087 ChangeRecord {
1088 change_id: UpdateId::new(),
1089 timestamp: Utc::now(),
1090 change_type,
1091 entity_id,
1092 document_id,
1093 operation,
1094 data: change_data,
1095 metadata: HashMap::new(),
1096 }
1097 }
1098
1099 pub fn config(&self) -> &IncrementalConfig {
1101 &self.config
1102 }
1103
1104 pub fn basic_upsert_entity(&mut self, entity: Entity) -> Result<UpdateId> {
1106 let update_id = UpdateId::new();
1107
1108 #[cfg(feature = "incremental")]
1109 {
1110 let operation_id = self.monitor.start_operation("upsert_entity");
1111 let mut graph = self.graph.write();
1112
1113 match graph.add_entity(entity.clone()) {
1114 Ok(_) => {
1115 let ent_id = entity.id.clone();
1116 let change = self.create_change_record(
1117 ChangeType::EntityAdded,
1118 Operation::Upsert,
1119 ChangeData::Entity(entity),
1120 Some(ent_id),
1121 None,
1122 );
1123 self.change_log.insert(change.change_id.clone(), change);
1124 self.monitor
1125 .complete_operation(&operation_id, true, None, 1, 0);
1126 Ok(update_id)
1127 }
1128 Err(e) => {
1129 self.monitor.complete_operation(
1130 &operation_id,
1131 false,
1132 Some(e.to_string()),
1133 0,
1134 0,
1135 );
1136 Err(e)
1137 }
1138 }
1139 }
1140
1141 #[cfg(not(feature = "incremental"))]
1142 {
1143 self.graph.add_entity(entity.clone())?;
1144 let ent_id = entity.id.clone();
1146 let change = self.create_change_record(
1147 ChangeType::EntityAdded,
1148 Operation::Upsert,
1149 ChangeData::Entity(entity),
1150 Some(ent_id),
1151 None,
1152 );
1153 self.change_log.push(change);
1154 Ok(update_id)
1155 }
1156 }
1157}
1158
1159#[derive(Debug, Clone, Serialize, Deserialize)]
1165pub struct IncrementalStatistics {
1166 pub total_updates: usize,
1168 pub successful_updates: usize,
1170 pub failed_updates: usize,
1172 pub entities_added: usize,
1174 pub entities_updated: usize,
1176 pub entities_removed: usize,
1178 pub relationships_added: usize,
1180 pub relationships_updated: usize,
1182 pub relationships_removed: usize,
1184 pub conflicts_resolved: usize,
1186 pub cache_invalidations: usize,
1188 pub average_update_time_ms: f64,
1190 pub peak_updates_per_second: f64,
1192 pub current_change_log_size: usize,
1194 pub current_delta_count: usize,
1196}
1197
1198impl IncrementalStatistics {
1199 pub fn empty() -> Self {
1201 Self {
1202 total_updates: 0,
1203 successful_updates: 0,
1204 failed_updates: 0,
1205 entities_added: 0,
1206 entities_updated: 0,
1207 entities_removed: 0,
1208 relationships_added: 0,
1209 relationships_updated: 0,
1210 relationships_removed: 0,
1211 conflicts_resolved: 0,
1212 cache_invalidations: 0,
1213 average_update_time_ms: 0.0,
1214 peak_updates_per_second: 0.0,
1215 current_change_log_size: 0,
1216 current_delta_count: 0,
1217 }
1218 }
1219
1220 pub fn print(&self) {
1222 println!("🔄 Incremental Updates Statistics");
1223 println!(" Total updates: {}", self.total_updates);
1224 println!(
1225 " Successful: {} ({:.1}%)",
1226 self.successful_updates,
1227 if self.total_updates > 0 {
1228 (self.successful_updates as f64 / self.total_updates as f64) * 100.0
1229 } else {
1230 0.0
1231 }
1232 );
1233 println!(" Failed: {}", self.failed_updates);
1234 println!(
1235 " Entities: +{} ~{} -{}",
1236 self.entities_added, self.entities_updated, self.entities_removed
1237 );
1238 println!(
1239 " Relationships: +{} ~{} -{}",
1240 self.relationships_added, self.relationships_updated, self.relationships_removed
1241 );
1242 println!(" Conflicts resolved: {}", self.conflicts_resolved);
1243 println!(" Cache invalidations: {}", self.cache_invalidations);
1244 println!(" Avg update time: {:.2}ms", self.average_update_time_ms);
1245 println!(" Peak updates/sec: {:.1}", self.peak_updates_per_second);
1246 println!(" Change log size: {}", self.current_change_log_size);
1247 println!(" Active deltas: {}", self.current_delta_count);
1248 }
1249}
1250
1251#[cfg(feature = "incremental")]
1252impl IncrementalGraphManager {
1253 pub fn get_statistics(&self) -> IncrementalStatistics {
1255 let perf_stats = self.monitor.get_performance_stats();
1256 let invalidation_stats = self.cache_invalidation.get_invalidation_stats();
1257
1258 let mut entity_stats = (0, 0, 0); let mut relationship_stats = (0, 0, 0);
1261 let conflicts_resolved = 0;
1262
1263 for change in self.change_log.iter() {
1264 match change.value().change_type {
1265 ChangeType::EntityAdded => entity_stats.0 += 1,
1266 ChangeType::EntityUpdated => entity_stats.1 += 1,
1267 ChangeType::EntityRemoved => entity_stats.2 += 1,
1268 ChangeType::RelationshipAdded => relationship_stats.0 += 1,
1269 ChangeType::RelationshipUpdated => relationship_stats.1 += 1,
1270 ChangeType::RelationshipRemoved => relationship_stats.2 += 1,
1271 _ => {}
1272 }
1273 }
1274
1275 IncrementalStatistics {
1276 total_updates: perf_stats.total_operations as usize,
1277 successful_updates: perf_stats.successful_operations as usize,
1278 failed_updates: perf_stats.failed_operations as usize,
1279 entities_added: entity_stats.0,
1280 entities_updated: entity_stats.1,
1281 entities_removed: entity_stats.2,
1282 relationships_added: relationship_stats.0,
1283 relationships_updated: relationship_stats.1,
1284 relationships_removed: relationship_stats.2,
1285 conflicts_resolved,
1286 cache_invalidations: invalidation_stats.total_invalidations,
1287 average_update_time_ms: perf_stats.average_operation_time.as_millis() as f64,
1288 peak_updates_per_second: perf_stats.peak_operations_per_second,
1289 current_change_log_size: self.change_log.len(),
1290 current_delta_count: self.deltas.len(),
1291 }
1292 }
1293}
1294
1295#[cfg(not(feature = "incremental"))]
1296impl IncrementalGraphManager {
1297 pub fn get_statistics(&self) -> IncrementalStatistics {
1299 let mut stats = IncrementalStatistics::empty();
1300 stats.current_change_log_size = self.change_log.len();
1301
1302 for change in &self.change_log {
1303 match change.change_type {
1304 ChangeType::EntityAdded => stats.entities_added += 1,
1305 ChangeType::EntityUpdated => stats.entities_updated += 1,
1306 ChangeType::EntityRemoved => stats.entities_removed += 1,
1307 ChangeType::RelationshipAdded => stats.relationships_added += 1,
1308 ChangeType::RelationshipUpdated => stats.relationships_updated += 1,
1309 ChangeType::RelationshipRemoved => stats.relationships_removed += 1,
1310 _ => {}
1311 }
1312 }
1313
1314 stats.total_updates = self.change_log.len();
1315 stats.successful_updates = self.change_log.len(); stats
1317 }
1318}
1319
1320#[cfg(feature = "incremental")]
1326#[allow(dead_code)]
1327pub struct IncrementalPageRank {
1328 scores: DashMap<EntityId, f64>,
1329 adjacency_changes: DashMap<EntityId, Vec<(EntityId, f64)>>, damping_factor: f64,
1331 tolerance: f64,
1332 max_iterations: usize,
1333 last_full_computation: DateTime<Utc>,
1334 incremental_threshold: usize, pending_changes: RwLock<usize>,
1336}
1337
1338#[cfg(feature = "incremental")]
1339impl IncrementalPageRank {
1340 pub fn new(damping_factor: f64, tolerance: f64, max_iterations: usize) -> Self {
1342 Self {
1343 scores: DashMap::new(),
1344 adjacency_changes: DashMap::new(),
1345 damping_factor,
1346 tolerance,
1347 max_iterations,
1348 last_full_computation: Utc::now(),
1349 incremental_threshold: 1000,
1350 pending_changes: RwLock::new(0),
1351 }
1352 }
1353
1354 pub async fn update_incremental(
1356 &self,
1357 changed_entities: &[EntityId],
1358 graph: &KnowledgeGraph,
1359 ) -> Result<()> {
1360 let start = Instant::now();
1361
1362 {
1364 let pending = *self.pending_changes.read();
1365 if pending > self.incremental_threshold {
1366 return self.full_recomputation(graph).await;
1367 }
1368 }
1369
1370 let mut affected_entities = HashSet::new();
1372
1373 for entity_id in changed_entities {
1375 affected_entities.insert(entity_id.clone());
1376
1377 for (neighbor, _) in graph.get_neighbors(entity_id) {
1379 affected_entities.insert(neighbor.id.clone());
1380
1381 for (second_hop, _) in graph.get_neighbors(&neighbor.id) {
1383 affected_entities.insert(second_hop.id.clone());
1384 }
1385 }
1386 }
1387
1388 self.localized_pagerank(&affected_entities, graph).await?;
1390
1391 *self.pending_changes.write() = 0;
1393
1394 let duration = start.elapsed();
1395 println!(
1396 "🔄 Incremental PageRank update completed in {:?} for {} entities",
1397 duration,
1398 affected_entities.len()
1399 );
1400
1401 Ok(())
1402 }
1403
1404 async fn full_recomputation(&self, graph: &KnowledgeGraph) -> Result<()> {
1406 let start = Instant::now();
1407
1408 let entities: Vec<EntityId> = graph.entities().map(|e| e.id.clone()).collect();
1410 let n = entities.len();
1411
1412 if n == 0 {
1413 return Ok(());
1414 }
1415
1416 let initial_score = 1.0 / n as f64;
1418 for entity_id in &entities {
1419 self.scores.insert(entity_id.clone(), initial_score);
1420 }
1421
1422 for iteration in 0..self.max_iterations {
1424 let mut new_scores = HashMap::new();
1425 let mut max_diff: f64 = 0.0;
1426
1427 for entity_id in &entities {
1428 let mut score = (1.0 - self.damping_factor) / n as f64;
1429
1430 for other_entity in &entities {
1432 if let Some(weight) = self.get_edge_weight(other_entity, entity_id, graph) {
1433 let other_score = self
1434 .scores
1435 .get(other_entity)
1436 .map(|s| *s.value())
1437 .unwrap_or(initial_score);
1438 let out_degree = self.get_out_degree(other_entity, graph);
1439
1440 if out_degree > 0.0 {
1441 score += self.damping_factor * other_score * weight / out_degree;
1442 }
1443 }
1444 }
1445
1446 let old_score = self
1447 .scores
1448 .get(entity_id)
1449 .map(|s| *s.value())
1450 .unwrap_or(initial_score);
1451 let diff = (score - old_score).abs();
1452 max_diff = max_diff.max(diff);
1453
1454 new_scores.insert(entity_id.clone(), score);
1455 }
1456
1457 for (entity_id, score) in new_scores {
1459 self.scores.insert(entity_id, score);
1460 }
1461
1462 if max_diff < self.tolerance {
1464 println!(
1465 "🎯 PageRank converged after {} iterations (diff: {:.6})",
1466 iteration + 1,
1467 max_diff
1468 );
1469 break;
1470 }
1471 }
1472
1473 let duration = start.elapsed();
1474 println!(
1475 "🔄 Full PageRank recomputation completed in {duration:?} for {n} entities"
1476 );
1477
1478 Ok(())
1479 }
1480
1481 async fn localized_pagerank(
1483 &self,
1484 entities: &HashSet<EntityId>,
1485 graph: &KnowledgeGraph,
1486 ) -> Result<()> {
1487 let entity_vec: Vec<EntityId> = entities.iter().cloned().collect();
1488 let n = entity_vec.len();
1489
1490 if n == 0 {
1491 return Ok(());
1492 }
1493
1494 for _iteration in 0..self.max_iterations {
1496 let mut max_diff: f64 = 0.0;
1497
1498 for entity_id in &entity_vec {
1499 let mut score = (1.0 - self.damping_factor) / n as f64;
1500
1501 for other_entity in &entity_vec {
1503 if let Some(weight) = self.get_edge_weight(other_entity, entity_id, graph) {
1504 let other_score = self
1505 .scores
1506 .get(other_entity)
1507 .map(|s| *s.value())
1508 .unwrap_or(1.0 / n as f64);
1509 let out_degree =
1510 self.get_localized_out_degree(other_entity, entities, graph);
1511
1512 if out_degree > 0.0 {
1513 score += self.damping_factor * other_score * weight / out_degree;
1514 }
1515 }
1516 }
1517
1518 let old_score = self
1519 .scores
1520 .get(entity_id)
1521 .map(|s| *s.value())
1522 .unwrap_or(1.0 / n as f64);
1523 let diff = (score - old_score).abs();
1524 max_diff = max_diff.max(diff);
1525
1526 self.scores.insert(entity_id.clone(), score);
1527 }
1528
1529 if max_diff < self.tolerance {
1531 break;
1532 }
1533 }
1534
1535 Ok(())
1536 }
1537
1538 fn get_edge_weight(
1539 &self,
1540 from: &EntityId,
1541 to: &EntityId,
1542 graph: &KnowledgeGraph,
1543 ) -> Option<f64> {
1544 for (neighbor, relationship) in graph.get_neighbors(from) {
1546 if neighbor.id == *to {
1547 return Some(relationship.confidence as f64);
1548 }
1549 }
1550 None
1551 }
1552
1553 fn get_out_degree(&self, entity_id: &EntityId, graph: &KnowledgeGraph) -> f64 {
1554 graph
1555 .get_neighbors(entity_id)
1556 .iter()
1557 .map(|(_, rel)| rel.confidence as f64)
1558 .sum()
1559 }
1560
1561 fn get_localized_out_degree(
1562 &self,
1563 entity_id: &EntityId,
1564 subset: &HashSet<EntityId>,
1565 graph: &KnowledgeGraph,
1566 ) -> f64 {
1567 graph
1568 .get_neighbors(entity_id)
1569 .iter()
1570 .filter(|(neighbor, _)| subset.contains(&neighbor.id))
1571 .map(|(_, rel)| rel.confidence as f64)
1572 .sum()
1573 }
1574
1575 pub fn get_score(&self, entity_id: &EntityId) -> Option<f64> {
1577 self.scores.get(entity_id).map(|s| *s.value())
1578 }
1579
1580 pub fn get_top_entities(&self, k: usize) -> Vec<(EntityId, f64)> {
1582 let mut entities: Vec<(EntityId, f64)> = self
1583 .scores
1584 .iter()
1585 .map(|entry| (entry.key().clone(), *entry.value()))
1586 .collect();
1587
1588 entities.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
1589 entities.truncate(k);
1590 entities
1591 }
1592
1593 pub fn record_change(&self, _entity_id: EntityId) {
1595 *self.pending_changes.write() += 1;
1596 }
1597}
1598
1599#[cfg(feature = "incremental")]
1605pub struct BatchProcessor {
1606 batch_size: usize,
1607 max_wait_time: Duration,
1608 pending_batches: DashMap<String, PendingBatch>,
1609 processing_semaphore: Semaphore,
1610 metrics: RwLock<BatchMetrics>,
1611}
1612
1613#[derive(Debug, Clone)]
1614#[allow(dead_code)]
1615struct PendingBatch {
1616 changes: Vec<ChangeRecord>,
1617 created_at: Instant,
1618 batch_id: String,
1619}
1620
1621#[derive(Debug, Clone)]
1623pub struct BatchMetrics {
1624 pub total_batches_processed: u64,
1626 pub total_changes_processed: u64,
1628 pub average_batch_size: f64,
1630 pub average_processing_time: Duration,
1632 pub throughput_per_second: f64,
1634 pub last_batch_processed: Option<DateTime<Utc>>,
1636}
1637
1638#[cfg(feature = "incremental")]
1639impl BatchProcessor {
1640 pub fn new(batch_size: usize, max_wait_time: Duration, max_concurrent_batches: usize) -> Self {
1642 Self {
1643 batch_size,
1644 max_wait_time,
1645 pending_batches: DashMap::new(),
1646 processing_semaphore: Semaphore::new(max_concurrent_batches),
1647 metrics: RwLock::new(BatchMetrics {
1648 total_batches_processed: 0,
1649 total_changes_processed: 0,
1650 average_batch_size: 0.0,
1651 average_processing_time: Duration::from_millis(0),
1652 throughput_per_second: 0.0,
1653 last_batch_processed: None,
1654 }),
1655 }
1656 }
1657
1658 pub async fn add_change(&self, change: ChangeRecord) -> Result<String> {
1660 let batch_key = self.get_batch_key(&change);
1661
1662 let batch_id = {
1663 let mut entry = self
1664 .pending_batches
1665 .entry(batch_key.clone())
1666 .or_insert_with(|| PendingBatch {
1667 changes: Vec::new(),
1668 created_at: Instant::now(),
1669 batch_id: format!("batch_{}", Uuid::new_v4()),
1670 });
1671
1672 entry.changes.push(change);
1673 let should_process = entry.changes.len() >= self.batch_size
1674 || entry.created_at.elapsed() > self.max_wait_time;
1675
1676 let batch_id = entry.batch_id.clone();
1677
1678 if should_process {
1679 let batch = entry.clone();
1681 self.pending_batches.remove(&batch_key);
1682
1683 let processor = Arc::new(self.clone());
1685 tokio::spawn(async move {
1686 if let Err(e) = processor.process_batch(batch).await {
1687 eprintln!("Batch processing error: {e}");
1688 }
1689 });
1690 }
1691
1692 batch_id
1693 };
1694
1695 Ok(batch_id)
1696 }
1697
1698 async fn process_batch(&self, batch: PendingBatch) -> Result<()> {
1699 let _permit = self.processing_semaphore.acquire().await.map_err(|_| {
1700 GraphRAGError::IncrementalUpdate {
1701 message: "Failed to acquire processing permit".to_string(),
1702 }
1703 })?;
1704
1705 let start = Instant::now();
1706
1707 let mut entity_changes = Vec::new();
1709 let mut relationship_changes = Vec::new();
1710 let mut embedding_changes = Vec::new();
1711
1712 for change in &batch.changes {
1713 match &change.change_type {
1714 ChangeType::EntityAdded | ChangeType::EntityUpdated | ChangeType::EntityRemoved => {
1715 entity_changes.push(change);
1716 }
1717 ChangeType::RelationshipAdded
1718 | ChangeType::RelationshipUpdated
1719 | ChangeType::RelationshipRemoved => {
1720 relationship_changes.push(change);
1721 }
1722 ChangeType::EmbeddingAdded
1723 | ChangeType::EmbeddingUpdated
1724 | ChangeType::EmbeddingRemoved => {
1725 embedding_changes.push(change);
1726 }
1727 _ => {}
1728 }
1729 }
1730
1731 self.process_entity_changes(&entity_changes).await?;
1733 self.process_relationship_changes(&relationship_changes)
1734 .await?;
1735 self.process_embedding_changes(&embedding_changes).await?;
1736
1737 let processing_time = start.elapsed();
1738
1739 self.update_metrics(&batch, processing_time).await;
1741
1742 println!(
1743 "🚀 Processed batch {} with {} changes in {:?}",
1744 batch.batch_id,
1745 batch.changes.len(),
1746 processing_time
1747 );
1748
1749 Ok(())
1750 }
1751
1752 async fn process_entity_changes(&self, _changes: &[&ChangeRecord]) -> Result<()> {
1753 Ok(())
1755 }
1756
1757 async fn process_relationship_changes(&self, _changes: &[&ChangeRecord]) -> Result<()> {
1758 Ok(())
1760 }
1761
1762 async fn process_embedding_changes(&self, _changes: &[&ChangeRecord]) -> Result<()> {
1763 Ok(())
1765 }
1766
1767 fn get_batch_key(&self, change: &ChangeRecord) -> String {
1768 match (&change.entity_id, &change.document_id) {
1770 (Some(entity_id), _) => format!("entity:{entity_id}"),
1771 (None, Some(doc_id)) => format!("document:{doc_id}"),
1772 _ => "global".to_string(),
1773 }
1774 }
1775
1776 async fn update_metrics(&self, batch: &PendingBatch, processing_time: Duration) {
1777 let mut metrics = self.metrics.write();
1778
1779 metrics.total_batches_processed += 1;
1780 metrics.total_changes_processed += batch.changes.len() as u64;
1781
1782 let total_batches = metrics.total_batches_processed as f64;
1784 metrics.average_batch_size = (metrics.average_batch_size * (total_batches - 1.0)
1785 + batch.changes.len() as f64)
1786 / total_batches;
1787
1788 let prev_avg_ms = metrics.average_processing_time.as_millis() as f64;
1789 let new_avg_ms = (prev_avg_ms * (total_batches - 1.0) + processing_time.as_millis() as f64)
1790 / total_batches;
1791 metrics.average_processing_time = Duration::from_millis(new_avg_ms as u64);
1792
1793 if processing_time.as_secs_f64() > 0.0 {
1795 metrics.throughput_per_second =
1796 batch.changes.len() as f64 / processing_time.as_secs_f64();
1797 }
1798
1799 metrics.last_batch_processed = Some(Utc::now());
1800 }
1801
1802 pub fn get_metrics(&self) -> BatchMetrics {
1804 self.metrics.read().clone()
1805 }
1806}
1807
1808#[cfg(feature = "incremental")]
1810impl Clone for BatchProcessor {
1811 fn clone(&self) -> Self {
1812 Self {
1813 batch_size: self.batch_size,
1814 max_wait_time: self.max_wait_time,
1815 pending_batches: DashMap::new(), processing_semaphore: Semaphore::new(self.processing_semaphore.available_permits()),
1817 metrics: RwLock::new(self.get_metrics()),
1818 }
1819 }
1820}
1821
1822impl GraphRAGError {
1827 pub fn conflict_resolution(message: String) -> Self {
1829 GraphRAGError::GraphConstruction { message }
1830 }
1831
1832 pub fn incremental_update(message: String) -> Self {
1834 GraphRAGError::GraphConstruction { message }
1835 }
1836}
1837
1838#[cfg(feature = "incremental")]
1844#[allow(dead_code)]
1845pub struct ProductionGraphStore {
1846 graph: Arc<RwLock<KnowledgeGraph>>,
1847 transactions: DashMap<TransactionId, Transaction>,
1848 change_log: DashMap<UpdateId, ChangeRecord>,
1849 rollback_data: DashMap<UpdateId, RollbackData>,
1850 conflict_resolver: Arc<ConflictResolver>,
1851 cache_invalidation: Arc<SelectiveInvalidation>,
1852 monitor: Arc<UpdateMonitor>,
1853 batch_processor: Arc<BatchProcessor>,
1854 incremental_pagerank: Arc<IncrementalPageRank>,
1855 event_publisher: broadcast::Sender<ChangeEvent>,
1856 config: IncrementalConfig,
1857}
1858
1859#[derive(Debug, Clone)]
1861#[allow(dead_code)]
1862struct Transaction {
1863 id: TransactionId,
1864 changes: Vec<ChangeRecord>,
1865 status: TransactionStatus,
1866 created_at: DateTime<Utc>,
1867 isolation_level: IsolationLevel,
1868}
1869
1870#[derive(Debug, Clone, PartialEq)]
1871#[allow(dead_code)]
1872enum TransactionStatus {
1873 Active,
1874 Preparing,
1875 Committed,
1876 Aborted,
1877}
1878
1879#[derive(Debug, Clone)]
1880#[allow(dead_code)]
1881enum IsolationLevel {
1882 ReadUncommitted,
1883 ReadCommitted,
1884 RepeatableRead,
1885 Serializable,
1886}
1887
1888#[derive(Debug, Clone, Serialize, Deserialize)]
1890pub struct ChangeEvent {
1891 pub event_id: UpdateId,
1893 pub event_type: ChangeEventType,
1895 pub entity_id: Option<EntityId>,
1897 pub timestamp: DateTime<Utc>,
1899 pub metadata: HashMap<String, String>,
1901}
1902
1903#[derive(Debug, Clone, Serialize, Deserialize)]
1905pub enum ChangeEventType {
1906 EntityUpserted,
1908 EntityDeleted,
1910 RelationshipUpserted,
1912 RelationshipDeleted,
1914 EmbeddingUpdated,
1916 TransactionStarted,
1918 TransactionCommitted,
1920 TransactionRolledBack,
1922 ConflictResolved,
1924 CacheInvalidated,
1926 BatchProcessed,
1928}
1929
1930#[cfg(feature = "incremental")]
1931impl ProductionGraphStore {
1932 pub fn new(
1934 graph: KnowledgeGraph,
1935 config: IncrementalConfig,
1936 conflict_resolver: ConflictResolver,
1937 ) -> Self {
1938 let (event_tx, _) = broadcast::channel(1000);
1939
1940 Self {
1941 graph: Arc::new(RwLock::new(graph)),
1942 transactions: DashMap::new(),
1943 change_log: DashMap::new(),
1944 rollback_data: DashMap::new(),
1945 conflict_resolver: Arc::new(conflict_resolver),
1946 cache_invalidation: Arc::new(SelectiveInvalidation::new()),
1947 monitor: Arc::new(UpdateMonitor::new()),
1948 batch_processor: Arc::new(BatchProcessor::new(
1949 config.batch_size,
1950 Duration::from_millis(100),
1951 config.max_concurrent_operations,
1952 )),
1953 incremental_pagerank: Arc::new(IncrementalPageRank::new(0.85, 1e-6, 100)),
1954 event_publisher: event_tx,
1955 config,
1956 }
1957 }
1958
1959 pub fn subscribe_events(&self) -> broadcast::Receiver<ChangeEvent> {
1961 self.event_publisher.subscribe()
1962 }
1963
1964 async fn publish_event(&self, event: ChangeEvent) {
1965 let _ = self.event_publisher.send(event);
1966 }
1967
1968 fn create_change_record(
1969 &self,
1970 change_type: ChangeType,
1971 operation: Operation,
1972 change_data: ChangeData,
1973 entity_id: Option<EntityId>,
1974 document_id: Option<DocumentId>,
1975 ) -> ChangeRecord {
1976 ChangeRecord {
1977 change_id: UpdateId::new(),
1978 timestamp: Utc::now(),
1979 change_type,
1980 entity_id,
1981 document_id,
1982 operation,
1983 data: change_data,
1984 metadata: HashMap::new(),
1985 }
1986 }
1987
1988 async fn apply_change_with_conflict_resolution(
1989 &self,
1990 change: ChangeRecord,
1991 ) -> Result<UpdateId> {
1992 let operation_id = self.monitor.start_operation("apply_change");
1993
1994 if let Some(conflict) = self.detect_conflict(&change)? {
1996 let resolution = self.conflict_resolver.resolve_conflict(&conflict).await?;
1997
1998 let resolved_change = ChangeRecord {
2000 data: resolution.resolved_data,
2001 metadata: resolution.metadata,
2002 ..change
2003 };
2004
2005 self.apply_change_internal(resolved_change).await?;
2006
2007 self.publish_event(ChangeEvent {
2009 event_id: UpdateId::new(),
2010 event_type: ChangeEventType::ConflictResolved,
2011 entity_id: conflict.existing_data.get_entity_id(),
2012 timestamp: Utc::now(),
2013 metadata: HashMap::new(),
2014 })
2015 .await;
2016 } else {
2017 self.apply_change_internal(change).await?;
2018 }
2019
2020 self.monitor
2021 .complete_operation(&operation_id, true, None, 1, 0);
2022 Ok(operation_id)
2023 }
2024
2025 fn detect_conflict(&self, change: &ChangeRecord) -> Result<Option<Conflict>> {
2026 match &change.data {
2027 ChangeData::Entity(entity) => {
2028 let graph = self.graph.read();
2029 if let Some(existing) = graph.get_entity(&entity.id) {
2030 if existing.name != entity.name || existing.entity_type != entity.entity_type {
2031 return Ok(Some(Conflict {
2032 conflict_id: UpdateId::new(),
2033 conflict_type: ConflictType::EntityExists,
2034 existing_data: ChangeData::Entity(existing.clone()),
2035 new_data: change.data.clone(),
2036 resolution: None,
2037 }));
2038 }
2039 }
2040 }
2041 ChangeData::Relationship(relationship) => {
2042 let graph = self.graph.read();
2043 for existing_rel in graph.get_all_relationships() {
2044 if existing_rel.source == relationship.source
2045 && existing_rel.target == relationship.target
2046 && existing_rel.relation_type == relationship.relation_type
2047 {
2048 return Ok(Some(Conflict {
2049 conflict_id: UpdateId::new(),
2050 conflict_type: ConflictType::RelationshipExists,
2051 existing_data: ChangeData::Relationship(existing_rel.clone()),
2052 new_data: change.data.clone(),
2053 resolution: None,
2054 }));
2055 }
2056 }
2057 }
2058 _ => {}
2059 }
2060
2061 Ok(None)
2062 }
2063
2064 async fn apply_change_internal(&self, change: ChangeRecord) -> Result<()> {
2065 let change_id = change.change_id.clone();
2066
2067 let rollback_data = {
2069 let graph = self.graph.read();
2070 self.create_rollback_data(&change, &graph)?
2071 };
2072
2073 self.rollback_data.insert(change_id.clone(), rollback_data);
2074
2075 {
2077 let mut graph = self.graph.write();
2078 match &change.data {
2079 ChangeData::Entity(entity) => {
2080 match change.operation {
2081 Operation::Insert | Operation::Upsert => {
2082 graph.add_entity(entity.clone())?;
2083 self.incremental_pagerank.record_change(entity.id.clone());
2084 }
2085 Operation::Delete => {
2086 }
2089 _ => {}
2090 }
2091 }
2092 ChangeData::Relationship(relationship) => {
2093 match change.operation {
2094 Operation::Insert | Operation::Upsert => {
2095 graph.add_relationship(relationship.clone())?;
2096 self.incremental_pagerank
2097 .record_change(relationship.source.clone());
2098 self.incremental_pagerank
2099 .record_change(relationship.target.clone());
2100 }
2101 Operation::Delete => {
2102 }
2105 _ => {}
2106 }
2107 }
2108 ChangeData::Embedding {
2109 entity_id,
2110 embedding,
2111 } => {
2112 if let Some(entity) = graph.get_entity_mut(entity_id) {
2113 entity.embedding = Some(embedding.clone());
2114 }
2115 }
2116 _ => {}
2117 }
2118 }
2119
2120 self.change_log.insert(change_id, change);
2122
2123 Ok(())
2124 }
2125
2126 fn create_rollback_data(
2127 &self,
2128 change: &ChangeRecord,
2129 graph: &KnowledgeGraph,
2130 ) -> Result<RollbackData> {
2131 let mut previous_entities = Vec::new();
2132 let mut previous_relationships = Vec::new();
2133
2134 match &change.data {
2135 ChangeData::Entity(entity) => {
2136 if let Some(existing) = graph.get_entity(&entity.id) {
2137 previous_entities.push(existing.clone());
2138 }
2139 }
2140 ChangeData::Relationship(relationship) => {
2141 for rel in graph.get_all_relationships() {
2143 if rel.source == relationship.source && rel.target == relationship.target {
2144 previous_relationships.push(rel.clone());
2145 }
2146 }
2147 }
2148 _ => {}
2149 }
2150
2151 Ok(RollbackData {
2152 previous_entities,
2153 previous_relationships,
2154 affected_caches: vec![], })
2156 }
2157}
2158
2159#[cfg(feature = "incremental")]
2160#[async_trait::async_trait]
2161impl IncrementalGraphStore for ProductionGraphStore {
2162 type Error = GraphRAGError;
2163
2164 async fn upsert_entity(&mut self, entity: Entity) -> Result<UpdateId> {
2165 let change = self.create_change_record(
2166 ChangeType::EntityAdded,
2167 Operation::Upsert,
2168 ChangeData::Entity(entity.clone()),
2169 Some(entity.id.clone()),
2170 None,
2171 );
2172
2173 let update_id = self.apply_change_with_conflict_resolution(change).await?;
2174
2175 let changes = vec![self.change_log.get(&update_id).unwrap().clone()];
2177 let _invalidation_strategies = self.cache_invalidation.invalidate_for_changes(&changes);
2178
2179 self.publish_event(ChangeEvent {
2181 event_id: UpdateId::new(),
2182 event_type: ChangeEventType::EntityUpserted,
2183 entity_id: Some(entity.id),
2184 timestamp: Utc::now(),
2185 metadata: HashMap::new(),
2186 })
2187 .await;
2188
2189 Ok(update_id)
2190 }
2191
2192 async fn upsert_relationship(&mut self, relationship: Relationship) -> Result<UpdateId> {
2193 let change = self.create_change_record(
2194 ChangeType::RelationshipAdded,
2195 Operation::Upsert,
2196 ChangeData::Relationship(relationship.clone()),
2197 None,
2198 None,
2199 );
2200
2201 let update_id = self.apply_change_with_conflict_resolution(change).await?;
2202
2203 self.publish_event(ChangeEvent {
2205 event_id: UpdateId::new(),
2206 event_type: ChangeEventType::RelationshipUpserted,
2207 entity_id: Some(relationship.source),
2208 timestamp: Utc::now(),
2209 metadata: HashMap::new(),
2210 })
2211 .await;
2212
2213 Ok(update_id)
2214 }
2215
2216 async fn delete_entity(&mut self, entity_id: &EntityId) -> Result<UpdateId> {
2217 let update_id = UpdateId::new();
2219
2220 self.publish_event(ChangeEvent {
2222 event_id: UpdateId::new(),
2223 event_type: ChangeEventType::EntityDeleted,
2224 entity_id: Some(entity_id.clone()),
2225 timestamp: Utc::now(),
2226 metadata: HashMap::new(),
2227 })
2228 .await;
2229
2230 Ok(update_id)
2231 }
2232
2233 async fn delete_relationship(
2234 &mut self,
2235 source: &EntityId,
2236 _target: &EntityId,
2237 _relation_type: &str,
2238 ) -> Result<UpdateId> {
2239 let update_id = UpdateId::new();
2241
2242 self.publish_event(ChangeEvent {
2244 event_id: UpdateId::new(),
2245 event_type: ChangeEventType::RelationshipDeleted,
2246 entity_id: Some(source.clone()),
2247 timestamp: Utc::now(),
2248 metadata: HashMap::new(),
2249 })
2250 .await;
2251
2252 Ok(update_id)
2253 }
2254
2255 async fn apply_delta(&mut self, delta: GraphDelta) -> Result<UpdateId> {
2256 let tx_id = self.begin_transaction().await?;
2257
2258 for change in delta.changes {
2259 self.apply_change_with_conflict_resolution(change).await?;
2260 }
2261
2262 self.commit_transaction(tx_id).await?;
2263 Ok(delta.delta_id)
2264 }
2265
2266 async fn rollback_delta(&mut self, _delta_id: &UpdateId) -> Result<()> {
2267 Ok(())
2269 }
2270
2271 async fn get_change_log(&self, since: Option<DateTime<Utc>>) -> Result<Vec<ChangeRecord>> {
2272 let changes: Vec<ChangeRecord> = self
2273 .change_log
2274 .iter()
2275 .filter_map(|entry| {
2276 let change = entry.value();
2277 if let Some(since_time) = since {
2278 if change.timestamp >= since_time {
2279 Some(change.clone())
2280 } else {
2281 None
2282 }
2283 } else {
2284 Some(change.clone())
2285 }
2286 })
2287 .collect();
2288
2289 Ok(changes)
2290 }
2291
2292 async fn begin_transaction(&mut self) -> Result<TransactionId> {
2293 let tx_id = TransactionId::new();
2294 let transaction = Transaction {
2295 id: tx_id.clone(),
2296 changes: Vec::new(),
2297 status: TransactionStatus::Active,
2298 created_at: Utc::now(),
2299 isolation_level: IsolationLevel::ReadCommitted,
2300 };
2301
2302 self.transactions.insert(tx_id.clone(), transaction);
2303
2304 self.publish_event(ChangeEvent {
2306 event_id: UpdateId::new(),
2307 event_type: ChangeEventType::TransactionStarted,
2308 entity_id: None,
2309 timestamp: Utc::now(),
2310 metadata: [("transaction_id".to_string(), tx_id.to_string())]
2311 .into_iter()
2312 .collect(),
2313 })
2314 .await;
2315
2316 Ok(tx_id)
2317 }
2318
2319 async fn commit_transaction(&mut self, tx_id: TransactionId) -> Result<()> {
2320 if let Some((_, mut tx)) = self.transactions.remove(&tx_id) {
2321 tx.status = TransactionStatus::Committed;
2322
2323 self.publish_event(ChangeEvent {
2325 event_id: UpdateId::new(),
2326 event_type: ChangeEventType::TransactionCommitted,
2327 entity_id: None,
2328 timestamp: Utc::now(),
2329 metadata: [("transaction_id".to_string(), tx_id.to_string())]
2330 .into_iter()
2331 .collect(),
2332 })
2333 .await;
2334
2335 Ok(())
2336 } else {
2337 Err(GraphRAGError::IncrementalUpdate {
2338 message: format!("Transaction {tx_id} not found"),
2339 })
2340 }
2341 }
2342
2343 async fn rollback_transaction(&mut self, tx_id: TransactionId) -> Result<()> {
2344 if let Some((_, mut tx)) = self.transactions.remove(&tx_id) {
2345 tx.status = TransactionStatus::Aborted;
2346
2347 for _change in &tx.changes {
2349 }
2351
2352 self.publish_event(ChangeEvent {
2354 event_id: UpdateId::new(),
2355 event_type: ChangeEventType::TransactionRolledBack,
2356 entity_id: None,
2357 timestamp: Utc::now(),
2358 metadata: [("transaction_id".to_string(), tx_id.to_string())]
2359 .into_iter()
2360 .collect(),
2361 })
2362 .await;
2363
2364 Ok(())
2365 } else {
2366 Err(GraphRAGError::IncrementalUpdate {
2367 message: format!("Transaction {tx_id} not found"),
2368 })
2369 }
2370 }
2371
2372 async fn batch_upsert_entities(
2373 &mut self,
2374 entities: Vec<Entity>,
2375 _strategy: ConflictStrategy,
2376 ) -> Result<Vec<UpdateId>> {
2377 let mut update_ids = Vec::new();
2378
2379 for entity in entities {
2380 let update_id = self.upsert_entity(entity).await?;
2381 update_ids.push(update_id);
2382 }
2383
2384 Ok(update_ids)
2385 }
2386
2387 async fn batch_upsert_relationships(
2388 &mut self,
2389 relationships: Vec<Relationship>,
2390 _strategy: ConflictStrategy,
2391 ) -> Result<Vec<UpdateId>> {
2392 let mut update_ids = Vec::new();
2393
2394 for relationship in relationships {
2395 let update_id = self.upsert_relationship(relationship).await?;
2396 update_ids.push(update_id);
2397 }
2398
2399 Ok(update_ids)
2400 }
2401
2402 async fn update_entity_embedding(
2403 &mut self,
2404 entity_id: &EntityId,
2405 embedding: Vec<f32>,
2406 ) -> Result<UpdateId> {
2407 let change = self.create_change_record(
2408 ChangeType::EmbeddingUpdated,
2409 Operation::Update,
2410 ChangeData::Embedding {
2411 entity_id: entity_id.clone(),
2412 embedding,
2413 },
2414 Some(entity_id.clone()),
2415 None,
2416 );
2417
2418 let update_id = self.apply_change_with_conflict_resolution(change).await?;
2419
2420 self.publish_event(ChangeEvent {
2422 event_id: UpdateId::new(),
2423 event_type: ChangeEventType::EmbeddingUpdated,
2424 entity_id: Some(entity_id.clone()),
2425 timestamp: Utc::now(),
2426 metadata: HashMap::new(),
2427 })
2428 .await;
2429
2430 Ok(update_id)
2431 }
2432
2433 async fn bulk_update_embeddings(
2434 &mut self,
2435 updates: Vec<(EntityId, Vec<f32>)>,
2436 ) -> Result<Vec<UpdateId>> {
2437 let mut update_ids = Vec::new();
2438
2439 for (entity_id, embedding) in updates {
2440 let update_id = self.update_entity_embedding(&entity_id, embedding).await?;
2441 update_ids.push(update_id);
2442 }
2443
2444 Ok(update_ids)
2445 }
2446
2447 async fn get_pending_transactions(&self) -> Result<Vec<TransactionId>> {
2448 let pending: Vec<TransactionId> = self
2449 .transactions
2450 .iter()
2451 .filter(|entry| entry.value().status == TransactionStatus::Active)
2452 .map(|entry| entry.key().clone())
2453 .collect();
2454
2455 Ok(pending)
2456 }
2457
2458 async fn get_graph_statistics(&self) -> Result<GraphStatistics> {
2459 let graph = self.graph.read();
2460 let entities: Vec<_> = graph.entities().collect();
2461 let relationships = graph.get_all_relationships();
2462
2463 let node_count = entities.len();
2464 let edge_count = relationships.len();
2465
2466 let total_degree: usize = entities
2468 .iter()
2469 .map(|entity| graph.get_neighbors(&entity.id).len())
2470 .sum();
2471
2472 let average_degree = if node_count > 0 {
2473 total_degree as f64 / node_count as f64
2474 } else {
2475 0.0
2476 };
2477
2478 let max_degree = entities
2480 .iter()
2481 .map(|entity| graph.get_neighbors(&entity.id).len())
2482 .max()
2483 .unwrap_or(0);
2484
2485 Ok(GraphStatistics {
2486 node_count,
2487 edge_count,
2488 average_degree,
2489 max_degree,
2490 connected_components: 1, clustering_coefficient: 0.0, last_updated: Utc::now(),
2493 })
2494 }
2495
2496 async fn validate_consistency(&self) -> Result<ConsistencyReport> {
2497 let graph = self.graph.read();
2498 let mut orphaned_entities = Vec::new();
2499 let mut broken_relationships = Vec::new();
2500 let mut missing_embeddings = Vec::new();
2501
2502 for entity in graph.entities() {
2504 let neighbors = graph.get_neighbors(&entity.id);
2505 if neighbors.is_empty() {
2506 orphaned_entities.push(entity.id.clone());
2507 }
2508
2509 if entity.embedding.is_none() {
2511 missing_embeddings.push(entity.id.clone());
2512 }
2513 }
2514
2515 for relationship in graph.get_all_relationships() {
2517 if graph.get_entity(&relationship.source).is_none()
2518 || graph.get_entity(&relationship.target).is_none()
2519 {
2520 broken_relationships.push((
2521 relationship.source.clone(),
2522 relationship.target.clone(),
2523 relationship.relation_type.clone(),
2524 ));
2525 }
2526 }
2527
2528 let issues_found =
2529 orphaned_entities.len() + broken_relationships.len() + missing_embeddings.len();
2530
2531 Ok(ConsistencyReport {
2532 is_consistent: issues_found == 0,
2533 orphaned_entities,
2534 broken_relationships,
2535 missing_embeddings,
2536 validation_time: Utc::now(),
2537 issues_found,
2538 })
2539 }
2540}
2541
2542#[allow(dead_code)]
2544trait ChangeDataExt {
2545 fn get_entity_id(&self) -> Option<EntityId>;
2546}
2547
2548impl ChangeDataExt for ChangeData {
2549 fn get_entity_id(&self) -> Option<EntityId> {
2550 match self {
2551 ChangeData::Entity(entity) => Some(entity.id.clone()),
2552 ChangeData::Embedding { entity_id, .. } => Some(entity_id.clone()),
2553 _ => None,
2554 }
2555 }
2556}
2557
2558#[cfg(test)]
2561mod tests {
2562 use super::*;
2563
2564 #[test]
2565 fn test_update_id_generation() {
2566 let id1 = UpdateId::new();
2567 let id2 = UpdateId::new();
2568 assert_ne!(id1.as_str(), id2.as_str());
2569 }
2570
2571 #[test]
2572 fn test_transaction_id_generation() {
2573 let tx1 = TransactionId::new();
2574 let tx2 = TransactionId::new();
2575 assert_ne!(tx1.as_str(), tx2.as_str());
2576 }
2577
2578 #[test]
2579 fn test_change_record_creation() {
2580 let entity = Entity::new(
2581 EntityId::new("test".to_string()),
2582 "Test Entity".to_string(),
2583 "Person".to_string(),
2584 0.9,
2585 );
2586
2587 let config = IncrementalConfig::default();
2588 let graph = KnowledgeGraph::new();
2589 let manager = IncrementalGraphManager::new(graph, config);
2590
2591 let change = manager.create_change_record(
2592 ChangeType::EntityAdded,
2593 Operation::Insert,
2594 ChangeData::Entity(entity.clone()),
2595 Some(entity.id.clone()),
2596 None,
2597 );
2598
2599 assert_eq!(change.change_type, ChangeType::EntityAdded);
2600 assert_eq!(change.operation, Operation::Insert);
2601 assert_eq!(change.entity_id, Some(entity.id));
2602 }
2603
2604 #[test]
2605 fn test_conflict_resolver_creation() {
2606 let resolver = ConflictResolver::new(ConflictStrategy::KeepExisting);
2607 assert!(matches!(resolver.strategy, ConflictStrategy::KeepExisting));
2608 }
2609
2610 #[test]
2611 fn test_incremental_config_default() {
2612 let config = IncrementalConfig::default();
2613 assert_eq!(config.max_change_log_size, 10000);
2614 assert_eq!(config.batch_size, 100);
2615 assert!(config.enable_monitoring);
2616 }
2617
2618 #[test]
2619 fn test_statistics_creation() {
2620 let stats = IncrementalStatistics::empty();
2621 assert_eq!(stats.total_updates, 0);
2622 assert_eq!(stats.entities_added, 0);
2623 assert_eq!(stats.average_update_time_ms, 0.0);
2624 }
2625
2626 #[tokio::test]
2627 async fn test_basic_entity_upsert() {
2628 let config = IncrementalConfig::default();
2629 let graph = KnowledgeGraph::new();
2630 let mut manager = IncrementalGraphManager::new(graph, config);
2631
2632 let entity = Entity::new(
2633 EntityId::new("test_entity".to_string()),
2634 "Test Entity".to_string(),
2635 "Person".to_string(),
2636 0.9,
2637 );
2638
2639 let update_id = manager.basic_upsert_entity(entity).unwrap();
2640 assert!(!update_id.as_str().is_empty());
2641
2642 let stats = manager.get_statistics();
2643 assert_eq!(stats.entities_added, 1);
2644 }
2645
2646 #[cfg(feature = "incremental")]
2647 #[tokio::test]
2648 async fn test_production_graph_store_creation() {
2649 let graph = KnowledgeGraph::new();
2650 let config = IncrementalConfig::default();
2651 let resolver = ConflictResolver::new(ConflictStrategy::Merge);
2652
2653 let store = ProductionGraphStore::new(graph, config, resolver);
2654 let _receiver = store.subscribe_events();
2655 }
2657
2658 #[cfg(feature = "incremental")]
2659 #[tokio::test]
2660 async fn test_production_graph_store_entity_upsert() {
2661 let graph = KnowledgeGraph::new();
2662 let config = IncrementalConfig::default();
2663 let resolver = ConflictResolver::new(ConflictStrategy::Merge);
2664
2665 let mut store = ProductionGraphStore::new(graph, config, resolver);
2666
2667 let entity = Entity::new(
2668 EntityId::new("test_entity".to_string()),
2669 "Test Entity".to_string(),
2670 "Person".to_string(),
2671 0.9,
2672 );
2673
2674 let update_id = store.upsert_entity(entity).await.unwrap();
2675 assert!(!update_id.as_str().is_empty());
2676
2677 let stats = store.get_graph_statistics().await.unwrap();
2678 assert_eq!(stats.node_count, 1);
2679 }
2680
2681 #[cfg(feature = "incremental")]
2682 #[tokio::test]
2683 async fn test_production_graph_store_relationship_upsert() {
2684 let graph = KnowledgeGraph::new();
2685 let config = IncrementalConfig::default();
2686 let resolver = ConflictResolver::new(ConflictStrategy::Merge);
2687
2688 let mut store = ProductionGraphStore::new(graph, config, resolver);
2689
2690 let entity1 = Entity::new(
2692 EntityId::new("entity1".to_string()),
2693 "Entity 1".to_string(),
2694 "Person".to_string(),
2695 0.9,
2696 );
2697
2698 let entity2 = Entity::new(
2699 EntityId::new("entity2".to_string()),
2700 "Entity 2".to_string(),
2701 "Person".to_string(),
2702 0.9,
2703 );
2704
2705 store.upsert_entity(entity1.clone()).await.unwrap();
2706 store.upsert_entity(entity2.clone()).await.unwrap();
2707
2708 let relationship = Relationship {
2709 source: entity1.id,
2710 target: entity2.id,
2711 relation_type: "KNOWS".to_string(),
2712 confidence: 0.8,
2713 context: vec![],
2714 };
2715
2716 let update_id = store.upsert_relationship(relationship).await.unwrap();
2717 assert!(!update_id.as_str().is_empty());
2718
2719 let stats = store.get_graph_statistics().await.unwrap();
2720 assert_eq!(stats.edge_count, 1);
2721 }
2722
2723 #[cfg(feature = "incremental")]
2724 #[tokio::test]
2725 async fn test_production_graph_store_transactions() {
2726 let graph = KnowledgeGraph::new();
2727 let config = IncrementalConfig::default();
2728 let resolver = ConflictResolver::new(ConflictStrategy::Merge);
2729
2730 let mut store = ProductionGraphStore::new(graph, config, resolver);
2731
2732 let tx_id = store.begin_transaction().await.unwrap();
2733 assert!(!tx_id.as_str().is_empty());
2734
2735 let pending = store.get_pending_transactions().await.unwrap();
2736 assert_eq!(pending.len(), 1);
2737 assert_eq!(pending[0], tx_id);
2738
2739 store.commit_transaction(tx_id).await.unwrap();
2740
2741 let pending_after = store.get_pending_transactions().await.unwrap();
2742 assert_eq!(pending_after.len(), 0);
2743 }
2744
2745 #[cfg(feature = "incremental")]
2746 #[tokio::test]
2747 async fn test_production_graph_store_consistency_validation() {
2748 let graph = KnowledgeGraph::new();
2749 let config = IncrementalConfig::default();
2750 let resolver = ConflictResolver::new(ConflictStrategy::Merge);
2751
2752 let store = ProductionGraphStore::new(graph, config, resolver);
2753
2754 let report = store.validate_consistency().await.unwrap();
2755 assert!(report.is_consistent);
2756 assert_eq!(report.issues_found, 0);
2757 }
2758
2759 #[cfg(feature = "incremental")]
2760 #[tokio::test]
2761 async fn test_production_graph_store_event_publishing() {
2762 let graph = KnowledgeGraph::new();
2763 let config = IncrementalConfig::default();
2764 let resolver = ConflictResolver::new(ConflictStrategy::Merge);
2765
2766 let store = ProductionGraphStore::new(graph, config, resolver);
2767 let mut event_receiver = store.subscribe_events();
2768
2769 let entity = Entity::new(
2770 EntityId::new("test_entity".to_string()),
2771 "Test Entity".to_string(),
2772 "Person".to_string(),
2773 0.9,
2774 );
2775
2776 let store_clone = Arc::new(tokio::sync::Mutex::new(store));
2778 let store_for_task = Arc::clone(&store_clone);
2779
2780 tokio::spawn(async move {
2781 let mut store = store_for_task.lock().await;
2782 let _ = store.upsert_entity(entity).await;
2783 });
2784
2785 let event = tokio::time::timeout(std::time::Duration::from_millis(100), event_receiver.recv()).await;
2787 assert!(event.is_ok());
2788 }
2789
2790 #[cfg(feature = "incremental")]
2791 #[test]
2792 fn test_incremental_pagerank_creation() {
2793 let pagerank = IncrementalPageRank::new(0.85, 1e-6, 100);
2794 assert!(pagerank.scores.is_empty());
2795 }
2796
2797 #[cfg(feature = "incremental")]
2798 #[test]
2799 fn test_batch_processor_creation() {
2800 let processor = BatchProcessor::new(100, Duration::from_millis(500), 10);
2801 let metrics = processor.get_metrics();
2802 assert_eq!(metrics.total_batches_processed, 0);
2803 }
2804
2805 #[cfg(feature = "incremental")]
2806 #[tokio::test]
2807 async fn test_selective_invalidation() {
2808 let invalidation = SelectiveInvalidation::new();
2809
2810 let region = CacheRegion {
2811 region_id: "test_region".to_string(),
2812 entity_ids: [EntityId::new("entity1".to_string())].into_iter().collect(),
2813 relationship_types: ["KNOWS".to_string()].into_iter().collect(),
2814 document_ids: HashSet::new(),
2815 last_modified: Utc::now(),
2816 };
2817
2818 invalidation.register_cache_region(region);
2819
2820 let entity = Entity::new(
2821 EntityId::new("entity1".to_string()),
2822 "Entity 1".to_string(),
2823 "Person".to_string(),
2824 0.9,
2825 );
2826
2827 let ent_id_for_log = entity.id.clone();
2828 let change = ChangeRecord {
2829 change_id: UpdateId::new(),
2830 timestamp: Utc::now(),
2831 change_type: ChangeType::EntityUpdated,
2832 entity_id: Some(ent_id_for_log),
2833 document_id: None,
2834 operation: Operation::Update,
2835 data: ChangeData::Entity(entity),
2836 metadata: HashMap::new(),
2837 };
2838
2839 let strategies = invalidation.invalidate_for_changes(&[change]);
2840 assert!(!strategies.is_empty());
2841 }
2842
2843 #[cfg(feature = "incremental")]
2844 #[test]
2845 fn test_conflict_resolver_merge() {
2846 let resolver = ConflictResolver::new(ConflictStrategy::Merge);
2847
2848 let entity1 = Entity::new(
2849 EntityId::new("entity1".to_string()),
2850 "Entity 1".to_string(),
2851 "Person".to_string(),
2852 0.8,
2853 );
2854
2855 let entity2 = Entity::new(
2856 EntityId::new("entity1".to_string()),
2857 "Entity 1 Updated".to_string(),
2858 "Person".to_string(),
2859 0.9,
2860 );
2861
2862 let merged = resolver.merge_entities(&entity1, &entity2).unwrap();
2863 assert_eq!(merged.confidence, 0.9); assert_eq!(merged.name, "Entity 1 Updated");
2865 }
2866
2867 #[test]
2868 fn test_graph_statistics_creation() {
2869 let stats = GraphStatistics {
2870 node_count: 100,
2871 edge_count: 150,
2872 average_degree: 3.0,
2873 max_degree: 10,
2874 connected_components: 1,
2875 clustering_coefficient: 0.3,
2876 last_updated: Utc::now(),
2877 };
2878
2879 assert_eq!(stats.node_count, 100);
2880 assert_eq!(stats.edge_count, 150);
2881 }
2882
2883 #[test]
2884 fn test_consistency_report_creation() {
2885 let report = ConsistencyReport {
2886 is_consistent: true,
2887 orphaned_entities: vec![],
2888 broken_relationships: vec![],
2889 missing_embeddings: vec![],
2890 validation_time: Utc::now(),
2891 issues_found: 0,
2892 };
2893
2894 assert!(report.is_consistent);
2895 assert_eq!(report.issues_found, 0);
2896 }
2897
2898 #[test]
2899 fn test_change_event_creation() {
2900 let event = ChangeEvent {
2901 event_id: UpdateId::new(),
2902 event_type: ChangeEventType::EntityUpserted,
2903 entity_id: Some(EntityId::new("entity1".to_string())),
2904 timestamp: Utc::now(),
2905 metadata: HashMap::new(),
2906 };
2907
2908 assert!(matches!(event.event_type, ChangeEventType::EntityUpserted));
2909 assert!(event.entity_id.is_some());
2910 }
2911}