batuta/pipeline/
execution.rs1use anyhow::{Context as AnyhowContext, Result};
4use std::path::Path;
5
6#[cfg(feature = "native")]
7use tracing::{debug, info};
8
9#[cfg(not(feature = "native"))]
11macro_rules! info {
12 ($($arg:tt)*) => {{}};
13}
14
15#[cfg(not(feature = "native"))]
16macro_rules! debug {
17 ($($arg:tt)*) => {{}};
18}
19
20use super::types::{PipelineContext, PipelineOutput, PipelineStage, ValidationStrategy};
21
22pub struct TranspilationPipeline {
24 pub(crate) stages: Vec<Box<dyn PipelineStage>>,
25 pub(crate) validation: ValidationStrategy,
26}
27
28impl TranspilationPipeline {
29 pub fn new(validation: ValidationStrategy) -> Self {
30 Self { stages: Vec::new(), validation }
31 }
32
33 pub fn add_stage(mut self, stage: Box<dyn PipelineStage>) -> Self {
35 self.stages.push(stage);
36 self
37 }
38
39 #[allow(clippy::cognitive_complexity)]
41 pub async fn run(&self, input: &Path, output: &Path) -> Result<PipelineOutput> {
42 info!("Starting pipeline with {} stages", self.stages.len());
43
44 let mut ctx = PipelineContext::new(input.to_path_buf(), output.to_path_buf());
45
46 for (idx, stage) in self.stages.iter().enumerate() {
47 info!("Running stage {}/{}: {}", idx + 1, self.stages.len(), stage.name());
48
49 ctx = stage
51 .execute(ctx)
52 .await
53 .with_context(|| format!("Stage '{}' failed", stage.name()))?;
54
55 if self.validation != ValidationStrategy::None {
57 debug!("Validating stage: {}", stage.name());
58 let validation_result = stage.validate(&ctx)?;
59 ctx.validation_results.push(validation_result.clone());
60
61 if !validation_result.passed && self.validation == ValidationStrategy::StopOnError {
62 anyhow::bail!(
63 "Validation failed for stage '{}': {}",
64 stage.name(),
65 validation_result.message
66 );
67 }
68 }
69 }
70
71 info!("Pipeline completed successfully");
72 Ok(ctx.output())
73 }
74}