Skip to main content

graphrag_core/
pipeline_executor.rs

1//! Composable pipeline executor for GraphRAG operations
2//!
3//! Provides a step-by-step interface for building knowledge graphs,
4//! giving callers control over individual pipeline phases.
5//!
6//! # Example
7//!
8//! ```rust,no_run
9//! use graphrag_core::pipeline_executor::PipelineExecutor;
10//! use graphrag_core::GraphRAG;
11//!
12//! # async fn example() -> graphrag_core::Result<()> {
13//! let mut graphrag = GraphRAG::builder()
14//!     .with_output_dir("./output")
15//!     .with_hash_embeddings()
16//!     .build()?;
17//!
18//! let executor = PipelineExecutor::new(&mut graphrag);
19//! let report = executor.run_full_pipeline().await?;
20//!
21//! println!("Pipeline complete: {} entities, {} relationships",
22//!     report.entity_count, report.relationship_count);
23//! # Ok(())
24//! # }
25//! ```
26
27use crate::core::GraphRAGError;
28use crate::{GraphRAG, Result};
29
30/// Report produced after a pipeline run
31#[derive(Debug, Clone)]
32pub struct PipelineReport {
33    /// Number of entities extracted
34    pub entity_count: usize,
35    /// Number of relationships built
36    pub relationship_count: usize,
37    /// Number of text chunks processed
38    pub chunks_processed: usize,
39    /// Number of documents loaded
40    pub document_count: usize,
41    /// Pipeline approach used ("semantic", "algorithmic", "hybrid")
42    pub approach: String,
43    /// Total elapsed time in milliseconds
44    pub elapsed_ms: u128,
45}
46
47/// Composable pipeline executor
48///
49/// Wraps a `GraphRAG` instance and provides fine-grained control over
50/// the build pipeline phases.
51pub struct PipelineExecutor<'a> {
52    graphrag: &'a mut GraphRAG,
53}
54
55impl<'a> PipelineExecutor<'a> {
56    /// Create a new pipeline executor wrapping the given GraphRAG instance
57    pub fn new(graphrag: &'a mut GraphRAG) -> Self {
58        Self { graphrag }
59    }
60
61    /// Run the full pipeline (entity extraction → relationships → communities → summarization)
62    ///
63    /// This is equivalent to calling `GraphRAG::build_graph()` but returns a detailed report.
64    #[cfg(feature = "async")]
65    pub async fn run_full_pipeline(&mut self) -> Result<PipelineReport> {
66        let start = std::time::Instant::now();
67
68        // Verify we have documents
69        if !self.graphrag.has_documents() {
70            return Err(GraphRAGError::Config {
71                message: "No documents loaded. Add documents before running the pipeline."
72                    .to_string(),
73            });
74        }
75
76        // Delegate to GraphRAG's build_graph which handles all phases
77        self.graphrag.build_graph().await?;
78
79        let elapsed = start.elapsed().as_millis();
80
81        Ok(self.build_report(elapsed))
82    }
83
84    /// Run the full pipeline (synchronous version)
85    #[cfg(not(feature = "async"))]
86    pub fn run_full_pipeline(&mut self) -> Result<PipelineReport> {
87        let start = std::time::Instant::now();
88
89        if !self.graphrag.has_documents() {
90            return Err(GraphRAGError::Config {
91                message: "No documents loaded. Add documents before running the pipeline."
92                    .to_string(),
93            });
94        }
95
96        self.graphrag.build_graph()?;
97
98        let elapsed = start.elapsed().as_millis();
99
100        Ok(self.build_report(elapsed))
101    }
102
103    /// Add a document from text and run the full pipeline in one call
104    #[cfg(feature = "async")]
105    pub async fn ingest_and_build(&mut self, text: &str) -> Result<PipelineReport> {
106        self.graphrag.add_document_from_text(text)?;
107        self.run_full_pipeline().await
108    }
109
110    /// Add a document from text and run the full pipeline in one call (sync)
111    #[cfg(not(feature = "async"))]
112    pub fn ingest_and_build(&mut self, text: &str) -> Result<PipelineReport> {
113        self.graphrag.add_document_from_text(text)?;
114        self.run_full_pipeline()
115    }
116
117    /// Get a snapshot report of the current graph state without running any pipeline
118    pub fn current_state(&self) -> PipelineReport {
119        self.build_report(0)
120    }
121
122    /// Build report from current graph state
123    fn build_report(&self, elapsed_ms: u128) -> PipelineReport {
124        let kg = self.graphrag.knowledge_graph();
125        let (entity_count, relationship_count, chunks_processed, document_count) = match kg {
126            Some(kg) => (
127                kg.entities().count(),
128                kg.relationships().count(),
129                kg.chunks().count(),
130                kg.documents().count(),
131            ),
132            None => (0, 0, 0, 0),
133        };
134
135        PipelineReport {
136            entity_count,
137            relationship_count,
138            chunks_processed,
139            document_count,
140            approach: self.graphrag.config().approach.clone(),
141            elapsed_ms,
142        }
143    }
144}
145
146impl std::fmt::Display for PipelineReport {
147    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
148        write!(
149            f,
150            "Pipeline Report ({}): {} entities, {} relationships, {} chunks from {} docs [{}ms]",
151            self.approach,
152            self.entity_count,
153            self.relationship_count,
154            self.chunks_processed,
155            self.document_count,
156            self.elapsed_ms,
157        )
158    }
159}