pipeline-service 0.1.2

Internal: Core pipeline execution service for roxid - not intended for direct use
Documentation
use crate::pipeline::models::{ExecutionContext, Pipeline, Step, StepResult, StepStatus};
use crate::pipeline::runners::shell::ShellRunner;
use std::time::Instant;
use tokio::sync::mpsc;

pub type ProgressSender = mpsc::UnboundedSender<ExecutionEvent>;
pub type ProgressReceiver = mpsc::UnboundedReceiver<ExecutionEvent>;

#[derive(Debug, Clone)]
pub enum ExecutionEvent {
    PipelineStarted {
        name: String,
    },
    StepStarted {
        step_name: String,
        step_index: usize,
    },
    StepOutput {
        step_name: String,
        output: String,
    },
    StepCompleted {
        result: StepResult,
        step_index: usize,
    },
    PipelineCompleted {
        success: bool,
        total_steps: usize,
        failed_steps: usize,
    },
}

pub struct PipelineExecutor {
    context: ExecutionContext,
}

impl PipelineExecutor {
    pub fn new(context: ExecutionContext) -> Self {
        Self { context }
    }

    pub async fn execute(
        &self,
        pipeline: Pipeline,
        progress_tx: Option<ProgressSender>,
    ) -> Vec<StepResult> {
        let mut results = Vec::new();
        let mut context = self.context.clone();

        context.env.extend(pipeline.env.clone());

        if let Some(tx) = &progress_tx {
            let _ = tx.send(ExecutionEvent::PipelineStarted {
                name: pipeline.name.clone(),
            });
        }

        for (index, step) in pipeline.steps.iter().enumerate() {
            if let Some(tx) = &progress_tx {
                let _ = tx.send(ExecutionEvent::StepStarted {
                    step_name: step.name.clone(),
                    step_index: index,
                });
            }

            let result = self
                .execute_step(step, &context, progress_tx.as_ref())
                .await;

            if let Some(tx) = &progress_tx {
                let _ = tx.send(ExecutionEvent::StepCompleted {
                    result: result.clone(),
                    step_index: index,
                });
            }

            let should_continue = result.status == StepStatus::Success || step.continue_on_error;
            results.push(result);

            if !should_continue {
                break;
            }
        }

        if let Some(tx) = &progress_tx {
            let failed_count = results
                .iter()
                .filter(|r| r.status == StepStatus::Failed)
                .count();
            let _ = tx.send(ExecutionEvent::PipelineCompleted {
                success: failed_count == 0,
                total_steps: results.len(),
                failed_steps: failed_count,
            });
        }

        results
    }

    async fn execute_step(
        &self,
        step: &Step,
        context: &ExecutionContext,
        progress_tx: Option<&ProgressSender>,
    ) -> StepResult {
        let start = Instant::now();
        let mut step_context = context.clone();
        step_context.env.extend(step.env.clone());

        let runner = ShellRunner::new(step_context);
        let result = runner.run(step, progress_tx).await;

        StepResult {
            step_name: step.name.clone(),
            status: result.status,
            output: result.output,
            error: result.error,
            duration: start.elapsed(),
            exit_code: result.exit_code,
        }
    }
}