1use crate::{
2 phlow::PipelineMap,
3 pipeline::Pipeline,
4 step_worker::{StepReference, StepWorker, StepWorkerError},
5};
6use phlow_sdk::{prelude::*, valu3};
7use rhai::Engine;
8use std::collections::HashMap;
9use std::sync::Arc;
10use valu3::{traits::ToValueBehavior, value::Value};
11
12#[derive(Debug)]
13pub enum TransformError {
14 InnerStepError(StepWorkerError),
15 Parser(valu3::Error),
16}
17
18pub(crate) fn value_to_pipelines(
19 engine: Arc<Engine>,
20 modules: Arc<Modules>,
21 input: &Value,
22) -> Result<PipelineMap, TransformError> {
23 let mut map = Vec::new();
24
25 process_raw_steps(input, &mut map);
26 value_to_structs(engine, modules, &map)
27}
28
29pub(crate) fn process_raw_steps(input: &Value, map: &mut Vec<Value>) -> Value {
30 if let Value::Object(pipeline) = input {
31 let mut new_pipeline = pipeline.clone();
32
33 new_pipeline.remove(&"steps");
34
35 if let Some(then) = pipeline.get("then") {
37 let then_value = process_raw_steps(then, map);
38 new_pipeline.insert("then".to_string(), then_value);
39 }
40
41 if let Some(els) = pipeline.get("else") {
43 let else_value = process_raw_steps(els, map);
44 new_pipeline.insert("else".to_string(), else_value);
45 }
46
47 let mut new_steps = if new_pipeline.is_empty() {
48 vec![]
49 } else {
50 vec![new_pipeline.to_value()]
51 };
52
53 if let Some(steps) = pipeline.get("steps") {
54 if let Value::Array(steps) = steps {
55 for step in steps {
56 let mut new_step = step.clone();
57
58 if let Some(then) = step.get("then") {
59 new_step.insert("then".to_string(), process_raw_steps(then, map));
60 }
61
62 if let Some(els) = step.get("else") {
63 new_step.insert("else".to_string(), process_raw_steps(els, map));
64 }
65
66 new_steps.push(new_step);
67 }
68 }
69 }
70
71 map.push(new_steps.to_value());
72 } else if let Value::Array(pipeline) = input {
73 let mut new_steps = Vec::new();
74
75 for step in pipeline {
76 if let Value::Object(step) = step {
77 let mut new_step = step.clone();
78
79 if let Some(then) = step.get("then") {
80 new_step.insert("then".to_string(), process_raw_steps(then, map));
81 }
82
83 if let Some(els) = step.get("else") {
84 new_step.insert("else".to_string(), process_raw_steps(els, map));
85 }
86
87 new_steps.push(new_step);
88 }
89 }
90
91 map.push(new_steps.to_value());
92 }
93
94 (map.len() - 1).to_value()
95}
96
97fn value_to_structs(
103 engine: Arc<Engine>,
104 modules: Arc<Modules>,
105 pipelines_raw: &Vec<Value>,
106) -> Result<PipelineMap, TransformError> {
107 let (parents, go_to_step_id) = map_parents(pipelines_raw);
108 let mut pipelines = HashMap::new();
109
110 for (pipeline_index, pipeline_value) in pipelines_raw.iter().enumerate() {
111 if let Value::Array(arr) = pipeline_value {
112 let mut steps = Vec::new();
113
114 for (step_index, step_value) in arr.into_iter().enumerate() {
115 if let Value::Object(step) = step_value {
116 let mut new_step = step.clone();
117
118 if let Some(to) = step.get("to") {
119 if let Some(go_to_step) = go_to_step_id.get(to.to_string().as_str()) {
120 new_step.insert("to".to_string(), go_to_step.to_value());
121 }
122 } else {
123 if step.get("then").is_none()
124 && step.get("else").is_none()
125 && step.get("return").is_none()
126 {
127 if let Some(target) = parents.get(&StepReference {
128 pipeline: pipeline_index,
129 step: step_index,
130 }) {
131 let next_step = get_next_step(&pipelines_raw, target);
132 new_step.insert("to".to_string(), next_step.to_value());
133 }
134 }
135 }
136
137 let step_worker = StepWorker::try_from_value(
138 engine.clone(),
139 modules.clone(),
140 &new_step.to_value(),
141 )
142 .map_err(TransformError::InnerStepError)?;
143 steps.push(step_worker);
144 }
145 }
146
147 pipelines.insert(pipeline_index, Pipeline { steps });
148 }
149 }
150
151 Ok(pipelines)
152}
153
154fn get_next_step(pipelines: &Vec<Value>, target: &StepReference) -> StepReference {
157 if let Value::Array(arr) = &pipelines[target.pipeline] {
158 let next_step_index = target.step + 1;
159 if arr.get(next_step_index).is_some() {
160 return StepReference {
161 pipeline: target.pipeline,
162 step: next_step_index,
163 };
164 }
165 }
166
167 return StepReference {
168 pipeline: target.pipeline,
169 step: target.step,
170 };
171}
172
173fn map_parents(
176 pipelines: &Vec<Value>,
177) -> (
178 HashMap<StepReference, StepReference>,
179 HashMap<String, StepReference>,
180) {
181 let (parents, go_to_step_references) = build_parent_map(pipelines);
182 (resolve_final_parents(parents), go_to_step_references)
183}
184
185fn build_parent_map(
189 pipelines: &Vec<Value>,
190) -> (
191 HashMap<StepReference, StepReference>,
192 HashMap<String, StepReference>,
193) {
194 let mut parents = HashMap::new();
195 let mut go_to_step_id = HashMap::new();
196
197 for (pipeline_index, steps) in pipelines.iter().enumerate() {
198 if let Value::Array(arr) = steps {
199 for (step_index, step) in arr.into_iter().enumerate() {
200 if let Value::Object(step) = step {
201 if let Some(id) = step.get("id") {
202 go_to_step_id.insert(
203 id.to_string(),
204 StepReference {
205 pipeline: pipeline_index,
206 step: step_index,
207 },
208 );
209 }
210
211 if let Some(then_value) = step.get("then").and_then(|v| v.to_u64()) {
213 parents.insert(
214 StepReference {
215 pipeline: then_value as usize,
216 step: 0,
217 },
218 StepReference {
219 pipeline: pipeline_index,
220 step: step_index,
221 },
222 );
223 }
224
225 if let Some(else_value) = step.get("else").and_then(|v| v.to_u64()) {
226 parents.insert(
227 StepReference {
228 pipeline: else_value as usize,
229 step: 0,
230 },
231 StepReference {
232 pipeline: pipeline_index,
233 step: step_index,
234 },
235 );
236 }
237 }
238 }
239 }
240 }
241
242 (parents, go_to_step_id)
243}
244
245fn resolve_final_parents(
248 parents: HashMap<StepReference, StepReference>,
249) -> HashMap<StepReference, StepReference> {
250 let mut final_parents = HashMap::new();
251
252 for (child, mut parent) in parents.iter() {
253 while let Some(grandparent) = parents.get(parent) {
255 parent = grandparent;
256 }
257 final_parents.insert(child.clone(), parent.clone());
258 }
259
260 final_parents
261}
262
263#[cfg(test)]
264mod test {
265 use super::*;
266 use valu3::{json, traits::ToValueBehavior};
267
268 #[test]
269 fn test_transform_value() {
270 let mut map = Vec::new();
271 let original = json!({
272 "steps": [
273 {
274 "condition": {
275 "left": "payload.requested",
276 "right": "payload.pre-approved",
277 "operator": "less_than"
278 },
279 "then": {
280 "payload": "payload.requested"
281 },
282 "else": {
283 "steps": [
284 {
285 "condition": {
286 "left": "payload.score",
287 "right": 0.5,
288 "operator": "greater_than"
289 }
290 },
291 {
292 "id": "approved",
293 "payload": {
294 "total": "(payload.requested * 0.3) + payload.pre-approved"
295 }
296 },
297 {
298 "condition": {
299 "left": "steps.approved.total",
300 "right": "payload.requested",
301 "operator": "greater_than"
302 },
303 "then": {
304 "return": "payload.requested"
305 },
306 "else": {
307 "return": "steps.approved.total"
308 }
309 }
310 ]
311 }
312 }
313 ]
314 });
315 let target = json!([[{"payload": "payload.requested"}],[{"return": "payload.requested"}],[{"return": "steps.approved.total"}],[{"condition": {"left": "payload.score","operator": "greater_than","right": 0.5}},{"id": "approved","payload": {"total": "(payload.requested * 0.3) + payload.pre-approved"}},{"else": 2,"condition": {"operator": "greater_than","right": "payload.requested","left": "steps.approved.total"},"then": 1}],[{"condition": {"right": "payload.pre-approved","left": "payload.requested","operator": "less_than"},"else": 3,"then": 0}]]);
316
317 process_raw_steps(&original, &mut map);
318
319 assert_eq!(map.to_value(), target);
320 }
321
322 #[test]
323 fn test_transform_value_array() {
324 let mut map = Vec::new();
325 let original = json!({
326 "steps": [
327 {
328 "condition": {
329 "left": "payload.requested",
330 "right": "payload.pre-approved",
331 "operator": "less_than"
332 },
333 "then": {
334 "payload": "payload.requested"
335 },
336 "else": [
337 {
338 "condition": {
339 "left": "payload.score",
340 "right": 0.5,
341 "operator": "greater_than"
342 }
343 },
344 {
345 "id": "approved",
346 "payload": {
347 "total": "(payload.requested * 0.3) + payload.pre-approved"
348 }
349 },
350 {
351 "condition": {
352 "left": "steps.approved.total",
353 "right": "payload.requested",
354 "operator": "greater_than"
355 },
356 "then": {
357 "return": "payload.requested"
358 },
359 "else": {
360 "return": "steps.approved.total"
361 }
362 }
363 ]
364 }
365 ]
366 });
367 let target = json!([[{"payload": "payload.requested"}],[{"return": "payload.requested"}],[{"return": "steps.approved.total"}],[{"condition": {"left": "payload.score","operator": "greater_than","right": 0.5}},{"id": "approved","payload": {"total": "(payload.requested * 0.3) + payload.pre-approved"}},{"else": 2,"condition": {"operator": "greater_than","right": "payload.requested","left": "steps.approved.total"},"then": 1}],[{"condition": {"right": "payload.pre-approved","left": "payload.requested","operator": "less_than"},"else": 3,"then": 0}]]);
368
369 process_raw_steps(&original, &mut map);
370
371 assert_eq!(map.to_value(), target);
372 }
373}