phlow_engine/
transform.rs

1use crate::{
2    phlow::PipelineMap,
3    pipeline::Pipeline,
4    step_worker::{StepReference, StepWorker, StepWorkerError},
5};
6use phlow_sdk::{
7    prelude::{log::debug, *},
8    valu3,
9};
10use rhai::Engine;
11use std::sync::Arc;
12use std::{collections::HashMap, fmt::Display};
13use valu3::{traits::ToValueBehavior, value::Value};
14
15#[derive(Debug)]
16pub enum TransformError {
17    InnerStepError(StepWorkerError),
18    Parser(valu3::Error),
19}
20
21impl Display for TransformError {
22    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
23        match self {
24            TransformError::InnerStepError(err) => write!(f, "Inner step error: {}", err),
25            TransformError::Parser(_) => write!(f, "Parser error: Non parseable"),
26        }
27    }
28}
29
30impl std::error::Error for TransformError {
31    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
32        match self {
33            TransformError::InnerStepError(err) => Some(err),
34            TransformError::Parser(_) => None, // valu3::Error doesn't implement std::error::Error
35        }
36    }
37}
38
39pub(crate) fn value_to_pipelines(
40    engine: Arc<Engine>,
41    modules: Arc<Modules>,
42    input: &Value,
43) -> Result<PipelineMap, TransformError> {
44    let mut map = Vec::new();
45
46    process_raw_steps(input, &mut map);
47    debug!("{}", map.to_value().to_json(JsonMode::Indented));
48    value_to_structs(engine, modules, &map)
49}
50
51pub(crate) fn process_raw_steps(input: &Value, map: &mut Vec<Value>) -> Value {
52    if let Value::Object(pipeline) = input {
53        let mut new_pipeline = pipeline.clone();
54
55        new_pipeline.remove(&"steps");
56
57        // Tratamento para THEN
58        if let Some(then) = pipeline.get("then") {
59            let then_value = process_raw_steps(then, map);
60            new_pipeline.insert("then".to_string(), then_value);
61        }
62
63        // Tratamento para ELSE
64        if let Some(els) = pipeline.get("else") {
65            let else_value = process_raw_steps(els, map);
66            new_pipeline.insert("else".to_string(), else_value);
67        }
68
69        let mut new_steps = if new_pipeline.is_empty() {
70            vec![]
71        } else {
72            vec![new_pipeline.to_value()]
73        };
74
75        if let Some(steps) = pipeline.get("steps") {
76            if let Value::Array(steps) = steps {
77                for step in steps {
78                    let mut new_step = step.clone();
79
80                    if let Some(then) = step.get("then") {
81                        new_step.insert("then".to_string(), process_raw_steps(then, map));
82                    }
83
84                    if let Some(els) = step.get("else") {
85                        new_step.insert("else".to_string(), process_raw_steps(els, map));
86                    }
87
88                    new_steps.push(new_step);
89                }
90            }
91        }
92
93        map.push(new_steps.to_value());
94    } else if let Value::Array(pipeline) = input {
95        let mut new_steps = Vec::new();
96
97        for step in pipeline {
98            if let Value::Object(step) = step {
99                let mut new_step = step.clone();
100
101                if let Some(then) = step.get("then") {
102                    new_step.insert("then".to_string(), process_raw_steps(then, map));
103                }
104
105                if let Some(els) = step.get("else") {
106                    new_step.insert("else".to_string(), process_raw_steps(els, map));
107                }
108
109                new_steps.push(new_step);
110            }
111        }
112
113        map.push(new_steps.to_value());
114    }
115
116    (map.len() - 1).to_value()
117}
118
119/// Function to transform a value into a pipeline map
120/// This function takes a value and transforms it into a pipeline map.
121/// It uses the `value_to_structs` function to convert the value into a pipeline map.
122/// It also uses the `resolve_go_to_step` function to resolve the "go to" step.
123/// The function returns a `Result` with the pipeline map or an error.
124fn value_to_structs(
125    engine: Arc<Engine>,
126    modules: Arc<Modules>,
127    pipelines_raw: &Vec<Value>,
128) -> Result<PipelineMap, TransformError> {
129    let (parents, go_to_step_id) = map_parents(pipelines_raw);
130    log::debug!("Parent mappings: {:?}", parents);
131    log::debug!(
132        "Pipeline structure: {}",
133        pipelines_raw.to_value().to_json(JsonMode::Indented)
134    );
135    let mut pipelines = HashMap::new();
136
137    for (pipeline_index, pipeline_value) in pipelines_raw.iter().enumerate() {
138        if let Value::Array(arr) = pipeline_value {
139            let mut steps = Vec::new();
140
141            for (step_index, step_value) in arr.into_iter().enumerate() {
142                if let Value::Object(step) = step_value {
143                    let mut new_step = step.clone();
144                    #[cfg(debug_assertions)]
145                    {
146                        log::debug!("new_step {:?}", new_step.to_value().to_string());
147                    }
148
149                    if let Some(to) = step.get("to") {
150                        if let Some(go_to_step) = go_to_step_id.get(to.to_string().as_str()) {
151                            new_step.insert("to".to_string(), go_to_step.to_value());
152                        }
153                    } else {
154                        if step.get("then").is_none()
155                            && step.get("else").is_none()
156                            && step.get("return").is_none()
157                            && step_index == arr.len() - 1
158                        {
159                            if let Some(target) = parents.get(&StepReference {
160                                pipeline: pipeline_index,
161                                step: 0,
162                            }) {
163                                let next_step_ref = get_next_step(&pipelines_raw, target);
164                                // Validate that the next step actually exists
165                                if is_valid_step(&pipelines_raw, &next_step_ref) {
166                                    log::debug!("Setting up parent return: pipeline {} → pipeline {} step {}", pipeline_index, next_step_ref.pipeline, next_step_ref.step);
167                                    new_step.insert("to".to_string(), next_step_ref.to_value());
168                                } else {
169                                    log::warn!("Invalid next step reference: pipeline {} step {} - looking for alternative", next_step_ref.pipeline, next_step_ref.step);
170                                    // Try to find a valid continuation point
171                                    if let Some(valid_step) =
172                                        find_valid_continuation(&pipelines_raw, target)
173                                    {
174                                        log::debug!("Found valid continuation: pipeline {} → pipeline {} step {}", pipeline_index, valid_step.pipeline, valid_step.step);
175                                        new_step.insert("to".to_string(), valid_step.to_value());
176                                    } else {
177                                        log::warn!(
178                                            "No valid continuation found for pipeline {}",
179                                            pipeline_index
180                                        );
181                                    }
182                                }
183                            } else {
184                                // BUGFIX: Se não tem parent e não é a pipeline principal,
185                                // esta pipeline pode ser órfã e deve retornar ao pipeline principal
186                                let main_pipeline = pipelines_raw.len() - 1;
187                                if pipeline_index != main_pipeline {
188                                    // Check if this pipeline is referenced as a then/else branch
189                                    let mut found_parent = false;
190
191                                    // Search through all pipelines to find if this one is referenced as a then/else branch
192                                    for (parent_pipeline_idx, parent_pipeline) in
193                                        pipelines_raw.iter().enumerate()
194                                    {
195                                        if let Value::Array(parent_steps) = parent_pipeline {
196                                            for (parent_step_idx, parent_step) in
197                                                parent_steps.values.iter().enumerate()
198                                            {
199                                                if let Value::Object(step_obj) = parent_step {
200                                                    // Check if this step references our pipeline as a then branch
201                                                    if let Some(then_val) = step_obj
202                                                        .get("then")
203                                                        .and_then(|v| v.to_u64())
204                                                    {
205                                                        if then_val as usize == pipeline_index {
206                                                            // Find the next available step in the parent pipeline
207                                                            let next_step_idx = parent_step_idx + 1;
208                                                            if next_step_idx
209                                                                < parent_steps.values.len()
210                                                            {
211                                                                let next_step = StepReference {
212                                                                    pipeline: parent_pipeline_idx,
213                                                                    step: next_step_idx,
214                                                                };
215                                                                log::debug!("Setting up then branch return: pipeline {} → pipeline {} step {}", pipeline_index, next_step.pipeline, next_step.step);
216                                                                new_step.insert(
217                                                                    "to".to_string(),
218                                                                    next_step.to_value(),
219                                                                );
220                                                                found_parent = true;
221                                                                break;
222                                                            } else {
223                                                                // No more steps in parent pipeline, need to find its parent
224                                                                log::debug!("Then branch pipeline {} has no next step in parent pipeline {}", pipeline_index, parent_pipeline_idx);
225                                                                // For now, let's see if this parent pipeline has a parent
226                                                                if let Some(parent_target) = parents
227                                                                    .get(&StepReference {
228                                                                        pipeline:
229                                                                            parent_pipeline_idx,
230                                                                        step: 0,
231                                                                    })
232                                                                {
233                                                                    let next_step = get_next_step(
234                                                                        &pipelines_raw,
235                                                                        parent_target,
236                                                                    );
237                                                                    log::debug!("Setting up then branch return via grandparent: pipeline {} → pipeline {} step {}", pipeline_index, next_step.pipeline, next_step.step);
238                                                                    new_step.insert(
239                                                                        "to".to_string(),
240                                                                        next_step.to_value(),
241                                                                    );
242                                                                    found_parent = true;
243                                                                    break;
244                                                                }
245                                                            }
246                                                        }
247                                                    }
248                                                    // Check if this step references our pipeline as an else branch
249                                                    if let Some(else_val) = step_obj
250                                                        .get("else")
251                                                        .and_then(|v| v.to_u64())
252                                                    {
253                                                        if else_val as usize == pipeline_index {
254                                                            // Find the next available step in the parent pipeline
255                                                            let next_step_idx = parent_step_idx + 1;
256                                                            if next_step_idx
257                                                                < parent_steps.values.len()
258                                                            {
259                                                                let next_step = StepReference {
260                                                                    pipeline: parent_pipeline_idx,
261                                                                    step: next_step_idx,
262                                                                };
263                                                                log::debug!("Setting up else branch return: pipeline {} → pipeline {} step {}", pipeline_index, next_step.pipeline, next_step.step);
264                                                                new_step.insert(
265                                                                    "to".to_string(),
266                                                                    next_step.to_value(),
267                                                                );
268                                                                found_parent = true;
269                                                                break;
270                                                            } else {
271                                                                // No more steps in parent pipeline, need to find its parent
272                                                                log::debug!("Else branch pipeline {} has no next step in parent pipeline {}", pipeline_index, parent_pipeline_idx);
273                                                                // For now, let's see if this parent pipeline has a parent
274                                                                if let Some(parent_target) = parents
275                                                                    .get(&StepReference {
276                                                                        pipeline:
277                                                                            parent_pipeline_idx,
278                                                                        step: 0,
279                                                                    })
280                                                                {
281                                                                    let next_step = get_next_step(
282                                                                        &pipelines_raw,
283                                                                        parent_target,
284                                                                    );
285                                                                    log::debug!("Setting up else branch return via grandparent: pipeline {} → pipeline {} step {}", pipeline_index, next_step.pipeline, next_step.step);
286                                                                    new_step.insert(
287                                                                        "to".to_string(),
288                                                                        next_step.to_value(),
289                                                                    );
290                                                                    found_parent = true;
291                                                                    break;
292                                                                }
293                                                            }
294                                                        }
295                                                    }
296                                                }
297                                            }
298                                            if found_parent {
299                                                break;
300                                            }
301                                        }
302                                    }
303
304                                    if !found_parent {
305                                        // Esta é uma sub-pipeline órfã - deve retornar ao pipeline principal
306                                        // Return to the next step after the first step (which is the assert/conditional)
307                                        let next_step = StepReference {
308                                            pipeline: main_pipeline,
309                                            step: 1, // Continue from step 1 in main pipeline
310                                        };
311                                        log::debug!("Setting up orphan pipeline return: pipeline {} → pipeline {} step {}", pipeline_index, next_step.pipeline, next_step.step);
312                                        new_step.insert("to".to_string(), next_step.to_value());
313                                    }
314                                }
315                            }
316                        }
317                    }
318
319                    // Use para debugar a saida do step
320                    // println!("{}", new_step.to_value().to_json(JsonMode::Indented));
321
322                    let step_worker = StepWorker::try_from_value(
323                        engine.clone(),
324                        modules.clone(),
325                        &new_step.to_value(),
326                    )
327                    .map_err(TransformError::InnerStepError)?;
328
329                    steps.push(step_worker);
330                }
331            }
332
333            pipelines.insert(
334                pipeline_index,
335                Pipeline {
336                    steps,
337                    id: pipeline_index,
338                },
339            );
340        }
341    }
342
343    Ok(pipelines)
344}
345
346/// Function to check if a step reference is valid
347fn is_valid_step(pipelines: &Vec<Value>, step_ref: &StepReference) -> bool {
348    if step_ref.pipeline >= pipelines.len() {
349        return false;
350    }
351
352    if let Value::Array(arr) = &pipelines[step_ref.pipeline] {
353        return step_ref.step < arr.len();
354    }
355
356    false
357}
358
359/// Function to find a valid continuation point when the direct next step is invalid
360fn find_valid_continuation(
361    pipelines: &Vec<Value>,
362    target: &StepReference,
363) -> Option<StepReference> {
364    let main_pipeline = pipelines.len() - 1;
365
366    // If we're looking for continuation and the target is in the main pipeline,
367    // we should go to the main pipeline's next available step
368    if target.pipeline == main_pipeline {
369        // Try the next step in main pipeline
370        let next_step_ref = StepReference {
371            pipeline: main_pipeline,
372            step: target.step + 1,
373        };
374
375        if is_valid_step(pipelines, &next_step_ref) {
376            return Some(next_step_ref);
377        } else {
378            // No more steps in main pipeline - execution should end
379            return None;
380        }
381    }
382
383    // For non-main pipelines, try to continue from the main pipeline
384    // starting from the step after the conditional
385    let main_continuation = StepReference {
386        pipeline: main_pipeline,
387        step: 1, // Step after the conditional
388    };
389
390    if is_valid_step(pipelines, &main_continuation) {
391        return Some(main_continuation);
392    }
393
394    None
395}
396
397/// Function to get the next step
398/// This function takes a vector of pipelines and a target step reference.
399fn get_next_step(pipelines: &Vec<Value>, target: &StepReference) -> StepReference {
400    if let Value::Array(arr) = &pipelines[target.pipeline] {
401        let next_step_index = target.step + 1;
402        if arr.get(next_step_index).is_some() {
403            return StepReference {
404                pipeline: target.pipeline,
405                step: next_step_index,
406            };
407        } else {
408            // No more steps in this pipeline, need to find where to go next
409            // This should handle end-of-pipeline scenarios more gracefully
410            log::warn!(
411                "get_next_step: No next step found for pipeline {} step {}",
412                target.pipeline,
413                target.step
414            );
415        }
416    }
417
418    // Fallback - should not reach here in normal execution
419    return StepReference {
420        pipeline: target.pipeline,
421        step: target.step + 1,
422    };
423}
424
425/// Function to map parents
426/// This function takes a vector of pipelines and builds a parent map.
427fn map_parents(
428    pipelines: &Vec<Value>,
429) -> (
430    HashMap<StepReference, StepReference>,
431    HashMap<String, StepReference>,
432) {
433    let (parents, go_to_step_references) = build_parent_map(pipelines);
434    (resolve_final_parents(parents), go_to_step_references)
435}
436
437/// Function to build the parent map
438/// This function takes a vector of pipelines and builds a parent map.
439/// It uses a hashmap to store the step references.
440fn build_parent_map(
441    pipelines: &Vec<Value>,
442) -> (
443    HashMap<StepReference, StepReference>,
444    HashMap<String, StepReference>,
445) {
446    let mut parents = HashMap::new();
447    let mut go_to_step_id = HashMap::new();
448
449    for (pipeline_index, steps) in pipelines.iter().enumerate() {
450        if let Value::Array(arr) = steps {
451            for (step_index, step) in arr.into_iter().enumerate() {
452                if let Value::Object(step) = step {
453                    if let Some(id) = step.get("id") {
454                        go_to_step_id.insert(
455                            id.to_string(),
456                            StepReference {
457                                pipeline: pipeline_index,
458                                step: step_index,
459                            },
460                        );
461                    }
462
463                    // Adiciona relações de "then" e "else" ao mapa de pais
464                    if let Some(then_value) = step.get("then").and_then(|v| v.to_u64()) {
465                        parents.insert(
466                            StepReference {
467                                pipeline: then_value as usize,
468                                step: 0,
469                            },
470                            StepReference {
471                                pipeline: pipeline_index,
472                                step: step_index,
473                            },
474                        );
475                    }
476
477                    if let Some(else_value) = step.get("else").and_then(|v| v.to_u64()) {
478                        parents.insert(
479                            StepReference {
480                                pipeline: else_value as usize,
481                                step: 0,
482                            },
483                            StepReference {
484                                pipeline: pipeline_index,
485                                step: step_index,
486                            },
487                        );
488                    }
489                }
490            }
491        }
492    }
493
494    (parents, go_to_step_id)
495}
496
497/// Function to resolve final parents
498/// This function takes a parent map and resolves the final parents.
499fn resolve_final_parents(
500    parents: HashMap<StepReference, StepReference>,
501) -> HashMap<StepReference, StepReference> {
502    let mut final_parents = HashMap::new();
503
504    for (child, mut parent) in parents.iter() {
505        // Resolve o pai final seguindo a cadeia de ancestrais
506        while let Some(grandparent) = parents.get(parent) {
507            parent = grandparent;
508        }
509        final_parents.insert(child.clone(), parent.clone());
510    }
511
512    final_parents
513}