Skip to main content

graphrag_core/graph/
incremental.rs

1//! Comprehensive incremental updates architecture for GraphRAG-RS
2//!
3//! This module provides zero-downtime incremental updates with ACID-like guarantees,
4//! intelligent cache invalidation, conflict resolution, and comprehensive monitoring.
5//!
6//! ## Architecture Goals
7//!
8//! - **Zero-downtime updates**: System remains available during modifications
9//! - **Consistency guarantees**: ACID-like properties for graph operations
10//! - **Performance**: Updates should be 10x+ faster than full reconstruction
11//! - **Scalability**: Handle thousands of concurrent updates per second
12//! - **Observability**: Complete audit trail of all changes
13//!
14//! ## Key Components
15//!
16//! - `IncrementalGraphStore` trait for atomic update operations
17//! - `ChangeRecord` and `ChangeLog` for tracking modifications
18//! - `GraphDelta` for representing atomic change sets
19//! - `ConflictResolver` for handling concurrent modifications
20//! - `SelectiveInvalidation` for cache management
21//! - `UpdateMonitor` for change tracking and metrics
22//! - `IncrementalPageRank` for efficient graph algorithm updates
23
24use crate::core::{
25    DocumentId, Entity, EntityId, GraphRAGError, KnowledgeGraph, Relationship, Result, TextChunk,
26};
27use chrono::{DateTime, Utc};
28use serde::{Deserialize, Serialize};
29use std::collections::{HashMap, HashSet};
30use std::time::{Duration, Instant};
31
32#[cfg(feature = "incremental")]
33use std::sync::Arc;
34
35#[cfg(feature = "incremental")]
36use {
37    dashmap::DashMap,
38    parking_lot::{Mutex, RwLock},
39    tokio::sync::{broadcast, Semaphore},
40    uuid::Uuid,
41};
42
43// ============================================================================
44// Core Types and Enums
45// ============================================================================
46
47/// Unique identifier for update operations
48#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
49pub struct UpdateId(String);
50
51impl UpdateId {
52    /// Creates a new unique update identifier
53    pub fn new() -> Self {
54        #[cfg(feature = "incremental")]
55        {
56            Self(Uuid::new_v4().to_string())
57        }
58        #[cfg(not(feature = "incremental"))]
59        {
60            Self(format!(
61                "update_{}",
62                Utc::now().timestamp_nanos_opt().unwrap_or(0)
63            ))
64        }
65    }
66
67    /// Creates an update identifier from an existing string
68    pub fn from_string(id: String) -> Self {
69        Self(id)
70    }
71
72    /// Returns the update ID as a string slice
73    pub fn as_str(&self) -> &str {
74        &self.0
75    }
76}
77
78impl std::fmt::Display for UpdateId {
79    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
80        write!(f, "{}", self.0)
81    }
82}
83
84impl Default for UpdateId {
85    fn default() -> Self {
86        Self::new()
87    }
88}
89
90/// Change record for tracking individual modifications
91#[derive(Debug, Clone, Serialize, Deserialize)]
92pub struct ChangeRecord {
93    /// Unique identifier for this change
94    pub change_id: UpdateId,
95    /// Timestamp when the change occurred
96    pub timestamp: DateTime<Utc>,
97    /// Type of change performed
98    pub change_type: ChangeType,
99    /// Optional entity ID affected by this change
100    pub entity_id: Option<EntityId>,
101    /// Optional document ID affected by this change
102    pub document_id: Option<DocumentId>,
103    /// Operation type (insert, update, delete, upsert)
104    pub operation: Operation,
105    /// Data associated with the change
106    pub data: ChangeData,
107    /// Additional metadata for the change
108    pub metadata: HashMap<String, String>,
109}
110
111/// Types of changes that can occur
112#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
113pub enum ChangeType {
114    /// An entity was added to the graph
115    EntityAdded,
116    /// An existing entity was updated
117    EntityUpdated,
118    /// An entity was removed from the graph
119    EntityRemoved,
120    /// A relationship was added to the graph
121    RelationshipAdded,
122    /// An existing relationship was updated
123    RelationshipUpdated,
124    /// A relationship was removed from the graph
125    RelationshipRemoved,
126    /// A document was added
127    DocumentAdded,
128    /// An existing document was updated
129    DocumentUpdated,
130    /// A document was removed
131    DocumentRemoved,
132    /// A text chunk was added
133    ChunkAdded,
134    /// An existing text chunk was updated
135    ChunkUpdated,
136    /// A text chunk was removed
137    ChunkRemoved,
138    /// An embedding was added
139    EmbeddingAdded,
140    /// An existing embedding was updated
141    EmbeddingUpdated,
142    /// An embedding was removed
143    EmbeddingRemoved,
144}
145
146/// Operations that can be performed
147#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
148pub enum Operation {
149    /// Insert a new item
150    Insert,
151    /// Update an existing item
152    Update,
153    /// Delete an item
154    Delete,
155    /// Insert or update (upsert) an item
156    Upsert,
157}
158
159/// Data associated with a change
160#[derive(Debug, Clone, Serialize, Deserialize)]
161pub enum ChangeData {
162    /// Entity data
163    Entity(Entity),
164    /// Relationship data
165    Relationship(Relationship),
166    /// Document data
167    Document(Document),
168    /// Text chunk data
169    Chunk(TextChunk),
170    /// Embedding data with entity ID and vector
171    Embedding {
172        /// Entity ID for the embedding
173        entity_id: EntityId,
174        /// Embedding vector
175        embedding: Vec<f32>,
176    },
177    /// Empty change data placeholder
178    Empty,
179}
180
181/// Document type for incremental updates
182#[derive(Debug, Clone, Serialize, Deserialize)]
183pub struct Document {
184    /// Unique identifier for the document
185    pub id: DocumentId,
186    /// Document title
187    pub title: String,
188    /// Document content
189    pub content: String,
190    /// Additional metadata
191    pub metadata: HashMap<String, String>,
192}
193
194/// Atomic change set representing a transaction
195#[derive(Debug, Clone, Serialize, Deserialize)]
196pub struct GraphDelta {
197    /// Unique identifier for this delta
198    pub delta_id: UpdateId,
199    /// Timestamp when the delta was created
200    pub timestamp: DateTime<Utc>,
201    /// List of changes in this delta
202    pub changes: Vec<ChangeRecord>,
203    /// Delta IDs that this delta depends on
204    pub dependencies: Vec<UpdateId>,
205    /// Current status of the delta
206    pub status: DeltaStatus,
207    /// Data needed to rollback this delta
208    pub rollback_data: Option<RollbackData>,
209}
210
211/// Status of a delta operation
212#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
213pub enum DeltaStatus {
214    /// Delta is pending application
215    Pending,
216    /// Delta has been applied but not committed
217    Applied,
218    /// Delta has been committed
219    Committed,
220    /// Delta has been rolled back
221    RolledBack,
222    /// Delta failed with error message
223    Failed {
224        /// Error message describing the failure
225        error: String
226    },
227}
228
229/// Data needed for rollback operations
230#[derive(Debug, Clone, Serialize, Deserialize)]
231pub struct RollbackData {
232    /// Previous state of entities before the change
233    pub previous_entities: Vec<Entity>,
234    /// Previous state of relationships before the change
235    pub previous_relationships: Vec<Relationship>,
236    /// Cache keys affected by the change
237    pub affected_caches: Vec<String>,
238}
239
240/// Conflict resolution strategies
241#[derive(Debug, Clone, Serialize, Deserialize)]
242pub enum ConflictStrategy {
243    /// Keep the existing data, discard new changes
244    KeepExisting,
245    /// Keep the new data, discard existing
246    KeepNew,
247    /// Merge existing and new data intelligently
248    Merge,
249    /// Use LLM to decide how to resolve conflict
250    LLMDecision,
251    /// Prompt user to resolve conflict
252    UserPrompt,
253    /// Use a custom resolver by name
254    Custom(String),
255}
256
257/// Conflict detected during update
258#[derive(Debug, Clone, Serialize, Deserialize)]
259pub struct Conflict {
260    /// Unique identifier for this conflict
261    pub conflict_id: UpdateId,
262    /// Type of conflict detected
263    pub conflict_type: ConflictType,
264    /// Existing data in the graph
265    pub existing_data: ChangeData,
266    /// New data attempting to be applied
267    pub new_data: ChangeData,
268    /// Resolution if already resolved
269    pub resolution: Option<ConflictResolution>,
270}
271
272/// Types of conflicts
273#[derive(Debug, Clone, Serialize, Deserialize)]
274pub enum ConflictType {
275    /// Entity already exists with different data
276    EntityExists,
277    /// Relationship already exists with different data
278    RelationshipExists,
279    /// Version mismatch between expected and actual
280    VersionMismatch,
281    /// Data is inconsistent with graph state
282    DataInconsistency,
283    /// Change violates a constraint
284    ConstraintViolation,
285}
286
287/// Resolution for a conflict
288#[derive(Debug, Clone, Serialize, Deserialize)]
289pub struct ConflictResolution {
290    /// Strategy used to resolve the conflict
291    pub strategy: ConflictStrategy,
292    /// Resolved data after applying strategy
293    pub resolved_data: ChangeData,
294    /// Metadata about the resolution
295    pub metadata: HashMap<String, String>,
296}
297
298// ============================================================================
299// IncrementalGraphStore Trait
300// ============================================================================
301
302/// Extended trait for incremental graph operations with production-ready features
303#[async_trait::async_trait]
304pub trait IncrementalGraphStore: Send + Sync {
305    /// The error type for incremental graph operations
306    type Error: std::error::Error + Send + Sync + 'static;
307
308    /// Upsert an entity (insert or update)
309    async fn upsert_entity(&mut self, entity: Entity) -> Result<UpdateId>;
310
311    /// Upsert a relationship
312    async fn upsert_relationship(&mut self, relationship: Relationship) -> Result<UpdateId>;
313
314    /// Delete an entity and its relationships
315    async fn delete_entity(&mut self, entity_id: &EntityId) -> Result<UpdateId>;
316
317    /// Delete a relationship
318    async fn delete_relationship(
319        &mut self,
320        source: &EntityId,
321        target: &EntityId,
322        relation_type: &str,
323    ) -> Result<UpdateId>;
324
325    /// Apply a batch of changes atomically
326    async fn apply_delta(&mut self, delta: GraphDelta) -> Result<UpdateId>;
327
328    /// Rollback a delta
329    async fn rollback_delta(&mut self, delta_id: &UpdateId) -> Result<()>;
330
331    /// Get change history
332    async fn get_change_log(&self, since: Option<DateTime<Utc>>) -> Result<Vec<ChangeRecord>>;
333
334    /// Start a transaction for atomic operations
335    async fn begin_transaction(&mut self) -> Result<TransactionId>;
336
337    /// Commit a transaction
338    async fn commit_transaction(&mut self, tx_id: TransactionId) -> Result<()>;
339
340    /// Rollback a transaction
341    async fn rollback_transaction(&mut self, tx_id: TransactionId) -> Result<()>;
342
343    /// Batch upsert entities with conflict resolution
344    async fn batch_upsert_entities(
345        &mut self,
346        entities: Vec<Entity>,
347        _strategy: ConflictStrategy,
348    ) -> Result<Vec<UpdateId>>;
349
350    /// Batch upsert relationships with conflict resolution
351    async fn batch_upsert_relationships(
352        &mut self,
353        relationships: Vec<Relationship>,
354        _strategy: ConflictStrategy,
355    ) -> Result<Vec<UpdateId>>;
356
357    /// Update entity embeddings incrementally
358    async fn update_entity_embedding(
359        &mut self,
360        entity_id: &EntityId,
361        embedding: Vec<f32>,
362    ) -> Result<UpdateId>;
363
364    /// Bulk update embeddings for performance
365    async fn bulk_update_embeddings(
366        &mut self,
367        updates: Vec<(EntityId, Vec<f32>)>,
368    ) -> Result<Vec<UpdateId>>;
369
370    /// Get pending transactions
371    async fn get_pending_transactions(&self) -> Result<Vec<TransactionId>>;
372
373    /// Get graph statistics
374    async fn get_graph_statistics(&self) -> Result<GraphStatistics>;
375
376    /// Validate graph consistency
377    async fn validate_consistency(&self) -> Result<ConsistencyReport>;
378}
379
380/// Transaction identifier for atomic operations
381#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
382pub struct TransactionId(String);
383
384impl TransactionId {
385    /// Creates a new unique transaction identifier
386    pub fn new() -> Self {
387        #[cfg(feature = "incremental")]
388        {
389            Self(Uuid::new_v4().to_string())
390        }
391        #[cfg(not(feature = "incremental"))]
392        {
393            Self(format!(
394                "tx_{}",
395                Utc::now().timestamp_nanos_opt().unwrap_or(0)
396            ))
397        }
398    }
399
400    /// Returns the transaction ID as a string slice
401    pub fn as_str(&self) -> &str {
402        &self.0
403    }
404}
405
406impl std::fmt::Display for TransactionId {
407    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
408        write!(f, "{}", self.0)
409    }
410}
411
412impl Default for TransactionId {
413    fn default() -> Self {
414        Self::new()
415    }
416}
417
418/// Graph statistics for monitoring
419#[derive(Debug, Clone, Serialize, Deserialize)]
420pub struct GraphStatistics {
421    /// Total number of nodes (entities)
422    pub node_count: usize,
423    /// Total number of edges (relationships)
424    pub edge_count: usize,
425    /// Average degree of nodes
426    pub average_degree: f64,
427    /// Maximum degree of any node
428    pub max_degree: usize,
429    /// Number of connected components
430    pub connected_components: usize,
431    /// Clustering coefficient
432    pub clustering_coefficient: f64,
433    /// When statistics were last updated
434    pub last_updated: DateTime<Utc>,
435}
436
437/// Consistency validation report
438#[derive(Debug, Clone, Serialize, Deserialize)]
439pub struct ConsistencyReport {
440    /// Whether the graph is consistent
441    pub is_consistent: bool,
442    /// Entities with no relationships
443    pub orphaned_entities: Vec<EntityId>,
444    /// Relationships referencing non-existent entities
445    pub broken_relationships: Vec<(EntityId, EntityId, String)>,
446    /// Entities missing embeddings
447    pub missing_embeddings: Vec<EntityId>,
448    /// When validation was performed
449    pub validation_time: DateTime<Utc>,
450    /// Total number of issues found
451    pub issues_found: usize,
452}
453
454// ============================================================================
455// Cache Management
456// ============================================================================
457
458/// Cache invalidation strategies
459#[derive(Debug, Clone)]
460pub enum InvalidationStrategy {
461    /// Invalidate specific cache keys
462    Selective(Vec<String>),
463    /// Invalidate all caches in a region
464    Regional(String),
465    /// Invalidate all caches
466    Global,
467    /// Invalidate based on entity relationships
468    Relational(EntityId, u32), // entity_id, depth
469}
470
471/// Cache region affected by changes
472#[derive(Debug, Clone)]
473pub struct CacheRegion {
474    /// Unique identifier for the cache region
475    pub region_id: String,
476    /// Entity IDs in this region
477    pub entity_ids: HashSet<EntityId>,
478    /// Relationship types in this region
479    pub relationship_types: HashSet<String>,
480    /// Document IDs in this region
481    pub document_ids: HashSet<DocumentId>,
482    /// When the region was last modified
483    pub last_modified: DateTime<Utc>,
484}
485
486/// Selective cache invalidation manager
487#[cfg(feature = "incremental")]
488pub struct SelectiveInvalidation {
489    cache_regions: DashMap<String, CacheRegion>,
490    entity_to_regions: DashMap<EntityId, HashSet<String>>,
491    invalidation_log: Mutex<Vec<(DateTime<Utc>, InvalidationStrategy)>>,
492}
493
494#[cfg(feature = "incremental")]
495impl Default for SelectiveInvalidation {
496    fn default() -> Self {
497        Self::new()
498    }
499}
500
501#[cfg(feature = "incremental")]
502impl SelectiveInvalidation {
503    /// Creates a new selective invalidation manager
504    pub fn new() -> Self {
505        Self {
506            cache_regions: DashMap::new(),
507            entity_to_regions: DashMap::new(),
508            invalidation_log: Mutex::new(Vec::new()),
509        }
510    }
511
512    /// Registers a cache region for invalidation tracking
513    pub fn register_cache_region(&self, region: CacheRegion) {
514        let region_id = region.region_id.clone();
515
516        // Update entity mappings
517        for entity_id in &region.entity_ids {
518            self.entity_to_regions
519                .entry(entity_id.clone())
520                .or_default()
521                .insert(region_id.clone());
522        }
523
524        self.cache_regions.insert(region_id, region);
525    }
526
527    /// Determines invalidation strategies for a set of changes
528    pub fn invalidate_for_changes(&self, changes: &[ChangeRecord]) -> Vec<InvalidationStrategy> {
529        let mut strategies = Vec::new();
530        let mut affected_regions = HashSet::new();
531
532        for change in changes {
533            match &change.change_type {
534                ChangeType::EntityAdded | ChangeType::EntityUpdated | ChangeType::EntityRemoved => {
535                    if let Some(entity_id) = &change.entity_id {
536                        if let Some(regions) = self.entity_to_regions.get(entity_id) {
537                            affected_regions.extend(regions.clone());
538                        }
539                        strategies.push(InvalidationStrategy::Relational(entity_id.clone(), 2));
540                    }
541                }
542                ChangeType::RelationshipAdded
543                | ChangeType::RelationshipUpdated
544                | ChangeType::RelationshipRemoved => {
545                    // Invalidate based on relationship endpoints
546                    if let ChangeData::Relationship(rel) = &change.data {
547                        strategies.push(InvalidationStrategy::Relational(rel.source.clone(), 1));
548                        strategies.push(InvalidationStrategy::Relational(rel.target.clone(), 1));
549                    }
550                }
551                _ => {
552                    // For other changes, use selective invalidation
553                    let cache_keys = self.generate_cache_keys_for_change(change);
554                    if !cache_keys.is_empty() {
555                        strategies.push(InvalidationStrategy::Selective(cache_keys));
556                    }
557                }
558            }
559        }
560
561        // Add regional invalidation for affected regions
562        for region_id in affected_regions {
563            strategies.push(InvalidationStrategy::Regional(region_id));
564        }
565
566        // Log invalidation
567        let mut log = self.invalidation_log.lock();
568        for strategy in &strategies {
569            log.push((Utc::now(), strategy.clone()));
570        }
571
572        strategies
573    }
574
575    fn generate_cache_keys_for_change(&self, change: &ChangeRecord) -> Vec<String> {
576        let mut keys = Vec::new();
577
578        // Generate cache keys based on change type and data
579        match &change.change_type {
580            ChangeType::EntityAdded | ChangeType::EntityUpdated => {
581                if let Some(entity_id) = &change.entity_id {
582                    keys.push(format!("entity:{entity_id}"));
583                    keys.push(format!("entity_neighbors:{entity_id}"));
584                }
585            }
586            ChangeType::DocumentAdded | ChangeType::DocumentUpdated => {
587                if let Some(doc_id) = &change.document_id {
588                    keys.push(format!("document:{doc_id}"));
589                    keys.push(format!("document_chunks:{doc_id}"));
590                }
591            }
592            ChangeType::EmbeddingAdded | ChangeType::EmbeddingUpdated => {
593                if let Some(entity_id) = &change.entity_id {
594                    keys.push(format!("embedding:{entity_id}"));
595                    keys.push(format!("similarity:{entity_id}"));
596                }
597            }
598            _ => {}
599        }
600
601        keys
602    }
603
604    /// Gets statistics about cache invalidations
605    pub fn get_invalidation_stats(&self) -> InvalidationStats {
606        let log = self.invalidation_log.lock();
607
608        InvalidationStats {
609            total_invalidations: log.len(),
610            cache_regions: self.cache_regions.len(),
611            entity_mappings: self.entity_to_regions.len(),
612            last_invalidation: log.last().map(|(time, _)| *time),
613        }
614    }
615}
616
617/// Statistics about cache invalidations
618#[derive(Debug, Clone)]
619pub struct InvalidationStats {
620    /// Total number of invalidations performed
621    pub total_invalidations: usize,
622    /// Number of cache regions registered
623    pub cache_regions: usize,
624    /// Number of entity-to-region mappings
625    pub entity_mappings: usize,
626    /// Timestamp of last invalidation
627    pub last_invalidation: Option<DateTime<Utc>>,
628}
629
630// ============================================================================
631// Conflict Resolution
632// ============================================================================
633
634/// Conflict resolver with multiple strategies
635pub struct ConflictResolver {
636    strategy: ConflictStrategy,
637    custom_resolvers: HashMap<String, ConflictResolverFn>,
638}
639
640// Reduce type complexity for custom resolver function type
641type ConflictResolverFn = Box<dyn Fn(&Conflict) -> Result<ConflictResolution> + Send + Sync>;
642
643impl ConflictResolver {
644    /// Creates a new conflict resolver with the given strategy
645    pub fn new(strategy: ConflictStrategy) -> Self {
646        Self {
647            strategy,
648            custom_resolvers: HashMap::new(),
649        }
650    }
651
652    /// Adds a custom resolver function by name
653    pub fn with_custom_resolver<F>(mut self, name: String, resolver: F) -> Self
654    where
655        F: Fn(&Conflict) -> Result<ConflictResolution> + Send + Sync + 'static,
656    {
657        self.custom_resolvers.insert(name, Box::new(resolver));
658        self
659    }
660
661    /// Resolves a conflict using the configured strategy
662    pub async fn resolve_conflict(&self, conflict: &Conflict) -> Result<ConflictResolution> {
663        match &self.strategy {
664            ConflictStrategy::KeepExisting => Ok(ConflictResolution {
665                strategy: ConflictStrategy::KeepExisting,
666                resolved_data: conflict.existing_data.clone(),
667                metadata: HashMap::new(),
668            }),
669            ConflictStrategy::KeepNew => Ok(ConflictResolution {
670                strategy: ConflictStrategy::KeepNew,
671                resolved_data: conflict.new_data.clone(),
672                metadata: HashMap::new(),
673            }),
674            ConflictStrategy::Merge => self.merge_conflict_data(conflict).await,
675            ConflictStrategy::Custom(resolver_name) => {
676                if let Some(resolver) = self.custom_resolvers.get(resolver_name) {
677                    resolver(conflict)
678                } else {
679                    Err(GraphRAGError::ConflictResolution {
680                        message: format!("Custom resolver '{resolver_name}' not found"),
681                    })
682                }
683            }
684            _ => Err(GraphRAGError::ConflictResolution {
685                message: "Conflict resolution strategy not implemented".to_string(),
686            }),
687        }
688    }
689
690    async fn merge_conflict_data(&self, conflict: &Conflict) -> Result<ConflictResolution> {
691        match (&conflict.existing_data, &conflict.new_data) {
692            (ChangeData::Entity(existing), ChangeData::Entity(new)) => {
693                let merged = self.merge_entities(existing, new)?;
694                Ok(ConflictResolution {
695                    strategy: ConflictStrategy::Merge,
696                    resolved_data: ChangeData::Entity(merged),
697                    metadata: [("merge_strategy".to_string(), "entity_merge".to_string())]
698                        .into_iter()
699                        .collect(),
700                })
701            }
702            (ChangeData::Relationship(existing), ChangeData::Relationship(new)) => {
703                let merged = self.merge_relationships(existing, new)?;
704                Ok(ConflictResolution {
705                    strategy: ConflictStrategy::Merge,
706                    resolved_data: ChangeData::Relationship(merged),
707                    metadata: [(
708                        "merge_strategy".to_string(),
709                        "relationship_merge".to_string(),
710                    )]
711                    .into_iter()
712                    .collect(),
713                })
714            }
715            _ => Err(GraphRAGError::ConflictResolution {
716                message: "Cannot merge incompatible data types".to_string(),
717            }),
718        }
719    }
720
721    fn merge_entities(&self, existing: &Entity, new: &Entity) -> Result<Entity> {
722        let mut merged = existing.clone();
723
724        // Use higher confidence
725        if new.confidence > existing.confidence {
726            merged.confidence = new.confidence;
727            merged.name = new.name.clone();
728            merged.entity_type = new.entity_type.clone();
729        }
730
731        // Merge mentions
732        let mut all_mentions = existing.mentions.clone();
733        for new_mention in &new.mentions {
734            if !all_mentions.iter().any(|m| {
735                m.chunk_id == new_mention.chunk_id && m.start_offset == new_mention.start_offset
736            }) {
737                all_mentions.push(new_mention.clone());
738            }
739        }
740        merged.mentions = all_mentions;
741
742        // Prefer new embedding if available
743        if new.embedding.is_some() {
744            merged.embedding = new.embedding.clone();
745        }
746
747        Ok(merged)
748    }
749
750    fn merge_relationships(
751        &self,
752        existing: &Relationship,
753        new: &Relationship,
754    ) -> Result<Relationship> {
755        let mut merged = existing.clone();
756
757        // Use higher confidence
758        if new.confidence > existing.confidence {
759            merged.confidence = new.confidence;
760            merged.relation_type = new.relation_type.clone();
761        }
762
763        // Merge contexts
764        let mut all_contexts = existing.context.clone();
765        for new_context in &new.context {
766            if !all_contexts.contains(new_context) {
767                all_contexts.push(new_context.clone());
768            }
769        }
770        merged.context = all_contexts;
771
772        Ok(merged)
773    }
774}
775
776// ============================================================================
777// Update Monitor and Metrics
778// ============================================================================
779
780/// Monitor for tracking update operations and performance
781#[cfg(feature = "incremental")]
782pub struct UpdateMonitor {
783    metrics: DashMap<String, UpdateMetric>,
784    operations_log: Mutex<Vec<OperationLog>>,
785    performance_stats: RwLock<PerformanceStats>,
786}
787
788#[cfg(feature = "incremental")]
789impl Default for UpdateMonitor {
790    fn default() -> Self {
791        Self::new()
792    }
793}
794
795/// Metric for tracking update operations
796#[derive(Debug, Clone)]
797pub struct UpdateMetric {
798    /// Name of the metric
799    pub name: String,
800    /// Metric value
801    pub value: f64,
802    /// When the metric was recorded
803    pub timestamp: DateTime<Utc>,
804    /// Tags for categorizing the metric
805    pub tags: HashMap<String, String>,
806}
807
808/// Log entry for an operation
809#[derive(Debug, Clone)]
810pub struct OperationLog {
811    /// Unique operation identifier
812    pub operation_id: UpdateId,
813    /// Type of operation performed
814    pub operation_type: String,
815    /// When the operation started
816    pub start_time: Instant,
817    /// When the operation ended
818    pub end_time: Option<Instant>,
819    /// Whether the operation succeeded
820    pub success: Option<bool>,
821    /// Error message if failed
822    pub error_message: Option<String>,
823    /// Number of entities affected
824    pub affected_entities: usize,
825    /// Number of relationships affected
826    pub affected_relationships: usize,
827}
828
829/// Performance statistics for monitoring
830#[derive(Debug, Clone)]
831pub struct PerformanceStats {
832    /// Total number of operations performed
833    pub total_operations: u64,
834    /// Number of successful operations
835    pub successful_operations: u64,
836    /// Number of failed operations
837    pub failed_operations: u64,
838    /// Average time per operation
839    pub average_operation_time: Duration,
840    /// Peak throughput in operations per second
841    pub peak_operations_per_second: f64,
842    /// Cache hit rate (0.0 to 1.0)
843    pub cache_hit_rate: f64,
844    /// Conflict resolution rate (0.0 to 1.0)
845    pub conflict_resolution_rate: f64,
846}
847
848#[cfg(feature = "incremental")]
849impl UpdateMonitor {
850    /// Creates a new update monitor
851    pub fn new() -> Self {
852        Self {
853            metrics: DashMap::new(),
854            operations_log: Mutex::new(Vec::new()),
855            performance_stats: RwLock::new(PerformanceStats {
856                total_operations: 0,
857                successful_operations: 0,
858                failed_operations: 0,
859                average_operation_time: Duration::from_millis(0),
860                peak_operations_per_second: 0.0,
861                cache_hit_rate: 0.0,
862                conflict_resolution_rate: 0.0,
863            }),
864        }
865    }
866
867    /// Starts tracking a new operation and returns its ID
868    pub fn start_operation(&self, operation_type: &str) -> UpdateId {
869        let operation_id = UpdateId::new();
870        let log_entry = OperationLog {
871            operation_id: operation_id.clone(),
872            operation_type: operation_type.to_string(),
873            start_time: Instant::now(),
874            end_time: None,
875            success: None,
876            error_message: None,
877            affected_entities: 0,
878            affected_relationships: 0,
879        };
880
881        self.operations_log.lock().push(log_entry);
882        operation_id
883    }
884
885    /// Marks an operation as complete with results
886    pub fn complete_operation(
887        &self,
888        operation_id: &UpdateId,
889        success: bool,
890        error: Option<String>,
891        affected_entities: usize,
892        affected_relationships: usize,
893    ) {
894        let mut log = self.operations_log.lock();
895        if let Some(entry) = log.iter_mut().find(|e| &e.operation_id == operation_id) {
896            entry.end_time = Some(Instant::now());
897            entry.success = Some(success);
898            entry.error_message = error;
899            entry.affected_entities = affected_entities;
900            entry.affected_relationships = affected_relationships;
901        }
902
903        // Update performance stats
904        self.update_performance_stats();
905    }
906
907    fn update_performance_stats(&self) {
908        let log = self.operations_log.lock();
909        let completed_ops: Vec<_> = log
910            .iter()
911            .filter(|op| op.end_time.is_some() && op.success.is_some())
912            .collect();
913
914        if completed_ops.is_empty() {
915            return;
916        }
917
918        let mut stats = self.performance_stats.write();
919        stats.total_operations = completed_ops.len() as u64;
920        stats.successful_operations = completed_ops
921            .iter()
922            .filter(|op| op.success == Some(true))
923            .count() as u64;
924        stats.failed_operations = stats.total_operations - stats.successful_operations;
925
926        // Calculate average operation time
927        let total_time: Duration = completed_ops
928            .iter()
929            .filter_map(|op| op.end_time.map(|end| end.duration_since(op.start_time)))
930            .sum();
931
932        if !completed_ops.is_empty() {
933            stats.average_operation_time = total_time / completed_ops.len() as u32;
934        }
935    }
936
937    /// Records a metric with tags
938    pub fn record_metric(&self, name: &str, value: f64, tags: HashMap<String, String>) {
939        let metric = UpdateMetric {
940            name: name.to_string(),
941            value,
942            timestamp: Utc::now(),
943            tags,
944        };
945        self.metrics.insert(name.to_string(), metric);
946    }
947
948    /// Gets the current performance statistics
949    pub fn get_performance_stats(&self) -> PerformanceStats {
950        self.performance_stats.read().clone()
951    }
952
953    /// Gets the most recent operations up to the specified limit
954    pub fn get_recent_operations(&self, limit: usize) -> Vec<OperationLog> {
955        let log = self.operations_log.lock();
956        log.iter().rev().take(limit).cloned().collect()
957    }
958}
959
960// ============================================================================
961// Main Incremental Graph Manager
962// ============================================================================
963
964/// Comprehensive incremental graph manager with production features
965#[cfg(feature = "incremental")]
966pub struct IncrementalGraphManager {
967    graph: Arc<RwLock<KnowledgeGraph>>,
968    change_log: DashMap<UpdateId, ChangeRecord>,
969    deltas: DashMap<UpdateId, GraphDelta>,
970    cache_invalidation: Arc<SelectiveInvalidation>,
971    conflict_resolver: Arc<ConflictResolver>,
972    monitor: Arc<UpdateMonitor>,
973    config: IncrementalConfig,
974}
975
976#[cfg(not(feature = "incremental"))]
977/// Incremental graph manager (simplified version without incremental feature)
978pub struct IncrementalGraphManager {
979    graph: KnowledgeGraph,
980    change_log: Vec<ChangeRecord>,
981    config: IncrementalConfig,
982}
983
984/// Configuration for incremental operations
985#[derive(Debug, Clone)]
986pub struct IncrementalConfig {
987    /// Maximum number of changes to keep in the log
988    pub max_change_log_size: usize,
989    /// Maximum number of changes in a single delta
990    pub max_delta_size: usize,
991    /// Default conflict resolution strategy
992    pub conflict_strategy: ConflictStrategy,
993    /// Whether to enable performance monitoring
994    pub enable_monitoring: bool,
995    /// Cache invalidation strategy name
996    pub cache_invalidation_strategy: String,
997    /// Default batch size for batch operations
998    pub batch_size: usize,
999    /// Maximum number of concurrent operations
1000    pub max_concurrent_operations: usize,
1001}
1002
1003impl Default for IncrementalConfig {
1004    fn default() -> Self {
1005        Self {
1006            max_change_log_size: 10000,
1007            max_delta_size: 1000,
1008            conflict_strategy: ConflictStrategy::Merge,
1009            enable_monitoring: true,
1010            cache_invalidation_strategy: "selective".to_string(),
1011            batch_size: 100,
1012            max_concurrent_operations: 10,
1013        }
1014    }
1015}
1016
1017#[cfg(feature = "incremental")]
1018impl IncrementalGraphManager {
1019    /// Creates a new incremental graph manager with feature-gated capabilities
1020    pub fn new(graph: KnowledgeGraph, config: IncrementalConfig) -> Self {
1021        Self {
1022            graph: Arc::new(RwLock::new(graph)),
1023            change_log: DashMap::new(),
1024            deltas: DashMap::new(),
1025            cache_invalidation: Arc::new(SelectiveInvalidation::new()),
1026            conflict_resolver: Arc::new(ConflictResolver::new(config.conflict_strategy.clone())),
1027            monitor: Arc::new(UpdateMonitor::new()),
1028            config,
1029        }
1030    }
1031
1032    /// Sets a custom conflict resolver for the manager
1033    pub fn with_conflict_resolver(mut self, resolver: ConflictResolver) -> Self {
1034        self.conflict_resolver = Arc::new(resolver);
1035        self
1036    }
1037
1038    /// Get a read-only reference to the knowledge graph
1039    pub fn graph(&self) -> Arc<RwLock<KnowledgeGraph>> {
1040        Arc::clone(&self.graph)
1041    }
1042
1043    /// Get the conflict resolver
1044    pub fn conflict_resolver(&self) -> Arc<ConflictResolver> {
1045        Arc::clone(&self.conflict_resolver)
1046    }
1047
1048    /// Get the update monitor
1049    pub fn monitor(&self) -> Arc<UpdateMonitor> {
1050        Arc::clone(&self.monitor)
1051    }
1052}
1053
1054#[cfg(not(feature = "incremental"))]
1055impl IncrementalGraphManager {
1056    /// Creates a new incremental graph manager without advanced features
1057    pub fn new(graph: KnowledgeGraph, config: IncrementalConfig) -> Self {
1058        Self {
1059            graph,
1060            change_log: Vec::new(),
1061            config,
1062        }
1063    }
1064
1065    /// Gets a reference to the knowledge graph
1066    pub fn graph(&self) -> &KnowledgeGraph {
1067        &self.graph
1068    }
1069
1070    /// Gets a mutable reference to the knowledge graph
1071    pub fn graph_mut(&mut self) -> &mut KnowledgeGraph {
1072        &mut self.graph
1073    }
1074}
1075
1076// Common implementation for both feature-gated and non-feature-gated versions
1077impl IncrementalGraphManager {
1078    /// Create a new change record
1079    pub fn create_change_record(
1080        &self,
1081        change_type: ChangeType,
1082        operation: Operation,
1083        change_data: ChangeData,
1084        entity_id: Option<EntityId>,
1085        document_id: Option<DocumentId>,
1086    ) -> ChangeRecord {
1087        ChangeRecord {
1088            change_id: UpdateId::new(),
1089            timestamp: Utc::now(),
1090            change_type,
1091            entity_id,
1092            document_id,
1093            operation,
1094            data: change_data,
1095            metadata: HashMap::new(),
1096        }
1097    }
1098
1099    /// Get configuration
1100    pub fn config(&self) -> &IncrementalConfig {
1101        &self.config
1102    }
1103
1104    /// Basic entity upsert (works without incremental feature)
1105    pub fn basic_upsert_entity(&mut self, entity: Entity) -> Result<UpdateId> {
1106        let update_id = UpdateId::new();
1107
1108        #[cfg(feature = "incremental")]
1109        {
1110            let operation_id = self.monitor.start_operation("upsert_entity");
1111            let mut graph = self.graph.write();
1112
1113            match graph.add_entity(entity.clone()) {
1114                Ok(_) => {
1115                    let ent_id = entity.id.clone();
1116                    let change = self.create_change_record(
1117                        ChangeType::EntityAdded,
1118                        Operation::Upsert,
1119                        ChangeData::Entity(entity),
1120                        Some(ent_id),
1121                        None,
1122                    );
1123                    self.change_log.insert(change.change_id.clone(), change);
1124                    self.monitor
1125                        .complete_operation(&operation_id, true, None, 1, 0);
1126                    Ok(update_id)
1127                }
1128                Err(e) => {
1129                    self.monitor.complete_operation(
1130                        &operation_id,
1131                        false,
1132                        Some(e.to_string()),
1133                        0,
1134                        0,
1135                    );
1136                    Err(e)
1137                }
1138            }
1139        }
1140
1141        #[cfg(not(feature = "incremental"))]
1142        {
1143            self.graph.add_entity(entity.clone())?;
1144            // Capture ID before moving `entity` into ChangeData
1145            let ent_id = entity.id.clone();
1146            let change = self.create_change_record(
1147                ChangeType::EntityAdded,
1148                Operation::Upsert,
1149                ChangeData::Entity(entity),
1150                Some(ent_id),
1151                None,
1152            );
1153            self.change_log.push(change);
1154            Ok(update_id)
1155        }
1156    }
1157}
1158
1159// ============================================================================
1160// Statistics and Monitoring
1161// ============================================================================
1162
1163/// Comprehensive statistics for incremental operations
1164#[derive(Debug, Clone, Serialize, Deserialize)]
1165pub struct IncrementalStatistics {
1166    /// Total number of update operations
1167    pub total_updates: usize,
1168    /// Number of successful updates
1169    pub successful_updates: usize,
1170    /// Number of failed updates
1171    pub failed_updates: usize,
1172    /// Number of entities added
1173    pub entities_added: usize,
1174    /// Number of entities updated
1175    pub entities_updated: usize,
1176    /// Number of entities removed
1177    pub entities_removed: usize,
1178    /// Number of relationships added
1179    pub relationships_added: usize,
1180    /// Number of relationships updated
1181    pub relationships_updated: usize,
1182    /// Number of relationships removed
1183    pub relationships_removed: usize,
1184    /// Number of conflicts resolved
1185    pub conflicts_resolved: usize,
1186    /// Number of cache invalidations performed
1187    pub cache_invalidations: usize,
1188    /// Average update time in milliseconds
1189    pub average_update_time_ms: f64,
1190    /// Peak updates per second achieved
1191    pub peak_updates_per_second: f64,
1192    /// Current size of the change log
1193    pub current_change_log_size: usize,
1194    /// Current number of active deltas
1195    pub current_delta_count: usize,
1196}
1197
1198impl IncrementalStatistics {
1199    /// Creates an empty statistics instance
1200    pub fn empty() -> Self {
1201        Self {
1202            total_updates: 0,
1203            successful_updates: 0,
1204            failed_updates: 0,
1205            entities_added: 0,
1206            entities_updated: 0,
1207            entities_removed: 0,
1208            relationships_added: 0,
1209            relationships_updated: 0,
1210            relationships_removed: 0,
1211            conflicts_resolved: 0,
1212            cache_invalidations: 0,
1213            average_update_time_ms: 0.0,
1214            peak_updates_per_second: 0.0,
1215            current_change_log_size: 0,
1216            current_delta_count: 0,
1217        }
1218    }
1219
1220    /// Prints statistics to stdout in a formatted way
1221    pub fn print(&self) {
1222        println!("🔄 Incremental Updates Statistics");
1223        println!("  Total updates: {}", self.total_updates);
1224        println!(
1225            "  Successful: {} ({:.1}%)",
1226            self.successful_updates,
1227            if self.total_updates > 0 {
1228                (self.successful_updates as f64 / self.total_updates as f64) * 100.0
1229            } else {
1230                0.0
1231            }
1232        );
1233        println!("  Failed: {}", self.failed_updates);
1234        println!(
1235            "  Entities: +{} ~{} -{}",
1236            self.entities_added, self.entities_updated, self.entities_removed
1237        );
1238        println!(
1239            "  Relationships: +{} ~{} -{}",
1240            self.relationships_added, self.relationships_updated, self.relationships_removed
1241        );
1242        println!("  Conflicts resolved: {}", self.conflicts_resolved);
1243        println!("  Cache invalidations: {}", self.cache_invalidations);
1244        println!("  Avg update time: {:.2}ms", self.average_update_time_ms);
1245        println!("  Peak updates/sec: {:.1}", self.peak_updates_per_second);
1246        println!("  Change log size: {}", self.current_change_log_size);
1247        println!("  Active deltas: {}", self.current_delta_count);
1248    }
1249}
1250
1251#[cfg(feature = "incremental")]
1252impl IncrementalGraphManager {
1253    /// Gets comprehensive statistics about incremental operations
1254    pub fn get_statistics(&self) -> IncrementalStatistics {
1255        let perf_stats = self.monitor.get_performance_stats();
1256        let invalidation_stats = self.cache_invalidation.get_invalidation_stats();
1257
1258        // Calculate entity/relationship statistics from change log
1259        let mut entity_stats = (0, 0, 0); // added, updated, removed
1260        let mut relationship_stats = (0, 0, 0);
1261        let conflicts_resolved = 0;
1262
1263        for change in self.change_log.iter() {
1264            match change.value().change_type {
1265                ChangeType::EntityAdded => entity_stats.0 += 1,
1266                ChangeType::EntityUpdated => entity_stats.1 += 1,
1267                ChangeType::EntityRemoved => entity_stats.2 += 1,
1268                ChangeType::RelationshipAdded => relationship_stats.0 += 1,
1269                ChangeType::RelationshipUpdated => relationship_stats.1 += 1,
1270                ChangeType::RelationshipRemoved => relationship_stats.2 += 1,
1271                _ => {}
1272            }
1273        }
1274
1275        IncrementalStatistics {
1276            total_updates: perf_stats.total_operations as usize,
1277            successful_updates: perf_stats.successful_operations as usize,
1278            failed_updates: perf_stats.failed_operations as usize,
1279            entities_added: entity_stats.0,
1280            entities_updated: entity_stats.1,
1281            entities_removed: entity_stats.2,
1282            relationships_added: relationship_stats.0,
1283            relationships_updated: relationship_stats.1,
1284            relationships_removed: relationship_stats.2,
1285            conflicts_resolved,
1286            cache_invalidations: invalidation_stats.total_invalidations,
1287            average_update_time_ms: perf_stats.average_operation_time.as_millis() as f64,
1288            peak_updates_per_second: perf_stats.peak_operations_per_second,
1289            current_change_log_size: self.change_log.len(),
1290            current_delta_count: self.deltas.len(),
1291        }
1292    }
1293}
1294
1295#[cfg(not(feature = "incremental"))]
1296impl IncrementalGraphManager {
1297    /// Gets basic statistics about incremental operations (non-feature version)
1298    pub fn get_statistics(&self) -> IncrementalStatistics {
1299        let mut stats = IncrementalStatistics::empty();
1300        stats.current_change_log_size = self.change_log.len();
1301
1302        for change in &self.change_log {
1303            match change.change_type {
1304                ChangeType::EntityAdded => stats.entities_added += 1,
1305                ChangeType::EntityUpdated => stats.entities_updated += 1,
1306                ChangeType::EntityRemoved => stats.entities_removed += 1,
1307                ChangeType::RelationshipAdded => stats.relationships_added += 1,
1308                ChangeType::RelationshipUpdated => stats.relationships_updated += 1,
1309                ChangeType::RelationshipRemoved => stats.relationships_removed += 1,
1310                _ => {}
1311            }
1312        }
1313
1314        stats.total_updates = self.change_log.len();
1315        stats.successful_updates = self.change_log.len(); // Assume all succeeded in basic mode
1316        stats
1317    }
1318}
1319
1320// ============================================================================
1321// Incremental PageRank Implementation
1322// ============================================================================
1323
1324/// Incremental PageRank calculator for efficient updates
1325#[cfg(feature = "incremental")]
1326#[allow(dead_code)]
1327pub struct IncrementalPageRank {
1328    scores: DashMap<EntityId, f64>,
1329    adjacency_changes: DashMap<EntityId, Vec<(EntityId, f64)>>, // Node -> [(neighbor, weight)]
1330    damping_factor: f64,
1331    tolerance: f64,
1332    max_iterations: usize,
1333    last_full_computation: DateTime<Utc>,
1334    incremental_threshold: usize, // Number of changes before full recomputation
1335    pending_changes: RwLock<usize>,
1336}
1337
1338#[cfg(feature = "incremental")]
1339impl IncrementalPageRank {
1340    /// Creates a new incremental PageRank calculator
1341    pub fn new(damping_factor: f64, tolerance: f64, max_iterations: usize) -> Self {
1342        Self {
1343            scores: DashMap::new(),
1344            adjacency_changes: DashMap::new(),
1345            damping_factor,
1346            tolerance,
1347            max_iterations,
1348            last_full_computation: Utc::now(),
1349            incremental_threshold: 1000,
1350            pending_changes: RwLock::new(0),
1351        }
1352    }
1353
1354    /// Update PageRank incrementally for a specific subgraph
1355    pub async fn update_incremental(
1356        &self,
1357        changed_entities: &[EntityId],
1358        graph: &KnowledgeGraph,
1359    ) -> Result<()> {
1360        let start = Instant::now();
1361
1362        // If too many changes accumulated, do full recomputation
1363        {
1364            let pending = *self.pending_changes.read();
1365            if pending > self.incremental_threshold {
1366                return self.full_recomputation(graph).await;
1367            }
1368        }
1369
1370        // Incremental update for changed entities and their neighborhoods
1371        let mut affected_entities = HashSet::new();
1372
1373        // Add changed entities and their neighbors (2-hop neighborhood)
1374        for entity_id in changed_entities {
1375            affected_entities.insert(entity_id.clone());
1376
1377            // Add direct neighbors
1378            for (neighbor, _) in graph.get_neighbors(entity_id) {
1379                affected_entities.insert(neighbor.id.clone());
1380
1381                // Add second-hop neighbors
1382                for (second_hop, _) in graph.get_neighbors(&neighbor.id) {
1383                    affected_entities.insert(second_hop.id.clone());
1384                }
1385            }
1386        }
1387
1388        // Perform localized PageRank computation
1389        self.localized_pagerank(&affected_entities, graph).await?;
1390
1391        // Reset pending changes counter
1392        *self.pending_changes.write() = 0;
1393
1394        let duration = start.elapsed();
1395        println!(
1396            "🔄 Incremental PageRank update completed in {:?} for {} entities",
1397            duration,
1398            affected_entities.len()
1399        );
1400
1401        Ok(())
1402    }
1403
1404    /// Perform full PageRank recomputation
1405    async fn full_recomputation(&self, graph: &KnowledgeGraph) -> Result<()> {
1406        let start = Instant::now();
1407
1408        // Build adjacency matrix
1409        let entities: Vec<EntityId> = graph.entities().map(|e| e.id.clone()).collect();
1410        let n = entities.len();
1411
1412        if n == 0 {
1413            return Ok(());
1414        }
1415
1416        // Initialize scores
1417        let initial_score = 1.0 / n as f64;
1418        for entity_id in &entities {
1419            self.scores.insert(entity_id.clone(), initial_score);
1420        }
1421
1422        // Power iteration
1423        for iteration in 0..self.max_iterations {
1424            let mut new_scores = HashMap::new();
1425            let mut max_diff: f64 = 0.0;
1426
1427            for entity_id in &entities {
1428                let mut score = (1.0 - self.damping_factor) / n as f64;
1429
1430                // Sum contributions from incoming links
1431                for other_entity in &entities {
1432                    if let Some(weight) = self.get_edge_weight(other_entity, entity_id, graph) {
1433                        let other_score = self
1434                            .scores
1435                            .get(other_entity)
1436                            .map(|s| *s.value())
1437                            .unwrap_or(initial_score);
1438                        let out_degree = self.get_out_degree(other_entity, graph);
1439
1440                        if out_degree > 0.0 {
1441                            score += self.damping_factor * other_score * weight / out_degree;
1442                        }
1443                    }
1444                }
1445
1446                let old_score = self
1447                    .scores
1448                    .get(entity_id)
1449                    .map(|s| *s.value())
1450                    .unwrap_or(initial_score);
1451                let diff = (score - old_score).abs();
1452                max_diff = max_diff.max(diff);
1453
1454                new_scores.insert(entity_id.clone(), score);
1455            }
1456
1457            // Update scores
1458            for (entity_id, score) in new_scores {
1459                self.scores.insert(entity_id, score);
1460            }
1461
1462            // Check convergence
1463            if max_diff < self.tolerance {
1464                println!(
1465                    "🎯 PageRank converged after {} iterations (diff: {:.6})",
1466                    iteration + 1,
1467                    max_diff
1468                );
1469                break;
1470            }
1471        }
1472
1473        let duration = start.elapsed();
1474        println!(
1475            "🔄 Full PageRank recomputation completed in {duration:?} for {n} entities"
1476        );
1477
1478        Ok(())
1479    }
1480
1481    /// Perform localized PageRank computation for a subset of entities
1482    async fn localized_pagerank(
1483        &self,
1484        entities: &HashSet<EntityId>,
1485        graph: &KnowledgeGraph,
1486    ) -> Result<()> {
1487        let entity_vec: Vec<EntityId> = entities.iter().cloned().collect();
1488        let n = entity_vec.len();
1489
1490        if n == 0 {
1491            return Ok(());
1492        }
1493
1494        // Localized power iteration
1495        for _iteration in 0..self.max_iterations {
1496            let mut max_diff: f64 = 0.0;
1497
1498            for entity_id in &entity_vec {
1499                let mut score = (1.0 - self.damping_factor) / n as f64;
1500
1501                // Only consider links within the subset for localized computation
1502                for other_entity in &entity_vec {
1503                    if let Some(weight) = self.get_edge_weight(other_entity, entity_id, graph) {
1504                        let other_score = self
1505                            .scores
1506                            .get(other_entity)
1507                            .map(|s| *s.value())
1508                            .unwrap_or(1.0 / n as f64);
1509                        let out_degree =
1510                            self.get_localized_out_degree(other_entity, entities, graph);
1511
1512                        if out_degree > 0.0 {
1513                            score += self.damping_factor * other_score * weight / out_degree;
1514                        }
1515                    }
1516                }
1517
1518                let old_score = self
1519                    .scores
1520                    .get(entity_id)
1521                    .map(|s| *s.value())
1522                    .unwrap_or(1.0 / n as f64);
1523                let diff = (score - old_score).abs();
1524                max_diff = max_diff.max(diff);
1525
1526                self.scores.insert(entity_id.clone(), score);
1527            }
1528
1529            // Check convergence
1530            if max_diff < self.tolerance {
1531                break;
1532            }
1533        }
1534
1535        Ok(())
1536    }
1537
1538    fn get_edge_weight(
1539        &self,
1540        from: &EntityId,
1541        to: &EntityId,
1542        graph: &KnowledgeGraph,
1543    ) -> Option<f64> {
1544        // Check if there's a relationship between entities
1545        for (neighbor, relationship) in graph.get_neighbors(from) {
1546            if neighbor.id == *to {
1547                return Some(relationship.confidence as f64);
1548            }
1549        }
1550        None
1551    }
1552
1553    fn get_out_degree(&self, entity_id: &EntityId, graph: &KnowledgeGraph) -> f64 {
1554        graph
1555            .get_neighbors(entity_id)
1556            .iter()
1557            .map(|(_, rel)| rel.confidence as f64)
1558            .sum()
1559    }
1560
1561    fn get_localized_out_degree(
1562        &self,
1563        entity_id: &EntityId,
1564        subset: &HashSet<EntityId>,
1565        graph: &KnowledgeGraph,
1566    ) -> f64 {
1567        graph
1568            .get_neighbors(entity_id)
1569            .iter()
1570            .filter(|(neighbor, _)| subset.contains(&neighbor.id))
1571            .map(|(_, rel)| rel.confidence as f64)
1572            .sum()
1573    }
1574
1575    /// Get PageRank score for an entity
1576    pub fn get_score(&self, entity_id: &EntityId) -> Option<f64> {
1577        self.scores.get(entity_id).map(|s| *s.value())
1578    }
1579
1580    /// Get top-k entities by PageRank score
1581    pub fn get_top_entities(&self, k: usize) -> Vec<(EntityId, f64)> {
1582        let mut entities: Vec<(EntityId, f64)> = self
1583            .scores
1584            .iter()
1585            .map(|entry| (entry.key().clone(), *entry.value()))
1586            .collect();
1587
1588        entities.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
1589        entities.truncate(k);
1590        entities
1591    }
1592
1593    /// Record a graph change for incremental updates
1594    pub fn record_change(&self, _entity_id: EntityId) {
1595        *self.pending_changes.write() += 1;
1596    }
1597}
1598
1599// ============================================================================
1600// Batch Processing System
1601// ============================================================================
1602
1603/// High-throughput batch processor for incremental updates
1604#[cfg(feature = "incremental")]
1605pub struct BatchProcessor {
1606    batch_size: usize,
1607    max_wait_time: Duration,
1608    pending_batches: DashMap<String, PendingBatch>,
1609    processing_semaphore: Semaphore,
1610    metrics: RwLock<BatchMetrics>,
1611}
1612
1613#[derive(Debug, Clone)]
1614#[allow(dead_code)]
1615struct PendingBatch {
1616    changes: Vec<ChangeRecord>,
1617    created_at: Instant,
1618    batch_id: String,
1619}
1620
1621/// Batch metrics for monitoring
1622#[derive(Debug, Clone)]
1623pub struct BatchMetrics {
1624    /// Total number of batches processed
1625    pub total_batches_processed: u64,
1626    /// Total number of changes processed across all batches
1627    pub total_changes_processed: u64,
1628    /// Average size of batches
1629    pub average_batch_size: f64,
1630    /// Average time to process a batch
1631    pub average_processing_time: Duration,
1632    /// Throughput in changes per second
1633    pub throughput_per_second: f64,
1634    /// Timestamp of last batch processed
1635    pub last_batch_processed: Option<DateTime<Utc>>,
1636}
1637
1638#[cfg(feature = "incremental")]
1639impl BatchProcessor {
1640    /// Creates a new batch processor with specified configuration
1641    pub fn new(batch_size: usize, max_wait_time: Duration, max_concurrent_batches: usize) -> Self {
1642        Self {
1643            batch_size,
1644            max_wait_time,
1645            pending_batches: DashMap::new(),
1646            processing_semaphore: Semaphore::new(max_concurrent_batches),
1647            metrics: RwLock::new(BatchMetrics {
1648                total_batches_processed: 0,
1649                total_changes_processed: 0,
1650                average_batch_size: 0.0,
1651                average_processing_time: Duration::from_millis(0),
1652                throughput_per_second: 0.0,
1653                last_batch_processed: None,
1654            }),
1655        }
1656    }
1657
1658    /// Adds a change to be processed in batches
1659    pub async fn add_change(&self, change: ChangeRecord) -> Result<String> {
1660        let batch_key = self.get_batch_key(&change);
1661
1662        let batch_id = {
1663            let mut entry = self
1664                .pending_batches
1665                .entry(batch_key.clone())
1666                .or_insert_with(|| PendingBatch {
1667                    changes: Vec::new(),
1668                    created_at: Instant::now(),
1669                    batch_id: format!("batch_{}", Uuid::new_v4()),
1670                });
1671
1672            entry.changes.push(change);
1673            let should_process = entry.changes.len() >= self.batch_size
1674                || entry.created_at.elapsed() > self.max_wait_time;
1675
1676            let batch_id = entry.batch_id.clone();
1677
1678            if should_process {
1679                // Move batch out for processing
1680                let batch = entry.clone();
1681                self.pending_batches.remove(&batch_key);
1682
1683                // Process batch asynchronously
1684                let processor = Arc::new(self.clone());
1685                tokio::spawn(async move {
1686                    if let Err(e) = processor.process_batch(batch).await {
1687                        eprintln!("Batch processing error: {e}");
1688                    }
1689                });
1690            }
1691
1692            batch_id
1693        };
1694
1695        Ok(batch_id)
1696    }
1697
1698    async fn process_batch(&self, batch: PendingBatch) -> Result<()> {
1699        let _permit = self.processing_semaphore.acquire().await.map_err(|_| {
1700            GraphRAGError::IncrementalUpdate {
1701                message: "Failed to acquire processing permit".to_string(),
1702            }
1703        })?;
1704
1705        let start = Instant::now();
1706
1707        // Group changes by type for optimized processing
1708        let mut entity_changes = Vec::new();
1709        let mut relationship_changes = Vec::new();
1710        let mut embedding_changes = Vec::new();
1711
1712        for change in &batch.changes {
1713            match &change.change_type {
1714                ChangeType::EntityAdded | ChangeType::EntityUpdated | ChangeType::EntityRemoved => {
1715                    entity_changes.push(change);
1716                }
1717                ChangeType::RelationshipAdded
1718                | ChangeType::RelationshipUpdated
1719                | ChangeType::RelationshipRemoved => {
1720                    relationship_changes.push(change);
1721                }
1722                ChangeType::EmbeddingAdded
1723                | ChangeType::EmbeddingUpdated
1724                | ChangeType::EmbeddingRemoved => {
1725                    embedding_changes.push(change);
1726                }
1727                _ => {}
1728            }
1729        }
1730
1731        // Process each type of change optimally
1732        self.process_entity_changes(&entity_changes).await?;
1733        self.process_relationship_changes(&relationship_changes)
1734            .await?;
1735        self.process_embedding_changes(&embedding_changes).await?;
1736
1737        let processing_time = start.elapsed();
1738
1739        // Update metrics
1740        self.update_metrics(&batch, processing_time).await;
1741
1742        println!(
1743            "🚀 Processed batch {} with {} changes in {:?}",
1744            batch.batch_id,
1745            batch.changes.len(),
1746            processing_time
1747        );
1748
1749        Ok(())
1750    }
1751
1752    async fn process_entity_changes(&self, _changes: &[&ChangeRecord]) -> Result<()> {
1753        // Implementation would go here - process entity changes efficiently
1754        Ok(())
1755    }
1756
1757    async fn process_relationship_changes(&self, _changes: &[&ChangeRecord]) -> Result<()> {
1758        // Implementation would go here - process relationship changes efficiently
1759        Ok(())
1760    }
1761
1762    async fn process_embedding_changes(&self, _changes: &[&ChangeRecord]) -> Result<()> {
1763        // Implementation would go here - process embedding changes efficiently
1764        Ok(())
1765    }
1766
1767    fn get_batch_key(&self, change: &ChangeRecord) -> String {
1768        // Group changes by entity or document for batching efficiency
1769        match (&change.entity_id, &change.document_id) {
1770            (Some(entity_id), _) => format!("entity:{entity_id}"),
1771            (None, Some(doc_id)) => format!("document:{doc_id}"),
1772            _ => "global".to_string(),
1773        }
1774    }
1775
1776    async fn update_metrics(&self, batch: &PendingBatch, processing_time: Duration) {
1777        let mut metrics = self.metrics.write();
1778
1779        metrics.total_batches_processed += 1;
1780        metrics.total_changes_processed += batch.changes.len() as u64;
1781
1782        // Update running averages
1783        let total_batches = metrics.total_batches_processed as f64;
1784        metrics.average_batch_size = (metrics.average_batch_size * (total_batches - 1.0)
1785            + batch.changes.len() as f64)
1786            / total_batches;
1787
1788        let prev_avg_ms = metrics.average_processing_time.as_millis() as f64;
1789        let new_avg_ms = (prev_avg_ms * (total_batches - 1.0) + processing_time.as_millis() as f64)
1790            / total_batches;
1791        metrics.average_processing_time = Duration::from_millis(new_avg_ms as u64);
1792
1793        // Calculate throughput
1794        if processing_time.as_secs_f64() > 0.0 {
1795            metrics.throughput_per_second =
1796                batch.changes.len() as f64 / processing_time.as_secs_f64();
1797        }
1798
1799        metrics.last_batch_processed = Some(Utc::now());
1800    }
1801
1802    /// Gets the current batch processing metrics
1803    pub fn get_metrics(&self) -> BatchMetrics {
1804        self.metrics.read().clone()
1805    }
1806}
1807
1808// Clone impl for BatchProcessor (required for Arc usage)
1809#[cfg(feature = "incremental")]
1810impl Clone for BatchProcessor {
1811    fn clone(&self) -> Self {
1812        Self {
1813            batch_size: self.batch_size,
1814            max_wait_time: self.max_wait_time,
1815            pending_batches: DashMap::new(), // New instance starts empty
1816            processing_semaphore: Semaphore::new(self.processing_semaphore.available_permits()),
1817            metrics: RwLock::new(self.get_metrics()),
1818        }
1819    }
1820}
1821
1822// ============================================================================
1823// Error Extensions
1824// ============================================================================
1825
1826impl GraphRAGError {
1827    /// Creates a conflict resolution error
1828    pub fn conflict_resolution(message: String) -> Self {
1829        GraphRAGError::GraphConstruction { message }
1830    }
1831
1832    /// Creates an incremental update error
1833    pub fn incremental_update(message: String) -> Self {
1834        GraphRAGError::GraphConstruction { message }
1835    }
1836}
1837
1838// ============================================================================
1839// Production-Ready IncrementalGraphStore Implementation
1840// ============================================================================
1841
1842/// Production implementation of IncrementalGraphStore with full ACID guarantees
1843#[cfg(feature = "incremental")]
1844#[allow(dead_code)]
1845pub struct ProductionGraphStore {
1846    graph: Arc<RwLock<KnowledgeGraph>>,
1847    transactions: DashMap<TransactionId, Transaction>,
1848    change_log: DashMap<UpdateId, ChangeRecord>,
1849    rollback_data: DashMap<UpdateId, RollbackData>,
1850    conflict_resolver: Arc<ConflictResolver>,
1851    cache_invalidation: Arc<SelectiveInvalidation>,
1852    monitor: Arc<UpdateMonitor>,
1853    batch_processor: Arc<BatchProcessor>,
1854    incremental_pagerank: Arc<IncrementalPageRank>,
1855    event_publisher: broadcast::Sender<ChangeEvent>,
1856    config: IncrementalConfig,
1857}
1858
1859/// Transaction state for ACID operations
1860#[derive(Debug, Clone)]
1861#[allow(dead_code)]
1862struct Transaction {
1863    id: TransactionId,
1864    changes: Vec<ChangeRecord>,
1865    status: TransactionStatus,
1866    created_at: DateTime<Utc>,
1867    isolation_level: IsolationLevel,
1868}
1869
1870#[derive(Debug, Clone, PartialEq)]
1871#[allow(dead_code)]
1872enum TransactionStatus {
1873    Active,
1874    Preparing,
1875    Committed,
1876    Aborted,
1877}
1878
1879#[derive(Debug, Clone)]
1880#[allow(dead_code)]
1881enum IsolationLevel {
1882    ReadUncommitted,
1883    ReadCommitted,
1884    RepeatableRead,
1885    Serializable,
1886}
1887
1888/// Change event for monitoring and debugging
1889#[derive(Debug, Clone, Serialize, Deserialize)]
1890pub struct ChangeEvent {
1891    /// Unique identifier for the event
1892    pub event_id: UpdateId,
1893    /// Type of change event
1894    pub event_type: ChangeEventType,
1895    /// Optional entity ID associated with the event
1896    pub entity_id: Option<EntityId>,
1897    /// When the event occurred
1898    pub timestamp: DateTime<Utc>,
1899    /// Additional metadata about the event
1900    pub metadata: HashMap<String, String>,
1901}
1902
1903/// Types of change events that can be published
1904#[derive(Debug, Clone, Serialize, Deserialize)]
1905pub enum ChangeEventType {
1906    /// An entity was upserted
1907    EntityUpserted,
1908    /// An entity was deleted
1909    EntityDeleted,
1910    /// A relationship was upserted
1911    RelationshipUpserted,
1912    /// A relationship was deleted
1913    RelationshipDeleted,
1914    /// An embedding was updated
1915    EmbeddingUpdated,
1916    /// A transaction was started
1917    TransactionStarted,
1918    /// A transaction was committed
1919    TransactionCommitted,
1920    /// A transaction was rolled back
1921    TransactionRolledBack,
1922    /// A conflict was resolved
1923    ConflictResolved,
1924    /// Cache was invalidated
1925    CacheInvalidated,
1926    /// A batch was processed
1927    BatchProcessed,
1928}
1929
1930#[cfg(feature = "incremental")]
1931impl ProductionGraphStore {
1932    /// Creates a new production-grade graph store with full ACID guarantees
1933    pub fn new(
1934        graph: KnowledgeGraph,
1935        config: IncrementalConfig,
1936        conflict_resolver: ConflictResolver,
1937    ) -> Self {
1938        let (event_tx, _) = broadcast::channel(1000);
1939
1940        Self {
1941            graph: Arc::new(RwLock::new(graph)),
1942            transactions: DashMap::new(),
1943            change_log: DashMap::new(),
1944            rollback_data: DashMap::new(),
1945            conflict_resolver: Arc::new(conflict_resolver),
1946            cache_invalidation: Arc::new(SelectiveInvalidation::new()),
1947            monitor: Arc::new(UpdateMonitor::new()),
1948            batch_processor: Arc::new(BatchProcessor::new(
1949                config.batch_size,
1950                Duration::from_millis(100),
1951                config.max_concurrent_operations,
1952            )),
1953            incremental_pagerank: Arc::new(IncrementalPageRank::new(0.85, 1e-6, 100)),
1954            event_publisher: event_tx,
1955            config,
1956        }
1957    }
1958
1959    /// Subscribes to change events for monitoring
1960    pub fn subscribe_events(&self) -> broadcast::Receiver<ChangeEvent> {
1961        self.event_publisher.subscribe()
1962    }
1963
1964    async fn publish_event(&self, event: ChangeEvent) {
1965        let _ = self.event_publisher.send(event);
1966    }
1967
1968    fn create_change_record(
1969        &self,
1970        change_type: ChangeType,
1971        operation: Operation,
1972        change_data: ChangeData,
1973        entity_id: Option<EntityId>,
1974        document_id: Option<DocumentId>,
1975    ) -> ChangeRecord {
1976        ChangeRecord {
1977            change_id: UpdateId::new(),
1978            timestamp: Utc::now(),
1979            change_type,
1980            entity_id,
1981            document_id,
1982            operation,
1983            data: change_data,
1984            metadata: HashMap::new(),
1985        }
1986    }
1987
1988    async fn apply_change_with_conflict_resolution(
1989        &self,
1990        change: ChangeRecord,
1991    ) -> Result<UpdateId> {
1992        let operation_id = self.monitor.start_operation("apply_change");
1993
1994        // Check for conflicts
1995        if let Some(conflict) = self.detect_conflict(&change)? {
1996            let resolution = self.conflict_resolver.resolve_conflict(&conflict).await?;
1997
1998            // Apply resolved change
1999            let resolved_change = ChangeRecord {
2000                data: resolution.resolved_data,
2001                metadata: resolution.metadata,
2002                ..change
2003            };
2004
2005            self.apply_change_internal(resolved_change).await?;
2006
2007            // Publish conflict resolution event
2008            self.publish_event(ChangeEvent {
2009                event_id: UpdateId::new(),
2010                event_type: ChangeEventType::ConflictResolved,
2011                entity_id: conflict.existing_data.get_entity_id(),
2012                timestamp: Utc::now(),
2013                metadata: HashMap::new(),
2014            })
2015            .await;
2016        } else {
2017            self.apply_change_internal(change).await?;
2018        }
2019
2020        self.monitor
2021            .complete_operation(&operation_id, true, None, 1, 0);
2022        Ok(operation_id)
2023    }
2024
2025    fn detect_conflict(&self, change: &ChangeRecord) -> Result<Option<Conflict>> {
2026        match &change.data {
2027            ChangeData::Entity(entity) => {
2028                let graph = self.graph.read();
2029                if let Some(existing) = graph.get_entity(&entity.id) {
2030                    if existing.name != entity.name || existing.entity_type != entity.entity_type {
2031                        return Ok(Some(Conflict {
2032                            conflict_id: UpdateId::new(),
2033                            conflict_type: ConflictType::EntityExists,
2034                            existing_data: ChangeData::Entity(existing.clone()),
2035                            new_data: change.data.clone(),
2036                            resolution: None,
2037                        }));
2038                    }
2039                }
2040            }
2041            ChangeData::Relationship(relationship) => {
2042                let graph = self.graph.read();
2043                for existing_rel in graph.get_all_relationships() {
2044                    if existing_rel.source == relationship.source
2045                        && existing_rel.target == relationship.target
2046                        && existing_rel.relation_type == relationship.relation_type
2047                    {
2048                        return Ok(Some(Conflict {
2049                            conflict_id: UpdateId::new(),
2050                            conflict_type: ConflictType::RelationshipExists,
2051                            existing_data: ChangeData::Relationship(existing_rel.clone()),
2052                            new_data: change.data.clone(),
2053                            resolution: None,
2054                        }));
2055                    }
2056                }
2057            }
2058            _ => {}
2059        }
2060
2061        Ok(None)
2062    }
2063
2064    async fn apply_change_internal(&self, change: ChangeRecord) -> Result<()> {
2065        let change_id = change.change_id.clone();
2066
2067        // Create rollback data first
2068        let rollback_data = {
2069            let graph = self.graph.read();
2070            self.create_rollback_data(&change, &graph)?
2071        };
2072
2073        self.rollback_data.insert(change_id.clone(), rollback_data);
2074
2075        // Apply the change
2076        {
2077            let mut graph = self.graph.write();
2078            match &change.data {
2079                ChangeData::Entity(entity) => {
2080                    match change.operation {
2081                        Operation::Insert | Operation::Upsert => {
2082                            graph.add_entity(entity.clone())?;
2083                            self.incremental_pagerank.record_change(entity.id.clone());
2084                        }
2085                        Operation::Delete => {
2086                            // Remove entity and its relationships
2087                            // Implementation would go here
2088                        }
2089                        _ => {}
2090                    }
2091                }
2092                ChangeData::Relationship(relationship) => {
2093                    match change.operation {
2094                        Operation::Insert | Operation::Upsert => {
2095                            graph.add_relationship(relationship.clone())?;
2096                            self.incremental_pagerank
2097                                .record_change(relationship.source.clone());
2098                            self.incremental_pagerank
2099                                .record_change(relationship.target.clone());
2100                        }
2101                        Operation::Delete => {
2102                            // Remove relationship
2103                            // Implementation would go here
2104                        }
2105                        _ => {}
2106                    }
2107                }
2108                ChangeData::Embedding {
2109                    entity_id,
2110                    embedding,
2111                } => {
2112                    if let Some(entity) = graph.get_entity_mut(entity_id) {
2113                        entity.embedding = Some(embedding.clone());
2114                    }
2115                }
2116                _ => {}
2117            }
2118        }
2119
2120        // Record change in log
2121        self.change_log.insert(change_id, change);
2122
2123        Ok(())
2124    }
2125
2126    fn create_rollback_data(
2127        &self,
2128        change: &ChangeRecord,
2129        graph: &KnowledgeGraph,
2130    ) -> Result<RollbackData> {
2131        let mut previous_entities = Vec::new();
2132        let mut previous_relationships = Vec::new();
2133
2134        match &change.data {
2135            ChangeData::Entity(entity) => {
2136                if let Some(existing) = graph.get_entity(&entity.id) {
2137                    previous_entities.push(existing.clone());
2138                }
2139            }
2140            ChangeData::Relationship(relationship) => {
2141                // Store existing relationships that might be affected
2142                for rel in graph.get_all_relationships() {
2143                    if rel.source == relationship.source && rel.target == relationship.target {
2144                        previous_relationships.push(rel.clone());
2145                    }
2146                }
2147            }
2148            _ => {}
2149        }
2150
2151        Ok(RollbackData {
2152            previous_entities,
2153            previous_relationships,
2154            affected_caches: vec![], // Will be populated by cache invalidation system
2155        })
2156    }
2157}
2158
2159#[cfg(feature = "incremental")]
2160#[async_trait::async_trait]
2161impl IncrementalGraphStore for ProductionGraphStore {
2162    type Error = GraphRAGError;
2163
2164    async fn upsert_entity(&mut self, entity: Entity) -> Result<UpdateId> {
2165        let change = self.create_change_record(
2166            ChangeType::EntityAdded,
2167            Operation::Upsert,
2168            ChangeData::Entity(entity.clone()),
2169            Some(entity.id.clone()),
2170            None,
2171        );
2172
2173        let update_id = self.apply_change_with_conflict_resolution(change).await?;
2174
2175        // Trigger cache invalidation
2176        let changes = vec![self.change_log.get(&update_id).unwrap().clone()];
2177        let _invalidation_strategies = self.cache_invalidation.invalidate_for_changes(&changes);
2178
2179        // Publish event
2180        self.publish_event(ChangeEvent {
2181            event_id: UpdateId::new(),
2182            event_type: ChangeEventType::EntityUpserted,
2183            entity_id: Some(entity.id),
2184            timestamp: Utc::now(),
2185            metadata: HashMap::new(),
2186        })
2187        .await;
2188
2189        Ok(update_id)
2190    }
2191
2192    async fn upsert_relationship(&mut self, relationship: Relationship) -> Result<UpdateId> {
2193        let change = self.create_change_record(
2194            ChangeType::RelationshipAdded,
2195            Operation::Upsert,
2196            ChangeData::Relationship(relationship.clone()),
2197            None,
2198            None,
2199        );
2200
2201        let update_id = self.apply_change_with_conflict_resolution(change).await?;
2202
2203        // Publish event
2204        self.publish_event(ChangeEvent {
2205            event_id: UpdateId::new(),
2206            event_type: ChangeEventType::RelationshipUpserted,
2207            entity_id: Some(relationship.source),
2208            timestamp: Utc::now(),
2209            metadata: HashMap::new(),
2210        })
2211        .await;
2212
2213        Ok(update_id)
2214    }
2215
2216    async fn delete_entity(&mut self, entity_id: &EntityId) -> Result<UpdateId> {
2217        // Implementation for entity deletion
2218        let update_id = UpdateId::new();
2219
2220        // Publish event
2221        self.publish_event(ChangeEvent {
2222            event_id: UpdateId::new(),
2223            event_type: ChangeEventType::EntityDeleted,
2224            entity_id: Some(entity_id.clone()),
2225            timestamp: Utc::now(),
2226            metadata: HashMap::new(),
2227        })
2228        .await;
2229
2230        Ok(update_id)
2231    }
2232
2233    async fn delete_relationship(
2234        &mut self,
2235        source: &EntityId,
2236        _target: &EntityId,
2237        _relation_type: &str,
2238    ) -> Result<UpdateId> {
2239        // Implementation for relationship deletion
2240        let update_id = UpdateId::new();
2241
2242        // Publish event
2243        self.publish_event(ChangeEvent {
2244            event_id: UpdateId::new(),
2245            event_type: ChangeEventType::RelationshipDeleted,
2246            entity_id: Some(source.clone()),
2247            timestamp: Utc::now(),
2248            metadata: HashMap::new(),
2249        })
2250        .await;
2251
2252        Ok(update_id)
2253    }
2254
2255    async fn apply_delta(&mut self, delta: GraphDelta) -> Result<UpdateId> {
2256        let tx_id = self.begin_transaction().await?;
2257
2258        for change in delta.changes {
2259            self.apply_change_with_conflict_resolution(change).await?;
2260        }
2261
2262        self.commit_transaction(tx_id).await?;
2263        Ok(delta.delta_id)
2264    }
2265
2266    async fn rollback_delta(&mut self, _delta_id: &UpdateId) -> Result<()> {
2267        // Implementation for delta rollback
2268        Ok(())
2269    }
2270
2271    async fn get_change_log(&self, since: Option<DateTime<Utc>>) -> Result<Vec<ChangeRecord>> {
2272        let changes: Vec<ChangeRecord> = self
2273            .change_log
2274            .iter()
2275            .filter_map(|entry| {
2276                let change = entry.value();
2277                if let Some(since_time) = since {
2278                    if change.timestamp >= since_time {
2279                        Some(change.clone())
2280                    } else {
2281                        None
2282                    }
2283                } else {
2284                    Some(change.clone())
2285                }
2286            })
2287            .collect();
2288
2289        Ok(changes)
2290    }
2291
2292    async fn begin_transaction(&mut self) -> Result<TransactionId> {
2293        let tx_id = TransactionId::new();
2294        let transaction = Transaction {
2295            id: tx_id.clone(),
2296            changes: Vec::new(),
2297            status: TransactionStatus::Active,
2298            created_at: Utc::now(),
2299            isolation_level: IsolationLevel::ReadCommitted,
2300        };
2301
2302        self.transactions.insert(tx_id.clone(), transaction);
2303
2304        // Publish event
2305        self.publish_event(ChangeEvent {
2306            event_id: UpdateId::new(),
2307            event_type: ChangeEventType::TransactionStarted,
2308            entity_id: None,
2309            timestamp: Utc::now(),
2310            metadata: [("transaction_id".to_string(), tx_id.to_string())]
2311                .into_iter()
2312                .collect(),
2313        })
2314        .await;
2315
2316        Ok(tx_id)
2317    }
2318
2319    async fn commit_transaction(&mut self, tx_id: TransactionId) -> Result<()> {
2320        if let Some((_, mut tx)) = self.transactions.remove(&tx_id) {
2321            tx.status = TransactionStatus::Committed;
2322
2323            // Publish event
2324            self.publish_event(ChangeEvent {
2325                event_id: UpdateId::new(),
2326                event_type: ChangeEventType::TransactionCommitted,
2327                entity_id: None,
2328                timestamp: Utc::now(),
2329                metadata: [("transaction_id".to_string(), tx_id.to_string())]
2330                    .into_iter()
2331                    .collect(),
2332            })
2333            .await;
2334
2335            Ok(())
2336        } else {
2337            Err(GraphRAGError::IncrementalUpdate {
2338                message: format!("Transaction {tx_id} not found"),
2339            })
2340        }
2341    }
2342
2343    async fn rollback_transaction(&mut self, tx_id: TransactionId) -> Result<()> {
2344        if let Some((_, mut tx)) = self.transactions.remove(&tx_id) {
2345            tx.status = TransactionStatus::Aborted;
2346
2347            // Rollback all changes in this transaction
2348            for _change in &tx.changes {
2349                // Implementation for rollback
2350            }
2351
2352            // Publish event
2353            self.publish_event(ChangeEvent {
2354                event_id: UpdateId::new(),
2355                event_type: ChangeEventType::TransactionRolledBack,
2356                entity_id: None,
2357                timestamp: Utc::now(),
2358                metadata: [("transaction_id".to_string(), tx_id.to_string())]
2359                    .into_iter()
2360                    .collect(),
2361            })
2362            .await;
2363
2364            Ok(())
2365        } else {
2366            Err(GraphRAGError::IncrementalUpdate {
2367                message: format!("Transaction {tx_id} not found"),
2368            })
2369        }
2370    }
2371
2372    async fn batch_upsert_entities(
2373        &mut self,
2374        entities: Vec<Entity>,
2375        _strategy: ConflictStrategy,
2376    ) -> Result<Vec<UpdateId>> {
2377        let mut update_ids = Vec::new();
2378
2379        for entity in entities {
2380            let update_id = self.upsert_entity(entity).await?;
2381            update_ids.push(update_id);
2382        }
2383
2384        Ok(update_ids)
2385    }
2386
2387    async fn batch_upsert_relationships(
2388        &mut self,
2389        relationships: Vec<Relationship>,
2390        _strategy: ConflictStrategy,
2391    ) -> Result<Vec<UpdateId>> {
2392        let mut update_ids = Vec::new();
2393
2394        for relationship in relationships {
2395            let update_id = self.upsert_relationship(relationship).await?;
2396            update_ids.push(update_id);
2397        }
2398
2399        Ok(update_ids)
2400    }
2401
2402    async fn update_entity_embedding(
2403        &mut self,
2404        entity_id: &EntityId,
2405        embedding: Vec<f32>,
2406    ) -> Result<UpdateId> {
2407        let change = self.create_change_record(
2408            ChangeType::EmbeddingUpdated,
2409            Operation::Update,
2410            ChangeData::Embedding {
2411                entity_id: entity_id.clone(),
2412                embedding,
2413            },
2414            Some(entity_id.clone()),
2415            None,
2416        );
2417
2418        let update_id = self.apply_change_with_conflict_resolution(change).await?;
2419
2420        // Publish event
2421        self.publish_event(ChangeEvent {
2422            event_id: UpdateId::new(),
2423            event_type: ChangeEventType::EmbeddingUpdated,
2424            entity_id: Some(entity_id.clone()),
2425            timestamp: Utc::now(),
2426            metadata: HashMap::new(),
2427        })
2428        .await;
2429
2430        Ok(update_id)
2431    }
2432
2433    async fn bulk_update_embeddings(
2434        &mut self,
2435        updates: Vec<(EntityId, Vec<f32>)>,
2436    ) -> Result<Vec<UpdateId>> {
2437        let mut update_ids = Vec::new();
2438
2439        for (entity_id, embedding) in updates {
2440            let update_id = self.update_entity_embedding(&entity_id, embedding).await?;
2441            update_ids.push(update_id);
2442        }
2443
2444        Ok(update_ids)
2445    }
2446
2447    async fn get_pending_transactions(&self) -> Result<Vec<TransactionId>> {
2448        let pending: Vec<TransactionId> = self
2449            .transactions
2450            .iter()
2451            .filter(|entry| entry.value().status == TransactionStatus::Active)
2452            .map(|entry| entry.key().clone())
2453            .collect();
2454
2455        Ok(pending)
2456    }
2457
2458    async fn get_graph_statistics(&self) -> Result<GraphStatistics> {
2459        let graph = self.graph.read();
2460        let entities: Vec<_> = graph.entities().collect();
2461        let relationships = graph.get_all_relationships();
2462
2463        let node_count = entities.len();
2464        let edge_count = relationships.len();
2465
2466        // Calculate average degree
2467        let total_degree: usize = entities
2468            .iter()
2469            .map(|entity| graph.get_neighbors(&entity.id).len())
2470            .sum();
2471
2472        let average_degree = if node_count > 0 {
2473            total_degree as f64 / node_count as f64
2474        } else {
2475            0.0
2476        };
2477
2478        // Find max degree
2479        let max_degree = entities
2480            .iter()
2481            .map(|entity| graph.get_neighbors(&entity.id).len())
2482            .max()
2483            .unwrap_or(0);
2484
2485        Ok(GraphStatistics {
2486            node_count,
2487            edge_count,
2488            average_degree,
2489            max_degree,
2490            connected_components: 1,     // Simplified for now
2491            clustering_coefficient: 0.0, // Would need complex calculation
2492            last_updated: Utc::now(),
2493        })
2494    }
2495
2496    async fn validate_consistency(&self) -> Result<ConsistencyReport> {
2497        let graph = self.graph.read();
2498        let mut orphaned_entities = Vec::new();
2499        let mut broken_relationships = Vec::new();
2500        let mut missing_embeddings = Vec::new();
2501
2502        // Check for orphaned entities (entities with no relationships)
2503        for entity in graph.entities() {
2504            let neighbors = graph.get_neighbors(&entity.id);
2505            if neighbors.is_empty() {
2506                orphaned_entities.push(entity.id.clone());
2507            }
2508
2509            // Check for missing embeddings
2510            if entity.embedding.is_none() {
2511                missing_embeddings.push(entity.id.clone());
2512            }
2513        }
2514
2515        // Check for broken relationships (references to non-existent entities)
2516        for relationship in graph.get_all_relationships() {
2517            if graph.get_entity(&relationship.source).is_none()
2518                || graph.get_entity(&relationship.target).is_none()
2519            {
2520                broken_relationships.push((
2521                    relationship.source.clone(),
2522                    relationship.target.clone(),
2523                    relationship.relation_type.clone(),
2524                ));
2525            }
2526        }
2527
2528        let issues_found =
2529            orphaned_entities.len() + broken_relationships.len() + missing_embeddings.len();
2530
2531        Ok(ConsistencyReport {
2532            is_consistent: issues_found == 0,
2533            orphaned_entities,
2534            broken_relationships,
2535            missing_embeddings,
2536            validation_time: Utc::now(),
2537            issues_found,
2538        })
2539    }
2540}
2541
2542// Helper trait for extracting entity ID from ChangeData
2543#[allow(dead_code)]
2544trait ChangeDataExt {
2545    fn get_entity_id(&self) -> Option<EntityId>;
2546}
2547
2548impl ChangeDataExt for ChangeData {
2549    fn get_entity_id(&self) -> Option<EntityId> {
2550        match self {
2551            ChangeData::Entity(entity) => Some(entity.id.clone()),
2552            ChangeData::Embedding { entity_id, .. } => Some(entity_id.clone()),
2553            _ => None,
2554        }
2555    }
2556}
2557
2558// Re-export for backward compatibility - removing to avoid duplicate definition
2559
2560#[cfg(test)]
2561mod tests {
2562    use super::*;
2563
2564    #[test]
2565    fn test_update_id_generation() {
2566        let id1 = UpdateId::new();
2567        let id2 = UpdateId::new();
2568        assert_ne!(id1.as_str(), id2.as_str());
2569    }
2570
2571    #[test]
2572    fn test_transaction_id_generation() {
2573        let tx1 = TransactionId::new();
2574        let tx2 = TransactionId::new();
2575        assert_ne!(tx1.as_str(), tx2.as_str());
2576    }
2577
2578    #[test]
2579    fn test_change_record_creation() {
2580        let entity = Entity::new(
2581            EntityId::new("test".to_string()),
2582            "Test Entity".to_string(),
2583            "Person".to_string(),
2584            0.9,
2585        );
2586
2587        let config = IncrementalConfig::default();
2588        let graph = KnowledgeGraph::new();
2589        let manager = IncrementalGraphManager::new(graph, config);
2590
2591        let change = manager.create_change_record(
2592            ChangeType::EntityAdded,
2593            Operation::Insert,
2594            ChangeData::Entity(entity.clone()),
2595            Some(entity.id.clone()),
2596            None,
2597        );
2598
2599        assert_eq!(change.change_type, ChangeType::EntityAdded);
2600        assert_eq!(change.operation, Operation::Insert);
2601        assert_eq!(change.entity_id, Some(entity.id));
2602    }
2603
2604    #[test]
2605    fn test_conflict_resolver_creation() {
2606        let resolver = ConflictResolver::new(ConflictStrategy::KeepExisting);
2607        assert!(matches!(resolver.strategy, ConflictStrategy::KeepExisting));
2608    }
2609
2610    #[test]
2611    fn test_incremental_config_default() {
2612        let config = IncrementalConfig::default();
2613        assert_eq!(config.max_change_log_size, 10000);
2614        assert_eq!(config.batch_size, 100);
2615        assert!(config.enable_monitoring);
2616    }
2617
2618    #[test]
2619    fn test_statistics_creation() {
2620        let stats = IncrementalStatistics::empty();
2621        assert_eq!(stats.total_updates, 0);
2622        assert_eq!(stats.entities_added, 0);
2623        assert_eq!(stats.average_update_time_ms, 0.0);
2624    }
2625
2626    #[tokio::test]
2627    async fn test_basic_entity_upsert() {
2628        let config = IncrementalConfig::default();
2629        let graph = KnowledgeGraph::new();
2630        let mut manager = IncrementalGraphManager::new(graph, config);
2631
2632        let entity = Entity::new(
2633            EntityId::new("test_entity".to_string()),
2634            "Test Entity".to_string(),
2635            "Person".to_string(),
2636            0.9,
2637        );
2638
2639        let update_id = manager.basic_upsert_entity(entity).unwrap();
2640        assert!(!update_id.as_str().is_empty());
2641
2642        let stats = manager.get_statistics();
2643        assert_eq!(stats.entities_added, 1);
2644    }
2645
2646    #[cfg(feature = "incremental")]
2647    #[tokio::test]
2648    async fn test_production_graph_store_creation() {
2649        let graph = KnowledgeGraph::new();
2650        let config = IncrementalConfig::default();
2651        let resolver = ConflictResolver::new(ConflictStrategy::Merge);
2652
2653        let store = ProductionGraphStore::new(graph, config, resolver);
2654        let _receiver = store.subscribe_events();
2655        // If we reached here, subscription succeeded; no further assertion needed.
2656    }
2657
2658    #[cfg(feature = "incremental")]
2659    #[tokio::test]
2660    async fn test_production_graph_store_entity_upsert() {
2661        let graph = KnowledgeGraph::new();
2662        let config = IncrementalConfig::default();
2663        let resolver = ConflictResolver::new(ConflictStrategy::Merge);
2664
2665        let mut store = ProductionGraphStore::new(graph, config, resolver);
2666
2667        let entity = Entity::new(
2668            EntityId::new("test_entity".to_string()),
2669            "Test Entity".to_string(),
2670            "Person".to_string(),
2671            0.9,
2672        );
2673
2674        let update_id = store.upsert_entity(entity).await.unwrap();
2675        assert!(!update_id.as_str().is_empty());
2676
2677        let stats = store.get_graph_statistics().await.unwrap();
2678        assert_eq!(stats.node_count, 1);
2679    }
2680
2681    #[cfg(feature = "incremental")]
2682    #[tokio::test]
2683    async fn test_production_graph_store_relationship_upsert() {
2684        let graph = KnowledgeGraph::new();
2685        let config = IncrementalConfig::default();
2686        let resolver = ConflictResolver::new(ConflictStrategy::Merge);
2687
2688        let mut store = ProductionGraphStore::new(graph, config, resolver);
2689
2690        // Add entities first
2691        let entity1 = Entity::new(
2692            EntityId::new("entity1".to_string()),
2693            "Entity 1".to_string(),
2694            "Person".to_string(),
2695            0.9,
2696        );
2697
2698        let entity2 = Entity::new(
2699            EntityId::new("entity2".to_string()),
2700            "Entity 2".to_string(),
2701            "Person".to_string(),
2702            0.9,
2703        );
2704
2705        store.upsert_entity(entity1.clone()).await.unwrap();
2706        store.upsert_entity(entity2.clone()).await.unwrap();
2707
2708        let relationship = Relationship {
2709            source: entity1.id,
2710            target: entity2.id,
2711            relation_type: "KNOWS".to_string(),
2712            confidence: 0.8,
2713            context: vec![],
2714        };
2715
2716        let update_id = store.upsert_relationship(relationship).await.unwrap();
2717        assert!(!update_id.as_str().is_empty());
2718
2719        let stats = store.get_graph_statistics().await.unwrap();
2720        assert_eq!(stats.edge_count, 1);
2721    }
2722
2723    #[cfg(feature = "incremental")]
2724    #[tokio::test]
2725    async fn test_production_graph_store_transactions() {
2726        let graph = KnowledgeGraph::new();
2727        let config = IncrementalConfig::default();
2728        let resolver = ConflictResolver::new(ConflictStrategy::Merge);
2729
2730        let mut store = ProductionGraphStore::new(graph, config, resolver);
2731
2732        let tx_id = store.begin_transaction().await.unwrap();
2733        assert!(!tx_id.as_str().is_empty());
2734
2735        let pending = store.get_pending_transactions().await.unwrap();
2736        assert_eq!(pending.len(), 1);
2737        assert_eq!(pending[0], tx_id);
2738
2739        store.commit_transaction(tx_id).await.unwrap();
2740
2741        let pending_after = store.get_pending_transactions().await.unwrap();
2742        assert_eq!(pending_after.len(), 0);
2743    }
2744
2745    #[cfg(feature = "incremental")]
2746    #[tokio::test]
2747    async fn test_production_graph_store_consistency_validation() {
2748        let graph = KnowledgeGraph::new();
2749        let config = IncrementalConfig::default();
2750        let resolver = ConflictResolver::new(ConflictStrategy::Merge);
2751
2752        let store = ProductionGraphStore::new(graph, config, resolver);
2753
2754        let report = store.validate_consistency().await.unwrap();
2755        assert!(report.is_consistent);
2756        assert_eq!(report.issues_found, 0);
2757    }
2758
2759    #[cfg(feature = "incremental")]
2760    #[tokio::test]
2761    async fn test_production_graph_store_event_publishing() {
2762        let graph = KnowledgeGraph::new();
2763        let config = IncrementalConfig::default();
2764        let resolver = ConflictResolver::new(ConflictStrategy::Merge);
2765
2766        let store = ProductionGraphStore::new(graph, config, resolver);
2767        let mut event_receiver = store.subscribe_events();
2768
2769        let entity = Entity::new(
2770            EntityId::new("test_entity".to_string()),
2771            "Test Entity".to_string(),
2772            "Person".to_string(),
2773            0.9,
2774        );
2775
2776        // Start a task to upsert entity
2777        let store_clone = Arc::new(tokio::sync::Mutex::new(store));
2778        let store_for_task = Arc::clone(&store_clone);
2779
2780        tokio::spawn(async move {
2781            let mut store = store_for_task.lock().await;
2782            let _ = store.upsert_entity(entity).await;
2783        });
2784
2785        // Wait for event
2786        let event = tokio::time::timeout(std::time::Duration::from_millis(100), event_receiver.recv()).await;
2787        assert!(event.is_ok());
2788    }
2789
2790    #[cfg(feature = "incremental")]
2791    #[test]
2792    fn test_incremental_pagerank_creation() {
2793        let pagerank = IncrementalPageRank::new(0.85, 1e-6, 100);
2794        assert!(pagerank.scores.is_empty());
2795    }
2796
2797    #[cfg(feature = "incremental")]
2798    #[test]
2799    fn test_batch_processor_creation() {
2800        let processor = BatchProcessor::new(100, Duration::from_millis(500), 10);
2801        let metrics = processor.get_metrics();
2802        assert_eq!(metrics.total_batches_processed, 0);
2803    }
2804
2805    #[cfg(feature = "incremental")]
2806    #[tokio::test]
2807    async fn test_selective_invalidation() {
2808        let invalidation = SelectiveInvalidation::new();
2809
2810        let region = CacheRegion {
2811            region_id: "test_region".to_string(),
2812            entity_ids: [EntityId::new("entity1".to_string())].into_iter().collect(),
2813            relationship_types: ["KNOWS".to_string()].into_iter().collect(),
2814            document_ids: HashSet::new(),
2815            last_modified: Utc::now(),
2816        };
2817
2818        invalidation.register_cache_region(region);
2819
2820        let entity = Entity::new(
2821            EntityId::new("entity1".to_string()),
2822            "Entity 1".to_string(),
2823            "Person".to_string(),
2824            0.9,
2825        );
2826
2827        let ent_id_for_log = entity.id.clone();
2828        let change = ChangeRecord {
2829            change_id: UpdateId::new(),
2830            timestamp: Utc::now(),
2831            change_type: ChangeType::EntityUpdated,
2832            entity_id: Some(ent_id_for_log),
2833            document_id: None,
2834            operation: Operation::Update,
2835            data: ChangeData::Entity(entity),
2836            metadata: HashMap::new(),
2837        };
2838
2839        let strategies = invalidation.invalidate_for_changes(&[change]);
2840        assert!(!strategies.is_empty());
2841    }
2842
2843    #[cfg(feature = "incremental")]
2844    #[test]
2845    fn test_conflict_resolver_merge() {
2846        let resolver = ConflictResolver::new(ConflictStrategy::Merge);
2847
2848        let entity1 = Entity::new(
2849            EntityId::new("entity1".to_string()),
2850            "Entity 1".to_string(),
2851            "Person".to_string(),
2852            0.8,
2853        );
2854
2855        let entity2 = Entity::new(
2856            EntityId::new("entity1".to_string()),
2857            "Entity 1 Updated".to_string(),
2858            "Person".to_string(),
2859            0.9,
2860        );
2861
2862        let merged = resolver.merge_entities(&entity1, &entity2).unwrap();
2863        assert_eq!(merged.confidence, 0.9); // Should take higher confidence
2864        assert_eq!(merged.name, "Entity 1 Updated");
2865    }
2866
2867    #[test]
2868    fn test_graph_statistics_creation() {
2869        let stats = GraphStatistics {
2870            node_count: 100,
2871            edge_count: 150,
2872            average_degree: 3.0,
2873            max_degree: 10,
2874            connected_components: 1,
2875            clustering_coefficient: 0.3,
2876            last_updated: Utc::now(),
2877        };
2878
2879        assert_eq!(stats.node_count, 100);
2880        assert_eq!(stats.edge_count, 150);
2881    }
2882
2883    #[test]
2884    fn test_consistency_report_creation() {
2885        let report = ConsistencyReport {
2886            is_consistent: true,
2887            orphaned_entities: vec![],
2888            broken_relationships: vec![],
2889            missing_embeddings: vec![],
2890            validation_time: Utc::now(),
2891            issues_found: 0,
2892        };
2893
2894        assert!(report.is_consistent);
2895        assert_eq!(report.issues_found, 0);
2896    }
2897
2898    #[test]
2899    fn test_change_event_creation() {
2900        let event = ChangeEvent {
2901            event_id: UpdateId::new(),
2902            event_type: ChangeEventType::EntityUpserted,
2903            entity_id: Some(EntityId::new("entity1".to_string())),
2904            timestamp: Utc::now(),
2905            metadata: HashMap::new(),
2906        };
2907
2908        assert!(matches!(event.event_type, ChangeEventType::EntityUpserted));
2909        assert!(event.entity_id.is_some());
2910    }
2911}