Skip to main content

vectorless/index/pipeline/
executor.rs

1// Copyright (c) 2026 vectorless developers
2// SPDX-License-Identifier: Apache-2.0
3
4//! Pipeline executor for running index stages.
5//!
6//! The executor uses [`PipelineOrchestrator`] internally for flexible
7//! stage management with priority-based ordering and dependency resolution.
8
9use tracing::info;
10
11use crate::error::Result;
12use crate::llm::LlmClient;
13
14use super::super::PipelineOptions;
15use super::super::stages::{
16    BuildStage, EnhanceStage, EnrichStage, IndexStage, OptimizeStage, ParseStage, PersistStage,
17};
18use super::context::{IndexInput, IndexResult};
19use super::orchestrator::PipelineOrchestrator;
20
21/// Pipeline executor for document indexing.
22///
23/// Uses [`PipelineOrchestrator`] internally for stage management.
24/// Supports both preset configurations and custom stage pipelines.
25///
26/// # Example
27///
28/// ```rust,ignore
29/// // Default pipeline
30/// let executor = PipelineExecutor::new();
31/// let result = executor.execute(input, options).await?;
32///
33/// // With LLM enhancement
34/// let executor = PipelineExecutor::with_llm(client);
35///
36/// // Custom pipeline using orchestrator
37/// let orchestrator = PipelineOrchestrator::new()
38///     .stage(ParseStage::new())
39///     .stage_with_priority(MyCustomStage::new(), 50)
40///     .stage(BuildStage::new());
41/// let executor = PipelineExecutor::from_orchestrator(orchestrator);
42/// ```
43pub struct PipelineExecutor {
44    orchestrator: PipelineOrchestrator,
45}
46
47impl PipelineExecutor {
48    /// Create a new pipeline executor with default stages.
49    ///
50    /// Default stages (in order):
51    /// 1. `parse` - Parse document into raw nodes
52    /// 2. `build` - Build tree structure
53    /// 3. `enrich` - Add metadata and cross-references
54    /// 4. `optimize` - Optimize tree structure
55    pub fn new() -> Self {
56        let orchestrator = PipelineOrchestrator::new()
57            .stage_with_priority(ParseStage::new(), 10)
58            .stage_with_priority(BuildStage::new(), 20)
59            .stage_with_priority(EnrichStage::new(), 40)
60            .stage_with_priority(OptimizeStage::new(), 60);
61
62        Self { orchestrator }
63    }
64
65    /// Create a pipeline with LLM enhancement.
66    ///
67    /// Stages (in order):
68    /// 1. `parse` - Parse document
69    /// 2. `build` - Build tree
70    /// 3. `enhance` - LLM-based enhancement (summaries)
71    /// 4. `enrich` - Add metadata
72    /// 5. `optimize` - Optimize tree
73    pub fn with_llm(client: LlmClient) -> Self {
74        let orchestrator = PipelineOrchestrator::new()
75            .stage_with_priority(ParseStage::new(), 10)
76            .stage_with_priority(BuildStage::new(), 20)
77            .stage_with_priority(EnhanceStage::with_llm_client(client), 30)
78            .stage_with_priority(EnrichStage::new(), 40)
79            .stage_with_priority(OptimizeStage::new(), 60);
80
81        Self { orchestrator }
82    }
83
84    /// Create from a custom orchestrator.
85    ///
86    /// Use this for full control over stage ordering and dependencies.
87    ///
88    /// # Example
89    ///
90    /// ```rust,ignore
91    /// let orchestrator = PipelineOrchestrator::new()
92    ///     .stage_with_priority(ParseStage::new(), 10)
93    ///     .stage_with_priority(MyAnalysisStage::new(), 25)
94    ///     .stage_with_priority(BuildStage::new(), 20)
95    ///     .stage_with_deps(MyValidationStage::new(), 50, &["build"]);
96    ///
97    /// let executor = PipelineExecutor::from_orchestrator(orchestrator);
98    /// ```
99    pub fn from_orchestrator(orchestrator: PipelineOrchestrator) -> Self {
100        Self { orchestrator }
101    }
102
103    /// Add a stage with default priority.
104    ///
105    /// The stage will be added after existing stages with the same priority.
106    pub fn add_stage(mut self, stage: impl IndexStage + 'static) -> Self {
107        self.orchestrator = self.orchestrator.stage(stage);
108        self
109    }
110
111    /// Add a stage with custom priority.
112    ///
113    /// Lower priority = earlier execution.
114    pub fn add_stage_with_priority(
115        mut self,
116        stage: impl IndexStage + 'static,
117        priority: i32,
118    ) -> Self {
119        self.orchestrator = self.orchestrator.stage_with_priority(stage, priority);
120        self
121    }
122
123    /// Add a stage with priority and dependencies.
124    ///
125    /// The stage will run after all specified dependencies.
126    pub fn add_stage_with_deps(
127        mut self,
128        stage: impl IndexStage + 'static,
129        priority: i32,
130        depends_on: &[&str],
131    ) -> Self {
132        self.orchestrator = self
133            .orchestrator
134            .stage_with_deps(stage, priority, depends_on);
135        self
136    }
137
138    /// Add persistence stage with async workspace.
139    pub fn with_persistence(mut self, workspace: crate::storage::Workspace) -> Self {
140        self.orchestrator = self
141            .orchestrator
142            .stage_with_priority(PersistStage::with_workspace(workspace), 80);
143        self
144    }
145
146    /// Get the list of stage names in execution order.
147    pub fn stage_names(&self) -> Result<Vec<&str>> {
148        self.orchestrator.stage_names()
149    }
150
151    /// Get the number of stages.
152    pub fn stage_count(&self) -> usize {
153        self.orchestrator.stage_count()
154    }
155
156    /// Execute the pipeline.
157    ///
158    /// Stages are executed in dependency-resolved order.
159    pub async fn execute(
160        &mut self,
161        input: IndexInput,
162        options: PipelineOptions,
163    ) -> Result<IndexResult> {
164        info!(
165            "Starting index pipeline with {} stages",
166            self.orchestrator.stage_count()
167        );
168        self.orchestrator.execute(input, options).await
169    }
170}
171
172impl Default for PipelineExecutor {
173    fn default() -> Self {
174        Self::new()
175    }
176}