1use crate::{
2 context::Context,
3 step_worker::{NextStep, StepOutput, StepWorker, StepWorkerError},
4};
5use std::fmt::Display;
6
7#[derive(Debug)]
8pub enum PipelineError {
9 StepWorkerError(StepWorkerError),
10}
11
12impl Display for PipelineError {
13 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
14 match self {
15 PipelineError::StepWorkerError(err) => write!(f, "Step worker error: {}", err),
16 }
17 }
18}
19
20impl std::error::Error for PipelineError {
21 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
22 match self {
23 PipelineError::StepWorkerError(err) => Some(err),
24 }
25 }
26}
27
28#[derive(Debug, Clone)]
29pub struct Pipeline {
30 pub(crate) steps: Vec<StepWorker>,
31 pub(crate) id: usize,
32}
33
34impl Pipeline {
35 pub fn get_id(&self) -> usize {
36 self.id
37 }
38
39 pub async fn execute(
40 &self,
41 context: &mut Context,
42 skip: usize,
43 ) -> Result<Option<StepOutput>, PipelineError> {
44 for (step_index, step) in self.steps.iter().enumerate().skip(skip) {
45 match step.execute(&context).await {
46 Ok(step_output) => {
47 context.add_step_payload(step_output.output.clone());
48
49 if step.get_id().is_some() {
50 if let Some(payload) = &step_output.output {
51 context.add_step_id_output(step.get_id().clone(), payload.clone());
52 }
53 }
54
55 match step_output.next_step {
56 NextStep::Pipeline(_) | NextStep::Stop => {
57 return Ok(Some(step_output));
58 }
59 NextStep::GoToStep(to) => {
60 return Ok(Some(StepOutput {
61 output: step_output.output,
62 next_step: NextStep::GoToStep(to),
63 }));
64 }
65 NextStep::Next => {
66 if step_index == self.steps.len() - 1 {
67 return Ok(Some(step_output));
68 }
69 }
70 }
71 }
72 Err(err) => {
73 return Err(PipelineError::StepWorkerError(err));
74 }
75 }
76 }
77
78 Ok(Some(StepOutput {
79 output: context.get_payload().clone(),
80 next_step: NextStep::Next,
81 }))
82 }
83}