Skip to main content

phlow_engine/
pipeline.rs

1use crate::{
2    context::Context,
3    debug::{DebugContext, DebugSnapshot, debug_controller},
4    step_worker::{NextStep, StepOutput, StepWorker, StepWorkerError},
5};
6use phlow_sdk::prelude::Value;
7use std::fmt::Display;
8
9#[derive(Debug)]
10pub enum PipelineError {
11    StepWorkerError(StepWorkerError),
12}
13
14impl Display for PipelineError {
15    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
16        match self {
17            PipelineError::StepWorkerError(err) => write!(f, "Step worker error: {}", err),
18        }
19    }
20}
21
22impl std::error::Error for PipelineError {
23    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
24        match self {
25            PipelineError::StepWorkerError(err) => Some(err),
26        }
27    }
28}
29
30#[derive(Debug, Clone)]
31pub struct Pipeline {
32    pub(crate) steps: Vec<StepWorker>,
33    pub(crate) id: usize,
34}
35
36impl Pipeline {
37    pub fn get_id(&self) -> usize {
38        self.id
39    }
40
41    pub async fn execute(
42        &self,
43        context: &mut Context,
44        skip: usize,
45    ) -> Result<Option<StepOutput>, PipelineError> {
46        for (step_index, step) in self.steps.iter().enumerate().skip(skip) {
47            let controller = debug_controller().cloned();
48            if let Some(controller) = &controller {
49                let snapshot = DebugSnapshot {
50                    context: DebugContext {
51                        payload: context.get_payload(),
52                        main: context.get_main(),
53                    },
54                    step: step.step_value.clone().unwrap_or(Value::Null),
55                    pipeline: self.id + 1,
56                    compiled: step.compiled_debug(),
57                };
58                controller.before_step(snapshot).await;
59            }
60
61            let result = step.execute(&context).await;
62            if let Some(controller) = &controller {
63                controller.finish_step().await;
64            }
65
66            match result {
67                Ok(step_output) => {
68                    context.add_step_payload(step_output.output.clone());
69
70                    if step.get_id().is_some() {
71                        if let Some(payload) = &step_output.output {
72                            context.add_step_id_output(step.get_id().clone(), payload.clone());
73                        }
74                    }
75
76                    match step_output.next_step {
77                        NextStep::Pipeline(_) | NextStep::Stop => {
78                            return Ok(Some(step_output));
79                        }
80                        NextStep::GoToStep(to) => {
81                            return Ok(Some(StepOutput {
82                                output: step_output.output,
83                                next_step: NextStep::GoToStep(to),
84                            }));
85                        }
86                        NextStep::Next => {
87                            if step_index == self.steps.len() - 1 {
88                                return Ok(Some(step_output));
89                            }
90                        }
91                    }
92                }
93                Err(err) => {
94                    return Err(PipelineError::StepWorkerError(err));
95                }
96            }
97        }
98
99        Ok(Some(StepOutput {
100            output: context.get_payload().clone(),
101            next_step: NextStep::Next,
102        }))
103    }
104}