phlow_engine/
phlow.rs

1use crate::{
2    context::Context,
3    debug::debug_controller,
4    pipeline::{Pipeline, PipelineError},
5    step_worker::{NextStep, StepReference},
6    transform::{TransformError, value_to_pipelines},
7};
8use phlow_sdk::prelude::{log::error, *};
9use phs::build_engine;
10use std::{collections::HashMap, fmt::Display, sync::Arc};
11use uuid::Uuid;
12
13#[derive(Debug)]
14pub enum PhlowError {
15    TransformError(TransformError),
16    PipelineError(PipelineError),
17    PipelineNotFound,
18    InvalidStartStep { pipeline: usize, step: usize },
19    ParentError,
20}
21
22impl Display for PhlowError {
23    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
24        match self {
25            PhlowError::TransformError(err) => write!(f, "Transform error: {}", err),
26            PhlowError::PipelineError(err) => write!(f, "Pipeline error: {}", err),
27            PhlowError::PipelineNotFound => write!(f, "Pipeline not found"),
28            PhlowError::InvalidStartStep { pipeline, step } => {
29                write!(f, "Invalid start step: pipeline {} step {}", pipeline, step)
30            }
31            PhlowError::ParentError => write!(f, "Parent error"),
32        }
33    }
34}
35
36impl std::error::Error for PhlowError {
37    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
38        match self {
39            PhlowError::TransformError(err) => Some(err),
40            PhlowError::PipelineError(err) => Some(err),
41            PhlowError::PipelineNotFound => None,
42            PhlowError::InvalidStartStep { .. } => None,
43            PhlowError::ParentError => None,
44        }
45    }
46}
47
48pub type PipelineMap = HashMap<usize, Pipeline>;
49
50#[derive(Debug, Default)]
51pub struct Phlow {
52    pipelines: PipelineMap,
53    script: Value,
54}
55
56impl Phlow {
57    pub fn try_from_value(
58        value: &Value,
59        modules: Option<Arc<Modules>>,
60    ) -> Result<Self, PhlowError> {
61        let engine = match &modules {
62            Some(modules) => {
63                let repositories = modules.extract_repositories();
64                build_engine(Some(repositories))
65            }
66            None => build_engine(None),
67        };
68
69        let modules = if let Some(modules) = modules {
70            modules
71        } else {
72            Arc::new(Modules::default())
73        };
74
75        let script = if should_add_uuid() {
76            let in_steps = value.is_array();
77            add_uuids(value, in_steps)
78        } else {
79            value.clone()
80        };
81
82        let pipelines =
83            value_to_pipelines(engine, modules, &script).map_err(PhlowError::TransformError)?;
84
85        Ok(Self {
86            pipelines,
87            script,
88        })
89    }
90
91    pub async fn execute(&self, context: &mut Context) -> Result<Option<Value>, PhlowError> {
92        if self.pipelines.is_empty() {
93            return Ok(None);
94        }
95
96        let main_pipeline = self.pipelines.len() - 1;
97        let start = StepReference {
98            pipeline: main_pipeline,
99            step: 0,
100        };
101
102        self.execute_from(context, start).await
103    }
104
105    pub fn find_step_reference(&self, id: &str) -> Option<StepReference> {
106        for pipeline in self.pipelines.values() {
107            let pipeline_id = pipeline.get_id();
108            for (step_index, step) in pipeline.steps.iter().enumerate() {
109                let step_id = step.get_id();
110                if step_id.is_some() && step_id.to_string() == id {
111                    return Some(StepReference {
112                        pipeline: pipeline_id,
113                        step: step_index,
114                    });
115                }
116            }
117        }
118
119        None
120    }
121
122    pub async fn execute_from(
123        &self,
124        context: &mut Context,
125        start: StepReference,
126    ) -> Result<Option<Value>, PhlowError> {
127        if self.pipelines.is_empty() {
128            return Ok(None);
129        }
130
131        let mut current_pipeline = start.pipeline;
132        let mut current_step = start.step;
133        let main_pipeline = self.pipelines.len() - 1;
134
135        {
136            let pipeline = self
137                .pipelines
138                .get(&current_pipeline)
139                .ok_or(PhlowError::PipelineNotFound)?;
140            if current_step >= pipeline.steps.len() {
141                return Err(PhlowError::InvalidStartStep {
142                    pipeline: current_pipeline,
143                    step: current_step,
144                });
145            }
146        }
147
148        loop {
149            log::debug!(
150                "Executing pipeline {} step {}",
151                current_pipeline,
152                current_step
153            );
154            let pipeline = self
155                .pipelines
156                .get(&current_pipeline)
157                .ok_or(PhlowError::PipelineNotFound)?;
158
159            match pipeline.execute(context, current_step).await {
160                Ok(step_output) => match step_output {
161                    Some(step_output) => {
162                        log::debug!(
163                            "Next step decision: {:?}, payload: {:?}",
164                            step_output.next_step,
165                            step_output.output
166                        );
167                        match step_output.next_step {
168                            NextStep::Stop => {
169                                log::debug!("NextStep::Stop - terminating execution");
170                                return Ok(step_output.output);
171                            }
172                            NextStep::Next => {
173                                log::debug!(
174                                    "NextStep::Next - checking if sub-pipeline needs to return to parent"
175                                );
176                                if current_pipeline == main_pipeline {
177                                    log::debug!(
178                                        "NextStep::Next - terminating execution (main pipeline completed)"
179                                    );
180                                    return Ok(step_output.output);
181                                } else {
182                                    log::debug!(
183                                        "NextStep::Next - sub-pipeline completed, checking for parent return"
184                                    );
185                                    // This is a sub-pipeline that completed - we should return to parent
186                                    // For now, terminate execution but this needs proper parent tracking
187                                    return Ok(step_output.output);
188                                }
189                            }
190                            NextStep::Pipeline(id) => {
191                                log::debug!("NextStep::Pipeline({}) - jumping to pipeline", id);
192                                current_pipeline = id;
193                                current_step = 0;
194                            }
195                            NextStep::GoToStep(to) => {
196                                log::debug!("NextStep::GoToStep({:?}) - jumping to step", to);
197                                current_pipeline = to.pipeline;
198                                current_step = to.step;
199                            }
200                        }
201                    }
202                    None => {
203                        return Ok(None);
204                    }
205                },
206                Err(err) => {
207                    error!("Error executing step: {:?}", err);
208                    return Err(PhlowError::PipelineError(err));
209                }
210            }
211        }
212    }
213
214    pub fn script(&self) -> Value {
215        self.script.clone()
216    }
217}
218
219fn should_add_uuid() -> bool {
220    if debug_controller().is_some() {
221        return true;
222    }
223    std::env::var("PHLOW_DEBUG")
224        .map(|value| value.eq_ignore_ascii_case("true"))
225        .unwrap_or(false)
226}
227
228fn add_uuids(value: &Value, in_steps: bool) -> Value {
229    match value {
230        Value::Object(map) => {
231            let mut new_map = HashMap::new();
232            for (key, value) in map.iter() {
233                let key_str = key.to_string();
234                let is_pipeline = matches!(key_str.as_str(), "then" | "else")
235                    && (value.is_object() || value.is_array());
236                let next_in_steps = key_str == "steps" || is_pipeline;
237                new_map.insert(key_str, add_uuids(value, next_in_steps));
238            }
239            if in_steps && !map.contains_key(&"#uuid".to_string()) {
240                new_map.insert(
241                    "#uuid".to_string(),
242                    Uuid::new_v4().to_string().to_value(),
243                );
244            }
245            Value::from(new_map)
246        }
247        Value::Array(array) => {
248            let mut new_array = Vec::new();
249            for value in array.values.iter() {
250                new_array.push(add_uuids(value, in_steps));
251            }
252            Value::from(new_array)
253        }
254        _ => value.clone(),
255    }
256}