1use crate::{
2 context::Context,
3 debug::debug_controller,
4 pipeline::{Pipeline, PipelineError},
5 step_worker::NextStep,
6 transform::{TransformError, value_to_pipelines},
7};
8use phlow_sdk::prelude::{log::error, *};
9use phs::build_engine;
10use std::{collections::HashMap, fmt::Display, sync::Arc};
11use uuid::Uuid;
12
13#[derive(Debug)]
14pub enum PhlowError {
15 TransformError(TransformError),
16 PipelineError(PipelineError),
17 PipelineNotFound,
18 ParentError,
19}
20
21impl Display for PhlowError {
22 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
23 match self {
24 PhlowError::TransformError(err) => write!(f, "Transform error: {}", err),
25 PhlowError::PipelineError(err) => write!(f, "Pipeline error: {}", err),
26 PhlowError::PipelineNotFound => write!(f, "Pipeline not found"),
27 PhlowError::ParentError => write!(f, "Parent error"),
28 }
29 }
30}
31
32impl std::error::Error for PhlowError {
33 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
34 match self {
35 PhlowError::TransformError(err) => Some(err),
36 PhlowError::PipelineError(err) => Some(err),
37 PhlowError::PipelineNotFound => None,
38 PhlowError::ParentError => None,
39 }
40 }
41}
42
43pub type PipelineMap = HashMap<usize, Pipeline>;
44
45#[derive(Debug, Default)]
46pub struct Phlow {
47 pipelines: PipelineMap,
48 script: Value,
49}
50
51impl Phlow {
52 pub fn try_from_value(
53 value: &Value,
54 modules: Option<Arc<Modules>>,
55 ) -> Result<Self, PhlowError> {
56 let engine = match &modules {
57 Some(modules) => {
58 let repositories = modules.extract_repositories();
59 build_engine(Some(repositories))
60 }
61 None => build_engine(None),
62 };
63
64 let modules = if let Some(modules) = modules {
65 modules
66 } else {
67 Arc::new(Modules::default())
68 };
69
70 let script = if should_add_uuid() {
71 let in_steps = value.is_array();
72 add_uuids(value, in_steps)
73 } else {
74 value.clone()
75 };
76
77 let pipelines =
78 value_to_pipelines(engine, modules, &script).map_err(PhlowError::TransformError)?;
79
80 Ok(Self {
81 pipelines,
82 script,
83 })
84 }
85
86 pub async fn execute(&self, context: &mut Context) -> Result<Option<Value>, PhlowError> {
87 if self.pipelines.is_empty() {
88 return Ok(None);
89 }
90
91 let mut current_pipeline = self.pipelines.len() - 1;
92 let mut current_step = 0;
93
94 loop {
95 log::debug!(
96 "Executing pipeline {} step {}",
97 current_pipeline,
98 current_step
99 );
100 let pipeline = self
101 .pipelines
102 .get(¤t_pipeline)
103 .ok_or(PhlowError::PipelineNotFound)?;
104
105 match pipeline.execute(context, current_step).await {
106 Ok(step_output) => match step_output {
107 Some(step_output) => {
108 log::debug!(
109 "Next step decision: {:?}, payload: {:?}",
110 step_output.next_step,
111 step_output.output
112 );
113 match step_output.next_step {
114 NextStep::Stop => {
115 log::debug!("NextStep::Stop - terminating execution");
116 return Ok(step_output.output);
117 }
118 NextStep::Next => {
119 log::debug!(
120 "NextStep::Next - checking if sub-pipeline needs to return to parent"
121 );
122 let main_pipeline = self.pipelines.len() - 1;
124 if current_pipeline == main_pipeline {
125 log::debug!(
126 "NextStep::Next - terminating execution (main pipeline completed)"
127 );
128 return Ok(step_output.output);
129 } else {
130 log::debug!(
131 "NextStep::Next - sub-pipeline completed, checking for parent return"
132 );
133 return Ok(step_output.output);
136 }
137 }
138 NextStep::Pipeline(id) => {
139 log::debug!("NextStep::Pipeline({}) - jumping to pipeline", id);
140 current_pipeline = id;
141 current_step = 0;
142 }
143 NextStep::GoToStep(to) => {
144 log::debug!("NextStep::GoToStep({:?}) - jumping to step", to);
145 current_pipeline = to.pipeline;
146 current_step = to.step;
147 }
148 }
149 }
150 None => {
151 return Ok(None);
152 }
153 },
154 Err(err) => {
155 error!("Error executing step: {:?}", err);
156 return Err(PhlowError::PipelineError(err));
157 }
158 }
159 }
160 }
161
162 pub fn script(&self) -> Value {
163 self.script.clone()
164 }
165}
166
167fn should_add_uuid() -> bool {
168 if debug_controller().is_some() {
169 return true;
170 }
171 std::env::var("PHLOW_DEBUG")
172 .map(|value| value.eq_ignore_ascii_case("true"))
173 .unwrap_or(false)
174}
175
176fn add_uuids(value: &Value, in_steps: bool) -> Value {
177 match value {
178 Value::Object(map) => {
179 let mut new_map = HashMap::new();
180 for (key, value) in map.iter() {
181 let key_str = key.to_string();
182 let is_pipeline = matches!(key_str.as_str(), "then" | "else")
183 && (value.is_object() || value.is_array());
184 let next_in_steps = key_str == "steps" || is_pipeline;
185 new_map.insert(key_str, add_uuids(value, next_in_steps));
186 }
187 if in_steps && !map.contains_key(&"#uuid".to_string()) {
188 new_map.insert(
189 "#uuid".to_string(),
190 Uuid::new_v4().to_string().to_value(),
191 );
192 }
193 Value::from(new_map)
194 }
195 Value::Array(array) => {
196 let mut new_array = Vec::new();
197 for value in array.values.iter() {
198 new_array.push(add_uuids(value, in_steps));
199 }
200 Value::from(new_array)
201 }
202 _ => value.clone(),
203 }
204}