1use crate::{
2 context::Context,
3 debug::{DebugContext, DebugSnapshot, debug_controller},
4 step_worker::{NextStep, StepOutput, StepWorker, StepWorkerError},
5};
6use phlow_sdk::prelude::Value;
7use std::fmt::Display;
8
9#[derive(Debug)]
10pub enum PipelineError {
11 StepWorkerError(StepWorkerError),
12}
13
14impl Display for PipelineError {
15 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
16 match self {
17 PipelineError::StepWorkerError(err) => write!(f, "Step worker error: {}", err),
18 }
19 }
20}
21
22impl std::error::Error for PipelineError {
23 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
24 match self {
25 PipelineError::StepWorkerError(err) => Some(err),
26 }
27 }
28}
29
30#[derive(Debug, Clone)]
31pub struct Pipeline {
32 pub(crate) steps: Vec<StepWorker>,
33 pub(crate) id: usize,
34}
35
36impl Pipeline {
37 pub fn get_id(&self) -> usize {
38 self.id
39 }
40
41 pub async fn execute(
42 &self,
43 context: &mut Context,
44 skip: usize,
45 ) -> Result<Option<StepOutput>, PipelineError> {
46 for (step_index, step) in self.steps.iter().enumerate().skip(skip) {
47 let controller = debug_controller().cloned();
48 if let Some(controller) = &controller {
49 let snapshot = DebugSnapshot {
50 context: DebugContext {
51 payload: context.get_payload(),
52 main: context.get_main(),
53 },
54 step: step.step_value.clone().unwrap_or(Value::Null),
55 pipeline: self.id + 1,
56 compiled: step.compiled_debug(),
57 };
58 controller.before_step(snapshot).await;
59 }
60
61 let result = step.execute(&context).await;
62 if let Some(controller) = &controller {
63 controller.finish_step().await;
64 }
65
66 match result {
67 Ok(step_output) => {
68 context.add_step_payload(step_output.output.clone());
69
70 if step.get_id().is_some() {
71 if let Some(payload) = &step_output.output {
72 context.add_step_id_output(step.get_id().clone(), payload.clone());
73 }
74 }
75
76 match step_output.next_step {
77 NextStep::Pipeline(_) | NextStep::Stop => {
78 return Ok(Some(step_output));
79 }
80 NextStep::GoToStep(to) => {
81 return Ok(Some(StepOutput {
82 output: step_output.output,
83 next_step: NextStep::GoToStep(to),
84 }));
85 }
86 NextStep::Next => {
87 if step_index == self.steps.len() - 1 {
88 return Ok(Some(step_output));
89 }
90 }
91 }
92 }
93 Err(err) => {
94 return Err(PipelineError::StepWorkerError(err));
95 }
96 }
97 }
98
99 Ok(Some(StepOutput {
100 output: context.get_payload().clone(),
101 next_step: NextStep::Next,
102 }))
103 }
104}