1use crate::{
2 context::Context,
3 step_worker::{NextStep, StepOutput, StepWorker, StepWorkerError},
4};
5
6#[derive(Debug)]
7pub enum PipelineError {
8 StepWorkerError(StepWorkerError),
9}
10
11#[derive(Debug, Clone)]
12pub struct Pipeline {
13 pub(crate) steps: Vec<StepWorker>,
14}
15
16impl Pipeline {
17 pub async fn execute(
18 &self,
19 context: &mut Context,
20 skip: usize,
21 ) -> Result<Option<StepOutput>, PipelineError> {
22 for (i, step) in self.steps.iter().enumerate().skip(skip) {
23 match step.execute(&context).await {
24 Ok(step_output) => {
25 context.add_step_payload(step_output.output.clone());
26
27 if step.get_id().is_some() {
28 if let Some(payload) = &step_output.output {
29 context.add_step_id_output(step.get_id().clone(), payload.clone());
30 }
31 }
32
33 match step_output.next_step {
34 NextStep::Pipeline(_) | NextStep::Stop => return Ok(Some(step_output)),
35 NextStep::GoToStep(to) => {
36 return Ok(Some(StepOutput {
37 output: step_output.output,
38 next_step: NextStep::GoToStep(to),
39 }));
40 }
41 NextStep::Next => {
42 if i == self.steps.len() - 1 {
43 return Ok(Some(step_output));
44 }
45 }
46 }
47 }
48 Err(err) => {
49 return Err(PipelineError::StepWorkerError(err));
50 }
51 }
52 }
53
54 Ok(None)
55 }
56}