1use crate::{
2 context::Context,
3 debug::debug_controller,
4 pipeline::{Pipeline, PipelineError},
5 step_worker::{NextStep, StepReference},
6 transform::{TransformError, value_to_pipelines},
7};
8use phlow_sdk::prelude::{log::error, *};
9use phs::build_engine;
10use std::{collections::HashMap, fmt::Display, sync::Arc};
11use uuid::Uuid;
12
13#[derive(Debug)]
14pub enum PhlowError {
15 TransformError(TransformError),
16 PipelineError(PipelineError),
17 PipelineNotFound,
18 InvalidStartStep { pipeline: usize, step: usize },
19 ParentError,
20}
21
22impl Display for PhlowError {
23 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
24 match self {
25 PhlowError::TransformError(err) => write!(f, "Transform error: {}", err),
26 PhlowError::PipelineError(err) => write!(f, "Pipeline error: {}", err),
27 PhlowError::PipelineNotFound => write!(f, "Pipeline not found"),
28 PhlowError::InvalidStartStep { pipeline, step } => {
29 write!(f, "Invalid start step: pipeline {} step {}", pipeline, step)
30 }
31 PhlowError::ParentError => write!(f, "Parent error"),
32 }
33 }
34}
35
36impl std::error::Error for PhlowError {
37 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
38 match self {
39 PhlowError::TransformError(err) => Some(err),
40 PhlowError::PipelineError(err) => Some(err),
41 PhlowError::PipelineNotFound => None,
42 PhlowError::InvalidStartStep { .. } => None,
43 PhlowError::ParentError => None,
44 }
45 }
46}
47
48pub type PipelineMap = HashMap<usize, Pipeline>;
49
50#[derive(Debug, Default)]
51pub struct Phlow {
52 pipelines: PipelineMap,
53 script: Value,
54}
55
56impl Phlow {
57 pub fn try_from_value(
58 value: &Value,
59 modules: Option<Arc<Modules>>,
60 ) -> Result<Self, PhlowError> {
61 let engine = match &modules {
62 Some(modules) => {
63 let repositories = modules.extract_repositories();
64 build_engine(Some(repositories))
65 }
66 None => build_engine(None),
67 };
68
69 let modules = if let Some(modules) = modules {
70 modules
71 } else {
72 Arc::new(Modules::default())
73 };
74
75 let script = if should_add_uuid() {
76 let in_steps = value.is_array();
77 add_uuids(value, in_steps)
78 } else {
79 value.clone()
80 };
81
82 let pipelines =
83 value_to_pipelines(engine, modules, &script).map_err(PhlowError::TransformError)?;
84
85 Ok(Self {
86 pipelines,
87 script,
88 })
89 }
90
91 pub async fn execute(&self, context: &mut Context) -> Result<Option<Value>, PhlowError> {
92 if self.pipelines.is_empty() {
93 return Ok(None);
94 }
95
96 let main_pipeline = self.pipelines.len() - 1;
97 let start = StepReference {
98 pipeline: main_pipeline,
99 step: 0,
100 };
101
102 self.execute_from(context, start).await
103 }
104
105 pub fn find_step_reference(&self, id: &str) -> Option<StepReference> {
106 for pipeline in self.pipelines.values() {
107 let pipeline_id = pipeline.get_id();
108 for (step_index, step) in pipeline.steps.iter().enumerate() {
109 let step_id = step.get_id();
110 if step_id.is_some() && step_id.to_string() == id {
111 return Some(StepReference {
112 pipeline: pipeline_id,
113 step: step_index,
114 });
115 }
116 }
117 }
118
119 None
120 }
121
122 pub async fn execute_from(
123 &self,
124 context: &mut Context,
125 start: StepReference,
126 ) -> Result<Option<Value>, PhlowError> {
127 if self.pipelines.is_empty() {
128 return Ok(None);
129 }
130
131 let mut current_pipeline = start.pipeline;
132 let mut current_step = start.step;
133 let main_pipeline = self.pipelines.len() - 1;
134
135 {
136 let pipeline = self
137 .pipelines
138 .get(¤t_pipeline)
139 .ok_or(PhlowError::PipelineNotFound)?;
140 if current_step >= pipeline.steps.len() {
141 return Err(PhlowError::InvalidStartStep {
142 pipeline: current_pipeline,
143 step: current_step,
144 });
145 }
146 }
147
148 loop {
149 log::debug!(
150 "Executing pipeline {} step {}",
151 current_pipeline,
152 current_step
153 );
154 let pipeline = self
155 .pipelines
156 .get(¤t_pipeline)
157 .ok_or(PhlowError::PipelineNotFound)?;
158
159 match pipeline.execute(context, current_step).await {
160 Ok(step_output) => match step_output {
161 Some(step_output) => {
162 log::debug!(
163 "Next step decision: {:?}, payload: {:?}",
164 step_output.next_step,
165 step_output.output
166 );
167 match step_output.next_step {
168 NextStep::Stop => {
169 log::debug!("NextStep::Stop - terminating execution");
170 return Ok(step_output.output);
171 }
172 NextStep::Next => {
173 log::debug!(
174 "NextStep::Next - checking if sub-pipeline needs to return to parent"
175 );
176 if current_pipeline == main_pipeline {
177 log::debug!(
178 "NextStep::Next - terminating execution (main pipeline completed)"
179 );
180 return Ok(step_output.output);
181 } else {
182 log::debug!(
183 "NextStep::Next - sub-pipeline completed, checking for parent return"
184 );
185 return Ok(step_output.output);
188 }
189 }
190 NextStep::Pipeline(id) => {
191 log::debug!("NextStep::Pipeline({}) - jumping to pipeline", id);
192 current_pipeline = id;
193 current_step = 0;
194 }
195 NextStep::GoToStep(to) => {
196 log::debug!("NextStep::GoToStep({:?}) - jumping to step", to);
197 current_pipeline = to.pipeline;
198 current_step = to.step;
199 }
200 }
201 }
202 None => {
203 return Ok(None);
204 }
205 },
206 Err(err) => {
207 error!("Error executing step: {:?}", err);
208 return Err(PhlowError::PipelineError(err));
209 }
210 }
211 }
212 }
213
214 pub fn script(&self) -> Value {
215 self.script.clone()
216 }
217}
218
219fn should_add_uuid() -> bool {
220 if debug_controller().is_some() {
221 return true;
222 }
223 std::env::var("PHLOW_DEBUG")
224 .map(|value| value.eq_ignore_ascii_case("true"))
225 .unwrap_or(false)
226}
227
228fn add_uuids(value: &Value, in_steps: bool) -> Value {
229 match value {
230 Value::Object(map) => {
231 let mut new_map = HashMap::new();
232 for (key, value) in map.iter() {
233 let key_str = key.to_string();
234 let is_pipeline = matches!(key_str.as_str(), "then" | "else")
235 && (value.is_object() || value.is_array());
236 let next_in_steps = key_str == "steps" || is_pipeline;
237 new_map.insert(key_str, add_uuids(value, next_in_steps));
238 }
239 if in_steps && !map.contains_key(&"#uuid".to_string()) {
240 new_map.insert(
241 "#uuid".to_string(),
242 Uuid::new_v4().to_string().to_value(),
243 );
244 }
245 Value::from(new_map)
246 }
247 Value::Array(array) => {
248 let mut new_array = Vec::new();
249 for value in array.values.iter() {
250 new_array.push(add_uuids(value, in_steps));
251 }
252 Value::from(new_array)
253 }
254 _ => value.clone(),
255 }
256}