phlow_engine/
transform.rs

1use crate::{
2    phlow::PipelineMap,
3    pipeline::Pipeline,
4    step_worker::{StepReference, StepWorker, StepWorkerError},
5};
6use phlow_sdk::{
7    prelude::{log::debug, *},
8    valu3,
9};
10use rhai::Engine;
11use std::sync::Arc;
12use std::{collections::HashMap, fmt::Display};
13use valu3::{traits::ToValueBehavior, value::Value};
14
15#[derive(Debug)]
16pub enum TransformError {
17    InnerStepError(StepWorkerError),
18    Parser(valu3::Error),
19}
20
21impl Display for TransformError {
22    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
23        match self {
24            TransformError::InnerStepError(err) => write!(f, "Inner step error: {}", err),
25            TransformError::Parser(_) => write!(f, "Parser error: Non parseable"),
26        }
27    }
28}
29
30impl std::error::Error for TransformError {
31    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
32        match self {
33            TransformError::InnerStepError(err) => Some(err),
34            TransformError::Parser(_) => None, // valu3::Error doesn't implement std::error::Error
35        }
36    }
37}
38
39pub(crate) fn value_to_pipelines(
40    engine: Arc<Engine>,
41    modules: Arc<Modules>,
42    input: &Value,
43) -> Result<PipelineMap, TransformError> {
44    let mut map = Vec::new();
45
46    process_raw_steps(input, &mut map);
47    debug!("{}", map.to_value().to_json(JsonMode::Indented));
48    value_to_structs(engine, modules, &map)
49}
50
51pub(crate) fn process_raw_steps(input: &Value, map: &mut Vec<Value>) -> Value {
52    if let Value::Object(pipeline) = input {
53        let mut new_pipeline = pipeline.clone();
54
55        new_pipeline.remove(&"steps");
56
57        // Tratamento para THEN
58        if let Some(then) = pipeline.get("then") {
59            let then_value = process_raw_steps(then, map);
60            new_pipeline.insert("then".to_string(), then_value);
61        }
62
63        // Tratamento para ELSE
64        if let Some(els) = pipeline.get("else") {
65            let else_value = process_raw_steps(els, map);
66            new_pipeline.insert("else".to_string(), else_value);
67        }
68
69        let mut new_steps = if new_pipeline.is_empty() {
70            vec![]
71        } else {
72            vec![new_pipeline.to_value()]
73        };
74
75        if let Some(steps) = pipeline.get("steps") {
76            if let Value::Array(steps) = steps {
77                for step in steps {
78                    let mut new_step = step.clone();
79
80                    if let Some(then) = step.get("then") {
81                        new_step.insert("then".to_string(), process_raw_steps(then, map));
82                    }
83
84                    if let Some(els) = step.get("else") {
85                        new_step.insert("else".to_string(), process_raw_steps(els, map));
86                    }
87
88                    new_steps.push(new_step);
89                }
90            }
91        }
92
93        map.push(new_steps.to_value());
94    } else if let Value::Array(pipeline) = input {
95        let mut new_steps = Vec::new();
96
97        for step in pipeline {
98            if let Value::Object(step) = step {
99                let mut new_step = step.clone();
100
101                if let Some(then) = step.get("then") {
102                    new_step.insert("then".to_string(), process_raw_steps(then, map));
103                }
104
105                if let Some(els) = step.get("else") {
106                    new_step.insert("else".to_string(), process_raw_steps(els, map));
107                }
108
109                new_steps.push(new_step);
110            }
111        }
112
113        map.push(new_steps.to_value());
114    }
115
116    (map.len() - 1).to_value()
117}
118
119/// Function to transform a value into a pipeline map
120/// This function takes a value and transforms it into a pipeline map.
121/// It uses the `value_to_structs` function to convert the value into a pipeline map.
122/// It also uses the `resolve_go_to_step` function to resolve the "go to" step.
123/// The function returns a `Result` with the pipeline map or an error.
124fn value_to_structs(
125    engine: Arc<Engine>,
126    modules: Arc<Modules>,
127    pipelines_raw: &Vec<Value>,
128) -> Result<PipelineMap, TransformError> {
129    let (parents, go_to_step_id) = map_parents(pipelines_raw);
130    log::debug!("Parent mappings: {:?}", parents);
131    log::debug!(
132        "Pipeline structure: {}",
133        pipelines_raw.to_value().to_json(JsonMode::Indented)
134    );
135    let mut pipelines = HashMap::new();
136
137    for (pipeline_index, pipeline_value) in pipelines_raw.iter().enumerate() {
138        if let Value::Array(arr) = pipeline_value {
139            let mut steps = Vec::new();
140
141            for (step_index, step_value) in arr.into_iter().enumerate() {
142                if let Value::Object(step) = step_value {
143                    let mut new_step = step.clone();
144                    #[cfg(debug_assertions)]
145                    {
146                        log::debug!("new_step {:?}", new_step.to_value().to_string());
147                    }
148
149                    if let Some(to) = step.get("to") {
150                        if let Some(go_to_step) = go_to_step_id.get(to.to_string().as_str()) {
151                            new_step.insert("to".to_string(), go_to_step.to_value());
152                        }
153                    } else {
154                        if step.get("return").is_none() && step_index == arr.len() - 1
155                        {
156                            if let Some(target) = parents.get(&StepReference {
157                                pipeline: pipeline_index,
158                                step: 0,
159                            }) {
160                                if let Some(next_step_ref) =
161                                    next_step_if_exists(&pipelines_raw, target)
162                                {
163                                    log::debug!("Setting up parent return: pipeline {} → pipeline {} step {}", pipeline_index, next_step_ref.pipeline, next_step_ref.step);
164                                    new_step.insert("to".to_string(), next_step_ref.to_value());
165                                } else if let Some(valid_step) =
166                                    find_valid_continuation(&pipelines_raw, &parents, target)
167                                {
168                                    log::debug!("Found valid continuation: pipeline {} → pipeline {} step {}", pipeline_index, valid_step.pipeline, valid_step.step);
169                                    new_step.insert("to".to_string(), valid_step.to_value());
170                                } else {
171                                    log::warn!(
172                                        "No valid continuation found for pipeline {}",
173                                        pipeline_index
174                                    );
175                                }
176                            } else {
177                                // BUGFIX: Se não tem parent e não é a pipeline principal,
178                                // esta pipeline pode ser órfã e deve retornar ao pipeline principal
179                                let main_pipeline = pipelines_raw.len() - 1;
180                                if pipeline_index != main_pipeline {
181                                    // Check if this pipeline is referenced as a then/else branch
182                                    let mut found_parent = false;
183
184                                    // Search through all pipelines to find if this one is referenced as a then/else branch
185                                    for (parent_pipeline_idx, parent_pipeline) in
186                                        pipelines_raw.iter().enumerate()
187                                    {
188                                        if let Value::Array(parent_steps) = parent_pipeline {
189                                            for (parent_step_idx, parent_step) in
190                                                parent_steps.values.iter().enumerate()
191                                            {
192                                                if let Value::Object(step_obj) = parent_step {
193                                                    // Check if this step references our pipeline as a then branch
194                                                    if let Some(then_val) = step_obj
195                                                        .get("then")
196                                                        .and_then(|v| v.to_u64())
197                                                    {
198                                                        if then_val as usize == pipeline_index {
199                                                            // Find the next available step in the parent pipeline
200                                                            let next_step_idx = parent_step_idx + 1;
201                                                            if next_step_idx
202                                                                < parent_steps.values.len()
203                                                            {
204                                                                let next_step = StepReference {
205                                                                    pipeline: parent_pipeline_idx,
206                                                                    step: next_step_idx,
207                                                                };
208                                                                log::debug!("Setting up then branch return: pipeline {} → pipeline {} step {}", pipeline_index, next_step.pipeline, next_step.step);
209                                                                new_step.insert(
210                                                                    "to".to_string(),
211                                                                    next_step.to_value(),
212                                                                );
213                                                                found_parent = true;
214                                                                break;
215                                                            } else {
216                                                                // No more steps in parent pipeline, need to find its parent
217                                                                log::debug!("Then branch pipeline {} has no next step in parent pipeline {}", pipeline_index, parent_pipeline_idx);
218                                                                // For now, let's see if this parent pipeline has a parent
219                                                                if let Some(parent_target) = parents
220                                                                    .get(&StepReference {
221                                                                        pipeline:
222                                                                            parent_pipeline_idx,
223                                                                        step: 0,
224                                                                    })
225                                                                {
226                                                                    if let Some(next_step) =
227                                                                        find_valid_continuation(
228                                                                            &pipelines_raw,
229                                                                            &parents,
230                                                                            parent_target,
231                                                                        )
232                                                                    {
233                                                                        log::debug!("Setting up then branch return via grandparent: pipeline {} → pipeline {} step {}", pipeline_index, next_step.pipeline, next_step.step);
234                                                                        new_step.insert(
235                                                                            "to".to_string(),
236                                                                            next_step.to_value(),
237                                                                        );
238                                                                        found_parent = true;
239                                                                        break;
240                                                                    }
241                                                                }
242                                                            }
243                                                        }
244                                                    }
245                                                    // Check if this step references our pipeline as an else branch
246                                                    if let Some(else_val) = step_obj
247                                                        .get("else")
248                                                        .and_then(|v| v.to_u64())
249                                                    {
250                                                        if else_val as usize == pipeline_index {
251                                                            // Find the next available step in the parent pipeline
252                                                            let next_step_idx = parent_step_idx + 1;
253                                                            if next_step_idx
254                                                                < parent_steps.values.len()
255                                                            {
256                                                                let next_step = StepReference {
257                                                                    pipeline: parent_pipeline_idx,
258                                                                    step: next_step_idx,
259                                                                };
260                                                                log::debug!("Setting up else branch return: pipeline {} → pipeline {} step {}", pipeline_index, next_step.pipeline, next_step.step);
261                                                                new_step.insert(
262                                                                    "to".to_string(),
263                                                                    next_step.to_value(),
264                                                                );
265                                                                found_parent = true;
266                                                                break;
267                                                            } else {
268                                                                // No more steps in parent pipeline, need to find its parent
269                                                                log::debug!("Else branch pipeline {} has no next step in parent pipeline {}", pipeline_index, parent_pipeline_idx);
270                                                                // For now, let's see if this parent pipeline has a parent
271                                                                if let Some(parent_target) = parents
272                                                                    .get(&StepReference {
273                                                                        pipeline:
274                                                                            parent_pipeline_idx,
275                                                                        step: 0,
276                                                                    })
277                                                                {
278                                                                    if let Some(next_step) =
279                                                                        find_valid_continuation(
280                                                                            &pipelines_raw,
281                                                                            &parents,
282                                                                            parent_target,
283                                                                        )
284                                                                    {
285                                                                        log::debug!("Setting up else branch return via grandparent: pipeline {} → pipeline {} step {}", pipeline_index, next_step.pipeline, next_step.step);
286                                                                        new_step.insert(
287                                                                            "to".to_string(),
288                                                                            next_step.to_value(),
289                                                                        );
290                                                                        found_parent = true;
291                                                                        break;
292                                                                    }
293                                                                }
294                                                            }
295                                                        }
296                                                    }
297                                                }
298                                            }
299                                            if found_parent {
300                                                break;
301                                            }
302                                        }
303                                    }
304
305                                    if !found_parent {
306                                        // Esta é uma sub-pipeline órfã - deve retornar ao pipeline principal
307                                        // Return to the next step after the first step (which is the assert/conditional)
308                                        let next_step = StepReference {
309                                            pipeline: main_pipeline,
310                                            step: 1, // Continue from step 1 in main pipeline
311                                        };
312                                        log::debug!("Setting up orphan pipeline return: pipeline {} → pipeline {} step {}", pipeline_index, next_step.pipeline, next_step.step);
313                                        new_step.insert("to".to_string(), next_step.to_value());
314                                    }
315                                }
316                            }
317                        }
318                    }
319
320                    // Use para debugar a saida do step
321                    // println!("{}", new_step.to_value().to_json(JsonMode::Indented));
322
323                    let step_worker = StepWorker::try_from_value(
324                        engine.clone(),
325                        modules.clone(),
326                        &new_step.to_value(),
327                    )
328                    .map_err(TransformError::InnerStepError)?;
329
330                    steps.push(step_worker);
331                }
332            }
333
334            pipelines.insert(
335                pipeline_index,
336                Pipeline {
337                    steps,
338                    id: pipeline_index,
339                },
340            );
341        }
342    }
343
344    Ok(pipelines)
345}
346
347/// Function to check if a step reference is valid
348fn is_valid_step(pipelines: &Vec<Value>, step_ref: &StepReference) -> bool {
349    if step_ref.pipeline >= pipelines.len() {
350        return false;
351    }
352
353    if let Value::Array(arr) = &pipelines[step_ref.pipeline] {
354        return step_ref.step < arr.len();
355    }
356
357    false
358}
359
360fn next_step_if_exists(pipelines: &Vec<Value>, target: &StepReference) -> Option<StepReference> {
361    if let Value::Array(arr) = &pipelines[target.pipeline] {
362        let next_step_index = target.step + 1;
363        if next_step_index < arr.len() {
364            return Some(StepReference {
365                pipeline: target.pipeline,
366                step: next_step_index,
367            });
368        }
369    }
370
371    None
372}
373
374/// Function to find a valid continuation point when the direct next step is invalid
375fn find_valid_continuation(
376    pipelines: &Vec<Value>,
377    parents: &HashMap<StepReference, StepReference>,
378    target: &StepReference,
379) -> Option<StepReference> {
380    let mut current = target.clone();
381    let mut depth = 0usize;
382
383    loop {
384        let next_step_ref = StepReference {
385            pipeline: current.pipeline,
386            step: current.step + 1,
387        };
388
389        if is_valid_step(pipelines, &next_step_ref) {
390            return Some(next_step_ref);
391        }
392
393        let parent_key = StepReference {
394            pipeline: current.pipeline,
395            step: 0,
396        };
397        let Some(parent) = parents.get(&parent_key) else {
398            return None;
399        };
400
401        if parent.pipeline == current.pipeline && parent.step == current.step {
402            return None;
403        }
404
405        current = parent.clone();
406        depth += 1;
407        if depth > pipelines.len() {
408            return None;
409        }
410    }
411}
412
413/// Function to map parents
414/// This function takes a vector of pipelines and builds a parent map.
415fn map_parents(
416    pipelines: &Vec<Value>,
417) -> (
418    HashMap<StepReference, StepReference>,
419    HashMap<String, StepReference>,
420) {
421    let (parents, go_to_step_references) = build_parent_map(pipelines);
422    (parents, go_to_step_references)
423}
424
425/// Function to build the parent map
426/// This function takes a vector of pipelines and builds a parent map.
427/// It uses a hashmap to store the step references.
428fn build_parent_map(
429    pipelines: &Vec<Value>,
430) -> (
431    HashMap<StepReference, StepReference>,
432    HashMap<String, StepReference>,
433) {
434    let mut parents = HashMap::new();
435    let mut go_to_step_id = HashMap::new();
436
437    for (pipeline_index, steps) in pipelines.iter().enumerate() {
438        if let Value::Array(arr) = steps {
439            for (step_index, step) in arr.into_iter().enumerate() {
440                if let Value::Object(step) = step {
441                    if let Some(id) = step.get("id") {
442                        go_to_step_id.insert(
443                            id.to_string(),
444                            StepReference {
445                                pipeline: pipeline_index,
446                                step: step_index,
447                            },
448                        );
449                    }
450
451                    // Adiciona relações de "then" e "else" ao mapa de pais
452                    if let Some(then_value) = step.get("then").and_then(|v| v.to_u64()) {
453                        parents.insert(
454                            StepReference {
455                                pipeline: then_value as usize,
456                                step: 0,
457                            },
458                            StepReference {
459                                pipeline: pipeline_index,
460                                step: step_index,
461                            },
462                        );
463                    }
464
465                    if let Some(else_value) = step.get("else").and_then(|v| v.to_u64()) {
466                        parents.insert(
467                            StepReference {
468                                pipeline: else_value as usize,
469                                step: 0,
470                            },
471                            StepReference {
472                                pipeline: pipeline_index,
473                                step: step_index,
474                            },
475                        );
476                    }
477                }
478            }
479        }
480    }
481
482    (parents, go_to_step_id)
483}