phlow_engine/transform.rs
1use crate::{
2 phlow::PipelineMap,
3 pipeline::Pipeline,
4 step_worker::{StepReference, StepWorker, StepWorkerError},
5};
6use phlow_sdk::{
7 prelude::{log::debug, *},
8 valu3,
9};
10use rhai::Engine;
11use std::sync::Arc;
12use std::{collections::HashMap, fmt::Display};
13use valu3::{traits::ToValueBehavior, value::Value};
14
15#[derive(Debug)]
16pub enum TransformError {
17 InnerStepError(StepWorkerError),
18 Parser(valu3::Error),
19}
20
21impl Display for TransformError {
22 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
23 match self {
24 TransformError::InnerStepError(err) => write!(f, "Inner step error: {}", err),
25 TransformError::Parser(_) => write!(f, "Parser error: Non parseable"),
26 }
27 }
28}
29
30impl std::error::Error for TransformError {
31 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
32 match self {
33 TransformError::InnerStepError(err) => Some(err),
34 TransformError::Parser(_) => None, // valu3::Error doesn't implement std::error::Error
35 }
36 }
37}
38
39pub(crate) fn value_to_pipelines(
40 engine: Arc<Engine>,
41 modules: Arc<Modules>,
42 input: &Value,
43) -> Result<PipelineMap, TransformError> {
44 let mut map = Vec::new();
45
46 process_raw_steps(input, &mut map);
47 debug!("{}", map.to_value().to_json(JsonMode::Indented));
48 value_to_structs(engine, modules, &map)
49}
50
51pub(crate) fn process_raw_steps(input: &Value, map: &mut Vec<Value>) -> Value {
52 if let Value::Object(pipeline) = input {
53 let mut new_pipeline = pipeline.clone();
54
55 new_pipeline.remove(&"steps");
56
57 // Tratamento para THEN
58 if let Some(then) = pipeline.get("then") {
59 let then_value = process_raw_steps(then, map);
60 new_pipeline.insert("then".to_string(), then_value);
61 }
62
63 // Tratamento para ELSE
64 if let Some(els) = pipeline.get("else") {
65 let else_value = process_raw_steps(els, map);
66 new_pipeline.insert("else".to_string(), else_value);
67 }
68
69 let mut new_steps = if new_pipeline.is_empty() {
70 vec![]
71 } else {
72 vec![new_pipeline.to_value()]
73 };
74
75 if let Some(steps) = pipeline.get("steps") {
76 if let Value::Array(steps) = steps {
77 for step in steps {
78 let mut new_step = step.clone();
79
80 if let Some(then) = step.get("then") {
81 new_step.insert("then".to_string(), process_raw_steps(then, map));
82 }
83
84 if let Some(els) = step.get("else") {
85 new_step.insert("else".to_string(), process_raw_steps(els, map));
86 }
87
88 new_steps.push(new_step);
89 }
90 }
91 }
92
93 map.push(new_steps.to_value());
94 } else if let Value::Array(pipeline) = input {
95 let mut new_steps = Vec::new();
96
97 for step in pipeline {
98 if let Value::Object(step) = step {
99 let mut new_step = step.clone();
100
101 if let Some(then) = step.get("then") {
102 new_step.insert("then".to_string(), process_raw_steps(then, map));
103 }
104
105 if let Some(els) = step.get("else") {
106 new_step.insert("else".to_string(), process_raw_steps(els, map));
107 }
108
109 new_steps.push(new_step);
110 }
111 }
112
113 map.push(new_steps.to_value());
114 }
115
116 (map.len() - 1).to_value()
117}
118
119/// Function to transform a value into a pipeline map
120/// This function takes a value and transforms it into a pipeline map.
121/// It uses the `value_to_structs` function to convert the value into a pipeline map.
122/// It also uses the `resolve_go_to_step` function to resolve the "go to" step.
123/// The function returns a `Result` with the pipeline map or an error.
124fn value_to_structs(
125 engine: Arc<Engine>,
126 modules: Arc<Modules>,
127 pipelines_raw: &Vec<Value>,
128) -> Result<PipelineMap, TransformError> {
129 let (parents, go_to_step_id) = map_parents(pipelines_raw);
130 log::debug!("Parent mappings: {:?}", parents);
131 log::debug!(
132 "Pipeline structure: {}",
133 pipelines_raw.to_value().to_json(JsonMode::Indented)
134 );
135 let mut pipelines = HashMap::new();
136
137 for (pipeline_index, pipeline_value) in pipelines_raw.iter().enumerate() {
138 if let Value::Array(arr) = pipeline_value {
139 let mut steps = Vec::new();
140
141 for (step_index, step_value) in arr.into_iter().enumerate() {
142 if let Value::Object(step) = step_value {
143 let mut new_step = step.clone();
144 #[cfg(debug_assertions)]
145 {
146 log::debug!("new_step {:?}", new_step.to_value().to_string());
147 }
148
149 if let Some(to) = step.get("to") {
150 if let Some(go_to_step) = go_to_step_id.get(to.to_string().as_str()) {
151 new_step.insert("to".to_string(), go_to_step.to_value());
152 }
153 } else {
154 if step.get("then").is_none()
155 && step.get("else").is_none()
156 && step.get("return").is_none()
157 && step_index == arr.len() - 1
158 {
159 if let Some(target) = parents.get(&StepReference {
160 pipeline: pipeline_index,
161 step: 0,
162 }) {
163 let next_step_ref = get_next_step(&pipelines_raw, target);
164 // Validate that the next step actually exists
165 if is_valid_step(&pipelines_raw, &next_step_ref) {
166 log::debug!("Setting up parent return: pipeline {} → pipeline {} step {}", pipeline_index, next_step_ref.pipeline, next_step_ref.step);
167 new_step.insert("to".to_string(), next_step_ref.to_value());
168 } else {
169 log::warn!("Invalid next step reference: pipeline {} step {} - looking for alternative", next_step_ref.pipeline, next_step_ref.step);
170 // Try to find a valid continuation point
171 if let Some(valid_step) =
172 find_valid_continuation(&pipelines_raw, target)
173 {
174 log::debug!("Found valid continuation: pipeline {} → pipeline {} step {}", pipeline_index, valid_step.pipeline, valid_step.step);
175 new_step.insert("to".to_string(), valid_step.to_value());
176 } else {
177 log::warn!(
178 "No valid continuation found for pipeline {}",
179 pipeline_index
180 );
181 }
182 }
183 } else {
184 // BUGFIX: Se não tem parent e não é a pipeline principal,
185 // esta pipeline pode ser órfã e deve retornar ao pipeline principal
186 let main_pipeline = pipelines_raw.len() - 1;
187 if pipeline_index != main_pipeline {
188 // Check if this pipeline is referenced as a then/else branch
189 let mut found_parent = false;
190
191 // Search through all pipelines to find if this one is referenced as a then/else branch
192 for (parent_pipeline_idx, parent_pipeline) in
193 pipelines_raw.iter().enumerate()
194 {
195 if let Value::Array(parent_steps) = parent_pipeline {
196 for (parent_step_idx, parent_step) in
197 parent_steps.values.iter().enumerate()
198 {
199 if let Value::Object(step_obj) = parent_step {
200 // Check if this step references our pipeline as a then branch
201 if let Some(then_val) = step_obj
202 .get("then")
203 .and_then(|v| v.to_u64())
204 {
205 if then_val as usize == pipeline_index {
206 // Find the next available step in the parent pipeline
207 let next_step_idx = parent_step_idx + 1;
208 if next_step_idx
209 < parent_steps.values.len()
210 {
211 let next_step = StepReference {
212 pipeline: parent_pipeline_idx,
213 step: next_step_idx,
214 };
215 log::debug!("Setting up then branch return: pipeline {} → pipeline {} step {}", pipeline_index, next_step.pipeline, next_step.step);
216 new_step.insert(
217 "to".to_string(),
218 next_step.to_value(),
219 );
220 found_parent = true;
221 break;
222 } else {
223 // No more steps in parent pipeline, need to find its parent
224 log::debug!("Then branch pipeline {} has no next step in parent pipeline {}", pipeline_index, parent_pipeline_idx);
225 // For now, let's see if this parent pipeline has a parent
226 if let Some(parent_target) = parents
227 .get(&StepReference {
228 pipeline:
229 parent_pipeline_idx,
230 step: 0,
231 })
232 {
233 let next_step = get_next_step(
234 &pipelines_raw,
235 parent_target,
236 );
237 log::debug!("Setting up then branch return via grandparent: pipeline {} → pipeline {} step {}", pipeline_index, next_step.pipeline, next_step.step);
238 new_step.insert(
239 "to".to_string(),
240 next_step.to_value(),
241 );
242 found_parent = true;
243 break;
244 }
245 }
246 }
247 }
248 // Check if this step references our pipeline as an else branch
249 if let Some(else_val) = step_obj
250 .get("else")
251 .and_then(|v| v.to_u64())
252 {
253 if else_val as usize == pipeline_index {
254 // Find the next available step in the parent pipeline
255 let next_step_idx = parent_step_idx + 1;
256 if next_step_idx
257 < parent_steps.values.len()
258 {
259 let next_step = StepReference {
260 pipeline: parent_pipeline_idx,
261 step: next_step_idx,
262 };
263 log::debug!("Setting up else branch return: pipeline {} → pipeline {} step {}", pipeline_index, next_step.pipeline, next_step.step);
264 new_step.insert(
265 "to".to_string(),
266 next_step.to_value(),
267 );
268 found_parent = true;
269 break;
270 } else {
271 // No more steps in parent pipeline, need to find its parent
272 log::debug!("Else branch pipeline {} has no next step in parent pipeline {}", pipeline_index, parent_pipeline_idx);
273 // For now, let's see if this parent pipeline has a parent
274 if let Some(parent_target) = parents
275 .get(&StepReference {
276 pipeline:
277 parent_pipeline_idx,
278 step: 0,
279 })
280 {
281 let next_step = get_next_step(
282 &pipelines_raw,
283 parent_target,
284 );
285 log::debug!("Setting up else branch return via grandparent: pipeline {} → pipeline {} step {}", pipeline_index, next_step.pipeline, next_step.step);
286 new_step.insert(
287 "to".to_string(),
288 next_step.to_value(),
289 );
290 found_parent = true;
291 break;
292 }
293 }
294 }
295 }
296 }
297 }
298 if found_parent {
299 break;
300 }
301 }
302 }
303
304 if !found_parent {
305 // Esta é uma sub-pipeline órfã - deve retornar ao pipeline principal
306 // Return to the next step after the first step (which is the assert/conditional)
307 let next_step = StepReference {
308 pipeline: main_pipeline,
309 step: 1, // Continue from step 1 in main pipeline
310 };
311 log::debug!("Setting up orphan pipeline return: pipeline {} → pipeline {} step {}", pipeline_index, next_step.pipeline, next_step.step);
312 new_step.insert("to".to_string(), next_step.to_value());
313 }
314 }
315 }
316 }
317 }
318
319 // Use para debugar a saida do step
320 // println!("{}", new_step.to_value().to_json(JsonMode::Indented));
321
322 let step_worker = StepWorker::try_from_value(
323 engine.clone(),
324 modules.clone(),
325 &new_step.to_value(),
326 )
327 .map_err(TransformError::InnerStepError)?;
328
329 steps.push(step_worker);
330 }
331 }
332
333 pipelines.insert(
334 pipeline_index,
335 Pipeline {
336 steps,
337 id: pipeline_index,
338 },
339 );
340 }
341 }
342
343 Ok(pipelines)
344}
345
346/// Function to check if a step reference is valid
347fn is_valid_step(pipelines: &Vec<Value>, step_ref: &StepReference) -> bool {
348 if step_ref.pipeline >= pipelines.len() {
349 return false;
350 }
351
352 if let Value::Array(arr) = &pipelines[step_ref.pipeline] {
353 return step_ref.step < arr.len();
354 }
355
356 false
357}
358
359/// Function to find a valid continuation point when the direct next step is invalid
360fn find_valid_continuation(
361 pipelines: &Vec<Value>,
362 target: &StepReference,
363) -> Option<StepReference> {
364 let main_pipeline = pipelines.len() - 1;
365
366 // If we're looking for continuation and the target is in the main pipeline,
367 // we should go to the main pipeline's next available step
368 if target.pipeline == main_pipeline {
369 // Try the next step in main pipeline
370 let next_step_ref = StepReference {
371 pipeline: main_pipeline,
372 step: target.step + 1,
373 };
374
375 if is_valid_step(pipelines, &next_step_ref) {
376 return Some(next_step_ref);
377 } else {
378 // No more steps in main pipeline - execution should end
379 return None;
380 }
381 }
382
383 // For non-main pipelines, try to continue from the main pipeline
384 // starting from the step after the conditional
385 let main_continuation = StepReference {
386 pipeline: main_pipeline,
387 step: 1, // Step after the conditional
388 };
389
390 if is_valid_step(pipelines, &main_continuation) {
391 return Some(main_continuation);
392 }
393
394 None
395}
396
397/// Function to get the next step
398/// This function takes a vector of pipelines and a target step reference.
399fn get_next_step(pipelines: &Vec<Value>, target: &StepReference) -> StepReference {
400 if let Value::Array(arr) = &pipelines[target.pipeline] {
401 let next_step_index = target.step + 1;
402 if arr.get(next_step_index).is_some() {
403 return StepReference {
404 pipeline: target.pipeline,
405 step: next_step_index,
406 };
407 } else {
408 // No more steps in this pipeline, need to find where to go next
409 // This should handle end-of-pipeline scenarios more gracefully
410 log::warn!(
411 "get_next_step: No next step found for pipeline {} step {}",
412 target.pipeline,
413 target.step
414 );
415 }
416 }
417
418 // Fallback - should not reach here in normal execution
419 return StepReference {
420 pipeline: target.pipeline,
421 step: target.step + 1,
422 };
423}
424
425/// Function to map parents
426/// This function takes a vector of pipelines and builds a parent map.
427fn map_parents(
428 pipelines: &Vec<Value>,
429) -> (
430 HashMap<StepReference, StepReference>,
431 HashMap<String, StepReference>,
432) {
433 let (parents, go_to_step_references) = build_parent_map(pipelines);
434 (resolve_final_parents(parents), go_to_step_references)
435}
436
437/// Function to build the parent map
438/// This function takes a vector of pipelines and builds a parent map.
439/// It uses a hashmap to store the step references.
440fn build_parent_map(
441 pipelines: &Vec<Value>,
442) -> (
443 HashMap<StepReference, StepReference>,
444 HashMap<String, StepReference>,
445) {
446 let mut parents = HashMap::new();
447 let mut go_to_step_id = HashMap::new();
448
449 for (pipeline_index, steps) in pipelines.iter().enumerate() {
450 if let Value::Array(arr) = steps {
451 for (step_index, step) in arr.into_iter().enumerate() {
452 if let Value::Object(step) = step {
453 if let Some(id) = step.get("id") {
454 go_to_step_id.insert(
455 id.to_string(),
456 StepReference {
457 pipeline: pipeline_index,
458 step: step_index,
459 },
460 );
461 }
462
463 // Adiciona relações de "then" e "else" ao mapa de pais
464 if let Some(then_value) = step.get("then").and_then(|v| v.to_u64()) {
465 parents.insert(
466 StepReference {
467 pipeline: then_value as usize,
468 step: 0,
469 },
470 StepReference {
471 pipeline: pipeline_index,
472 step: step_index,
473 },
474 );
475 }
476
477 if let Some(else_value) = step.get("else").and_then(|v| v.to_u64()) {
478 parents.insert(
479 StepReference {
480 pipeline: else_value as usize,
481 step: 0,
482 },
483 StepReference {
484 pipeline: pipeline_index,
485 step: step_index,
486 },
487 );
488 }
489 }
490 }
491 }
492 }
493
494 (parents, go_to_step_id)
495}
496
497/// Function to resolve final parents
498/// This function takes a parent map and resolves the final parents.
499fn resolve_final_parents(
500 parents: HashMap<StepReference, StepReference>,
501) -> HashMap<StepReference, StepReference> {
502 let mut final_parents = HashMap::new();
503
504 for (child, mut parent) in parents.iter() {
505 // Resolve o pai final seguindo a cadeia de ancestrais
506 while let Some(grandparent) = parents.get(parent) {
507 parent = grandparent;
508 }
509 final_parents.insert(child.clone(), parent.clone());
510 }
511
512 final_parents
513}