1use phlow_sdk::tracing::error;
2use rhai::Engine;
3use std::{collections::HashMap, sync::Arc};
4use valu3::{prelude::*, traits::ToValueBehavior, value::Value};
5
6use crate::{
7 collector::ContextSender,
8 modules::Modules,
9 phlow::PipelineMap,
10 pipeline::Pipeline,
11 step_worker::{StepWorker, StepWorkerError},
12};
13
14#[derive(Debug)]
15pub enum TransformError {
16 InnerStepError(StepWorkerError),
17 Parser(valu3::Error),
18}
19
20pub(crate) fn value_to_pipelines(
21 engine: Arc<Engine>,
22 modules: Arc<Modules>,
23 trace_sender: Option<ContextSender>,
24 input: &Value,
25) -> Result<PipelineMap, TransformError> {
26 let mut map = Vec::new();
27
28 process_raw_steps(input, &mut map);
29 value_to_structs(engine, modules, &trace_sender, &map)
30}
31
32pub(crate) fn process_raw_steps(input: &Value, map: &mut Vec<Value>) -> Value {
33 if let Value::Object(pipeline) = input {
34 let mut new_pipeline = pipeline.clone();
35
36 new_pipeline.remove(&"steps");
37
38 if let Some(then) = pipeline.get("then") {
40 let then_value = process_raw_steps(then, map);
41 new_pipeline.insert("then".to_string(), then_value);
42 }
43
44 if let Some(els) = pipeline.get("else") {
46 let else_value = process_raw_steps(els, map);
47 new_pipeline.insert("else".to_string(), else_value);
48 }
49
50 let mut new_steps = if new_pipeline.is_empty() {
51 vec![]
52 } else {
53 vec![new_pipeline.to_value()]
54 };
55
56 if let Some(steps) = pipeline.get("steps") {
57 if let Value::Array(steps) = steps {
58 for step in steps {
59 let mut new_step = step.clone();
60
61 if let Some(then) = step.get("then") {
62 new_step.insert("then".to_string(), process_raw_steps(then, map));
63 }
64
65 if let Some(els) = step.get("else") {
66 new_step.insert("else".to_string(), process_raw_steps(els, map));
67 }
68
69 new_steps.push(new_step);
70 }
71 }
72 }
73
74 map.push(new_steps.to_value());
75 } else if let Value::Array(pipeline) = input {
76 let mut new_steps = Vec::new();
77
78 for step in pipeline {
79 if let Value::Object(step) = step {
80 let mut new_step = step.clone();
81
82 if let Some(then) = step.get("then") {
83 new_step.insert("then".to_string(), process_raw_steps(then, map));
84 }
85
86 if let Some(els) = step.get("else") {
87 new_step.insert("else".to_string(), process_raw_steps(els, map));
88 }
89
90 new_steps.push(new_step);
91 }
92 }
93
94 map.push(new_steps.to_value());
95 }
96
97 let json = (map.len() - 1).to_value().to_json(JsonMode::Inline);
98 match Value::json_to_value(&json) {
99 Ok(value) => value,
100 Err(err) => {
101 error!("Error parsing json: {:?}", err);
102 Value::Null
103 }
104 }
105}
106
107fn value_to_structs(
108 engine: Arc<Engine>,
109 modules: Arc<Modules>,
110 trace_sender: &Option<ContextSender>,
111 map: &Vec<Value>,
112) -> Result<PipelineMap, TransformError> {
113 let mut pipelines = HashMap::new();
114
115 for (pipeline_id, steps) in map.iter().enumerate() {
116 if let Value::Array(arr) = steps {
117 let mut steps = Vec::new();
118
119 for step in arr.into_iter() {
120 let step_worker = StepWorker::try_from_value(
121 engine.clone(),
122 modules.clone(),
123 trace_sender.clone(),
124 step,
125 )
126 .map_err(TransformError::InnerStepError)?;
127 steps.push(step_worker);
128 }
129
130 pipelines.insert(pipeline_id, Pipeline { steps });
131 }
132 }
133
134 Ok(pipelines)
135}
136
137#[cfg(test)]
138mod test {
139 use super::*;
140 use valu3::{json, traits::ToValueBehavior};
141
142 #[test]
143 fn test_transform_value() {
144 let mut map = Vec::new();
145 let original = json!({
146 "steps": [
147 {
148 "condition": {
149 "left": "params.requested",
150 "right": "params.pre-approved",
151 "operator": "less_than"
152 },
153 "then": {
154 "payload": "params.requested"
155 },
156 "else": {
157 "steps": [
158 {
159 "condition": {
160 "left": "params.score",
161 "right": 0.5,
162 "operator": "greater_than"
163 }
164 },
165 {
166 "id": "approved",
167 "payload": {
168 "total": "(params.requested * 0.3) + params.pre-approved"
169 }
170 },
171 {
172 "condition": {
173 "left": "steps.approved.total",
174 "right": "params.requested",
175 "operator": "greater_than"
176 },
177 "then": {
178 "return": "params.requested"
179 },
180 "else": {
181 "return": "steps.approved.total"
182 }
183 }
184 ]
185 }
186 }
187 ]
188 });
189 let target = json!([[{"payload": "params.requested"}],[{"return": "params.requested"}],[{"return": "steps.approved.total"}],[{"condition": {"left": "params.score","operator": "greater_than","right": 0.5}},{"id": "approved","payload": {"total": "(params.requested * 0.3) + params.pre-approved"}},{"else": 2,"condition": {"operator": "greater_than","right": "params.requested","left": "steps.approved.total"},"then": 1}],[{"condition": {"right": "params.pre-approved","left": "params.requested","operator": "less_than"},"else": 3,"then": 0}]]);
190
191 process_raw_steps(&original, &mut map);
192
193 assert_eq!(map.to_value(), target);
194 }
195
196 #[test]
197 fn test_transform_value_array() {
198 let mut map = Vec::new();
199 let original = json!({
200 "steps": [
201 {
202 "condition": {
203 "left": "params.requested",
204 "right": "params.pre-approved",
205 "operator": "less_than"
206 },
207 "then": {
208 "payload": "params.requested"
209 },
210 "else": [
211 {
212 "condition": {
213 "left": "params.score",
214 "right": 0.5,
215 "operator": "greater_than"
216 }
217 },
218 {
219 "id": "approved",
220 "payload": {
221 "total": "(params.requested * 0.3) + params.pre-approved"
222 }
223 },
224 {
225 "condition": {
226 "left": "steps.approved.total",
227 "right": "params.requested",
228 "operator": "greater_than"
229 },
230 "then": {
231 "return": "params.requested"
232 },
233 "else": {
234 "return": "steps.approved.total"
235 }
236 }
237 ]
238 }
239 ]
240 });
241 let target = json!([[{"payload": "params.requested"}],[{"return": "params.requested"}],[{"return": "steps.approved.total"}],[{"condition": {"left": "params.score","operator": "greater_than","right": 0.5}},{"id": "approved","payload": {"total": "(params.requested * 0.3) + params.pre-approved"}},{"else": 2,"condition": {"operator": "greater_than","right": "params.requested","left": "steps.approved.total"},"then": 1}],[{"condition": {"right": "params.pre-approved","left": "params.requested","operator": "less_than"},"else": 3,"then": 0}]]);
242
243 process_raw_steps(&original, &mut map);
244
245 assert_eq!(map.to_value(), target);
246 }
247}