use tracing::info;
use crate::error::Result;
use crate::llm::LlmClient;
use super::super::PipelineOptions;
use super::super::stages::{
BuildStage, EnhanceStage, EnrichStage, IndexStage, OptimizeStage, ParseStage,
ReasoningIndexStage, SplitStage, ValidateStage,
};
use super::context::{IndexInput, PipelineResult};
use super::orchestrator::PipelineOrchestrator;
pub struct PipelineExecutor {
orchestrator: PipelineOrchestrator,
}
impl PipelineExecutor {
pub fn new() -> Self {
let orchestrator = PipelineOrchestrator::new()
.stage_with_priority(ParseStage::new(), 10)
.stage_with_priority(BuildStage::new(), 20)
.stage_with_priority(ValidateStage::new(), 22)
.stage_with_priority(SplitStage::new(), 25)
.stage_with_priority(EnrichStage::new(), 40)
.stage_with_priority(ReasoningIndexStage::new(), 45)
.stage_with_priority(OptimizeStage::new(), 60);
Self { orchestrator }
}
pub fn with_llm(client: LlmClient) -> Self {
tracing::info!(
"PipelineExecutor::with_llm — cloning client to ParseStage + EnhanceStage + context"
);
let orchestrator = PipelineOrchestrator::new()
.with_llm_client(client.clone())
.stage_with_priority(ParseStage::with_llm_client(client.clone()), 10)
.stage_with_priority(BuildStage::new(), 20)
.stage_with_priority(ValidateStage::new(), 22)
.stage_with_priority(SplitStage::new(), 25)
.stage_with_priority(EnhanceStage::with_llm_client(client), 30)
.stage_with_priority(EnrichStage::new(), 40)
.stage_with_priority(ReasoningIndexStage::new(), 45)
.stage_with_priority(OptimizeStage::new(), 60);
Self { orchestrator }
}
pub fn from_orchestrator(orchestrator: PipelineOrchestrator) -> Self {
Self { orchestrator }
}
pub fn add_stage(mut self, stage: impl IndexStage + 'static) -> Self {
self.orchestrator = self.orchestrator.stage(stage);
self
}
pub fn add_stage_with_priority(
mut self,
stage: impl IndexStage + 'static,
priority: i32,
) -> Self {
self.orchestrator = self.orchestrator.stage_with_priority(stage, priority);
self
}
pub fn add_stage_with_deps(
mut self,
stage: impl IndexStage + 'static,
priority: i32,
depends_on: &[&str],
) -> Self {
self.orchestrator = self
.orchestrator
.stage_with_deps(stage, priority, depends_on);
self
}
pub fn stage_names(&self) -> Result<Vec<&str>> {
self.orchestrator.stage_names()
}
pub fn stage_count(&self) -> usize {
self.orchestrator.stage_count()
}
pub async fn execute(
&mut self,
input: IndexInput,
options: PipelineOptions,
) -> Result<PipelineResult> {
info!(
"Starting index pipeline with {} stages",
self.orchestrator.stage_count()
);
self.orchestrator.execute(input, options).await
}
}
impl Default for PipelineExecutor {
fn default() -> Self {
Self::new()
}
}