phlow_engine/
pipeline.rs

1use crate::{
2    context::Context,
3    step_worker::{NextStep, StepOutput, StepWorker, StepWorkerError},
4};
5
6#[derive(Debug)]
7pub enum PipelineError {
8    StepWorkerError(StepWorkerError),
9}
10
11#[derive(Debug, Clone)]
12pub struct Pipeline {
13    pub(crate) steps: Vec<StepWorker>,
14}
15
16impl Pipeline {
17    pub async fn execute(
18        &self,
19        context: &mut Context,
20        skip: usize,
21    ) -> Result<Option<StepOutput>, PipelineError> {
22        for (i, step) in self.steps.iter().enumerate().skip(skip) {
23            match step.execute(&context).await {
24                Ok(step_output) => {
25                    context.add_step_payload(step_output.output.clone());
26
27                    if step.get_id().is_some() {
28                        if let Some(payload) = &step_output.output {
29                            context.add_step_id_output(step.get_id().clone(), payload.clone());
30                        }
31                    }
32
33                    match step_output.next_step {
34                        NextStep::Pipeline(_) | NextStep::Stop => return Ok(Some(step_output)),
35                        NextStep::GoToStep(to) => {
36                            return Ok(Some(StepOutput {
37                                output: step_output.output,
38                                next_step: NextStep::GoToStep(to),
39                            }));
40                        }
41                        NextStep::Next => {
42                            if i == self.steps.len() - 1 {
43                                return Ok(Some(step_output));
44                            }
45                        }
46                    }
47                }
48                Err(err) => {
49                    return Err(PipelineError::StepWorkerError(err));
50                }
51            }
52        }
53
54        Ok(None)
55    }
56}