1use crate::{
2 context::Context,
3 pipeline::{Pipeline, PipelineError},
4 step_worker::NextStep,
5 transform::{value_to_pipelines, TransformError},
6};
7use phlow_sdk::prelude::{log::error, *};
8use phs::build_engine;
9use std::{collections::HashMap, fmt::Display, sync::Arc};
10
11#[derive(Debug)]
12pub enum PhlowError {
13 TransformError(TransformError),
14 PipelineError(PipelineError),
15 PipelineNotFound,
16 ParentError,
17}
18
19impl Display for PhlowError {
20 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
21 match self {
22 PhlowError::TransformError(err) => write!(f, "Transform error: {}", err),
23 PhlowError::PipelineError(err) => write!(f, "Pipeline error: {}", err),
24 PhlowError::PipelineNotFound => write!(f, "Pipeline not found"),
25 PhlowError::ParentError => write!(f, "Parent error"),
26 }
27 }
28}
29
30impl std::error::Error for PhlowError {
31 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
32 match self {
33 PhlowError::TransformError(err) => Some(err),
34 PhlowError::PipelineError(err) => Some(err),
35 PhlowError::PipelineNotFound => None,
36 PhlowError::ParentError => None,
37 }
38 }
39}
40
41pub type PipelineMap = HashMap<usize, Pipeline>;
42
43#[derive(Debug, Default)]
44pub struct Phlow {
45 pipelines: PipelineMap,
46}
47
48impl Phlow {
49 pub fn try_from_value(
50 value: &Value,
51 modules: Option<Arc<Modules>>,
52 ) -> Result<Self, PhlowError> {
53 let engine = match &modules {
54 Some(modules) => {
55 let repositories = modules.extract_repositories();
56 build_engine(Some(repositories))
57 }
58 None => build_engine(None),
59 };
60
61 let modules = if let Some(modules) = modules {
62 modules
63 } else {
64 Arc::new(Modules::default())
65 };
66
67 let pipelines =
68 value_to_pipelines(engine, modules, value).map_err(PhlowError::TransformError)?;
69
70 Ok(Self { pipelines })
71 }
72
73 pub async fn execute(&self, context: &mut Context) -> Result<Option<Value>, PhlowError> {
74 if self.pipelines.is_empty() {
75 return Ok(None);
76 }
77
78 let mut current_pipeline = self.pipelines.len() - 1;
79 let mut current_step = 0;
80
81 loop {
82 log::debug!(
83 "Executing pipeline {} step {}",
84 current_pipeline,
85 current_step
86 );
87 let pipeline = self
88 .pipelines
89 .get(¤t_pipeline)
90 .ok_or(PhlowError::PipelineNotFound)?;
91
92 match pipeline.execute(context, current_step).await {
93 Ok(step_output) => match step_output {
94 Some(step_output) => {
95 log::debug!(
96 "Next step decision: {:?}, payload: {:?}",
97 step_output.next_step,
98 step_output.output
99 );
100 match step_output.next_step {
101 NextStep::Stop => {
102 log::debug!("NextStep::Stop - terminating execution");
103 return Ok(step_output.output);
104 }
105 NextStep::Next => {
106 log::debug!("NextStep::Next - checking if sub-pipeline needs to return to parent");
107 let main_pipeline = self.pipelines.len() - 1;
109 if current_pipeline == main_pipeline {
110 log::debug!("NextStep::Next - terminating execution (main pipeline completed)");
111 return Ok(step_output.output);
112 } else {
113 log::debug!("NextStep::Next - sub-pipeline completed, checking for parent return");
114 return Ok(step_output.output);
117 }
118 }
119 NextStep::Pipeline(id) => {
120 log::debug!("NextStep::Pipeline({}) - jumping to pipeline", id);
121 current_pipeline = id;
122 current_step = 0;
123 }
124 NextStep::GoToStep(to) => {
125 log::debug!("NextStep::GoToStep({:?}) - jumping to step", to);
126 current_pipeline = to.pipeline;
127 current_step = to.step;
128 }
129 }
130 }
131 None => {
132 return Ok(None);
133 }
134 },
135 Err(err) => {
136 error!("Error executing step: {:?}", err);
137 return Err(PhlowError::PipelineError(err));
138 }
139 }
140 }
141 }
142}