1#![allow(unused_imports)]
2
3use crate::config::Config;
4use crate::core::{
5 ChunkId, Document, DocumentId, Entity, EntityId, GraphRAGError, KnowledgeGraph, Relationship,
6 Result, TextChunk,
7};
8use crate::{critic, ollama, persistence, query, retrieval};
9
10#[cfg(feature = "parallel-processing")]
11#[allow(unused_imports)]
12use crate::parallel;
13
14use super::GraphRAG;
15
16impl GraphRAG {
17 #[cfg(feature = "async")]
28 pub async fn build_graph(&mut self) -> Result<()> {
29 use indicatif::{ProgressBar, ProgressDrawTarget, ProgressStyle};
30
31 let suppress = self.config.suppress_progress_bars;
34 let make_pb = move |total: u64, style: ProgressStyle| -> ProgressBar {
35 let pb = ProgressBar::new(total).with_style(style);
36 if suppress {
37 pb.set_draw_target(ProgressDrawTarget::hidden());
38 }
39 pb
40 };
41
42 let graph = self
43 .knowledge_graph
44 .as_mut()
45 .ok_or_else(|| GraphRAGError::Config {
46 message: "Knowledge graph not initialized".to_string(),
47 })?;
48
49 let chunks: Vec<_> = graph.chunks().cloned().collect();
50 let total_chunks = chunks.len();
51
52 #[cfg(feature = "tracing")]
60 tracing::info!(
61 "build_graph() - Config state: approach='{}', use_gleaning={}, ollama.enabled={}",
62 self.config.approach,
63 self.config.entities.use_gleaning,
64 self.config.ollama.enabled
65 );
66
67 if self.config.entities.use_gleaning && self.config.ollama.enabled {
68 #[cfg(feature = "async")]
70 {
71 use crate::entity::GleaningEntityExtractor;
72 use crate::ollama::OllamaClient;
73
74 #[cfg(feature = "tracing")]
75 tracing::info!(
76 "Using LLM-based entity extraction with gleaning (max_rounds: {})",
77 self.config.entities.max_gleaning_rounds
78 );
79
80 let client = OllamaClient::new(self.config.ollama.clone());
82
83 let gleaning_config = crate::entity::GleaningConfig {
85 max_gleaning_rounds: self.config.entities.max_gleaning_rounds,
86 completion_threshold: 0.8,
87 entity_confidence_threshold: self.config.entities.min_confidence as f64,
88 use_llm_completion_check: true,
89 entity_types: if self.config.entities.entity_types.is_empty() {
90 vec![
91 "PERSON".to_string(),
92 "ORGANIZATION".to_string(),
93 "LOCATION".to_string(),
94 ]
95 } else {
96 self.config.entities.entity_types.clone()
97 },
98 temperature: 0.1,
99 max_tokens: 1500,
100 };
101
102 let extractor = GleaningEntityExtractor::new(client.clone(), gleaning_config);
104
105 let rel_extractor = if self.config.entities.enable_triple_reflection {
107 Some(crate::entity::LLMRelationshipExtractor::new(Some(
108 &self.config.ollama,
109 ))?)
110 } else {
111 None
112 };
113
114 let pb = make_pb(total_chunks as u64,
115 ProgressStyle::default_bar()
116 .template(" [{elapsed_precise}] [{bar:40.cyan/blue}] {pos}/{len} chunks ({eta})")
117 .expect("Invalid progress bar template")
118 .progress_chars("=>-")
119 );
120 pb.set_message("Extracting entities with LLM");
121
122 for (idx, chunk) in chunks.iter().enumerate() {
124 pb.set_message(format!(
125 "Chunk {}/{} (gleaning with {} rounds)",
126 idx + 1,
127 total_chunks,
128 self.config.entities.max_gleaning_rounds
129 ));
130
131 #[cfg(feature = "tracing")]
132 tracing::info!("Processing chunk {}/{} (LLM)", idx + 1, total_chunks);
133
134 let (entities, relationships) = extractor.extract_with_gleaning(chunk).await?;
135
136 let entity_map: std::collections::HashMap<_, _> = entities
138 .iter()
139 .map(|e| (e.id.clone(), e.name.clone()))
140 .collect();
141
142 for entity in entities {
144 graph.add_entity(entity)?;
145 }
146
147 if let Some(ref validator) = rel_extractor {
149 #[cfg(feature = "tracing")]
150 tracing::info!(
151 "Triple reflection enabled: validating {} relationships",
152 relationships.len()
153 );
154
155 let mut validated_count = 0;
156 let mut filtered_count = 0;
157
158 for relationship in relationships {
159 let source_name = entity_map
161 .get(&relationship.source)
162 .or_else(|| {
163 graph
164 .entities()
165 .find(|e| e.id == relationship.source)
166 .map(|e| &e.name)
167 })
168 .map(|s| s.as_str())
169 .unwrap_or(relationship.source.0.as_str());
170 let target_name = entity_map
171 .get(&relationship.target)
172 .or_else(|| {
173 graph
174 .entities()
175 .find(|e| e.id == relationship.target)
176 .map(|e| &e.name)
177 })
178 .map(|s| s.as_str())
179 .unwrap_or(relationship.target.0.as_str());
180
181 match validator
183 .validate_triple(
184 source_name,
185 &relationship.relation_type,
186 target_name,
187 &chunk.content,
188 )
189 .await
190 {
191 Ok(validation) => {
192 if validation.is_valid
193 && validation.confidence
194 >= self.config.entities.validation_min_confidence
195 {
196 if let Err(e) = graph.add_relationship(relationship) {
198 #[cfg(feature = "tracing")]
199 tracing::debug!(
200 "Failed to add validated relationship: {}",
201 e
202 );
203 } else {
204 validated_count += 1;
205 }
206 } else {
207 filtered_count += 1;
209 #[cfg(feature = "tracing")]
210 tracing::debug!(
211 "Filtered relationship {} --[{}]--> {} (valid={}, conf={:.2}): {}",
212 source_name, relationship.relation_type, target_name,
213 validation.is_valid, validation.confidence, validation.reason
214 );
215 }
216 },
217 Err(e) => {
218 #[cfg(feature = "tracing")]
220 tracing::warn!(
221 "Validation error, adding relationship anyway: {}",
222 e
223 );
224 let _ = graph.add_relationship(relationship);
225 },
226 }
227 }
228
229 #[cfg(feature = "tracing")]
230 tracing::info!(
231 "Triple reflection complete: {} validated, {} filtered",
232 validated_count,
233 filtered_count
234 );
235 } else {
236 for relationship in relationships {
238 if let Err(e) = graph.add_relationship(relationship) {
239 #[cfg(feature = "tracing")]
240 tracing::warn!(
241 "Failed to add relationship: {} -> {} ({}). Error: {}",
242 e.to_string().split("entity ").nth(1).unwrap_or("unknown"),
243 e.to_string().split("entity ").nth(2).unwrap_or("unknown"),
244 "relationship",
245 e
246 );
247 }
248 }
249 }
250
251 pb.inc(1);
252 }
253
254 pb.finish_with_message("Entity extraction complete");
255
256 if self.config.entities.use_atomic_facts {
258 use crate::entity::AtomicFactExtractor;
259
260 #[cfg(feature = "tracing")]
261 tracing::info!("Starting atomic fact extraction (ATOM methodology)");
262
263 let atomic_extractor = AtomicFactExtractor::new(client.clone())
264 .with_max_tokens(self.config.entities.max_fact_tokens);
265
266 let pb_atomic = make_pb(total_chunks as u64,
267 ProgressStyle::default_bar()
268 .template(" [{elapsed_precise}] [{bar:40.magenta/blue}] {pos}/{len} atomic facts ({eta})")
269 .expect("Invalid progress bar template")
270 .progress_chars("=>-")
271 );
272 pb_atomic.set_message("Extracting atomic facts");
273
274 let mut total_facts = 0;
275 let mut total_atomic_entities = 0;
276 let mut total_atomic_relationships = 0;
277
278 for (idx, chunk) in chunks.iter().enumerate() {
279 pb_atomic.set_message(format!(
280 "Chunk {}/{} (extracting atomic facts)",
281 idx + 1,
282 total_chunks
283 ));
284
285 #[cfg(feature = "tracing")]
286 tracing::info!("Processing chunk {}/{} (Atomic)", idx + 1, total_chunks);
287
288 match atomic_extractor.extract_atomic_facts(chunk).await {
289 Ok(facts) => {
290 total_facts += facts.len();
291
292 let (atomic_entities, atomic_relationships) =
294 atomic_extractor.atomics_to_graph_elements(facts, &chunk.id);
295
296 total_atomic_entities += atomic_entities.len();
297 total_atomic_relationships += atomic_relationships.len();
298
299 for entity in atomic_entities {
301 if let Err(e) = graph.add_entity(entity) {
302 #[cfg(feature = "tracing")]
303 tracing::debug!("Failed to add atomic entity: {}", e);
304 }
305 }
306
307 for relationship in atomic_relationships {
309 if let Err(e) = graph.add_relationship(relationship) {
310 #[cfg(feature = "tracing")]
311 tracing::debug!("Failed to add atomic relationship: {}", e);
312 }
313 }
314 },
315 Err(e) => {
316 #[cfg(feature = "tracing")]
317 tracing::warn!(
318 chunk_id = %chunk.id,
319 error = %e,
320 "Atomic fact extraction failed for chunk"
321 );
322 },
323 }
324
325 pb_atomic.inc(1);
326 }
327
328 pb_atomic.finish_with_message(format!(
329 "Atomic extraction complete: {} facts → {} entities, {} relationships",
330 total_facts, total_atomic_entities, total_atomic_relationships
331 ));
332
333 #[cfg(feature = "tracing")]
334 tracing::info!(
335 facts_extracted = total_facts,
336 atomic_entities = total_atomic_entities,
337 atomic_relationships = total_atomic_relationships,
338 "ATOM atomic fact extraction complete"
339 );
340 }
341 }
342 } else if self.config.ollama.enabled {
343 #[cfg(feature = "async")]
349 {
350 use crate::entity::llm_extractor::LLMEntityExtractor;
351 use crate::ollama::OllamaClient;
352
353 #[cfg(feature = "tracing")]
354 tracing::info!(
355 "Using LLM single-pass entity extraction (no gleaning, keep_alive={:?})",
356 self.config.ollama.keep_alive,
357 );
358
359 let client = OllamaClient::new(self.config.ollama.clone());
360 let entity_types = if self.config.entities.entity_types.is_empty() {
361 vec![
362 "PERSON".to_string(),
363 "ORGANIZATION".to_string(),
364 "LOCATION".to_string(),
365 ]
366 } else {
367 self.config.entities.entity_types.clone()
368 };
369
370 let extractor = LLMEntityExtractor::new(client, entity_types)
371 .with_temperature(self.config.ollama.temperature.unwrap_or(0.1))
372 .with_max_tokens(self.config.ollama.max_tokens.unwrap_or(1500) as usize)
373 .with_keep_alive(self.config.ollama.keep_alive.clone());
374
375 let pb = make_pb(total_chunks as u64,
376 ProgressStyle::default_bar()
377 .template(" [{elapsed_precise}] [{bar:40.cyan/blue}] {pos}/{len} chunks ({eta})")
378 .expect("Invalid progress bar template")
379 .progress_chars("=>-"),
380 );
381 pb.set_message("Extracting entities with LLM (single-pass)");
382
383 for (idx, chunk) in chunks.iter().enumerate() {
384 pb.set_message(format!(
385 "Chunk {}/{} (LLM single-pass)",
386 idx + 1,
387 total_chunks
388 ));
389
390 #[cfg(feature = "tracing")]
391 tracing::info!(
392 "Processing chunk {}/{} (LLM single-pass)",
393 idx + 1,
394 total_chunks
395 );
396
397 match extractor.extract_from_chunk(chunk).await {
398 Ok((entities, relationships)) => {
399 for entity in entities {
400 if let Err(e) = graph.add_entity(entity) {
401 #[cfg(feature = "tracing")]
402 tracing::debug!("Failed to add entity: {}", e);
403 }
404 }
405 for relationship in relationships {
406 if let Err(e) = graph.add_relationship(relationship) {
407 #[cfg(feature = "tracing")]
408 tracing::debug!("Failed to add relationship: {}", e);
409 }
410 }
411 },
412 Err(e) => {
413 #[cfg(feature = "tracing")]
414 tracing::warn!(
415 chunk_id = %chunk.id,
416 error = %e,
417 "LLM extraction failed for chunk, skipping"
418 );
419 },
420 }
421
422 pb.inc(1);
423 }
424
425 pb.finish_with_message("LLM single-pass extraction complete");
426 }
427 } else if self.config.gliner.enabled {
428 #[cfg(feature = "gliner")]
436 {
437 use crate::entity::GLiNERExtractor;
438 use std::sync::Arc;
439
440 let extractor = Arc::new(
441 GLiNERExtractor::new(self.config.gliner.clone()).map_err(|e| {
442 crate::core::error::GraphRAGError::EntityExtraction {
443 message: format!("GLiNER init failed: {e}"),
444 }
445 })?,
446 );
447
448 let pb = make_pb(total_chunks as u64,
449 ProgressStyle::default_bar()
450 .template(
451 " [{elapsed_precise}] [{bar:40.magenta/blue}] {pos}/{len} chunks ({eta})",
452 )
453 .expect("Invalid progress bar template")
454 .progress_chars("=>-"),
455 );
456 pb.set_message("Extracting entities with GLiNER-Relex");
457
458 let parallelism = self.config.gliner.max_concurrent_chunks.unwrap_or(4).max(1);
461
462 use futures::stream::{self, StreamExt};
463 let mut stream = stream::iter(chunks.iter().cloned())
464 .map(|chunk| {
465 let ext = Arc::clone(&extractor);
466 let chunk_id = chunk.id.clone();
467 async move {
468 let r =
469 tokio::task::spawn_blocking(move || ext.extract_from_chunk(&chunk))
470 .await;
471 (chunk_id, r)
472 }
473 })
474 .buffer_unordered(parallelism);
475
476 while let Some((chunk_id, join_result)) = stream.next().await {
477 let result = join_result.map_err(|e| {
478 crate::core::error::GraphRAGError::EntityExtraction {
479 message: format!("spawn_blocking join error: {e}"),
480 }
481 })?;
482 match result {
483 Ok((entities, relationships)) => {
484 for entity in entities {
485 if let Err(e) = graph.add_entity(entity) {
486 #[cfg(feature = "tracing")]
487 tracing::debug!("GLiNER: failed to add entity: {}", e);
488 }
489 }
490 for rel in relationships {
491 if let Err(e) = graph.add_relationship(rel) {
492 #[cfg(feature = "tracing")]
493 tracing::debug!("GLiNER: failed to add relationship: {}", e);
494 }
495 }
496 },
497 Err(e) => {
498 #[cfg(feature = "tracing")]
499 tracing::warn!(
500 chunk_id = %chunk_id,
501 error = %e,
502 "GLiNER extraction failed for chunk, skipping"
503 );
504 },
505 }
506 pb.inc(1);
507 }
508
509 pb.finish_with_message("GLiNER-Relex extraction complete");
510 }
511 #[cfg(not(feature = "gliner"))]
512 return Err(crate::core::error::GraphRAGError::Config {
513 message: "GLiNER enabled in config but crate compiled without --features gliner"
514 .into(),
515 });
516 } else {
517 use crate::entity::EntityExtractor;
519
520 #[cfg(feature = "tracing")]
521 tracing::info!("Using pattern-based entity extraction");
522
523 let extractor = EntityExtractor::new(self.config.entities.min_confidence)?;
524
525 let pb = make_pb(
527 total_chunks as u64,
528 ProgressStyle::default_bar()
529 .template(
530 " [{elapsed_precise}] [{bar:40.green/blue}] {pos}/{len} chunks ({eta})",
531 )
532 .expect("Invalid progress bar template")
533 .progress_chars("=>-"),
534 );
535 pb.set_message("Extracting entities (pattern-based)");
536
537 for (idx, chunk) in chunks.iter().enumerate() {
538 pb.set_message(format!(
539 "Chunk {}/{} (pattern-based)",
540 idx + 1,
541 total_chunks
542 ));
543
544 #[cfg(feature = "tracing")]
545 tracing::info!("Processing chunk {}/{} (Pattern)", idx + 1, total_chunks);
546
547 let entities = extractor.extract_from_chunk(chunk)?;
548 for entity in entities {
549 graph.add_entity(entity)?;
550 }
551
552 pb.inc(1);
553 }
554
555 pb.finish_with_message("Entity extraction complete");
556
557 if self.config.graph.extract_relationships {
561 let all_entities: Vec<_> = graph.entities().cloned().collect();
562
563 let rel_pb = make_pb(total_chunks as u64,
565 ProgressStyle::default_bar()
566 .template(" [{elapsed_precise}] [{bar:40.yellow/blue}] {pos}/{len} chunks ({eta})")
567 .expect("Invalid progress bar template")
568 .progress_chars("=>-")
569 );
570 rel_pb.set_message("Extracting relationships");
571
572 for (idx, chunk) in chunks.iter().enumerate() {
573 rel_pb.set_message(format!(
574 "Chunk {}/{} (relationships)",
575 idx + 1,
576 total_chunks
577 ));
578 let chunk_entities: Vec<_> = all_entities
580 .iter()
581 .filter(|e| e.mentions.iter().any(|m| m.chunk_id == chunk.id))
582 .cloned()
583 .collect();
584
585 if chunk_entities.len() < 2 {
586 rel_pb.inc(1);
587 continue; }
589
590 let relationships = extractor.extract_relationships(&chunk_entities, chunk)?;
592
593 for (source_id, target_id, relation_type) in relationships {
595 let relationship = Relationship {
596 source: source_id.clone(),
597 target: target_id.clone(),
598 relation_type: relation_type.clone(),
599 confidence: self.config.graph.relationship_confidence_threshold,
600 context: vec![chunk.id.clone()],
601 embedding: None,
602 temporal_type: None,
603 temporal_range: None,
604 causal_strength: None,
605 };
606
607 if let Err(_e) = graph.add_relationship(relationship) {
609 #[cfg(feature = "tracing")]
610 tracing::debug!(
611 "Failed to add relationship: {} -> {} ({}). Error: {}",
612 source_id,
613 target_id,
614 relation_type,
615 _e
616 );
617 }
618 }
619
620 rel_pb.inc(1);
621 }
622
623 rel_pb.finish_with_message("Relationship extraction complete");
624 } } self.save_to_workspace()?;
629
630 Ok(())
631 }
632
633 #[cfg(not(feature = "async"))]
638 pub fn build_graph(&mut self) -> Result<()> {
639 use crate::entity::EntityExtractor;
640
641 let graph = self
642 .knowledge_graph
643 .as_mut()
644 .ok_or_else(|| GraphRAGError::Config {
645 message: "Knowledge graph not initialized".to_string(),
646 })?;
647
648 let chunks: Vec<_> = graph.chunks().cloned().collect();
649
650 #[cfg(feature = "tracing")]
651 tracing::info!("Using pattern-based entity extraction (sync mode)");
652
653 let extractor = EntityExtractor::new(self.config.entities.min_confidence)?;
654
655 for chunk in &chunks {
656 let entities = extractor.extract_from_chunk(chunk)?;
657 for entity in entities {
658 graph.add_entity(entity)?;
659 }
660 }
661
662 if self.config.graph.extract_relationships {
664 let all_entities: Vec<_> = graph.entities().cloned().collect();
665
666 for chunk in &chunks {
667 let chunk_entities: Vec<_> = all_entities
668 .iter()
669 .filter(|e| e.mentions.iter().any(|m| m.chunk_id == chunk.id))
670 .cloned()
671 .collect();
672
673 if chunk_entities.len() < 2 {
674 continue;
675 }
676
677 let relationships = extractor.extract_relationships(&chunk_entities, chunk)?;
678
679 for (source_id, target_id, relation_type) in relationships {
680 let relationship = Relationship {
681 source: source_id.clone(),
682 target: target_id.clone(),
683 relation_type: relation_type.clone(),
684 confidence: self.config.graph.relationship_confidence_threshold,
685 context: vec![chunk.id.clone()],
686 embedding: None,
687 temporal_type: None,
688 temporal_range: None,
689 causal_strength: None,
690 };
691
692 if let Err(_e) = graph.add_relationship(relationship) {
693 #[cfg(feature = "tracing")]
694 tracing::debug!(
695 "Failed to add relationship: {} -> {} ({}). Error: {}",
696 source_id,
697 target_id,
698 relation_type,
699 _e
700 );
701 }
702 }
703 }
704 }
705
706 Ok(())
707 }
708}