phlow_engine/
pipeline.rs

1use crate::{
2    context::Context,
3    step_worker::{NextStep, StepOutput, StepWorker, StepWorkerError},
4};
5use std::fmt::Display;
6
7#[derive(Debug)]
8pub enum PipelineError {
9    StepWorkerError(StepWorkerError),
10}
11
12impl Display for PipelineError {
13    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
14        match self {
15            PipelineError::StepWorkerError(err) => write!(f, "Step worker error: {}", err),
16        }
17    }
18}
19
20impl std::error::Error for PipelineError {
21    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
22        match self {
23            PipelineError::StepWorkerError(err) => Some(err),
24        }
25    }
26}
27
28#[derive(Debug, Clone)]
29pub struct Pipeline {
30    pub(crate) steps: Vec<StepWorker>,
31    pub(crate) id: usize,
32}
33
34impl Pipeline {
35    pub fn get_id(&self) -> usize {
36        self.id
37    }
38
39    pub async fn execute(
40        &self,
41        context: &mut Context,
42        skip: usize,
43    ) -> Result<Option<StepOutput>, PipelineError> {
44        for (step_index, step) in self.steps.iter().enumerate().skip(skip) {
45            match step.execute(&context).await {
46                Ok(step_output) => {
47                    context.add_step_payload(step_output.output.clone());
48
49                    if step.get_id().is_some() {
50                        if let Some(payload) = &step_output.output {
51                            context.add_step_id_output(step.get_id().clone(), payload.clone());
52                        }
53                    }
54
55                    match step_output.next_step {
56                        NextStep::Pipeline(_) | NextStep::Stop => {
57                            return Ok(Some(step_output));
58                        }
59                        NextStep::GoToStep(to) => {
60                            return Ok(Some(StepOutput {
61                                output: step_output.output,
62                                next_step: NextStep::GoToStep(to),
63                            }));
64                        }
65                        NextStep::Next => {
66                            if step_index == self.steps.len() - 1 {
67                                return Ok(Some(step_output));
68                            }
69                        }
70                    }
71                }
72                Err(err) => {
73                    return Err(PipelineError::StepWorkerError(err));
74                }
75            }
76        }
77
78        Ok(Some(StepOutput {
79            output: context.get_payload().clone(),
80            next_step: NextStep::Next,
81        }))
82    }
83}