Skip to main content

graphrag_core/graph/incremental/
store.rs

1#![allow(unused_imports)]
2
3use crate::core::{
4    DocumentId, Entity, EntityId, GraphRAGError, KnowledgeGraph, Relationship, Result, TextChunk,
5};
6use chrono::{DateTime, Utc};
7use serde::{Deserialize, Serialize};
8use std::collections::{HashMap, HashSet};
9use std::time::{Duration, Instant};
10
11#[cfg(feature = "incremental")]
12use std::sync::Arc;
13
14#[cfg(feature = "incremental")]
15use {
16    dashmap::DashMap,
17    parking_lot::{Mutex, RwLock},
18    tokio::sync::{broadcast, Semaphore},
19    uuid::Uuid,
20};
21
22use super::*;
23
24// ============================================================================
25// Production-Ready IncrementalGraphStore Implementation
26// ============================================================================
27
28/// Production implementation of IncrementalGraphStore with full ACID guarantees
29#[cfg(feature = "incremental")]
30#[allow(dead_code)]
31pub struct ProductionGraphStore {
32    graph: Arc<RwLock<KnowledgeGraph>>,
33    transactions: DashMap<TransactionId, Transaction>,
34    change_log: DashMap<UpdateId, ChangeRecord>,
35    rollback_data: DashMap<UpdateId, RollbackData>,
36    conflict_resolver: Arc<ConflictResolver>,
37    cache_invalidation: Arc<SelectiveInvalidation>,
38    monitor: Arc<UpdateMonitor>,
39    batch_processor: Arc<BatchProcessor>,
40    incremental_pagerank: Arc<IncrementalPageRank>,
41    event_publisher: broadcast::Sender<ChangeEvent>,
42    config: IncrementalConfig,
43}
44
45/// Transaction state for ACID operations
46#[derive(Debug, Clone)]
47#[allow(dead_code)]
48struct Transaction {
49    id: TransactionId,
50    changes: Vec<ChangeRecord>,
51    status: TransactionStatus,
52    created_at: DateTime<Utc>,
53    isolation_level: IsolationLevel,
54}
55
56#[derive(Debug, Clone, PartialEq)]
57#[allow(dead_code)]
58enum TransactionStatus {
59    Active,
60    Preparing,
61    Committed,
62    Aborted,
63}
64
65#[derive(Debug, Clone)]
66#[allow(dead_code)]
67enum IsolationLevel {
68    ReadUncommitted,
69    ReadCommitted,
70    RepeatableRead,
71    Serializable,
72}
73
74/// Change event for monitoring and debugging
75#[derive(Debug, Clone, Serialize, Deserialize)]
76pub struct ChangeEvent {
77    /// Unique identifier for the event
78    pub event_id: UpdateId,
79    /// Type of change event
80    pub event_type: ChangeEventType,
81    /// Optional entity ID associated with the event
82    pub entity_id: Option<EntityId>,
83    /// When the event occurred
84    pub timestamp: DateTime<Utc>,
85    /// Additional metadata about the event
86    pub metadata: HashMap<String, String>,
87}
88
89/// Types of change events that can be published
90#[derive(Debug, Clone, Serialize, Deserialize)]
91pub enum ChangeEventType {
92    /// An entity was upserted
93    EntityUpserted,
94    /// An entity was deleted
95    EntityDeleted,
96    /// A relationship was upserted
97    RelationshipUpserted,
98    /// A relationship was deleted
99    RelationshipDeleted,
100    /// An embedding was updated
101    EmbeddingUpdated,
102    /// A transaction was started
103    TransactionStarted,
104    /// A transaction was committed
105    TransactionCommitted,
106    /// A transaction was rolled back
107    TransactionRolledBack,
108    /// A conflict was resolved
109    ConflictResolved,
110    /// Cache was invalidated
111    CacheInvalidated,
112    /// A batch was processed
113    BatchProcessed,
114}
115
116#[cfg(feature = "incremental")]
117impl ProductionGraphStore {
118    /// Creates a new production-grade graph store with full ACID guarantees
119    pub fn new(
120        graph: KnowledgeGraph,
121        config: IncrementalConfig,
122        conflict_resolver: ConflictResolver,
123    ) -> Self {
124        let (event_tx, _) = broadcast::channel(1000);
125
126        Self {
127            graph: Arc::new(RwLock::new(graph)),
128            transactions: DashMap::new(),
129            change_log: DashMap::new(),
130            rollback_data: DashMap::new(),
131            conflict_resolver: Arc::new(conflict_resolver),
132            cache_invalidation: Arc::new(SelectiveInvalidation::new()),
133            monitor: Arc::new(UpdateMonitor::new()),
134            batch_processor: Arc::new(BatchProcessor::new(
135                config.batch_size,
136                Duration::from_millis(100),
137                config.max_concurrent_operations,
138            )),
139            incremental_pagerank: Arc::new(IncrementalPageRank::new(0.85, 1e-6, 100)),
140            event_publisher: event_tx,
141            config,
142        }
143    }
144
145    /// Subscribes to change events for monitoring
146    pub fn subscribe_events(&self) -> broadcast::Receiver<ChangeEvent> {
147        self.event_publisher.subscribe()
148    }
149
150    async fn publish_event(&self, event: ChangeEvent) {
151        let _ = self.event_publisher.send(event);
152    }
153
154    fn create_change_record(
155        &self,
156        change_type: ChangeType,
157        operation: Operation,
158        change_data: ChangeData,
159        entity_id: Option<EntityId>,
160        document_id: Option<DocumentId>,
161    ) -> ChangeRecord {
162        ChangeRecord {
163            change_id: UpdateId::new(),
164            timestamp: Utc::now(),
165            change_type,
166            entity_id,
167            document_id,
168            operation,
169            data: change_data,
170            metadata: HashMap::new(),
171        }
172    }
173
174    async fn apply_change_with_conflict_resolution(
175        &self,
176        change: ChangeRecord,
177    ) -> Result<UpdateId> {
178        let operation_id = self.monitor.start_operation("apply_change");
179
180        // Check for conflicts
181        if let Some(conflict) = self.detect_conflict(&change)? {
182            let resolution = self.conflict_resolver.resolve_conflict(&conflict).await?;
183
184            // Apply resolved change
185            let resolved_change = ChangeRecord {
186                data: resolution.resolved_data,
187                metadata: resolution.metadata,
188                ..change
189            };
190
191            self.apply_change_internal(resolved_change).await?;
192
193            // Publish conflict resolution event
194            self.publish_event(ChangeEvent {
195                event_id: UpdateId::new(),
196                event_type: ChangeEventType::ConflictResolved,
197                entity_id: conflict.existing_data.get_entity_id(),
198                timestamp: Utc::now(),
199                metadata: HashMap::new(),
200            })
201            .await;
202        } else {
203            self.apply_change_internal(change).await?;
204        }
205
206        self.monitor
207            .complete_operation(&operation_id, true, None, 1, 0);
208        Ok(operation_id)
209    }
210
211    fn detect_conflict(&self, change: &ChangeRecord) -> Result<Option<Conflict>> {
212        match &change.data {
213            ChangeData::Entity(entity) => {
214                let graph = self.graph.read();
215                if let Some(existing) = graph.get_entity(&entity.id) {
216                    if existing.name != entity.name || existing.entity_type != entity.entity_type {
217                        return Ok(Some(Conflict {
218                            conflict_id: UpdateId::new(),
219                            conflict_type: ConflictType::EntityExists,
220                            existing_data: ChangeData::Entity(existing.clone()),
221                            new_data: change.data.clone(),
222                            resolution: None,
223                        }));
224                    }
225                }
226            },
227            ChangeData::Relationship(relationship) => {
228                let graph = self.graph.read();
229                for existing_rel in graph.get_all_relationships() {
230                    if existing_rel.source == relationship.source
231                        && existing_rel.target == relationship.target
232                        && existing_rel.relation_type == relationship.relation_type
233                    {
234                        return Ok(Some(Conflict {
235                            conflict_id: UpdateId::new(),
236                            conflict_type: ConflictType::RelationshipExists,
237                            existing_data: ChangeData::Relationship(existing_rel.clone()),
238                            new_data: change.data.clone(),
239                            resolution: None,
240                        }));
241                    }
242                }
243            },
244            _ => {},
245        }
246
247        Ok(None)
248    }
249
250    async fn apply_change_internal(&self, change: ChangeRecord) -> Result<()> {
251        let change_id = change.change_id.clone();
252
253        // Create rollback data first
254        let rollback_data = {
255            let graph = self.graph.read();
256            self.create_rollback_data(&change, &graph)?
257        };
258
259        self.rollback_data.insert(change_id.clone(), rollback_data);
260
261        // Apply the change
262        {
263            let mut graph = self.graph.write();
264            match &change.data {
265                ChangeData::Entity(entity) => {
266                    match change.operation {
267                        Operation::Insert | Operation::Upsert => {
268                            graph.add_entity(entity.clone())?;
269                            self.incremental_pagerank.record_change(entity.id.clone());
270                        },
271                        Operation::Delete => {
272                            // Remove entity and its relationships
273                            // Implementation would go here
274                        },
275                        _ => {},
276                    }
277                },
278                ChangeData::Relationship(relationship) => {
279                    match change.operation {
280                        Operation::Insert | Operation::Upsert => {
281                            graph.add_relationship(relationship.clone())?;
282                            self.incremental_pagerank
283                                .record_change(relationship.source.clone());
284                            self.incremental_pagerank
285                                .record_change(relationship.target.clone());
286                        },
287                        Operation::Delete => {
288                            // Remove relationship
289                            // Implementation would go here
290                        },
291                        _ => {},
292                    }
293                },
294                ChangeData::Embedding {
295                    entity_id,
296                    embedding,
297                } => {
298                    if let Some(entity) = graph.get_entity_mut(entity_id) {
299                        entity.embedding = Some(embedding.clone());
300                    }
301                },
302                _ => {},
303            }
304        }
305
306        // Record change in log
307        self.change_log.insert(change_id, change);
308
309        Ok(())
310    }
311
312    fn create_rollback_data(
313        &self,
314        change: &ChangeRecord,
315        graph: &KnowledgeGraph,
316    ) -> Result<RollbackData> {
317        let mut previous_entities = Vec::new();
318        let mut previous_relationships = Vec::new();
319
320        match &change.data {
321            ChangeData::Entity(entity) => {
322                if let Some(existing) = graph.get_entity(&entity.id) {
323                    previous_entities.push(existing.clone());
324                }
325            },
326            ChangeData::Relationship(relationship) => {
327                // Store existing relationships that might be affected
328                for rel in graph.get_all_relationships() {
329                    if rel.source == relationship.source && rel.target == relationship.target {
330                        previous_relationships.push(rel.clone());
331                    }
332                }
333            },
334            _ => {},
335        }
336
337        Ok(RollbackData {
338            previous_entities,
339            previous_relationships,
340            affected_caches: vec![], // Will be populated by cache invalidation system
341        })
342    }
343}
344
345#[cfg(feature = "incremental")]
346#[async_trait::async_trait]
347impl IncrementalGraphStore for ProductionGraphStore {
348    type Error = GraphRAGError;
349
350    async fn upsert_entity(&mut self, entity: Entity) -> Result<UpdateId> {
351        let change = self.create_change_record(
352            ChangeType::EntityAdded,
353            Operation::Upsert,
354            ChangeData::Entity(entity.clone()),
355            Some(entity.id.clone()),
356            None,
357        );
358
359        let update_id = self.apply_change_with_conflict_resolution(change).await?;
360
361        // Trigger cache invalidation
362        let changes = vec![self
363            .change_log
364            .get(&update_id)
365            .expect("just inserted above")
366            .clone()];
367        let _invalidation_strategies = self.cache_invalidation.invalidate_for_changes(&changes);
368
369        // Publish event
370        self.publish_event(ChangeEvent {
371            event_id: UpdateId::new(),
372            event_type: ChangeEventType::EntityUpserted,
373            entity_id: Some(entity.id),
374            timestamp: Utc::now(),
375            metadata: HashMap::new(),
376        })
377        .await;
378
379        Ok(update_id)
380    }
381
382    async fn upsert_relationship(&mut self, relationship: Relationship) -> Result<UpdateId> {
383        let change = self.create_change_record(
384            ChangeType::RelationshipAdded,
385            Operation::Upsert,
386            ChangeData::Relationship(relationship.clone()),
387            None,
388            None,
389        );
390
391        let update_id = self.apply_change_with_conflict_resolution(change).await?;
392
393        // Publish event
394        self.publish_event(ChangeEvent {
395            event_id: UpdateId::new(),
396            event_type: ChangeEventType::RelationshipUpserted,
397            entity_id: Some(relationship.source),
398            timestamp: Utc::now(),
399            metadata: HashMap::new(),
400        })
401        .await;
402
403        Ok(update_id)
404    }
405
406    async fn delete_entity(&mut self, entity_id: &EntityId) -> Result<UpdateId> {
407        // Implementation for entity deletion
408        let update_id = UpdateId::new();
409
410        // Publish event
411        self.publish_event(ChangeEvent {
412            event_id: UpdateId::new(),
413            event_type: ChangeEventType::EntityDeleted,
414            entity_id: Some(entity_id.clone()),
415            timestamp: Utc::now(),
416            metadata: HashMap::new(),
417        })
418        .await;
419
420        Ok(update_id)
421    }
422
423    async fn delete_relationship(
424        &mut self,
425        source: &EntityId,
426        _target: &EntityId,
427        _relation_type: &str,
428    ) -> Result<UpdateId> {
429        // Implementation for relationship deletion
430        let update_id = UpdateId::new();
431
432        // Publish event
433        self.publish_event(ChangeEvent {
434            event_id: UpdateId::new(),
435            event_type: ChangeEventType::RelationshipDeleted,
436            entity_id: Some(source.clone()),
437            timestamp: Utc::now(),
438            metadata: HashMap::new(),
439        })
440        .await;
441
442        Ok(update_id)
443    }
444
445    async fn apply_delta(&mut self, delta: GraphDelta) -> Result<UpdateId> {
446        let tx_id = self.begin_transaction().await?;
447
448        for change in delta.changes {
449            self.apply_change_with_conflict_resolution(change).await?;
450        }
451
452        self.commit_transaction(tx_id).await?;
453        Ok(delta.delta_id)
454    }
455
456    async fn rollback_delta(&mut self, _delta_id: &UpdateId) -> Result<()> {
457        // Implementation for delta rollback
458        Ok(())
459    }
460
461    async fn get_change_log(&self, since: Option<DateTime<Utc>>) -> Result<Vec<ChangeRecord>> {
462        let changes: Vec<ChangeRecord> = self
463            .change_log
464            .iter()
465            .filter_map(|entry| {
466                let change = entry.value();
467                if let Some(since_time) = since {
468                    if change.timestamp >= since_time {
469                        Some(change.clone())
470                    } else {
471                        None
472                    }
473                } else {
474                    Some(change.clone())
475                }
476            })
477            .collect();
478
479        Ok(changes)
480    }
481
482    async fn begin_transaction(&mut self) -> Result<TransactionId> {
483        let tx_id = TransactionId::new();
484        let transaction = Transaction {
485            id: tx_id.clone(),
486            changes: Vec::new(),
487            status: TransactionStatus::Active,
488            created_at: Utc::now(),
489            isolation_level: IsolationLevel::ReadCommitted,
490        };
491
492        self.transactions.insert(tx_id.clone(), transaction);
493
494        // Publish event
495        self.publish_event(ChangeEvent {
496            event_id: UpdateId::new(),
497            event_type: ChangeEventType::TransactionStarted,
498            entity_id: None,
499            timestamp: Utc::now(),
500            metadata: [("transaction_id".to_string(), tx_id.to_string())]
501                .into_iter()
502                .collect(),
503        })
504        .await;
505
506        Ok(tx_id)
507    }
508
509    async fn commit_transaction(&mut self, tx_id: TransactionId) -> Result<()> {
510        if let Some((_, mut tx)) = self.transactions.remove(&tx_id) {
511            tx.status = TransactionStatus::Committed;
512
513            // Publish event
514            self.publish_event(ChangeEvent {
515                event_id: UpdateId::new(),
516                event_type: ChangeEventType::TransactionCommitted,
517                entity_id: None,
518                timestamp: Utc::now(),
519                metadata: [("transaction_id".to_string(), tx_id.to_string())]
520                    .into_iter()
521                    .collect(),
522            })
523            .await;
524
525            Ok(())
526        } else {
527            Err(GraphRAGError::IncrementalUpdate {
528                message: format!("Transaction {tx_id} not found"),
529            })
530        }
531    }
532
533    async fn rollback_transaction(&mut self, tx_id: TransactionId) -> Result<()> {
534        if let Some((_, mut tx)) = self.transactions.remove(&tx_id) {
535            tx.status = TransactionStatus::Aborted;
536
537            // Rollback all changes in this transaction
538            for _change in &tx.changes {
539                // Implementation for rollback
540            }
541
542            // Publish event
543            self.publish_event(ChangeEvent {
544                event_id: UpdateId::new(),
545                event_type: ChangeEventType::TransactionRolledBack,
546                entity_id: None,
547                timestamp: Utc::now(),
548                metadata: [("transaction_id".to_string(), tx_id.to_string())]
549                    .into_iter()
550                    .collect(),
551            })
552            .await;
553
554            Ok(())
555        } else {
556            Err(GraphRAGError::IncrementalUpdate {
557                message: format!("Transaction {tx_id} not found"),
558            })
559        }
560    }
561
562    async fn batch_upsert_entities(
563        &mut self,
564        entities: Vec<Entity>,
565        _strategy: ConflictStrategy,
566    ) -> Result<Vec<UpdateId>> {
567        let mut update_ids = Vec::new();
568
569        for entity in entities {
570            let update_id = self.upsert_entity(entity).await?;
571            update_ids.push(update_id);
572        }
573
574        Ok(update_ids)
575    }
576
577    async fn batch_upsert_relationships(
578        &mut self,
579        relationships: Vec<Relationship>,
580        _strategy: ConflictStrategy,
581    ) -> Result<Vec<UpdateId>> {
582        let mut update_ids = Vec::new();
583
584        for relationship in relationships {
585            let update_id = self.upsert_relationship(relationship).await?;
586            update_ids.push(update_id);
587        }
588
589        Ok(update_ids)
590    }
591
592    async fn update_entity_embedding(
593        &mut self,
594        entity_id: &EntityId,
595        embedding: Vec<f32>,
596    ) -> Result<UpdateId> {
597        let change = self.create_change_record(
598            ChangeType::EmbeddingUpdated,
599            Operation::Update,
600            ChangeData::Embedding {
601                entity_id: entity_id.clone(),
602                embedding,
603            },
604            Some(entity_id.clone()),
605            None,
606        );
607
608        let update_id = self.apply_change_with_conflict_resolution(change).await?;
609
610        // Publish event
611        self.publish_event(ChangeEvent {
612            event_id: UpdateId::new(),
613            event_type: ChangeEventType::EmbeddingUpdated,
614            entity_id: Some(entity_id.clone()),
615            timestamp: Utc::now(),
616            metadata: HashMap::new(),
617        })
618        .await;
619
620        Ok(update_id)
621    }
622
623    async fn bulk_update_embeddings(
624        &mut self,
625        updates: Vec<(EntityId, Vec<f32>)>,
626    ) -> Result<Vec<UpdateId>> {
627        let mut update_ids = Vec::new();
628
629        for (entity_id, embedding) in updates {
630            let update_id = self.update_entity_embedding(&entity_id, embedding).await?;
631            update_ids.push(update_id);
632        }
633
634        Ok(update_ids)
635    }
636
637    async fn get_pending_transactions(&self) -> Result<Vec<TransactionId>> {
638        let pending: Vec<TransactionId> = self
639            .transactions
640            .iter()
641            .filter(|entry| entry.value().status == TransactionStatus::Active)
642            .map(|entry| entry.key().clone())
643            .collect();
644
645        Ok(pending)
646    }
647
648    async fn get_graph_statistics(&self) -> Result<GraphStatistics> {
649        let graph = self.graph.read();
650        let entities: Vec<_> = graph.entities().collect();
651        let relationships = graph.get_all_relationships();
652
653        let node_count = entities.len();
654        let edge_count = relationships.len();
655
656        // Calculate average degree
657        let total_degree: usize = entities
658            .iter()
659            .map(|entity| graph.get_neighbors(&entity.id).len())
660            .sum();
661
662        let average_degree = if node_count > 0 {
663            total_degree as f64 / node_count as f64
664        } else {
665            0.0
666        };
667
668        // Find max degree
669        let max_degree = entities
670            .iter()
671            .map(|entity| graph.get_neighbors(&entity.id).len())
672            .max()
673            .unwrap_or(0);
674
675        Ok(GraphStatistics {
676            node_count,
677            edge_count,
678            average_degree,
679            max_degree,
680            connected_components: 1,     // Simplified for now
681            clustering_coefficient: 0.0, // Would need complex calculation
682            last_updated: Utc::now(),
683        })
684    }
685
686    async fn validate_consistency(&self) -> Result<ConsistencyReport> {
687        let graph = self.graph.read();
688        let mut orphaned_entities = Vec::new();
689        let mut broken_relationships = Vec::new();
690        let mut missing_embeddings = Vec::new();
691
692        // Check for orphaned entities (entities with no relationships)
693        for entity in graph.entities() {
694            let neighbors = graph.get_neighbors(&entity.id);
695            if neighbors.is_empty() {
696                orphaned_entities.push(entity.id.clone());
697            }
698
699            // Check for missing embeddings
700            if entity.embedding.is_none() {
701                missing_embeddings.push(entity.id.clone());
702            }
703        }
704
705        // Check for broken relationships (references to non-existent entities)
706        for relationship in graph.get_all_relationships() {
707            if graph.get_entity(&relationship.source).is_none()
708                || graph.get_entity(&relationship.target).is_none()
709            {
710                broken_relationships.push((
711                    relationship.source.clone(),
712                    relationship.target.clone(),
713                    relationship.relation_type.clone(),
714                ));
715            }
716        }
717
718        let issues_found =
719            orphaned_entities.len() + broken_relationships.len() + missing_embeddings.len();
720
721        Ok(ConsistencyReport {
722            is_consistent: issues_found == 0,
723            orphaned_entities,
724            broken_relationships,
725            missing_embeddings,
726            validation_time: Utc::now(),
727            issues_found,
728        })
729    }
730}
731
732// Helper trait for extracting entity ID from ChangeData
733#[allow(dead_code)]
734trait ChangeDataExt {
735    fn get_entity_id(&self) -> Option<EntityId>;
736}
737
738impl ChangeDataExt for ChangeData {
739    fn get_entity_id(&self) -> Option<EntityId> {
740        match self {
741            ChangeData::Entity(entity) => Some(entity.id.clone()),
742            ChangeData::Embedding { entity_id, .. } => Some(entity_id.clone()),
743            _ => None,
744        }
745    }
746}