1use std::fmt::Display;
2
3use crate::{
4 context::Context,
5 step_worker::{NextStep, StepOutput, StepWorker, StepWorkerError},
6};
7
8#[derive(Debug)]
9pub enum PipelineError {
10 StepWorkerError(StepWorkerError),
11}
12
13impl Display for PipelineError {
14 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
15 match self {
16 PipelineError::StepWorkerError(err) => write!(f, "Step worker error: {}", err),
17 }
18 }
19}
20
21#[derive(Debug, Clone)]
22pub struct Pipeline {
23 pub(crate) steps: Vec<StepWorker>,
24}
25
26impl Pipeline {
27 pub async fn execute(
28 &self,
29 context: &mut Context,
30 skip: usize,
31 ) -> Result<Option<StepOutput>, PipelineError> {
32 for (i, step) in self.steps.iter().enumerate().skip(skip) {
33 match step.execute(&context).await {
34 Ok(step_output) => {
35 context.add_step_payload(step_output.output.clone());
36
37 if step.get_id().is_some() {
38 if let Some(payload) = &step_output.output {
39 context.add_step_id_output(step.get_id().clone(), payload.clone());
40 }
41 }
42
43 match step_output.next_step {
44 NextStep::Pipeline(_) | NextStep::Stop => return Ok(Some(step_output)),
45 NextStep::GoToStep(to) => {
46 return Ok(Some(StepOutput {
47 output: step_output.output,
48 next_step: NextStep::GoToStep(to),
49 }));
50 }
51 NextStep::Next => {
52 if i == self.steps.len() - 1 {
53 return Ok(Some(step_output));
54 }
55 }
56 }
57 }
58 Err(err) => {
59 return Err(PipelineError::StepWorkerError(err));
60 }
61 }
62 }
63
64 Ok(None)
65 }
66}