1use crate::{
2 build_engine_async,
3 collector::ContextSender,
4 context::Context,
5 modules::Modules,
6 pipeline::{Pipeline, PipelineError},
7 step_worker::NextStep,
8 transform::{value_to_pipelines, TransformError},
9};
10use std::{collections::HashMap, sync::Arc};
11use valu3::prelude::*;
12
13#[derive(Debug)]
14pub enum PhlowError {
15 TransformError(TransformError),
16 PipelineError(PipelineError),
17 PipelineNotFound,
18}
19
20pub type PipelineMap = HashMap<usize, Pipeline>;
21
22#[derive(Debug, Default)]
23pub struct Phlow {
24 pipelines: PipelineMap,
25}
26
27impl Phlow {
28 pub fn try_from_value(
29 value: &Value,
30 modules: Option<Arc<Modules>>,
31 trace_sender: Option<ContextSender>,
32 ) -> Result<Self, PhlowError> {
33 let engine = build_engine_async(None);
34
35 let modules = if let Some(modules) = modules {
36 modules
37 } else {
38 Arc::new(Modules::default())
39 };
40 let pipelines = value_to_pipelines(engine, modules, trace_sender, value)
41 .map_err(PhlowError::TransformError)?;
42
43 Ok(Self { pipelines })
44 }
45
46 pub async fn execute(&self, context: &mut Context) -> Result<Option<Value>, PhlowError> {
47 if self.pipelines.is_empty() {
48 return Ok(None);
49 }
50
51 let mut current = self.pipelines.len() - 1;
52
53 loop {
54 let pipeline = self
55 .pipelines
56 .get(¤t)
57 .ok_or(PhlowError::PipelineNotFound)?;
58
59 match pipeline.execute(context).await {
60 Ok(step_output) => match step_output {
61 Some(step_output) => match step_output.next_step {
62 NextStep::Next | NextStep::Stop => {
63 return Ok(step_output.output);
64 }
65 NextStep::Pipeline(id) => {
66 current = id;
67 }
68 },
69 None => {
70 return Ok(None);
71 }
72 },
73 Err(err) => {
74 return Err(PhlowError::PipelineError(err));
75 }
76 }
77 }
78 }
79}
80
81#[cfg(test)]
82mod tests {
83 use super::*;
84 use valu3::json;
85
86 fn get_original() -> Value {
87 json!({
88 "steps": [
89 {
90 "condition": {
91 "left": "{{params.requested}}",
92 "right": "{{params.pre_approved}}",
93 "operator": "less_than_or_equal"
94 },
95 "then": {
96 "return": "{{params.requested}}"
97 },
98 "else": {
99 "steps": [
100 {
101 "condition": {
102 "left": "{{params.score}}",
103 "right": 0.5,
104 "operator": "greater_than_or_equal"
105 },
106 "then": [
107 {
108 "id": "approved",
109 "payload": {
110 "total": "{{(params.requested * 0.3) + params.pre_approved}}"
111 }
112 },
113 {
114 "condition": {
115 "left": "{{steps.approved.total}}",
116 "right": "{{params.requested}}",
117 "operator": "greater_than_or_equal"
118 },
119 "then": {
120 "return": "{{params.requested}}"
121 },
122 "else": {
123 "return": "{{steps.approved.total}}"
124 }
125 }
126 ]
127 }
128 ]
129 }
130 }
131 ]
132 })
133 }
134
135 #[tokio::test]
136 async fn test_phlow_original_1() {
137 let original = get_original();
138 let phlow = Phlow::try_from_value(&original, None, None).unwrap();
139 let mut context = Context::new(Some(json!({
140 "requested": 10000.00,
141 "pre_approved": 10000.00,
142 "score": 0.6
143 })));
144
145 let result = phlow.execute(&mut context).await.unwrap();
146
147 assert_eq!(result, Some(json!(10000.0)));
148 }
149
150 #[tokio::test]
151 async fn test_phlow_original_2() {
152 let original = get_original();
153 let phlow = Phlow::try_from_value(&original, None, None).unwrap();
154 let mut context = Context::new(Some(json!({
155 "requested": 10000.00,
156 "pre_approved": 500.00,
157 "score": 0.6
158 })));
159
160 let result = phlow.execute(&mut context).await.unwrap();
161
162 assert_eq!(result, Some(json!(3500.0)));
163 }
164
165 #[tokio::test]
166 async fn test_phlow_original_3() {
167 let original = get_original();
168 let phlow = Phlow::try_from_value(&original, None, None).unwrap();
169 let mut context = Context::new(Some(json!({
170 "requested": 10000.00,
171 "pre_approved": 500.00,
172 "score": 0.2
173 })));
174
175 let result = phlow.execute(&mut context).await.unwrap();
176
177 assert_eq!(result, None);
178 }
179
180 #[tokio::test]
181 async fn test_phlow_original_4() {
182 let original = get_original();
183 let phlow = Phlow::try_from_value(&original, None, None).unwrap();
184 let mut context = Context::new(Some(json!({
185 "requested": 10000.00,
186 "pre_approved": 9999.00,
187 "score": 0.6
188 })));
189
190 let result = phlow.execute(&mut context).await.unwrap();
191
192 assert_eq!(result, Some(json!(10000.0)));
193 }
194}