graphrag_core/
pipeline_executor.rs1use crate::core::GraphRAGError;
28use crate::{GraphRAG, Result};
29
30#[derive(Debug, Clone)]
32pub struct PipelineReport {
33 pub entity_count: usize,
35 pub relationship_count: usize,
37 pub chunks_processed: usize,
39 pub document_count: usize,
41 pub approach: String,
43 pub elapsed_ms: u128,
45}
46
47pub struct PipelineExecutor<'a> {
52 graphrag: &'a mut GraphRAG,
53}
54
55impl<'a> PipelineExecutor<'a> {
56 pub fn new(graphrag: &'a mut GraphRAG) -> Self {
58 Self { graphrag }
59 }
60
61 #[cfg(feature = "async")]
65 pub async fn run_full_pipeline(&mut self) -> Result<PipelineReport> {
66 let start = std::time::Instant::now();
67
68 if !self.graphrag.has_documents() {
70 return Err(GraphRAGError::Config {
71 message: "No documents loaded. Add documents before running the pipeline."
72 .to_string(),
73 });
74 }
75
76 self.graphrag.build_graph().await?;
78
79 let elapsed = start.elapsed().as_millis();
80
81 Ok(self.build_report(elapsed))
82 }
83
84 #[cfg(not(feature = "async"))]
86 pub fn run_full_pipeline(&mut self) -> Result<PipelineReport> {
87 let start = std::time::Instant::now();
88
89 if !self.graphrag.has_documents() {
90 return Err(GraphRAGError::Config {
91 message: "No documents loaded. Add documents before running the pipeline."
92 .to_string(),
93 });
94 }
95
96 self.graphrag.build_graph()?;
97
98 let elapsed = start.elapsed().as_millis();
99
100 Ok(self.build_report(elapsed))
101 }
102
103 #[cfg(feature = "async")]
105 pub async fn ingest_and_build(&mut self, text: &str) -> Result<PipelineReport> {
106 self.graphrag.add_document_from_text(text)?;
107 self.run_full_pipeline().await
108 }
109
110 #[cfg(not(feature = "async"))]
112 pub fn ingest_and_build(&mut self, text: &str) -> Result<PipelineReport> {
113 self.graphrag.add_document_from_text(text)?;
114 self.run_full_pipeline()
115 }
116
117 pub fn current_state(&self) -> PipelineReport {
119 self.build_report(0)
120 }
121
122 fn build_report(&self, elapsed_ms: u128) -> PipelineReport {
124 let kg = self.graphrag.knowledge_graph();
125 let (entity_count, relationship_count, chunks_processed, document_count) = match kg {
126 Some(kg) => (
127 kg.entities().count(),
128 kg.relationships().count(),
129 kg.chunks().count(),
130 kg.documents().count(),
131 ),
132 None => (0, 0, 0, 0),
133 };
134
135 PipelineReport {
136 entity_count,
137 relationship_count,
138 chunks_processed,
139 document_count,
140 approach: self.graphrag.config().approach.clone(),
141 elapsed_ms,
142 }
143 }
144}
145
146impl std::fmt::Display for PipelineReport {
147 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
148 write!(
149 f,
150 "Pipeline Report ({}): {} entities, {} relationships, {} chunks from {} docs [{}ms]",
151 self.approach,
152 self.entity_count,
153 self.relationship_count,
154 self.chunks_processed,
155 self.document_count,
156 self.elapsed_ms,
157 )
158 }
159}