vectorless/index/pipeline/executor.rs
1// Copyright (c) 2026 vectorless developers
2// SPDX-License-Identifier: Apache-2.0
3
4//! Pipeline executor for running index stages.
5//!
6//! The executor uses [`PipelineOrchestrator`] internally for flexible
7//! stage management with priority-based ordering and dependency resolution.
8
9use tracing::info;
10
11use crate::error::Result;
12use crate::llm::LlmClient;
13
14use super::super::PipelineOptions;
15use super::super::stages::{
16 BuildStage, EnhanceStage, EnrichStage, IndexStage, OptimizeStage, ParseStage, PersistStage,
17};
18use super::context::{IndexInput, IndexResult};
19use super::orchestrator::PipelineOrchestrator;
20
21/// Pipeline executor for document indexing.
22///
23/// Uses [`PipelineOrchestrator`] internally for stage management.
24/// Supports both preset configurations and custom stage pipelines.
25///
26/// # Example
27///
28/// ```rust,ignore
29/// // Default pipeline
30/// let executor = PipelineExecutor::new();
31/// let result = executor.execute(input, options).await?;
32///
33/// // With LLM enhancement
34/// let executor = PipelineExecutor::with_llm(client);
35///
36/// // Custom pipeline using orchestrator
37/// let orchestrator = PipelineOrchestrator::new()
38/// .stage(ParseStage::new())
39/// .stage_with_priority(MyCustomStage::new(), 50)
40/// .stage(BuildStage::new());
41/// let executor = PipelineExecutor::from_orchestrator(orchestrator);
42/// ```
43pub struct PipelineExecutor {
44 orchestrator: PipelineOrchestrator,
45}
46
47impl PipelineExecutor {
48 /// Create a new pipeline executor with default stages.
49 ///
50 /// Default stages (in order):
51 /// 1. `parse` - Parse document into raw nodes
52 /// 2. `build` - Build tree structure
53 /// 3. `enrich` - Add metadata and cross-references
54 /// 4. `optimize` - Optimize tree structure
55 pub fn new() -> Self {
56 let orchestrator = PipelineOrchestrator::new()
57 .stage_with_priority(ParseStage::new(), 10)
58 .stage_with_priority(BuildStage::new(), 20)
59 .stage_with_priority(EnrichStage::new(), 40)
60 .stage_with_priority(OptimizeStage::new(), 60);
61
62 Self { orchestrator }
63 }
64
65 /// Create a pipeline with LLM enhancement.
66 ///
67 /// Stages (in order):
68 /// 1. `parse` - Parse document
69 /// 2. `build` - Build tree
70 /// 3. `enhance` - LLM-based enhancement (summaries)
71 /// 4. `enrich` - Add metadata
72 /// 5. `optimize` - Optimize tree
73 pub fn with_llm(client: LlmClient) -> Self {
74 let orchestrator = PipelineOrchestrator::new()
75 .stage_with_priority(ParseStage::new(), 10)
76 .stage_with_priority(BuildStage::new(), 20)
77 .stage_with_priority(EnhanceStage::with_llm_client(client), 30)
78 .stage_with_priority(EnrichStage::new(), 40)
79 .stage_with_priority(OptimizeStage::new(), 60);
80
81 Self { orchestrator }
82 }
83
84 /// Create from a custom orchestrator.
85 ///
86 /// Use this for full control over stage ordering and dependencies.
87 ///
88 /// # Example
89 ///
90 /// ```rust,ignore
91 /// let orchestrator = PipelineOrchestrator::new()
92 /// .stage_with_priority(ParseStage::new(), 10)
93 /// .stage_with_priority(MyAnalysisStage::new(), 25)
94 /// .stage_with_priority(BuildStage::new(), 20)
95 /// .stage_with_deps(MyValidationStage::new(), 50, &["build"]);
96 ///
97 /// let executor = PipelineExecutor::from_orchestrator(orchestrator);
98 /// ```
99 pub fn from_orchestrator(orchestrator: PipelineOrchestrator) -> Self {
100 Self { orchestrator }
101 }
102
103 /// Add a stage with default priority.
104 ///
105 /// The stage will be added after existing stages with the same priority.
106 pub fn add_stage(mut self, stage: impl IndexStage + 'static) -> Self {
107 self.orchestrator = self.orchestrator.stage(stage);
108 self
109 }
110
111 /// Add a stage with custom priority.
112 ///
113 /// Lower priority = earlier execution.
114 pub fn add_stage_with_priority(
115 mut self,
116 stage: impl IndexStage + 'static,
117 priority: i32,
118 ) -> Self {
119 self.orchestrator = self.orchestrator.stage_with_priority(stage, priority);
120 self
121 }
122
123 /// Add a stage with priority and dependencies.
124 ///
125 /// The stage will run after all specified dependencies.
126 pub fn add_stage_with_deps(
127 mut self,
128 stage: impl IndexStage + 'static,
129 priority: i32,
130 depends_on: &[&str],
131 ) -> Self {
132 self.orchestrator = self
133 .orchestrator
134 .stage_with_deps(stage, priority, depends_on);
135 self
136 }
137
138 /// Add persistence stage with async workspace.
139 pub fn with_persistence(mut self, workspace: crate::storage::Workspace) -> Self {
140 self.orchestrator = self
141 .orchestrator
142 .stage_with_priority(PersistStage::with_workspace(workspace), 80);
143 self
144 }
145
146 /// Get the list of stage names in execution order.
147 pub fn stage_names(&self) -> Result<Vec<&str>> {
148 self.orchestrator.stage_names()
149 }
150
151 /// Get the number of stages.
152 pub fn stage_count(&self) -> usize {
153 self.orchestrator.stage_count()
154 }
155
156 /// Execute the pipeline.
157 ///
158 /// Stages are executed in dependency-resolved order.
159 pub async fn execute(
160 &mut self,
161 input: IndexInput,
162 options: PipelineOptions,
163 ) -> Result<IndexResult> {
164 info!(
165 "Starting index pipeline with {} stages",
166 self.orchestrator.stage_count()
167 );
168 self.orchestrator.execute(input, options).await
169 }
170}
171
172impl Default for PipelineExecutor {
173 fn default() -> Self {
174 Self::new()
175 }
176}