phlow_engine/transform.rs
1use crate::{
2 phlow::PipelineMap,
3 pipeline::Pipeline,
4 step_worker::{StepReference, StepWorker, StepWorkerError},
5};
6use phlow_sdk::{
7 prelude::{log::debug, *},
8 valu3,
9};
10use rhai::Engine;
11use std::sync::Arc;
12use std::{collections::HashMap, fmt::Display};
13use valu3::{traits::ToValueBehavior, value::Value};
14
15#[derive(Debug)]
16pub enum TransformError {
17 InnerStepError(StepWorkerError),
18 Parser(valu3::Error),
19}
20
21impl Display for TransformError {
22 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
23 match self {
24 TransformError::InnerStepError(err) => write!(f, "Inner step error: {}", err),
25 TransformError::Parser(_) => write!(f, "Parser error: Non parseable"),
26 }
27 }
28}
29
30impl std::error::Error for TransformError {
31 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
32 match self {
33 TransformError::InnerStepError(err) => Some(err),
34 TransformError::Parser(_) => None, // valu3::Error doesn't implement std::error::Error
35 }
36 }
37}
38
39pub(crate) fn value_to_pipelines(
40 engine: Arc<Engine>,
41 modules: Arc<Modules>,
42 input: &Value,
43) -> Result<PipelineMap, TransformError> {
44 let mut map = Vec::new();
45
46 process_raw_steps(input, &mut map);
47 debug!("{}", map.to_value().to_json(JsonMode::Indented));
48 value_to_structs(engine, modules, &map)
49}
50
51pub(crate) fn process_raw_steps(input: &Value, map: &mut Vec<Value>) -> Value {
52 if let Value::Object(pipeline) = input {
53 let mut new_pipeline = pipeline.clone();
54
55 new_pipeline.remove(&"steps");
56
57 // Tratamento para THEN
58 if let Some(then) = pipeline.get("then") {
59 let then_value = process_raw_steps(then, map);
60 new_pipeline.insert("then".to_string(), then_value);
61 }
62
63 // Tratamento para ELSE
64 if let Some(els) = pipeline.get("else") {
65 let else_value = process_raw_steps(els, map);
66 new_pipeline.insert("else".to_string(), else_value);
67 }
68
69 let mut new_steps = if new_pipeline.is_empty() {
70 vec![]
71 } else {
72 vec![new_pipeline.to_value()]
73 };
74
75 if let Some(steps) = pipeline.get("steps") {
76 if let Value::Array(steps) = steps {
77 for step in steps {
78 let mut new_step = step.clone();
79
80 if let Some(then) = step.get("then") {
81 new_step.insert("then".to_string(), process_raw_steps(then, map));
82 }
83
84 if let Some(els) = step.get("else") {
85 new_step.insert("else".to_string(), process_raw_steps(els, map));
86 }
87
88 new_steps.push(new_step);
89 }
90 }
91 }
92
93 map.push(new_steps.to_value());
94 } else if let Value::Array(pipeline) = input {
95 let mut new_steps = Vec::new();
96
97 for step in pipeline {
98 if let Value::Object(step) = step {
99 let mut new_step = step.clone();
100
101 if let Some(then) = step.get("then") {
102 new_step.insert("then".to_string(), process_raw_steps(then, map));
103 }
104
105 if let Some(els) = step.get("else") {
106 new_step.insert("else".to_string(), process_raw_steps(els, map));
107 }
108
109 new_steps.push(new_step);
110 }
111 }
112
113 map.push(new_steps.to_value());
114 }
115
116 (map.len() - 1).to_value()
117}
118
119/// Function to transform a value into a pipeline map
120/// This function takes a value and transforms it into a pipeline map.
121/// It uses the `value_to_structs` function to convert the value into a pipeline map.
122/// It also uses the `resolve_go_to_step` function to resolve the "go to" step.
123/// The function returns a `Result` with the pipeline map or an error.
124fn value_to_structs(
125 engine: Arc<Engine>,
126 modules: Arc<Modules>,
127 pipelines_raw: &Vec<Value>,
128) -> Result<PipelineMap, TransformError> {
129 let (parents, go_to_step_id) = map_parents(pipelines_raw);
130 log::debug!("Parent mappings: {:?}", parents);
131 log::debug!(
132 "Pipeline structure: {}",
133 pipelines_raw.to_value().to_json(JsonMode::Indented)
134 );
135 let mut pipelines = HashMap::new();
136
137 for (pipeline_index, pipeline_value) in pipelines_raw.iter().enumerate() {
138 if let Value::Array(arr) = pipeline_value {
139 let mut steps = Vec::new();
140
141 for (step_index, step_value) in arr.into_iter().enumerate() {
142 if let Value::Object(step) = step_value {
143 let mut new_step = step.clone();
144 #[cfg(debug_assertions)]
145 {
146 log::debug!("new_step {:?}", new_step.to_value().to_string());
147 }
148
149 if let Some(to) = step.get("to") {
150 if let Some(go_to_step) = go_to_step_id.get(to.to_string().as_str()) {
151 new_step.insert("to".to_string(), go_to_step.to_value());
152 }
153 } else {
154 if step.get("return").is_none() && step_index == arr.len() - 1
155 {
156 if let Some(target) = parents.get(&StepReference {
157 pipeline: pipeline_index,
158 step: 0,
159 }) {
160 if let Some(next_step_ref) =
161 next_step_if_exists(&pipelines_raw, target)
162 {
163 log::debug!("Setting up parent return: pipeline {} → pipeline {} step {}", pipeline_index, next_step_ref.pipeline, next_step_ref.step);
164 new_step.insert("to".to_string(), next_step_ref.to_value());
165 } else if let Some(valid_step) =
166 find_valid_continuation(&pipelines_raw, &parents, target)
167 {
168 log::debug!("Found valid continuation: pipeline {} → pipeline {} step {}", pipeline_index, valid_step.pipeline, valid_step.step);
169 new_step.insert("to".to_string(), valid_step.to_value());
170 } else {
171 log::warn!(
172 "No valid continuation found for pipeline {}",
173 pipeline_index
174 );
175 }
176 } else {
177 // BUGFIX: Se não tem parent e não é a pipeline principal,
178 // esta pipeline pode ser órfã e deve retornar ao pipeline principal
179 let main_pipeline = pipelines_raw.len() - 1;
180 if pipeline_index != main_pipeline {
181 // Check if this pipeline is referenced as a then/else branch
182 let mut found_parent = false;
183
184 // Search through all pipelines to find if this one is referenced as a then/else branch
185 for (parent_pipeline_idx, parent_pipeline) in
186 pipelines_raw.iter().enumerate()
187 {
188 if let Value::Array(parent_steps) = parent_pipeline {
189 for (parent_step_idx, parent_step) in
190 parent_steps.values.iter().enumerate()
191 {
192 if let Value::Object(step_obj) = parent_step {
193 // Check if this step references our pipeline as a then branch
194 if let Some(then_val) = step_obj
195 .get("then")
196 .and_then(|v| v.to_u64())
197 {
198 if then_val as usize == pipeline_index {
199 // Find the next available step in the parent pipeline
200 let next_step_idx = parent_step_idx + 1;
201 if next_step_idx
202 < parent_steps.values.len()
203 {
204 let next_step = StepReference {
205 pipeline: parent_pipeline_idx,
206 step: next_step_idx,
207 };
208 log::debug!("Setting up then branch return: pipeline {} → pipeline {} step {}", pipeline_index, next_step.pipeline, next_step.step);
209 new_step.insert(
210 "to".to_string(),
211 next_step.to_value(),
212 );
213 found_parent = true;
214 break;
215 } else {
216 // No more steps in parent pipeline, need to find its parent
217 log::debug!("Then branch pipeline {} has no next step in parent pipeline {}", pipeline_index, parent_pipeline_idx);
218 // For now, let's see if this parent pipeline has a parent
219 if let Some(parent_target) = parents
220 .get(&StepReference {
221 pipeline:
222 parent_pipeline_idx,
223 step: 0,
224 })
225 {
226 if let Some(next_step) =
227 find_valid_continuation(
228 &pipelines_raw,
229 &parents,
230 parent_target,
231 )
232 {
233 log::debug!("Setting up then branch return via grandparent: pipeline {} → pipeline {} step {}", pipeline_index, next_step.pipeline, next_step.step);
234 new_step.insert(
235 "to".to_string(),
236 next_step.to_value(),
237 );
238 found_parent = true;
239 break;
240 }
241 }
242 }
243 }
244 }
245 // Check if this step references our pipeline as an else branch
246 if let Some(else_val) = step_obj
247 .get("else")
248 .and_then(|v| v.to_u64())
249 {
250 if else_val as usize == pipeline_index {
251 // Find the next available step in the parent pipeline
252 let next_step_idx = parent_step_idx + 1;
253 if next_step_idx
254 < parent_steps.values.len()
255 {
256 let next_step = StepReference {
257 pipeline: parent_pipeline_idx,
258 step: next_step_idx,
259 };
260 log::debug!("Setting up else branch return: pipeline {} → pipeline {} step {}", pipeline_index, next_step.pipeline, next_step.step);
261 new_step.insert(
262 "to".to_string(),
263 next_step.to_value(),
264 );
265 found_parent = true;
266 break;
267 } else {
268 // No more steps in parent pipeline, need to find its parent
269 log::debug!("Else branch pipeline {} has no next step in parent pipeline {}", pipeline_index, parent_pipeline_idx);
270 // For now, let's see if this parent pipeline has a parent
271 if let Some(parent_target) = parents
272 .get(&StepReference {
273 pipeline:
274 parent_pipeline_idx,
275 step: 0,
276 })
277 {
278 if let Some(next_step) =
279 find_valid_continuation(
280 &pipelines_raw,
281 &parents,
282 parent_target,
283 )
284 {
285 log::debug!("Setting up else branch return via grandparent: pipeline {} → pipeline {} step {}", pipeline_index, next_step.pipeline, next_step.step);
286 new_step.insert(
287 "to".to_string(),
288 next_step.to_value(),
289 );
290 found_parent = true;
291 break;
292 }
293 }
294 }
295 }
296 }
297 }
298 }
299 if found_parent {
300 break;
301 }
302 }
303 }
304
305 if !found_parent {
306 // Esta é uma sub-pipeline órfã - deve retornar ao pipeline principal
307 // Return to the next step after the first step (which is the assert/conditional)
308 let next_step = StepReference {
309 pipeline: main_pipeline,
310 step: 1, // Continue from step 1 in main pipeline
311 };
312 log::debug!("Setting up orphan pipeline return: pipeline {} → pipeline {} step {}", pipeline_index, next_step.pipeline, next_step.step);
313 new_step.insert("to".to_string(), next_step.to_value());
314 }
315 }
316 }
317 }
318 }
319
320 // Use para debugar a saida do step
321 // println!("{}", new_step.to_value().to_json(JsonMode::Indented));
322
323 let step_worker = StepWorker::try_from_value(
324 engine.clone(),
325 modules.clone(),
326 &new_step.to_value(),
327 )
328 .map_err(TransformError::InnerStepError)?;
329
330 steps.push(step_worker);
331 }
332 }
333
334 pipelines.insert(
335 pipeline_index,
336 Pipeline {
337 steps,
338 id: pipeline_index,
339 },
340 );
341 }
342 }
343
344 Ok(pipelines)
345}
346
347/// Function to check if a step reference is valid
348fn is_valid_step(pipelines: &Vec<Value>, step_ref: &StepReference) -> bool {
349 if step_ref.pipeline >= pipelines.len() {
350 return false;
351 }
352
353 if let Value::Array(arr) = &pipelines[step_ref.pipeline] {
354 return step_ref.step < arr.len();
355 }
356
357 false
358}
359
360fn next_step_if_exists(pipelines: &Vec<Value>, target: &StepReference) -> Option<StepReference> {
361 if let Value::Array(arr) = &pipelines[target.pipeline] {
362 let next_step_index = target.step + 1;
363 if next_step_index < arr.len() {
364 return Some(StepReference {
365 pipeline: target.pipeline,
366 step: next_step_index,
367 });
368 }
369 }
370
371 None
372}
373
374/// Function to find a valid continuation point when the direct next step is invalid
375fn find_valid_continuation(
376 pipelines: &Vec<Value>,
377 parents: &HashMap<StepReference, StepReference>,
378 target: &StepReference,
379) -> Option<StepReference> {
380 let mut current = target.clone();
381 let mut depth = 0usize;
382
383 loop {
384 let next_step_ref = StepReference {
385 pipeline: current.pipeline,
386 step: current.step + 1,
387 };
388
389 if is_valid_step(pipelines, &next_step_ref) {
390 return Some(next_step_ref);
391 }
392
393 let parent_key = StepReference {
394 pipeline: current.pipeline,
395 step: 0,
396 };
397 let Some(parent) = parents.get(&parent_key) else {
398 return None;
399 };
400
401 if parent.pipeline == current.pipeline && parent.step == current.step {
402 return None;
403 }
404
405 current = parent.clone();
406 depth += 1;
407 if depth > pipelines.len() {
408 return None;
409 }
410 }
411}
412
413/// Function to map parents
414/// This function takes a vector of pipelines and builds a parent map.
415fn map_parents(
416 pipelines: &Vec<Value>,
417) -> (
418 HashMap<StepReference, StepReference>,
419 HashMap<String, StepReference>,
420) {
421 let (parents, go_to_step_references) = build_parent_map(pipelines);
422 (parents, go_to_step_references)
423}
424
425/// Function to build the parent map
426/// This function takes a vector of pipelines and builds a parent map.
427/// It uses a hashmap to store the step references.
428fn build_parent_map(
429 pipelines: &Vec<Value>,
430) -> (
431 HashMap<StepReference, StepReference>,
432 HashMap<String, StepReference>,
433) {
434 let mut parents = HashMap::new();
435 let mut go_to_step_id = HashMap::new();
436
437 for (pipeline_index, steps) in pipelines.iter().enumerate() {
438 if let Value::Array(arr) = steps {
439 for (step_index, step) in arr.into_iter().enumerate() {
440 if let Value::Object(step) = step {
441 if let Some(id) = step.get("id") {
442 go_to_step_id.insert(
443 id.to_string(),
444 StepReference {
445 pipeline: pipeline_index,
446 step: step_index,
447 },
448 );
449 }
450
451 // Adiciona relações de "then" e "else" ao mapa de pais
452 if let Some(then_value) = step.get("then").and_then(|v| v.to_u64()) {
453 parents.insert(
454 StepReference {
455 pipeline: then_value as usize,
456 step: 0,
457 },
458 StepReference {
459 pipeline: pipeline_index,
460 step: step_index,
461 },
462 );
463 }
464
465 if let Some(else_value) = step.get("else").and_then(|v| v.to_u64()) {
466 parents.insert(
467 StepReference {
468 pipeline: else_value as usize,
469 step: 0,
470 },
471 StepReference {
472 pipeline: pipeline_index,
473 step: step_index,
474 },
475 );
476 }
477 }
478 }
479 }
480 }
481
482 (parents, go_to_step_id)
483}