1#![allow(unused_imports)]
2
3use crate::core::{
4 DocumentId, Entity, EntityId, GraphRAGError, KnowledgeGraph, Relationship, Result, TextChunk,
5};
6use chrono::{DateTime, Utc};
7use serde::{Deserialize, Serialize};
8use std::collections::{HashMap, HashSet};
9use std::time::{Duration, Instant};
10
11#[cfg(feature = "incremental")]
12use std::sync::Arc;
13
14#[cfg(feature = "incremental")]
15use {
16 dashmap::DashMap,
17 parking_lot::{Mutex, RwLock},
18 tokio::sync::{broadcast, Semaphore},
19 uuid::Uuid,
20};
21
22use super::*;
23
24#[cfg(feature = "incremental")]
30#[allow(dead_code)]
31pub struct ProductionGraphStore {
32 graph: Arc<RwLock<KnowledgeGraph>>,
33 transactions: DashMap<TransactionId, Transaction>,
34 change_log: DashMap<UpdateId, ChangeRecord>,
35 rollback_data: DashMap<UpdateId, RollbackData>,
36 conflict_resolver: Arc<ConflictResolver>,
37 cache_invalidation: Arc<SelectiveInvalidation>,
38 monitor: Arc<UpdateMonitor>,
39 batch_processor: Arc<BatchProcessor>,
40 incremental_pagerank: Arc<IncrementalPageRank>,
41 event_publisher: broadcast::Sender<ChangeEvent>,
42 config: IncrementalConfig,
43}
44
45#[derive(Debug, Clone)]
47#[allow(dead_code)]
48struct Transaction {
49 id: TransactionId,
50 changes: Vec<ChangeRecord>,
51 status: TransactionStatus,
52 created_at: DateTime<Utc>,
53 isolation_level: IsolationLevel,
54}
55
56#[derive(Debug, Clone, PartialEq)]
57#[allow(dead_code)]
58enum TransactionStatus {
59 Active,
60 Preparing,
61 Committed,
62 Aborted,
63}
64
65#[derive(Debug, Clone)]
66#[allow(dead_code)]
67enum IsolationLevel {
68 ReadUncommitted,
69 ReadCommitted,
70 RepeatableRead,
71 Serializable,
72}
73
74#[derive(Debug, Clone, Serialize, Deserialize)]
76pub struct ChangeEvent {
77 pub event_id: UpdateId,
79 pub event_type: ChangeEventType,
81 pub entity_id: Option<EntityId>,
83 pub timestamp: DateTime<Utc>,
85 pub metadata: HashMap<String, String>,
87}
88
89#[derive(Debug, Clone, Serialize, Deserialize)]
91pub enum ChangeEventType {
92 EntityUpserted,
94 EntityDeleted,
96 RelationshipUpserted,
98 RelationshipDeleted,
100 EmbeddingUpdated,
102 TransactionStarted,
104 TransactionCommitted,
106 TransactionRolledBack,
108 ConflictResolved,
110 CacheInvalidated,
112 BatchProcessed,
114}
115
116#[cfg(feature = "incremental")]
117impl ProductionGraphStore {
118 pub fn new(
120 graph: KnowledgeGraph,
121 config: IncrementalConfig,
122 conflict_resolver: ConflictResolver,
123 ) -> Self {
124 let (event_tx, _) = broadcast::channel(1000);
125
126 Self {
127 graph: Arc::new(RwLock::new(graph)),
128 transactions: DashMap::new(),
129 change_log: DashMap::new(),
130 rollback_data: DashMap::new(),
131 conflict_resolver: Arc::new(conflict_resolver),
132 cache_invalidation: Arc::new(SelectiveInvalidation::new()),
133 monitor: Arc::new(UpdateMonitor::new()),
134 batch_processor: Arc::new(BatchProcessor::new(
135 config.batch_size,
136 Duration::from_millis(100),
137 config.max_concurrent_operations,
138 )),
139 incremental_pagerank: Arc::new(IncrementalPageRank::new(0.85, 1e-6, 100)),
140 event_publisher: event_tx,
141 config,
142 }
143 }
144
145 pub fn subscribe_events(&self) -> broadcast::Receiver<ChangeEvent> {
147 self.event_publisher.subscribe()
148 }
149
150 async fn publish_event(&self, event: ChangeEvent) {
151 let _ = self.event_publisher.send(event);
152 }
153
154 fn create_change_record(
155 &self,
156 change_type: ChangeType,
157 operation: Operation,
158 change_data: ChangeData,
159 entity_id: Option<EntityId>,
160 document_id: Option<DocumentId>,
161 ) -> ChangeRecord {
162 ChangeRecord {
163 change_id: UpdateId::new(),
164 timestamp: Utc::now(),
165 change_type,
166 entity_id,
167 document_id,
168 operation,
169 data: change_data,
170 metadata: HashMap::new(),
171 }
172 }
173
174 async fn apply_change_with_conflict_resolution(
175 &self,
176 change: ChangeRecord,
177 ) -> Result<UpdateId> {
178 let operation_id = self.monitor.start_operation("apply_change");
179
180 if let Some(conflict) = self.detect_conflict(&change)? {
182 let resolution = self.conflict_resolver.resolve_conflict(&conflict).await?;
183
184 let resolved_change = ChangeRecord {
186 data: resolution.resolved_data,
187 metadata: resolution.metadata,
188 ..change
189 };
190
191 self.apply_change_internal(resolved_change).await?;
192
193 self.publish_event(ChangeEvent {
195 event_id: UpdateId::new(),
196 event_type: ChangeEventType::ConflictResolved,
197 entity_id: conflict.existing_data.get_entity_id(),
198 timestamp: Utc::now(),
199 metadata: HashMap::new(),
200 })
201 .await;
202 } else {
203 self.apply_change_internal(change).await?;
204 }
205
206 self.monitor
207 .complete_operation(&operation_id, true, None, 1, 0);
208 Ok(operation_id)
209 }
210
211 fn detect_conflict(&self, change: &ChangeRecord) -> Result<Option<Conflict>> {
212 match &change.data {
213 ChangeData::Entity(entity) => {
214 let graph = self.graph.read();
215 if let Some(existing) = graph.get_entity(&entity.id) {
216 if existing.name != entity.name || existing.entity_type != entity.entity_type {
217 return Ok(Some(Conflict {
218 conflict_id: UpdateId::new(),
219 conflict_type: ConflictType::EntityExists,
220 existing_data: ChangeData::Entity(existing.clone()),
221 new_data: change.data.clone(),
222 resolution: None,
223 }));
224 }
225 }
226 },
227 ChangeData::Relationship(relationship) => {
228 let graph = self.graph.read();
229 for existing_rel in graph.get_all_relationships() {
230 if existing_rel.source == relationship.source
231 && existing_rel.target == relationship.target
232 && existing_rel.relation_type == relationship.relation_type
233 {
234 return Ok(Some(Conflict {
235 conflict_id: UpdateId::new(),
236 conflict_type: ConflictType::RelationshipExists,
237 existing_data: ChangeData::Relationship(existing_rel.clone()),
238 new_data: change.data.clone(),
239 resolution: None,
240 }));
241 }
242 }
243 },
244 _ => {},
245 }
246
247 Ok(None)
248 }
249
250 async fn apply_change_internal(&self, change: ChangeRecord) -> Result<()> {
251 let change_id = change.change_id.clone();
252
253 let rollback_data = {
255 let graph = self.graph.read();
256 self.create_rollback_data(&change, &graph)?
257 };
258
259 self.rollback_data.insert(change_id.clone(), rollback_data);
260
261 {
263 let mut graph = self.graph.write();
264 match &change.data {
265 ChangeData::Entity(entity) => {
266 match change.operation {
267 Operation::Insert | Operation::Upsert => {
268 graph.add_entity(entity.clone())?;
269 self.incremental_pagerank.record_change(entity.id.clone());
270 },
271 Operation::Delete => {
272 },
275 _ => {},
276 }
277 },
278 ChangeData::Relationship(relationship) => {
279 match change.operation {
280 Operation::Insert | Operation::Upsert => {
281 graph.add_relationship(relationship.clone())?;
282 self.incremental_pagerank
283 .record_change(relationship.source.clone());
284 self.incremental_pagerank
285 .record_change(relationship.target.clone());
286 },
287 Operation::Delete => {
288 },
291 _ => {},
292 }
293 },
294 ChangeData::Embedding {
295 entity_id,
296 embedding,
297 } => {
298 if let Some(entity) = graph.get_entity_mut(entity_id) {
299 entity.embedding = Some(embedding.clone());
300 }
301 },
302 _ => {},
303 }
304 }
305
306 self.change_log.insert(change_id, change);
308
309 Ok(())
310 }
311
312 fn create_rollback_data(
313 &self,
314 change: &ChangeRecord,
315 graph: &KnowledgeGraph,
316 ) -> Result<RollbackData> {
317 let mut previous_entities = Vec::new();
318 let mut previous_relationships = Vec::new();
319
320 match &change.data {
321 ChangeData::Entity(entity) => {
322 if let Some(existing) = graph.get_entity(&entity.id) {
323 previous_entities.push(existing.clone());
324 }
325 },
326 ChangeData::Relationship(relationship) => {
327 for rel in graph.get_all_relationships() {
329 if rel.source == relationship.source && rel.target == relationship.target {
330 previous_relationships.push(rel.clone());
331 }
332 }
333 },
334 _ => {},
335 }
336
337 Ok(RollbackData {
338 previous_entities,
339 previous_relationships,
340 affected_caches: vec![], })
342 }
343}
344
345#[cfg(feature = "incremental")]
346#[async_trait::async_trait]
347impl IncrementalGraphStore for ProductionGraphStore {
348 type Error = GraphRAGError;
349
350 async fn upsert_entity(&mut self, entity: Entity) -> Result<UpdateId> {
351 let change = self.create_change_record(
352 ChangeType::EntityAdded,
353 Operation::Upsert,
354 ChangeData::Entity(entity.clone()),
355 Some(entity.id.clone()),
356 None,
357 );
358
359 let update_id = self.apply_change_with_conflict_resolution(change).await?;
360
361 let changes = vec![self
363 .change_log
364 .get(&update_id)
365 .expect("just inserted above")
366 .clone()];
367 let _invalidation_strategies = self.cache_invalidation.invalidate_for_changes(&changes);
368
369 self.publish_event(ChangeEvent {
371 event_id: UpdateId::new(),
372 event_type: ChangeEventType::EntityUpserted,
373 entity_id: Some(entity.id),
374 timestamp: Utc::now(),
375 metadata: HashMap::new(),
376 })
377 .await;
378
379 Ok(update_id)
380 }
381
382 async fn upsert_relationship(&mut self, relationship: Relationship) -> Result<UpdateId> {
383 let change = self.create_change_record(
384 ChangeType::RelationshipAdded,
385 Operation::Upsert,
386 ChangeData::Relationship(relationship.clone()),
387 None,
388 None,
389 );
390
391 let update_id = self.apply_change_with_conflict_resolution(change).await?;
392
393 self.publish_event(ChangeEvent {
395 event_id: UpdateId::new(),
396 event_type: ChangeEventType::RelationshipUpserted,
397 entity_id: Some(relationship.source),
398 timestamp: Utc::now(),
399 metadata: HashMap::new(),
400 })
401 .await;
402
403 Ok(update_id)
404 }
405
406 async fn delete_entity(&mut self, entity_id: &EntityId) -> Result<UpdateId> {
407 let update_id = UpdateId::new();
409
410 self.publish_event(ChangeEvent {
412 event_id: UpdateId::new(),
413 event_type: ChangeEventType::EntityDeleted,
414 entity_id: Some(entity_id.clone()),
415 timestamp: Utc::now(),
416 metadata: HashMap::new(),
417 })
418 .await;
419
420 Ok(update_id)
421 }
422
423 async fn delete_relationship(
424 &mut self,
425 source: &EntityId,
426 _target: &EntityId,
427 _relation_type: &str,
428 ) -> Result<UpdateId> {
429 let update_id = UpdateId::new();
431
432 self.publish_event(ChangeEvent {
434 event_id: UpdateId::new(),
435 event_type: ChangeEventType::RelationshipDeleted,
436 entity_id: Some(source.clone()),
437 timestamp: Utc::now(),
438 metadata: HashMap::new(),
439 })
440 .await;
441
442 Ok(update_id)
443 }
444
445 async fn apply_delta(&mut self, delta: GraphDelta) -> Result<UpdateId> {
446 let tx_id = self.begin_transaction().await?;
447
448 for change in delta.changes {
449 self.apply_change_with_conflict_resolution(change).await?;
450 }
451
452 self.commit_transaction(tx_id).await?;
453 Ok(delta.delta_id)
454 }
455
456 async fn rollback_delta(&mut self, _delta_id: &UpdateId) -> Result<()> {
457 Ok(())
459 }
460
461 async fn get_change_log(&self, since: Option<DateTime<Utc>>) -> Result<Vec<ChangeRecord>> {
462 let changes: Vec<ChangeRecord> = self
463 .change_log
464 .iter()
465 .filter_map(|entry| {
466 let change = entry.value();
467 if let Some(since_time) = since {
468 if change.timestamp >= since_time {
469 Some(change.clone())
470 } else {
471 None
472 }
473 } else {
474 Some(change.clone())
475 }
476 })
477 .collect();
478
479 Ok(changes)
480 }
481
482 async fn begin_transaction(&mut self) -> Result<TransactionId> {
483 let tx_id = TransactionId::new();
484 let transaction = Transaction {
485 id: tx_id.clone(),
486 changes: Vec::new(),
487 status: TransactionStatus::Active,
488 created_at: Utc::now(),
489 isolation_level: IsolationLevel::ReadCommitted,
490 };
491
492 self.transactions.insert(tx_id.clone(), transaction);
493
494 self.publish_event(ChangeEvent {
496 event_id: UpdateId::new(),
497 event_type: ChangeEventType::TransactionStarted,
498 entity_id: None,
499 timestamp: Utc::now(),
500 metadata: [("transaction_id".to_string(), tx_id.to_string())]
501 .into_iter()
502 .collect(),
503 })
504 .await;
505
506 Ok(tx_id)
507 }
508
509 async fn commit_transaction(&mut self, tx_id: TransactionId) -> Result<()> {
510 if let Some((_, mut tx)) = self.transactions.remove(&tx_id) {
511 tx.status = TransactionStatus::Committed;
512
513 self.publish_event(ChangeEvent {
515 event_id: UpdateId::new(),
516 event_type: ChangeEventType::TransactionCommitted,
517 entity_id: None,
518 timestamp: Utc::now(),
519 metadata: [("transaction_id".to_string(), tx_id.to_string())]
520 .into_iter()
521 .collect(),
522 })
523 .await;
524
525 Ok(())
526 } else {
527 Err(GraphRAGError::IncrementalUpdate {
528 message: format!("Transaction {tx_id} not found"),
529 })
530 }
531 }
532
533 async fn rollback_transaction(&mut self, tx_id: TransactionId) -> Result<()> {
534 if let Some((_, mut tx)) = self.transactions.remove(&tx_id) {
535 tx.status = TransactionStatus::Aborted;
536
537 for _change in &tx.changes {
539 }
541
542 self.publish_event(ChangeEvent {
544 event_id: UpdateId::new(),
545 event_type: ChangeEventType::TransactionRolledBack,
546 entity_id: None,
547 timestamp: Utc::now(),
548 metadata: [("transaction_id".to_string(), tx_id.to_string())]
549 .into_iter()
550 .collect(),
551 })
552 .await;
553
554 Ok(())
555 } else {
556 Err(GraphRAGError::IncrementalUpdate {
557 message: format!("Transaction {tx_id} not found"),
558 })
559 }
560 }
561
562 async fn batch_upsert_entities(
563 &mut self,
564 entities: Vec<Entity>,
565 _strategy: ConflictStrategy,
566 ) -> Result<Vec<UpdateId>> {
567 let mut update_ids = Vec::new();
568
569 for entity in entities {
570 let update_id = self.upsert_entity(entity).await?;
571 update_ids.push(update_id);
572 }
573
574 Ok(update_ids)
575 }
576
577 async fn batch_upsert_relationships(
578 &mut self,
579 relationships: Vec<Relationship>,
580 _strategy: ConflictStrategy,
581 ) -> Result<Vec<UpdateId>> {
582 let mut update_ids = Vec::new();
583
584 for relationship in relationships {
585 let update_id = self.upsert_relationship(relationship).await?;
586 update_ids.push(update_id);
587 }
588
589 Ok(update_ids)
590 }
591
592 async fn update_entity_embedding(
593 &mut self,
594 entity_id: &EntityId,
595 embedding: Vec<f32>,
596 ) -> Result<UpdateId> {
597 let change = self.create_change_record(
598 ChangeType::EmbeddingUpdated,
599 Operation::Update,
600 ChangeData::Embedding {
601 entity_id: entity_id.clone(),
602 embedding,
603 },
604 Some(entity_id.clone()),
605 None,
606 );
607
608 let update_id = self.apply_change_with_conflict_resolution(change).await?;
609
610 self.publish_event(ChangeEvent {
612 event_id: UpdateId::new(),
613 event_type: ChangeEventType::EmbeddingUpdated,
614 entity_id: Some(entity_id.clone()),
615 timestamp: Utc::now(),
616 metadata: HashMap::new(),
617 })
618 .await;
619
620 Ok(update_id)
621 }
622
623 async fn bulk_update_embeddings(
624 &mut self,
625 updates: Vec<(EntityId, Vec<f32>)>,
626 ) -> Result<Vec<UpdateId>> {
627 let mut update_ids = Vec::new();
628
629 for (entity_id, embedding) in updates {
630 let update_id = self.update_entity_embedding(&entity_id, embedding).await?;
631 update_ids.push(update_id);
632 }
633
634 Ok(update_ids)
635 }
636
637 async fn get_pending_transactions(&self) -> Result<Vec<TransactionId>> {
638 let pending: Vec<TransactionId> = self
639 .transactions
640 .iter()
641 .filter(|entry| entry.value().status == TransactionStatus::Active)
642 .map(|entry| entry.key().clone())
643 .collect();
644
645 Ok(pending)
646 }
647
648 async fn get_graph_statistics(&self) -> Result<GraphStatistics> {
649 let graph = self.graph.read();
650 let entities: Vec<_> = graph.entities().collect();
651 let relationships = graph.get_all_relationships();
652
653 let node_count = entities.len();
654 let edge_count = relationships.len();
655
656 let total_degree: usize = entities
658 .iter()
659 .map(|entity| graph.get_neighbors(&entity.id).len())
660 .sum();
661
662 let average_degree = if node_count > 0 {
663 total_degree as f64 / node_count as f64
664 } else {
665 0.0
666 };
667
668 let max_degree = entities
670 .iter()
671 .map(|entity| graph.get_neighbors(&entity.id).len())
672 .max()
673 .unwrap_or(0);
674
675 Ok(GraphStatistics {
676 node_count,
677 edge_count,
678 average_degree,
679 max_degree,
680 connected_components: 1, clustering_coefficient: 0.0, last_updated: Utc::now(),
683 })
684 }
685
686 async fn validate_consistency(&self) -> Result<ConsistencyReport> {
687 let graph = self.graph.read();
688 let mut orphaned_entities = Vec::new();
689 let mut broken_relationships = Vec::new();
690 let mut missing_embeddings = Vec::new();
691
692 for entity in graph.entities() {
694 let neighbors = graph.get_neighbors(&entity.id);
695 if neighbors.is_empty() {
696 orphaned_entities.push(entity.id.clone());
697 }
698
699 if entity.embedding.is_none() {
701 missing_embeddings.push(entity.id.clone());
702 }
703 }
704
705 for relationship in graph.get_all_relationships() {
707 if graph.get_entity(&relationship.source).is_none()
708 || graph.get_entity(&relationship.target).is_none()
709 {
710 broken_relationships.push((
711 relationship.source.clone(),
712 relationship.target.clone(),
713 relationship.relation_type.clone(),
714 ));
715 }
716 }
717
718 let issues_found =
719 orphaned_entities.len() + broken_relationships.len() + missing_embeddings.len();
720
721 Ok(ConsistencyReport {
722 is_consistent: issues_found == 0,
723 orphaned_entities,
724 broken_relationships,
725 missing_embeddings,
726 validation_time: Utc::now(),
727 issues_found,
728 })
729 }
730}
731
732#[allow(dead_code)]
734trait ChangeDataExt {
735 fn get_entity_id(&self) -> Option<EntityId>;
736}
737
738impl ChangeDataExt for ChangeData {
739 fn get_entity_id(&self) -> Option<EntityId> {
740 match self {
741 ChangeData::Entity(entity) => Some(entity.id.clone()),
742 ChangeData::Embedding { entity_id, .. } => Some(entity_id.clone()),
743 _ => None,
744 }
745 }
746}