Skip to main content

graphrag_core/graph/incremental/
manager.rs

1#![allow(unused_imports)]
2
3use crate::core::{
4    DocumentId, Entity, EntityId, GraphRAGError, KnowledgeGraph, Relationship, Result, TextChunk,
5};
6use chrono::{DateTime, Utc};
7use serde::{Deserialize, Serialize};
8use std::collections::{HashMap, HashSet};
9use std::time::{Duration, Instant};
10
11#[cfg(feature = "incremental")]
12use std::sync::Arc;
13
14#[cfg(feature = "incremental")]
15use {
16    dashmap::DashMap,
17    parking_lot::{Mutex, RwLock},
18    tokio::sync::{broadcast, Semaphore},
19    uuid::Uuid,
20};
21
22use super::*;
23
24// ============================================================================
25// Main Incremental Graph Manager
26// ============================================================================
27
28/// Comprehensive incremental graph manager with production features
29#[cfg(feature = "incremental")]
30pub struct IncrementalGraphManager {
31    graph: Arc<RwLock<KnowledgeGraph>>,
32    change_log: DashMap<UpdateId, ChangeRecord>,
33    deltas: DashMap<UpdateId, GraphDelta>,
34    cache_invalidation: Arc<SelectiveInvalidation>,
35    conflict_resolver: Arc<ConflictResolver>,
36    monitor: Arc<UpdateMonitor>,
37    config: IncrementalConfig,
38}
39
40#[cfg(not(feature = "incremental"))]
41/// Incremental graph manager (simplified version without incremental feature)
42pub struct IncrementalGraphManager {
43    graph: KnowledgeGraph,
44    change_log: Vec<ChangeRecord>,
45    config: IncrementalConfig,
46}
47
48/// Configuration for incremental operations
49#[derive(Debug, Clone)]
50pub struct IncrementalConfig {
51    /// Maximum number of changes to keep in the log
52    pub max_change_log_size: usize,
53    /// Maximum number of changes in a single delta
54    pub max_delta_size: usize,
55    /// Default conflict resolution strategy
56    pub conflict_strategy: ConflictStrategy,
57    /// Whether to enable performance monitoring
58    pub enable_monitoring: bool,
59    /// Cache invalidation strategy name
60    pub cache_invalidation_strategy: String,
61    /// Default batch size for batch operations
62    pub batch_size: usize,
63    /// Maximum number of concurrent operations
64    pub max_concurrent_operations: usize,
65}
66
67impl Default for IncrementalConfig {
68    fn default() -> Self {
69        Self {
70            max_change_log_size: 10000,
71            max_delta_size: 1000,
72            conflict_strategy: ConflictStrategy::Merge,
73            enable_monitoring: true,
74            cache_invalidation_strategy: "selective".to_string(),
75            batch_size: 100,
76            max_concurrent_operations: 10,
77        }
78    }
79}
80
81#[cfg(feature = "incremental")]
82impl IncrementalGraphManager {
83    /// Creates a new incremental graph manager with feature-gated capabilities
84    pub fn new(graph: KnowledgeGraph, config: IncrementalConfig) -> Self {
85        Self {
86            graph: Arc::new(RwLock::new(graph)),
87            change_log: DashMap::new(),
88            deltas: DashMap::new(),
89            cache_invalidation: Arc::new(SelectiveInvalidation::new()),
90            conflict_resolver: Arc::new(ConflictResolver::new(config.conflict_strategy.clone())),
91            monitor: Arc::new(UpdateMonitor::new()),
92            config,
93        }
94    }
95
96    /// Sets a custom conflict resolver for the manager
97    pub fn with_conflict_resolver(mut self, resolver: ConflictResolver) -> Self {
98        self.conflict_resolver = Arc::new(resolver);
99        self
100    }
101
102    /// Get a read-only reference to the knowledge graph
103    pub fn graph(&self) -> Arc<RwLock<KnowledgeGraph>> {
104        Arc::clone(&self.graph)
105    }
106
107    /// Get the conflict resolver
108    pub fn conflict_resolver(&self) -> Arc<ConflictResolver> {
109        Arc::clone(&self.conflict_resolver)
110    }
111
112    /// Get the update monitor
113    pub fn monitor(&self) -> Arc<UpdateMonitor> {
114        Arc::clone(&self.monitor)
115    }
116}
117
118#[cfg(not(feature = "incremental"))]
119impl IncrementalGraphManager {
120    /// Creates a new incremental graph manager without advanced features
121    pub fn new(graph: KnowledgeGraph, config: IncrementalConfig) -> Self {
122        Self {
123            graph,
124            change_log: Vec::new(),
125            config,
126        }
127    }
128
129    /// Gets a reference to the knowledge graph
130    pub fn graph(&self) -> &KnowledgeGraph {
131        &self.graph
132    }
133
134    /// Gets a mutable reference to the knowledge graph
135    pub fn graph_mut(&mut self) -> &mut KnowledgeGraph {
136        &mut self.graph
137    }
138}
139
140// Common implementation for both feature-gated and non-feature-gated versions
141impl IncrementalGraphManager {
142    /// Create a new change record
143    pub fn create_change_record(
144        &self,
145        change_type: ChangeType,
146        operation: Operation,
147        change_data: ChangeData,
148        entity_id: Option<EntityId>,
149        document_id: Option<DocumentId>,
150    ) -> ChangeRecord {
151        ChangeRecord {
152            change_id: UpdateId::new(),
153            timestamp: Utc::now(),
154            change_type,
155            entity_id,
156            document_id,
157            operation,
158            data: change_data,
159            metadata: HashMap::new(),
160        }
161    }
162
163    /// Get configuration
164    pub fn config(&self) -> &IncrementalConfig {
165        &self.config
166    }
167
168    /// Basic entity upsert (works without incremental feature)
169    pub fn basic_upsert_entity(&mut self, entity: Entity) -> Result<UpdateId> {
170        let update_id = UpdateId::new();
171
172        #[cfg(feature = "incremental")]
173        {
174            let operation_id = self.monitor.start_operation("upsert_entity");
175            let mut graph = self.graph.write();
176
177            match graph.add_entity(entity.clone()) {
178                Ok(_) => {
179                    let ent_id = entity.id.clone();
180                    let change = self.create_change_record(
181                        ChangeType::EntityAdded,
182                        Operation::Upsert,
183                        ChangeData::Entity(entity),
184                        Some(ent_id),
185                        None,
186                    );
187                    self.change_log.insert(change.change_id.clone(), change);
188                    self.monitor
189                        .complete_operation(&operation_id, true, None, 1, 0);
190                    Ok(update_id)
191                },
192                Err(e) => {
193                    self.monitor.complete_operation(
194                        &operation_id,
195                        false,
196                        Some(e.to_string()),
197                        0,
198                        0,
199                    );
200                    Err(e)
201                },
202            }
203        }
204
205        #[cfg(not(feature = "incremental"))]
206        {
207            self.graph.add_entity(entity.clone())?;
208            // Capture ID before moving `entity` into ChangeData
209            let ent_id = entity.id.clone();
210            let change = self.create_change_record(
211                ChangeType::EntityAdded,
212                Operation::Upsert,
213                ChangeData::Entity(entity),
214                Some(ent_id),
215                None,
216            );
217            self.change_log.push(change);
218            Ok(update_id)
219        }
220    }
221}
222
223// ============================================================================
224// Statistics and Monitoring
225// ============================================================================
226
227/// Comprehensive statistics for incremental operations
228#[derive(Debug, Clone, Serialize, Deserialize)]
229pub struct IncrementalStatistics {
230    /// Total number of update operations
231    pub total_updates: usize,
232    /// Number of successful updates
233    pub successful_updates: usize,
234    /// Number of failed updates
235    pub failed_updates: usize,
236    /// Number of entities added
237    pub entities_added: usize,
238    /// Number of entities updated
239    pub entities_updated: usize,
240    /// Number of entities removed
241    pub entities_removed: usize,
242    /// Number of relationships added
243    pub relationships_added: usize,
244    /// Number of relationships updated
245    pub relationships_updated: usize,
246    /// Number of relationships removed
247    pub relationships_removed: usize,
248    /// Number of conflicts resolved
249    pub conflicts_resolved: usize,
250    /// Number of cache invalidations performed
251    pub cache_invalidations: usize,
252    /// Average update time in milliseconds
253    pub average_update_time_ms: f64,
254    /// Peak updates per second achieved
255    pub peak_updates_per_second: f64,
256    /// Current size of the change log
257    pub current_change_log_size: usize,
258    /// Current number of active deltas
259    pub current_delta_count: usize,
260}
261
262impl IncrementalStatistics {
263    /// Creates an empty statistics instance
264    pub fn empty() -> Self {
265        Self {
266            total_updates: 0,
267            successful_updates: 0,
268            failed_updates: 0,
269            entities_added: 0,
270            entities_updated: 0,
271            entities_removed: 0,
272            relationships_added: 0,
273            relationships_updated: 0,
274            relationships_removed: 0,
275            conflicts_resolved: 0,
276            cache_invalidations: 0,
277            average_update_time_ms: 0.0,
278            peak_updates_per_second: 0.0,
279            current_change_log_size: 0,
280            current_delta_count: 0,
281        }
282    }
283
284    /// Prints statistics to stdout in a formatted way
285    pub fn print(&self) {
286        println!("🔄 Incremental Updates Statistics");
287        println!("  Total updates: {}", self.total_updates);
288        println!(
289            "  Successful: {} ({:.1}%)",
290            self.successful_updates,
291            if self.total_updates > 0 {
292                (self.successful_updates as f64 / self.total_updates as f64) * 100.0
293            } else {
294                0.0
295            }
296        );
297        println!("  Failed: {}", self.failed_updates);
298        println!(
299            "  Entities: +{} ~{} -{}",
300            self.entities_added, self.entities_updated, self.entities_removed
301        );
302        println!(
303            "  Relationships: +{} ~{} -{}",
304            self.relationships_added, self.relationships_updated, self.relationships_removed
305        );
306        println!("  Conflicts resolved: {}", self.conflicts_resolved);
307        println!("  Cache invalidations: {}", self.cache_invalidations);
308        println!("  Avg update time: {:.2}ms", self.average_update_time_ms);
309        println!("  Peak updates/sec: {:.1}", self.peak_updates_per_second);
310        println!("  Change log size: {}", self.current_change_log_size);
311        println!("  Active deltas: {}", self.current_delta_count);
312    }
313}
314
315#[cfg(feature = "incremental")]
316impl IncrementalGraphManager {
317    /// Gets comprehensive statistics about incremental operations
318    pub fn get_statistics(&self) -> IncrementalStatistics {
319        let perf_stats = self.monitor.get_performance_stats();
320        let invalidation_stats = self.cache_invalidation.get_invalidation_stats();
321
322        // Calculate entity/relationship statistics from change log
323        let mut entity_stats = (0, 0, 0); // added, updated, removed
324        let mut relationship_stats = (0, 0, 0);
325        let conflicts_resolved = 0;
326
327        for change in self.change_log.iter() {
328            match change.value().change_type {
329                ChangeType::EntityAdded => entity_stats.0 += 1,
330                ChangeType::EntityUpdated => entity_stats.1 += 1,
331                ChangeType::EntityRemoved => entity_stats.2 += 1,
332                ChangeType::RelationshipAdded => relationship_stats.0 += 1,
333                ChangeType::RelationshipUpdated => relationship_stats.1 += 1,
334                ChangeType::RelationshipRemoved => relationship_stats.2 += 1,
335                _ => {},
336            }
337        }
338
339        IncrementalStatistics {
340            total_updates: perf_stats.total_operations as usize,
341            successful_updates: perf_stats.successful_operations as usize,
342            failed_updates: perf_stats.failed_operations as usize,
343            entities_added: entity_stats.0,
344            entities_updated: entity_stats.1,
345            entities_removed: entity_stats.2,
346            relationships_added: relationship_stats.0,
347            relationships_updated: relationship_stats.1,
348            relationships_removed: relationship_stats.2,
349            conflicts_resolved,
350            cache_invalidations: invalidation_stats.total_invalidations,
351            average_update_time_ms: perf_stats.average_operation_time.as_millis() as f64,
352            peak_updates_per_second: perf_stats.peak_operations_per_second,
353            current_change_log_size: self.change_log.len(),
354            current_delta_count: self.deltas.len(),
355        }
356    }
357}
358
359#[cfg(not(feature = "incremental"))]
360impl IncrementalGraphManager {
361    /// Gets basic statistics about incremental operations (non-feature version)
362    pub fn get_statistics(&self) -> IncrementalStatistics {
363        let mut stats = IncrementalStatistics::empty();
364        stats.current_change_log_size = self.change_log.len();
365
366        for change in &self.change_log {
367            match change.change_type {
368                ChangeType::EntityAdded => stats.entities_added += 1,
369                ChangeType::EntityUpdated => stats.entities_updated += 1,
370                ChangeType::EntityRemoved => stats.entities_removed += 1,
371                ChangeType::RelationshipAdded => stats.relationships_added += 1,
372                ChangeType::RelationshipUpdated => stats.relationships_updated += 1,
373                ChangeType::RelationshipRemoved => stats.relationships_removed += 1,
374                _ => {},
375            }
376        }
377
378        stats.total_updates = self.change_log.len();
379        stats.successful_updates = self.change_log.len(); // Assume all succeeded in basic mode
380        stats
381    }
382}
383
384// ============================================================================
385// Incremental PageRank Implementation
386// ============================================================================
387
388/// Incremental PageRank calculator for efficient updates
389#[cfg(feature = "incremental")]
390#[allow(dead_code)]
391pub struct IncrementalPageRank {
392    pub(super) scores: DashMap<EntityId, f64>,
393    adjacency_changes: DashMap<EntityId, Vec<(EntityId, f64)>>, // Node -> [(neighbor, weight)]
394    damping_factor: f64,
395    tolerance: f64,
396    max_iterations: usize,
397    last_full_computation: DateTime<Utc>,
398    incremental_threshold: usize, // Number of changes before full recomputation
399    pending_changes: RwLock<usize>,
400}
401
402#[cfg(feature = "incremental")]
403impl IncrementalPageRank {
404    /// Creates a new incremental PageRank calculator
405    pub fn new(damping_factor: f64, tolerance: f64, max_iterations: usize) -> Self {
406        Self {
407            scores: DashMap::new(),
408            adjacency_changes: DashMap::new(),
409            damping_factor,
410            tolerance,
411            max_iterations,
412            last_full_computation: Utc::now(),
413            incremental_threshold: 1000,
414            pending_changes: RwLock::new(0),
415        }
416    }
417
418    /// Update PageRank incrementally for a specific subgraph
419    pub async fn update_incremental(
420        &self,
421        changed_entities: &[EntityId],
422        graph: &KnowledgeGraph,
423    ) -> Result<()> {
424        let start = Instant::now();
425
426        // If too many changes accumulated, do full recomputation
427        {
428            let pending = *self.pending_changes.read();
429            if pending > self.incremental_threshold {
430                return self.full_recomputation(graph).await;
431            }
432        }
433
434        // Incremental update for changed entities and their neighborhoods
435        let mut affected_entities = HashSet::new();
436
437        // Add changed entities and their neighbors (2-hop neighborhood)
438        for entity_id in changed_entities {
439            affected_entities.insert(entity_id.clone());
440
441            // Add direct neighbors
442            for (neighbor, _) in graph.get_neighbors(entity_id) {
443                affected_entities.insert(neighbor.id.clone());
444
445                // Add second-hop neighbors
446                for (second_hop, _) in graph.get_neighbors(&neighbor.id) {
447                    affected_entities.insert(second_hop.id.clone());
448                }
449            }
450        }
451
452        // Perform localized PageRank computation
453        self.localized_pagerank(&affected_entities, graph).await?;
454
455        // Reset pending changes counter
456        *self.pending_changes.write() = 0;
457
458        let duration = start.elapsed();
459        println!(
460            "🔄 Incremental PageRank update completed in {:?} for {} entities",
461            duration,
462            affected_entities.len()
463        );
464
465        Ok(())
466    }
467
468    /// Perform full PageRank recomputation
469    async fn full_recomputation(&self, graph: &KnowledgeGraph) -> Result<()> {
470        let start = Instant::now();
471
472        // Build adjacency matrix
473        let entities: Vec<EntityId> = graph.entities().map(|e| e.id.clone()).collect();
474        let n = entities.len();
475
476        if n == 0 {
477            return Ok(());
478        }
479
480        // Initialize scores
481        let initial_score = 1.0 / n as f64;
482        for entity_id in &entities {
483            self.scores.insert(entity_id.clone(), initial_score);
484        }
485
486        // Power iteration
487        for iteration in 0..self.max_iterations {
488            let mut new_scores = HashMap::new();
489            let mut max_diff: f64 = 0.0;
490
491            for entity_id in &entities {
492                let mut score = (1.0 - self.damping_factor) / n as f64;
493
494                // Sum contributions from incoming links
495                for other_entity in &entities {
496                    if let Some(weight) = self.get_edge_weight(other_entity, entity_id, graph) {
497                        let other_score = self
498                            .scores
499                            .get(other_entity)
500                            .map(|s| *s.value())
501                            .unwrap_or(initial_score);
502                        let out_degree = self.get_out_degree(other_entity, graph);
503
504                        if out_degree > 0.0 {
505                            score += self.damping_factor * other_score * weight / out_degree;
506                        }
507                    }
508                }
509
510                let old_score = self
511                    .scores
512                    .get(entity_id)
513                    .map(|s| *s.value())
514                    .unwrap_or(initial_score);
515                let diff = (score - old_score).abs();
516                max_diff = max_diff.max(diff);
517
518                new_scores.insert(entity_id.clone(), score);
519            }
520
521            // Update scores
522            for (entity_id, score) in new_scores {
523                self.scores.insert(entity_id, score);
524            }
525
526            // Check convergence
527            if max_diff < self.tolerance {
528                println!(
529                    "🎯 PageRank converged after {} iterations (diff: {:.6})",
530                    iteration + 1,
531                    max_diff
532                );
533                break;
534            }
535        }
536
537        let duration = start.elapsed();
538        println!("🔄 Full PageRank recomputation completed in {duration:?} for {n} entities");
539
540        Ok(())
541    }
542
543    /// Perform localized PageRank computation for a subset of entities
544    async fn localized_pagerank(
545        &self,
546        entities: &HashSet<EntityId>,
547        graph: &KnowledgeGraph,
548    ) -> Result<()> {
549        let entity_vec: Vec<EntityId> = entities.iter().cloned().collect();
550        let n = entity_vec.len();
551
552        if n == 0 {
553            return Ok(());
554        }
555
556        // Localized power iteration
557        for _iteration in 0..self.max_iterations {
558            let mut max_diff: f64 = 0.0;
559
560            for entity_id in &entity_vec {
561                let mut score = (1.0 - self.damping_factor) / n as f64;
562
563                // Only consider links within the subset for localized computation
564                for other_entity in &entity_vec {
565                    if let Some(weight) = self.get_edge_weight(other_entity, entity_id, graph) {
566                        let other_score = self
567                            .scores
568                            .get(other_entity)
569                            .map(|s| *s.value())
570                            .unwrap_or(1.0 / n as f64);
571                        let out_degree =
572                            self.get_localized_out_degree(other_entity, entities, graph);
573
574                        if out_degree > 0.0 {
575                            score += self.damping_factor * other_score * weight / out_degree;
576                        }
577                    }
578                }
579
580                let old_score = self
581                    .scores
582                    .get(entity_id)
583                    .map(|s| *s.value())
584                    .unwrap_or(1.0 / n as f64);
585                let diff = (score - old_score).abs();
586                max_diff = max_diff.max(diff);
587
588                self.scores.insert(entity_id.clone(), score);
589            }
590
591            // Check convergence
592            if max_diff < self.tolerance {
593                break;
594            }
595        }
596
597        Ok(())
598    }
599
600    fn get_edge_weight(
601        &self,
602        from: &EntityId,
603        to: &EntityId,
604        graph: &KnowledgeGraph,
605    ) -> Option<f64> {
606        // Check if there's a relationship between entities
607        for (neighbor, relationship) in graph.get_neighbors(from) {
608            if neighbor.id == *to {
609                return Some(relationship.confidence as f64);
610            }
611        }
612        None
613    }
614
615    fn get_out_degree(&self, entity_id: &EntityId, graph: &KnowledgeGraph) -> f64 {
616        graph
617            .get_neighbors(entity_id)
618            .iter()
619            .map(|(_, rel)| rel.confidence as f64)
620            .sum()
621    }
622
623    fn get_localized_out_degree(
624        &self,
625        entity_id: &EntityId,
626        subset: &HashSet<EntityId>,
627        graph: &KnowledgeGraph,
628    ) -> f64 {
629        graph
630            .get_neighbors(entity_id)
631            .iter()
632            .filter(|(neighbor, _)| subset.contains(&neighbor.id))
633            .map(|(_, rel)| rel.confidence as f64)
634            .sum()
635    }
636
637    /// Get PageRank score for an entity
638    pub fn get_score(&self, entity_id: &EntityId) -> Option<f64> {
639        self.scores.get(entity_id).map(|s| *s.value())
640    }
641
642    /// Get top-k entities by PageRank score
643    pub fn get_top_entities(&self, k: usize) -> Vec<(EntityId, f64)> {
644        let mut entities: Vec<(EntityId, f64)> = self
645            .scores
646            .iter()
647            .map(|entry| (entry.key().clone(), *entry.value()))
648            .collect();
649
650        entities.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
651        entities.truncate(k);
652        entities
653    }
654
655    /// Record a graph change for incremental updates
656    pub fn record_change(&self, _entity_id: EntityId) {
657        *self.pending_changes.write() += 1;
658    }
659}
660
661// ============================================================================
662// Batch Processing System
663// ============================================================================
664
665/// High-throughput batch processor for incremental updates
666#[cfg(feature = "incremental")]
667pub struct BatchProcessor {
668    batch_size: usize,
669    max_wait_time: Duration,
670    pending_batches: DashMap<String, PendingBatch>,
671    processing_semaphore: Semaphore,
672    metrics: RwLock<BatchMetrics>,
673}
674
675#[derive(Debug, Clone)]
676#[allow(dead_code)]
677struct PendingBatch {
678    changes: Vec<ChangeRecord>,
679    created_at: Instant,
680    batch_id: String,
681}
682
683/// Batch metrics for monitoring
684#[derive(Debug, Clone)]
685pub struct BatchMetrics {
686    /// Total number of batches processed
687    pub total_batches_processed: u64,
688    /// Total number of changes processed across all batches
689    pub total_changes_processed: u64,
690    /// Average size of batches
691    pub average_batch_size: f64,
692    /// Average time to process a batch
693    pub average_processing_time: Duration,
694    /// Throughput in changes per second
695    pub throughput_per_second: f64,
696    /// Timestamp of last batch processed
697    pub last_batch_processed: Option<DateTime<Utc>>,
698}
699
700#[cfg(feature = "incremental")]
701impl BatchProcessor {
702    /// Creates a new batch processor with specified configuration
703    pub fn new(batch_size: usize, max_wait_time: Duration, max_concurrent_batches: usize) -> Self {
704        Self {
705            batch_size,
706            max_wait_time,
707            pending_batches: DashMap::new(),
708            processing_semaphore: Semaphore::new(max_concurrent_batches),
709            metrics: RwLock::new(BatchMetrics {
710                total_batches_processed: 0,
711                total_changes_processed: 0,
712                average_batch_size: 0.0,
713                average_processing_time: Duration::from_millis(0),
714                throughput_per_second: 0.0,
715                last_batch_processed: None,
716            }),
717        }
718    }
719
720    /// Adds a change to be processed in batches
721    pub async fn add_change(&self, change: ChangeRecord) -> Result<String> {
722        let batch_key = self.get_batch_key(&change);
723
724        let batch_id = {
725            let mut entry = self
726                .pending_batches
727                .entry(batch_key.clone())
728                .or_insert_with(|| PendingBatch {
729                    changes: Vec::new(),
730                    created_at: Instant::now(),
731                    batch_id: format!("batch_{}", Uuid::new_v4()),
732                });
733
734            entry.changes.push(change);
735            let should_process = entry.changes.len() >= self.batch_size
736                || entry.created_at.elapsed() > self.max_wait_time;
737
738            let batch_id = entry.batch_id.clone();
739
740            if should_process {
741                // Move batch out for processing
742                let batch = entry.clone();
743                self.pending_batches.remove(&batch_key);
744
745                // Process batch asynchronously
746                let processor = Arc::new(self.clone());
747                tokio::spawn(async move {
748                    if let Err(e) = processor.process_batch(batch).await {
749                        eprintln!("Batch processing error: {e}");
750                    }
751                });
752            }
753
754            batch_id
755        };
756
757        Ok(batch_id)
758    }
759
760    async fn process_batch(&self, batch: PendingBatch) -> Result<()> {
761        let _permit = self.processing_semaphore.acquire().await.map_err(|_| {
762            GraphRAGError::IncrementalUpdate {
763                message: "Failed to acquire processing permit".to_string(),
764            }
765        })?;
766
767        let start = Instant::now();
768
769        // Group changes by type for optimized processing
770        let mut entity_changes = Vec::new();
771        let mut relationship_changes = Vec::new();
772        let mut embedding_changes = Vec::new();
773
774        for change in &batch.changes {
775            match &change.change_type {
776                ChangeType::EntityAdded | ChangeType::EntityUpdated | ChangeType::EntityRemoved => {
777                    entity_changes.push(change);
778                },
779                ChangeType::RelationshipAdded
780                | ChangeType::RelationshipUpdated
781                | ChangeType::RelationshipRemoved => {
782                    relationship_changes.push(change);
783                },
784                ChangeType::EmbeddingAdded
785                | ChangeType::EmbeddingUpdated
786                | ChangeType::EmbeddingRemoved => {
787                    embedding_changes.push(change);
788                },
789                _ => {},
790            }
791        }
792
793        // Process each type of change optimally
794        self.process_entity_changes(&entity_changes).await?;
795        self.process_relationship_changes(&relationship_changes)
796            .await?;
797        self.process_embedding_changes(&embedding_changes).await?;
798
799        let processing_time = start.elapsed();
800
801        // Update metrics
802        self.update_metrics(&batch, processing_time).await;
803
804        println!(
805            "🚀 Processed batch {} with {} changes in {:?}",
806            batch.batch_id,
807            batch.changes.len(),
808            processing_time
809        );
810
811        Ok(())
812    }
813
814    async fn process_entity_changes(&self, _changes: &[&ChangeRecord]) -> Result<()> {
815        // Implementation would go here - process entity changes efficiently
816        Ok(())
817    }
818
819    async fn process_relationship_changes(&self, _changes: &[&ChangeRecord]) -> Result<()> {
820        // Implementation would go here - process relationship changes efficiently
821        Ok(())
822    }
823
824    async fn process_embedding_changes(&self, _changes: &[&ChangeRecord]) -> Result<()> {
825        // Implementation would go here - process embedding changes efficiently
826        Ok(())
827    }
828
829    fn get_batch_key(&self, change: &ChangeRecord) -> String {
830        // Group changes by entity or document for batching efficiency
831        match (&change.entity_id, &change.document_id) {
832            (Some(entity_id), _) => format!("entity:{entity_id}"),
833            (None, Some(doc_id)) => format!("document:{doc_id}"),
834            _ => "global".to_string(),
835        }
836    }
837
838    async fn update_metrics(&self, batch: &PendingBatch, processing_time: Duration) {
839        let mut metrics = self.metrics.write();
840
841        metrics.total_batches_processed += 1;
842        metrics.total_changes_processed += batch.changes.len() as u64;
843
844        // Update running averages
845        let total_batches = metrics.total_batches_processed as f64;
846        metrics.average_batch_size = (metrics.average_batch_size * (total_batches - 1.0)
847            + batch.changes.len() as f64)
848            / total_batches;
849
850        let prev_avg_ms = metrics.average_processing_time.as_millis() as f64;
851        let new_avg_ms = (prev_avg_ms * (total_batches - 1.0) + processing_time.as_millis() as f64)
852            / total_batches;
853        metrics.average_processing_time = Duration::from_millis(new_avg_ms as u64);
854
855        // Calculate throughput
856        if processing_time.as_secs_f64() > 0.0 {
857            metrics.throughput_per_second =
858                batch.changes.len() as f64 / processing_time.as_secs_f64();
859        }
860
861        metrics.last_batch_processed = Some(Utc::now());
862    }
863
864    /// Gets the current batch processing metrics
865    pub fn get_metrics(&self) -> BatchMetrics {
866        self.metrics.read().clone()
867    }
868}
869
870// Clone impl for BatchProcessor (required for Arc usage)
871#[cfg(feature = "incremental")]
872impl Clone for BatchProcessor {
873    fn clone(&self) -> Self {
874        Self {
875            batch_size: self.batch_size,
876            max_wait_time: self.max_wait_time,
877            pending_batches: DashMap::new(), // New instance starts empty
878            processing_semaphore: Semaphore::new(self.processing_semaphore.available_permits()),
879            metrics: RwLock::new(self.get_metrics()),
880        }
881    }
882}
883
884// ============================================================================
885// Error Extensions
886// ============================================================================
887
888impl GraphRAGError {
889    /// Creates a conflict resolution error
890    pub fn conflict_resolution(message: String) -> Self {
891        GraphRAGError::GraphConstruction { message }
892    }
893
894    /// Creates an incremental update error
895    pub fn incremental_update(message: String) -> Self {
896        GraphRAGError::GraphConstruction { message }
897    }
898}