phlow_engine/
phlow.rs

1use crate::{
2    context::Context,
3    debug::debug_controller,
4    pipeline::{Pipeline, PipelineError},
5    step_worker::NextStep,
6    transform::{TransformError, value_to_pipelines},
7};
8use phlow_sdk::prelude::{log::error, *};
9use phs::build_engine;
10use std::{collections::HashMap, fmt::Display, sync::Arc};
11use uuid::Uuid;
12
13#[derive(Debug)]
14pub enum PhlowError {
15    TransformError(TransformError),
16    PipelineError(PipelineError),
17    PipelineNotFound,
18    ParentError,
19}
20
21impl Display for PhlowError {
22    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
23        match self {
24            PhlowError::TransformError(err) => write!(f, "Transform error: {}", err),
25            PhlowError::PipelineError(err) => write!(f, "Pipeline error: {}", err),
26            PhlowError::PipelineNotFound => write!(f, "Pipeline not found"),
27            PhlowError::ParentError => write!(f, "Parent error"),
28        }
29    }
30}
31
32impl std::error::Error for PhlowError {
33    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
34        match self {
35            PhlowError::TransformError(err) => Some(err),
36            PhlowError::PipelineError(err) => Some(err),
37            PhlowError::PipelineNotFound => None,
38            PhlowError::ParentError => None,
39        }
40    }
41}
42
43pub type PipelineMap = HashMap<usize, Pipeline>;
44
45#[derive(Debug, Default)]
46pub struct Phlow {
47    pipelines: PipelineMap,
48    script: Value,
49}
50
51impl Phlow {
52    pub fn try_from_value(
53        value: &Value,
54        modules: Option<Arc<Modules>>,
55    ) -> Result<Self, PhlowError> {
56        let engine = match &modules {
57            Some(modules) => {
58                let repositories = modules.extract_repositories();
59                build_engine(Some(repositories))
60            }
61            None => build_engine(None),
62        };
63
64        let modules = if let Some(modules) = modules {
65            modules
66        } else {
67            Arc::new(Modules::default())
68        };
69
70        let script = if should_add_uuid() {
71            let in_steps = value.is_array();
72            add_uuids(value, in_steps)
73        } else {
74            value.clone()
75        };
76
77        let pipelines =
78            value_to_pipelines(engine, modules, &script).map_err(PhlowError::TransformError)?;
79
80        Ok(Self {
81            pipelines,
82            script,
83        })
84    }
85
86    pub async fn execute(&self, context: &mut Context) -> Result<Option<Value>, PhlowError> {
87        if self.pipelines.is_empty() {
88            return Ok(None);
89        }
90
91        let mut current_pipeline = self.pipelines.len() - 1;
92        let mut current_step = 0;
93
94        loop {
95            log::debug!(
96                "Executing pipeline {} step {}",
97                current_pipeline,
98                current_step
99            );
100            let pipeline = self
101                .pipelines
102                .get(&current_pipeline)
103                .ok_or(PhlowError::PipelineNotFound)?;
104
105            match pipeline.execute(context, current_step).await {
106                Ok(step_output) => match step_output {
107                    Some(step_output) => {
108                        log::debug!(
109                            "Next step decision: {:?}, payload: {:?}",
110                            step_output.next_step,
111                            step_output.output
112                        );
113                        match step_output.next_step {
114                            NextStep::Stop => {
115                                log::debug!("NextStep::Stop - terminating execution");
116                                return Ok(step_output.output);
117                            }
118                            NextStep::Next => {
119                                log::debug!(
120                                    "NextStep::Next - checking if sub-pipeline needs to return to parent"
121                                );
122                                // Check if this is the main pipeline (highest index)
123                                let main_pipeline = self.pipelines.len() - 1;
124                                if current_pipeline == main_pipeline {
125                                    log::debug!(
126                                        "NextStep::Next - terminating execution (main pipeline completed)"
127                                    );
128                                    return Ok(step_output.output);
129                                } else {
130                                    log::debug!(
131                                        "NextStep::Next - sub-pipeline completed, checking for parent return"
132                                    );
133                                    // This is a sub-pipeline that completed - we should return to parent
134                                    // For now, terminate execution but this needs proper parent tracking
135                                    return Ok(step_output.output);
136                                }
137                            }
138                            NextStep::Pipeline(id) => {
139                                log::debug!("NextStep::Pipeline({}) - jumping to pipeline", id);
140                                current_pipeline = id;
141                                current_step = 0;
142                            }
143                            NextStep::GoToStep(to) => {
144                                log::debug!("NextStep::GoToStep({:?}) - jumping to step", to);
145                                current_pipeline = to.pipeline;
146                                current_step = to.step;
147                            }
148                        }
149                    }
150                    None => {
151                        return Ok(None);
152                    }
153                },
154                Err(err) => {
155                    error!("Error executing step: {:?}", err);
156                    return Err(PhlowError::PipelineError(err));
157                }
158            }
159        }
160    }
161
162    pub fn script(&self) -> Value {
163        self.script.clone()
164    }
165}
166
167fn should_add_uuid() -> bool {
168    if debug_controller().is_some() {
169        return true;
170    }
171    std::env::var("PHLOW_DEBUG")
172        .map(|value| value.eq_ignore_ascii_case("true"))
173        .unwrap_or(false)
174}
175
176fn add_uuids(value: &Value, in_steps: bool) -> Value {
177    match value {
178        Value::Object(map) => {
179            let mut new_map = HashMap::new();
180            for (key, value) in map.iter() {
181                let key_str = key.to_string();
182                let is_pipeline = matches!(key_str.as_str(), "then" | "else")
183                    && (value.is_object() || value.is_array());
184                let next_in_steps = key_str == "steps" || is_pipeline;
185                new_map.insert(key_str, add_uuids(value, next_in_steps));
186            }
187            if in_steps && !map.contains_key(&"#uuid".to_string()) {
188                new_map.insert(
189                    "#uuid".to_string(),
190                    Uuid::new_v4().to_string().to_value(),
191                );
192            }
193            Value::from(new_map)
194        }
195        Value::Array(array) => {
196            let mut new_array = Vec::new();
197            for value in array.values.iter() {
198                new_array.push(add_uuids(value, in_steps));
199            }
200            Value::from(new_array)
201        }
202        _ => value.clone(),
203    }
204}