1#![allow(unused_imports)]
2
3use crate::core::{
4 DocumentId, Entity, EntityId, GraphRAGError, KnowledgeGraph, Relationship, Result, TextChunk,
5};
6use chrono::{DateTime, Utc};
7use serde::{Deserialize, Serialize};
8use std::collections::{HashMap, HashSet};
9use std::time::{Duration, Instant};
10
11#[cfg(feature = "incremental")]
12use std::sync::Arc;
13
14#[cfg(feature = "incremental")]
15use {
16 dashmap::DashMap,
17 parking_lot::{Mutex, RwLock},
18 tokio::sync::{broadcast, Semaphore},
19 uuid::Uuid,
20};
21
22use super::*;
23
24#[cfg(feature = "incremental")]
30pub struct IncrementalGraphManager {
31 graph: Arc<RwLock<KnowledgeGraph>>,
32 change_log: DashMap<UpdateId, ChangeRecord>,
33 deltas: DashMap<UpdateId, GraphDelta>,
34 cache_invalidation: Arc<SelectiveInvalidation>,
35 conflict_resolver: Arc<ConflictResolver>,
36 monitor: Arc<UpdateMonitor>,
37 config: IncrementalConfig,
38}
39
40#[cfg(not(feature = "incremental"))]
41pub struct IncrementalGraphManager {
43 graph: KnowledgeGraph,
44 change_log: Vec<ChangeRecord>,
45 config: IncrementalConfig,
46}
47
48#[derive(Debug, Clone)]
50pub struct IncrementalConfig {
51 pub max_change_log_size: usize,
53 pub max_delta_size: usize,
55 pub conflict_strategy: ConflictStrategy,
57 pub enable_monitoring: bool,
59 pub cache_invalidation_strategy: String,
61 pub batch_size: usize,
63 pub max_concurrent_operations: usize,
65}
66
67impl Default for IncrementalConfig {
68 fn default() -> Self {
69 Self {
70 max_change_log_size: 10000,
71 max_delta_size: 1000,
72 conflict_strategy: ConflictStrategy::Merge,
73 enable_monitoring: true,
74 cache_invalidation_strategy: "selective".to_string(),
75 batch_size: 100,
76 max_concurrent_operations: 10,
77 }
78 }
79}
80
81#[cfg(feature = "incremental")]
82impl IncrementalGraphManager {
83 pub fn new(graph: KnowledgeGraph, config: IncrementalConfig) -> Self {
85 Self {
86 graph: Arc::new(RwLock::new(graph)),
87 change_log: DashMap::new(),
88 deltas: DashMap::new(),
89 cache_invalidation: Arc::new(SelectiveInvalidation::new()),
90 conflict_resolver: Arc::new(ConflictResolver::new(config.conflict_strategy.clone())),
91 monitor: Arc::new(UpdateMonitor::new()),
92 config,
93 }
94 }
95
96 pub fn with_conflict_resolver(mut self, resolver: ConflictResolver) -> Self {
98 self.conflict_resolver = Arc::new(resolver);
99 self
100 }
101
102 pub fn graph(&self) -> Arc<RwLock<KnowledgeGraph>> {
104 Arc::clone(&self.graph)
105 }
106
107 pub fn conflict_resolver(&self) -> Arc<ConflictResolver> {
109 Arc::clone(&self.conflict_resolver)
110 }
111
112 pub fn monitor(&self) -> Arc<UpdateMonitor> {
114 Arc::clone(&self.monitor)
115 }
116}
117
118#[cfg(not(feature = "incremental"))]
119impl IncrementalGraphManager {
120 pub fn new(graph: KnowledgeGraph, config: IncrementalConfig) -> Self {
122 Self {
123 graph,
124 change_log: Vec::new(),
125 config,
126 }
127 }
128
129 pub fn graph(&self) -> &KnowledgeGraph {
131 &self.graph
132 }
133
134 pub fn graph_mut(&mut self) -> &mut KnowledgeGraph {
136 &mut self.graph
137 }
138}
139
140impl IncrementalGraphManager {
142 pub fn create_change_record(
144 &self,
145 change_type: ChangeType,
146 operation: Operation,
147 change_data: ChangeData,
148 entity_id: Option<EntityId>,
149 document_id: Option<DocumentId>,
150 ) -> ChangeRecord {
151 ChangeRecord {
152 change_id: UpdateId::new(),
153 timestamp: Utc::now(),
154 change_type,
155 entity_id,
156 document_id,
157 operation,
158 data: change_data,
159 metadata: HashMap::new(),
160 }
161 }
162
163 pub fn config(&self) -> &IncrementalConfig {
165 &self.config
166 }
167
168 pub fn basic_upsert_entity(&mut self, entity: Entity) -> Result<UpdateId> {
170 let update_id = UpdateId::new();
171
172 #[cfg(feature = "incremental")]
173 {
174 let operation_id = self.monitor.start_operation("upsert_entity");
175 let mut graph = self.graph.write();
176
177 match graph.add_entity(entity.clone()) {
178 Ok(_) => {
179 let ent_id = entity.id.clone();
180 let change = self.create_change_record(
181 ChangeType::EntityAdded,
182 Operation::Upsert,
183 ChangeData::Entity(entity),
184 Some(ent_id),
185 None,
186 );
187 self.change_log.insert(change.change_id.clone(), change);
188 self.monitor
189 .complete_operation(&operation_id, true, None, 1, 0);
190 Ok(update_id)
191 },
192 Err(e) => {
193 self.monitor.complete_operation(
194 &operation_id,
195 false,
196 Some(e.to_string()),
197 0,
198 0,
199 );
200 Err(e)
201 },
202 }
203 }
204
205 #[cfg(not(feature = "incremental"))]
206 {
207 self.graph.add_entity(entity.clone())?;
208 let ent_id = entity.id.clone();
210 let change = self.create_change_record(
211 ChangeType::EntityAdded,
212 Operation::Upsert,
213 ChangeData::Entity(entity),
214 Some(ent_id),
215 None,
216 );
217 self.change_log.push(change);
218 Ok(update_id)
219 }
220 }
221}
222
223#[derive(Debug, Clone, Serialize, Deserialize)]
229pub struct IncrementalStatistics {
230 pub total_updates: usize,
232 pub successful_updates: usize,
234 pub failed_updates: usize,
236 pub entities_added: usize,
238 pub entities_updated: usize,
240 pub entities_removed: usize,
242 pub relationships_added: usize,
244 pub relationships_updated: usize,
246 pub relationships_removed: usize,
248 pub conflicts_resolved: usize,
250 pub cache_invalidations: usize,
252 pub average_update_time_ms: f64,
254 pub peak_updates_per_second: f64,
256 pub current_change_log_size: usize,
258 pub current_delta_count: usize,
260}
261
262impl IncrementalStatistics {
263 pub fn empty() -> Self {
265 Self {
266 total_updates: 0,
267 successful_updates: 0,
268 failed_updates: 0,
269 entities_added: 0,
270 entities_updated: 0,
271 entities_removed: 0,
272 relationships_added: 0,
273 relationships_updated: 0,
274 relationships_removed: 0,
275 conflicts_resolved: 0,
276 cache_invalidations: 0,
277 average_update_time_ms: 0.0,
278 peak_updates_per_second: 0.0,
279 current_change_log_size: 0,
280 current_delta_count: 0,
281 }
282 }
283
284 pub fn print(&self) {
286 println!("🔄 Incremental Updates Statistics");
287 println!(" Total updates: {}", self.total_updates);
288 println!(
289 " Successful: {} ({:.1}%)",
290 self.successful_updates,
291 if self.total_updates > 0 {
292 (self.successful_updates as f64 / self.total_updates as f64) * 100.0
293 } else {
294 0.0
295 }
296 );
297 println!(" Failed: {}", self.failed_updates);
298 println!(
299 " Entities: +{} ~{} -{}",
300 self.entities_added, self.entities_updated, self.entities_removed
301 );
302 println!(
303 " Relationships: +{} ~{} -{}",
304 self.relationships_added, self.relationships_updated, self.relationships_removed
305 );
306 println!(" Conflicts resolved: {}", self.conflicts_resolved);
307 println!(" Cache invalidations: {}", self.cache_invalidations);
308 println!(" Avg update time: {:.2}ms", self.average_update_time_ms);
309 println!(" Peak updates/sec: {:.1}", self.peak_updates_per_second);
310 println!(" Change log size: {}", self.current_change_log_size);
311 println!(" Active deltas: {}", self.current_delta_count);
312 }
313}
314
315#[cfg(feature = "incremental")]
316impl IncrementalGraphManager {
317 pub fn get_statistics(&self) -> IncrementalStatistics {
319 let perf_stats = self.monitor.get_performance_stats();
320 let invalidation_stats = self.cache_invalidation.get_invalidation_stats();
321
322 let mut entity_stats = (0, 0, 0); let mut relationship_stats = (0, 0, 0);
325 let conflicts_resolved = 0;
326
327 for change in self.change_log.iter() {
328 match change.value().change_type {
329 ChangeType::EntityAdded => entity_stats.0 += 1,
330 ChangeType::EntityUpdated => entity_stats.1 += 1,
331 ChangeType::EntityRemoved => entity_stats.2 += 1,
332 ChangeType::RelationshipAdded => relationship_stats.0 += 1,
333 ChangeType::RelationshipUpdated => relationship_stats.1 += 1,
334 ChangeType::RelationshipRemoved => relationship_stats.2 += 1,
335 _ => {},
336 }
337 }
338
339 IncrementalStatistics {
340 total_updates: perf_stats.total_operations as usize,
341 successful_updates: perf_stats.successful_operations as usize,
342 failed_updates: perf_stats.failed_operations as usize,
343 entities_added: entity_stats.0,
344 entities_updated: entity_stats.1,
345 entities_removed: entity_stats.2,
346 relationships_added: relationship_stats.0,
347 relationships_updated: relationship_stats.1,
348 relationships_removed: relationship_stats.2,
349 conflicts_resolved,
350 cache_invalidations: invalidation_stats.total_invalidations,
351 average_update_time_ms: perf_stats.average_operation_time.as_millis() as f64,
352 peak_updates_per_second: perf_stats.peak_operations_per_second,
353 current_change_log_size: self.change_log.len(),
354 current_delta_count: self.deltas.len(),
355 }
356 }
357}
358
359#[cfg(not(feature = "incremental"))]
360impl IncrementalGraphManager {
361 pub fn get_statistics(&self) -> IncrementalStatistics {
363 let mut stats = IncrementalStatistics::empty();
364 stats.current_change_log_size = self.change_log.len();
365
366 for change in &self.change_log {
367 match change.change_type {
368 ChangeType::EntityAdded => stats.entities_added += 1,
369 ChangeType::EntityUpdated => stats.entities_updated += 1,
370 ChangeType::EntityRemoved => stats.entities_removed += 1,
371 ChangeType::RelationshipAdded => stats.relationships_added += 1,
372 ChangeType::RelationshipUpdated => stats.relationships_updated += 1,
373 ChangeType::RelationshipRemoved => stats.relationships_removed += 1,
374 _ => {},
375 }
376 }
377
378 stats.total_updates = self.change_log.len();
379 stats.successful_updates = self.change_log.len(); stats
381 }
382}
383
384#[cfg(feature = "incremental")]
390#[allow(dead_code)]
391pub struct IncrementalPageRank {
392 pub(super) scores: DashMap<EntityId, f64>,
393 adjacency_changes: DashMap<EntityId, Vec<(EntityId, f64)>>, damping_factor: f64,
395 tolerance: f64,
396 max_iterations: usize,
397 last_full_computation: DateTime<Utc>,
398 incremental_threshold: usize, pending_changes: RwLock<usize>,
400}
401
402#[cfg(feature = "incremental")]
403impl IncrementalPageRank {
404 pub fn new(damping_factor: f64, tolerance: f64, max_iterations: usize) -> Self {
406 Self {
407 scores: DashMap::new(),
408 adjacency_changes: DashMap::new(),
409 damping_factor,
410 tolerance,
411 max_iterations,
412 last_full_computation: Utc::now(),
413 incremental_threshold: 1000,
414 pending_changes: RwLock::new(0),
415 }
416 }
417
418 pub async fn update_incremental(
420 &self,
421 changed_entities: &[EntityId],
422 graph: &KnowledgeGraph,
423 ) -> Result<()> {
424 let start = Instant::now();
425
426 {
428 let pending = *self.pending_changes.read();
429 if pending > self.incremental_threshold {
430 return self.full_recomputation(graph).await;
431 }
432 }
433
434 let mut affected_entities = HashSet::new();
436
437 for entity_id in changed_entities {
439 affected_entities.insert(entity_id.clone());
440
441 for (neighbor, _) in graph.get_neighbors(entity_id) {
443 affected_entities.insert(neighbor.id.clone());
444
445 for (second_hop, _) in graph.get_neighbors(&neighbor.id) {
447 affected_entities.insert(second_hop.id.clone());
448 }
449 }
450 }
451
452 self.localized_pagerank(&affected_entities, graph).await?;
454
455 *self.pending_changes.write() = 0;
457
458 let duration = start.elapsed();
459 println!(
460 "🔄 Incremental PageRank update completed in {:?} for {} entities",
461 duration,
462 affected_entities.len()
463 );
464
465 Ok(())
466 }
467
468 async fn full_recomputation(&self, graph: &KnowledgeGraph) -> Result<()> {
470 let start = Instant::now();
471
472 let entities: Vec<EntityId> = graph.entities().map(|e| e.id.clone()).collect();
474 let n = entities.len();
475
476 if n == 0 {
477 return Ok(());
478 }
479
480 let initial_score = 1.0 / n as f64;
482 for entity_id in &entities {
483 self.scores.insert(entity_id.clone(), initial_score);
484 }
485
486 for iteration in 0..self.max_iterations {
488 let mut new_scores = HashMap::new();
489 let mut max_diff: f64 = 0.0;
490
491 for entity_id in &entities {
492 let mut score = (1.0 - self.damping_factor) / n as f64;
493
494 for other_entity in &entities {
496 if let Some(weight) = self.get_edge_weight(other_entity, entity_id, graph) {
497 let other_score = self
498 .scores
499 .get(other_entity)
500 .map(|s| *s.value())
501 .unwrap_or(initial_score);
502 let out_degree = self.get_out_degree(other_entity, graph);
503
504 if out_degree > 0.0 {
505 score += self.damping_factor * other_score * weight / out_degree;
506 }
507 }
508 }
509
510 let old_score = self
511 .scores
512 .get(entity_id)
513 .map(|s| *s.value())
514 .unwrap_or(initial_score);
515 let diff = (score - old_score).abs();
516 max_diff = max_diff.max(diff);
517
518 new_scores.insert(entity_id.clone(), score);
519 }
520
521 for (entity_id, score) in new_scores {
523 self.scores.insert(entity_id, score);
524 }
525
526 if max_diff < self.tolerance {
528 println!(
529 "🎯 PageRank converged after {} iterations (diff: {:.6})",
530 iteration + 1,
531 max_diff
532 );
533 break;
534 }
535 }
536
537 let duration = start.elapsed();
538 println!("🔄 Full PageRank recomputation completed in {duration:?} for {n} entities");
539
540 Ok(())
541 }
542
543 async fn localized_pagerank(
545 &self,
546 entities: &HashSet<EntityId>,
547 graph: &KnowledgeGraph,
548 ) -> Result<()> {
549 let entity_vec: Vec<EntityId> = entities.iter().cloned().collect();
550 let n = entity_vec.len();
551
552 if n == 0 {
553 return Ok(());
554 }
555
556 for _iteration in 0..self.max_iterations {
558 let mut max_diff: f64 = 0.0;
559
560 for entity_id in &entity_vec {
561 let mut score = (1.0 - self.damping_factor) / n as f64;
562
563 for other_entity in &entity_vec {
565 if let Some(weight) = self.get_edge_weight(other_entity, entity_id, graph) {
566 let other_score = self
567 .scores
568 .get(other_entity)
569 .map(|s| *s.value())
570 .unwrap_or(1.0 / n as f64);
571 let out_degree =
572 self.get_localized_out_degree(other_entity, entities, graph);
573
574 if out_degree > 0.0 {
575 score += self.damping_factor * other_score * weight / out_degree;
576 }
577 }
578 }
579
580 let old_score = self
581 .scores
582 .get(entity_id)
583 .map(|s| *s.value())
584 .unwrap_or(1.0 / n as f64);
585 let diff = (score - old_score).abs();
586 max_diff = max_diff.max(diff);
587
588 self.scores.insert(entity_id.clone(), score);
589 }
590
591 if max_diff < self.tolerance {
593 break;
594 }
595 }
596
597 Ok(())
598 }
599
600 fn get_edge_weight(
601 &self,
602 from: &EntityId,
603 to: &EntityId,
604 graph: &KnowledgeGraph,
605 ) -> Option<f64> {
606 for (neighbor, relationship) in graph.get_neighbors(from) {
608 if neighbor.id == *to {
609 return Some(relationship.confidence as f64);
610 }
611 }
612 None
613 }
614
615 fn get_out_degree(&self, entity_id: &EntityId, graph: &KnowledgeGraph) -> f64 {
616 graph
617 .get_neighbors(entity_id)
618 .iter()
619 .map(|(_, rel)| rel.confidence as f64)
620 .sum()
621 }
622
623 fn get_localized_out_degree(
624 &self,
625 entity_id: &EntityId,
626 subset: &HashSet<EntityId>,
627 graph: &KnowledgeGraph,
628 ) -> f64 {
629 graph
630 .get_neighbors(entity_id)
631 .iter()
632 .filter(|(neighbor, _)| subset.contains(&neighbor.id))
633 .map(|(_, rel)| rel.confidence as f64)
634 .sum()
635 }
636
637 pub fn get_score(&self, entity_id: &EntityId) -> Option<f64> {
639 self.scores.get(entity_id).map(|s| *s.value())
640 }
641
642 pub fn get_top_entities(&self, k: usize) -> Vec<(EntityId, f64)> {
644 let mut entities: Vec<(EntityId, f64)> = self
645 .scores
646 .iter()
647 .map(|entry| (entry.key().clone(), *entry.value()))
648 .collect();
649
650 entities.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
651 entities.truncate(k);
652 entities
653 }
654
655 pub fn record_change(&self, _entity_id: EntityId) {
657 *self.pending_changes.write() += 1;
658 }
659}
660
661#[cfg(feature = "incremental")]
667pub struct BatchProcessor {
668 batch_size: usize,
669 max_wait_time: Duration,
670 pending_batches: DashMap<String, PendingBatch>,
671 processing_semaphore: Semaphore,
672 metrics: RwLock<BatchMetrics>,
673}
674
675#[derive(Debug, Clone)]
676#[allow(dead_code)]
677struct PendingBatch {
678 changes: Vec<ChangeRecord>,
679 created_at: Instant,
680 batch_id: String,
681}
682
683#[derive(Debug, Clone)]
685pub struct BatchMetrics {
686 pub total_batches_processed: u64,
688 pub total_changes_processed: u64,
690 pub average_batch_size: f64,
692 pub average_processing_time: Duration,
694 pub throughput_per_second: f64,
696 pub last_batch_processed: Option<DateTime<Utc>>,
698}
699
700#[cfg(feature = "incremental")]
701impl BatchProcessor {
702 pub fn new(batch_size: usize, max_wait_time: Duration, max_concurrent_batches: usize) -> Self {
704 Self {
705 batch_size,
706 max_wait_time,
707 pending_batches: DashMap::new(),
708 processing_semaphore: Semaphore::new(max_concurrent_batches),
709 metrics: RwLock::new(BatchMetrics {
710 total_batches_processed: 0,
711 total_changes_processed: 0,
712 average_batch_size: 0.0,
713 average_processing_time: Duration::from_millis(0),
714 throughput_per_second: 0.0,
715 last_batch_processed: None,
716 }),
717 }
718 }
719
720 pub async fn add_change(&self, change: ChangeRecord) -> Result<String> {
722 let batch_key = self.get_batch_key(&change);
723
724 let batch_id = {
725 let mut entry = self
726 .pending_batches
727 .entry(batch_key.clone())
728 .or_insert_with(|| PendingBatch {
729 changes: Vec::new(),
730 created_at: Instant::now(),
731 batch_id: format!("batch_{}", Uuid::new_v4()),
732 });
733
734 entry.changes.push(change);
735 let should_process = entry.changes.len() >= self.batch_size
736 || entry.created_at.elapsed() > self.max_wait_time;
737
738 let batch_id = entry.batch_id.clone();
739
740 if should_process {
741 let batch = entry.clone();
743 self.pending_batches.remove(&batch_key);
744
745 let processor = Arc::new(self.clone());
747 tokio::spawn(async move {
748 if let Err(e) = processor.process_batch(batch).await {
749 eprintln!("Batch processing error: {e}");
750 }
751 });
752 }
753
754 batch_id
755 };
756
757 Ok(batch_id)
758 }
759
760 async fn process_batch(&self, batch: PendingBatch) -> Result<()> {
761 let _permit = self.processing_semaphore.acquire().await.map_err(|_| {
762 GraphRAGError::IncrementalUpdate {
763 message: "Failed to acquire processing permit".to_string(),
764 }
765 })?;
766
767 let start = Instant::now();
768
769 let mut entity_changes = Vec::new();
771 let mut relationship_changes = Vec::new();
772 let mut embedding_changes = Vec::new();
773
774 for change in &batch.changes {
775 match &change.change_type {
776 ChangeType::EntityAdded | ChangeType::EntityUpdated | ChangeType::EntityRemoved => {
777 entity_changes.push(change);
778 },
779 ChangeType::RelationshipAdded
780 | ChangeType::RelationshipUpdated
781 | ChangeType::RelationshipRemoved => {
782 relationship_changes.push(change);
783 },
784 ChangeType::EmbeddingAdded
785 | ChangeType::EmbeddingUpdated
786 | ChangeType::EmbeddingRemoved => {
787 embedding_changes.push(change);
788 },
789 _ => {},
790 }
791 }
792
793 self.process_entity_changes(&entity_changes).await?;
795 self.process_relationship_changes(&relationship_changes)
796 .await?;
797 self.process_embedding_changes(&embedding_changes).await?;
798
799 let processing_time = start.elapsed();
800
801 self.update_metrics(&batch, processing_time).await;
803
804 println!(
805 "🚀 Processed batch {} with {} changes in {:?}",
806 batch.batch_id,
807 batch.changes.len(),
808 processing_time
809 );
810
811 Ok(())
812 }
813
814 async fn process_entity_changes(&self, _changes: &[&ChangeRecord]) -> Result<()> {
815 Ok(())
817 }
818
819 async fn process_relationship_changes(&self, _changes: &[&ChangeRecord]) -> Result<()> {
820 Ok(())
822 }
823
824 async fn process_embedding_changes(&self, _changes: &[&ChangeRecord]) -> Result<()> {
825 Ok(())
827 }
828
829 fn get_batch_key(&self, change: &ChangeRecord) -> String {
830 match (&change.entity_id, &change.document_id) {
832 (Some(entity_id), _) => format!("entity:{entity_id}"),
833 (None, Some(doc_id)) => format!("document:{doc_id}"),
834 _ => "global".to_string(),
835 }
836 }
837
838 async fn update_metrics(&self, batch: &PendingBatch, processing_time: Duration) {
839 let mut metrics = self.metrics.write();
840
841 metrics.total_batches_processed += 1;
842 metrics.total_changes_processed += batch.changes.len() as u64;
843
844 let total_batches = metrics.total_batches_processed as f64;
846 metrics.average_batch_size = (metrics.average_batch_size * (total_batches - 1.0)
847 + batch.changes.len() as f64)
848 / total_batches;
849
850 let prev_avg_ms = metrics.average_processing_time.as_millis() as f64;
851 let new_avg_ms = (prev_avg_ms * (total_batches - 1.0) + processing_time.as_millis() as f64)
852 / total_batches;
853 metrics.average_processing_time = Duration::from_millis(new_avg_ms as u64);
854
855 if processing_time.as_secs_f64() > 0.0 {
857 metrics.throughput_per_second =
858 batch.changes.len() as f64 / processing_time.as_secs_f64();
859 }
860
861 metrics.last_batch_processed = Some(Utc::now());
862 }
863
864 pub fn get_metrics(&self) -> BatchMetrics {
866 self.metrics.read().clone()
867 }
868}
869
870#[cfg(feature = "incremental")]
872impl Clone for BatchProcessor {
873 fn clone(&self) -> Self {
874 Self {
875 batch_size: self.batch_size,
876 max_wait_time: self.max_wait_time,
877 pending_batches: DashMap::new(), processing_semaphore: Semaphore::new(self.processing_semaphore.available_permits()),
879 metrics: RwLock::new(self.get_metrics()),
880 }
881 }
882}
883
884impl GraphRAGError {
889 pub fn conflict_resolution(message: String) -> Self {
891 GraphRAGError::GraphConstruction { message }
892 }
893
894 pub fn incremental_update(message: String) -> Self {
896 GraphRAGError::GraphConstruction { message }
897 }
898}