phlow_engine/
pipeline.rs

1use std::fmt::Display;
2
3use crate::{
4    context::Context,
5    step_worker::{NextStep, StepOutput, StepWorker, StepWorkerError},
6};
7
8#[derive(Debug)]
9pub enum PipelineError {
10    StepWorkerError(StepWorkerError),
11}
12
13impl Display for PipelineError {
14    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
15        match self {
16            PipelineError::StepWorkerError(err) => write!(f, "Step worker error: {}", err),
17        }
18    }
19}
20
21#[derive(Debug, Clone)]
22pub struct Pipeline {
23    pub(crate) steps: Vec<StepWorker>,
24}
25
26impl Pipeline {
27    pub async fn execute(
28        &self,
29        context: &mut Context,
30        skip: usize,
31    ) -> Result<Option<StepOutput>, PipelineError> {
32        for (i, step) in self.steps.iter().enumerate().skip(skip) {
33            match step.execute(&context).await {
34                Ok(step_output) => {
35                    context.add_step_payload(step_output.output.clone());
36
37                    if step.get_id().is_some() {
38                        if let Some(payload) = &step_output.output {
39                            context.add_step_id_output(step.get_id().clone(), payload.clone());
40                        }
41                    }
42
43                    match step_output.next_step {
44                        NextStep::Pipeline(_) | NextStep::Stop => return Ok(Some(step_output)),
45                        NextStep::GoToStep(to) => {
46                            return Ok(Some(StepOutput {
47                                output: step_output.output,
48                                next_step: NextStep::GoToStep(to),
49                            }));
50                        }
51                        NextStep::Next => {
52                            if i == self.steps.len() - 1 {
53                                return Ok(Some(step_output));
54                            }
55                        }
56                    }
57                }
58                Err(err) => {
59                    return Err(PipelineError::StepWorkerError(err));
60                }
61            }
62        }
63
64        Ok(None)
65    }
66}