1use crate::{
2 context::Context,
3 step_worker::{NextStep, StepOutput, StepWorker, StepWorkerError},
4};
5
6#[derive(Debug)]
7pub enum PipelineError {
8 StepWorkerError(StepWorkerError),
9}
10
11#[derive(Debug, Clone)]
12pub struct Pipeline {
13 pub(crate) steps: Vec<StepWorker>,
14}
15
16impl Pipeline {
17 pub async fn execute(
18 &self,
19 context: &mut Context,
20 ) -> Result<Option<StepOutput>, PipelineError> {
21 for step in self.steps.iter() {
22 match step.execute(&context).await {
23 Ok(step_output) => {
24 if step.get_id().is_some() {
25 if let Some(payload) = &step_output.output {
26 context.add_step_output(step.get_id().clone(), payload.clone());
27 }
28 }
29
30 if let NextStep::Pipeline(_) | NextStep::Stop = step_output.next_step {
31 return Ok(Some(step_output));
32 }
33 }
34 Err(err) => {
35 return Err(PipelineError::StepWorkerError(err));
36 }
37 }
38 }
39
40 Ok(None)
41 }
42}