phlow_engine/
pipeline.rs

1use crate::{
2    context::Context,
3    step_worker::{NextStep, StepOutput, StepWorker, StepWorkerError},
4};
5
6#[derive(Debug)]
7pub enum PipelineError {
8    StepWorkerError(StepWorkerError),
9}
10
11#[derive(Debug, Clone)]
12pub struct Pipeline {
13    pub(crate) steps: Vec<StepWorker>,
14}
15
16impl Pipeline {
17    pub async fn execute(
18        &self,
19        context: &mut Context,
20    ) -> Result<Option<StepOutput>, PipelineError> {
21        for step in self.steps.iter() {
22            match step.execute(&context).await {
23                Ok(step_output) => {
24                    if step.get_id().is_some() {
25                        if let Some(payload) = &step_output.output {
26                            context.add_step_output(step.get_id().clone(), payload.clone());
27                        }
28                    }
29
30                    if let NextStep::Pipeline(_) | NextStep::Stop = step_output.next_step {
31                        return Ok(Some(step_output));
32                    }
33                }
34                Err(err) => {
35                    return Err(PipelineError::StepWorkerError(err));
36                }
37            }
38        }
39
40        Ok(None)
41    }
42}