phlow_engine/
transform.rs

1use phlow_sdk::tracing::error;
2use rhai::Engine;
3use std::{collections::HashMap, sync::Arc};
4use valu3::{prelude::*, traits::ToValueBehavior, value::Value};
5
6use crate::{
7    collector::ContextSender,
8    modules::Modules,
9    phlow::PipelineMap,
10    pipeline::Pipeline,
11    step_worker::{StepWorker, StepWorkerError},
12};
13
14#[derive(Debug)]
15pub enum TransformError {
16    InnerStepError(StepWorkerError),
17    Parser(valu3::Error),
18}
19
20pub(crate) fn value_to_pipelines(
21    engine: Arc<Engine>,
22    modules: Arc<Modules>,
23    trace_sender: Option<ContextSender>,
24    input: &Value,
25) -> Result<PipelineMap, TransformError> {
26    let mut map = Vec::new();
27
28    process_raw_steps(input, &mut map);
29    value_to_structs(engine, modules, &trace_sender, &map)
30}
31
32pub(crate) fn process_raw_steps(input: &Value, map: &mut Vec<Value>) -> Value {
33    if let Value::Object(pipeline) = input {
34        let mut new_pipeline = pipeline.clone();
35
36        new_pipeline.remove(&"steps");
37
38        // Tratamento para THEN
39        if let Some(then) = pipeline.get("then") {
40            let then_value = process_raw_steps(then, map);
41            new_pipeline.insert("then".to_string(), then_value);
42        }
43
44        // Tratamento para ELSE
45        if let Some(els) = pipeline.get("else") {
46            let else_value = process_raw_steps(els, map);
47            new_pipeline.insert("else".to_string(), else_value);
48        }
49
50        let mut new_steps = if new_pipeline.is_empty() {
51            vec![]
52        } else {
53            vec![new_pipeline.to_value()]
54        };
55
56        if let Some(steps) = pipeline.get("steps") {
57            if let Value::Array(steps) = steps {
58                for step in steps {
59                    let mut new_step = step.clone();
60
61                    if let Some(then) = step.get("then") {
62                        new_step.insert("then".to_string(), process_raw_steps(then, map));
63                    }
64
65                    if let Some(els) = step.get("else") {
66                        new_step.insert("else".to_string(), process_raw_steps(els, map));
67                    }
68
69                    new_steps.push(new_step);
70                }
71            }
72        }
73
74        map.push(new_steps.to_value());
75    } else if let Value::Array(pipeline) = input {
76        let mut new_steps = Vec::new();
77
78        for step in pipeline {
79            if let Value::Object(step) = step {
80                let mut new_step = step.clone();
81
82                if let Some(then) = step.get("then") {
83                    new_step.insert("then".to_string(), process_raw_steps(then, map));
84                }
85
86                if let Some(els) = step.get("else") {
87                    new_step.insert("else".to_string(), process_raw_steps(els, map));
88                }
89
90                new_steps.push(new_step);
91            }
92        }
93
94        map.push(new_steps.to_value());
95    }
96
97    let json = (map.len() - 1).to_value().to_json(JsonMode::Inline);
98    match Value::json_to_value(&json) {
99        Ok(value) => value,
100        Err(err) => {
101            error!("Error parsing json: {:?}", err);
102            Value::Null
103        }
104    }
105}
106
107fn value_to_structs(
108    engine: Arc<Engine>,
109    modules: Arc<Modules>,
110    trace_sender: &Option<ContextSender>,
111    map: &Vec<Value>,
112) -> Result<PipelineMap, TransformError> {
113    let mut pipelines = HashMap::new();
114
115    for (pipeline_id, steps) in map.iter().enumerate() {
116        if let Value::Array(arr) = steps {
117            let mut steps = Vec::new();
118
119            for step in arr.into_iter() {
120                let step_worker = StepWorker::try_from_value(
121                    engine.clone(),
122                    modules.clone(),
123                    trace_sender.clone(),
124                    step,
125                )
126                .map_err(TransformError::InnerStepError)?;
127                steps.push(step_worker);
128            }
129
130            pipelines.insert(pipeline_id, Pipeline { steps });
131        }
132    }
133
134    Ok(pipelines)
135}
136
137#[cfg(test)]
138mod test {
139    use super::*;
140    use valu3::{json, traits::ToValueBehavior};
141
142    #[test]
143    fn test_transform_value() {
144        let mut map = Vec::new();
145        let original = json!({
146          "steps": [
147            {
148              "condition": {
149                "left": "params.requested",
150                "right": "params.pre-approved",
151                "operator": "less_than"
152              },
153              "then": {
154                "payload": "params.requested"
155              },
156              "else": {
157                "steps": [
158                  {
159                    "condition": {
160                      "left": "params.score",
161                      "right": 0.5,
162                      "operator": "greater_than"
163                    }
164                  },
165                  {
166                    "id": "approved",
167                    "payload": {
168                      "total": "(params.requested * 0.3) + params.pre-approved"
169                    }
170                  },
171                  {
172                    "condition": {
173                      "left": "steps.approved.total",
174                      "right": "params.requested",
175                      "operator": "greater_than"
176                    },
177                    "then": {
178                      "return": "params.requested"
179                    },
180                    "else": {
181                      "return": "steps.approved.total"
182                    }
183                  }
184                ]
185              }
186            }
187          ]
188        });
189        let target = json!([[{"payload": "params.requested"}],[{"return": "params.requested"}],[{"return": "steps.approved.total"}],[{"condition": {"left": "params.score","operator": "greater_than","right": 0.5}},{"id": "approved","payload": {"total": "(params.requested * 0.3) + params.pre-approved"}},{"else": 2,"condition": {"operator": "greater_than","right": "params.requested","left": "steps.approved.total"},"then": 1}],[{"condition": {"right": "params.pre-approved","left": "params.requested","operator": "less_than"},"else": 3,"then": 0}]]);
190
191        process_raw_steps(&original, &mut map);
192
193        assert_eq!(map.to_value(), target);
194    }
195
196    #[test]
197    fn test_transform_value_array() {
198        let mut map = Vec::new();
199        let original = json!({
200          "steps": [
201            {
202              "condition": {
203                "left": "params.requested",
204                "right": "params.pre-approved",
205                "operator": "less_than"
206              },
207              "then": {
208                "payload": "params.requested"
209              },
210              "else": [
211                {
212                  "condition": {
213                    "left": "params.score",
214                    "right": 0.5,
215                    "operator": "greater_than"
216                  }
217                },
218                {
219                  "id": "approved",
220                  "payload": {
221                    "total": "(params.requested * 0.3) + params.pre-approved"
222                  }
223                },
224                {
225                  "condition": {
226                    "left": "steps.approved.total",
227                    "right": "params.requested",
228                    "operator": "greater_than"
229                  },
230                  "then": {
231                    "return": "params.requested"
232                  },
233                  "else": {
234                    "return": "steps.approved.total"
235                  }
236                }
237              ]
238            }
239          ]
240        });
241        let target = json!([[{"payload": "params.requested"}],[{"return": "params.requested"}],[{"return": "steps.approved.total"}],[{"condition": {"left": "params.score","operator": "greater_than","right": 0.5}},{"id": "approved","payload": {"total": "(params.requested * 0.3) + params.pre-approved"}},{"else": 2,"condition": {"operator": "greater_than","right": "params.requested","left": "steps.approved.total"},"then": 1}],[{"condition": {"right": "params.pre-approved","left": "params.requested","operator": "less_than"},"else": 3,"then": 0}]]);
242
243        process_raw_steps(&original, &mut map);
244
245        assert_eq!(map.to_value(), target);
246    }
247}