phlow_engine/
phlow.rs

1use crate::{
2    build_engine_async,
3    collector::ContextSender,
4    context::Context,
5    modules::Modules,
6    pipeline::{Pipeline, PipelineError},
7    step_worker::NextStep,
8    transform::{value_to_pipelines, TransformError},
9};
10use std::{collections::HashMap, sync::Arc};
11use valu3::prelude::*;
12
13#[derive(Debug)]
14pub enum PhlowError {
15    TransformError(TransformError),
16    PipelineError(PipelineError),
17    PipelineNotFound,
18}
19
20pub type PipelineMap = HashMap<usize, Pipeline>;
21
22#[derive(Debug, Default)]
23pub struct Phlow {
24    pipelines: PipelineMap,
25}
26
27impl Phlow {
28    pub fn try_from_value(
29        value: &Value,
30        modules: Option<Arc<Modules>>,
31        trace_sender: Option<ContextSender>,
32    ) -> Result<Self, PhlowError> {
33        let engine = build_engine_async(None);
34
35        let modules = if let Some(modules) = modules {
36            modules
37        } else {
38            Arc::new(Modules::default())
39        };
40        let pipelines = value_to_pipelines(engine, modules, trace_sender, value)
41            .map_err(PhlowError::TransformError)?;
42
43        Ok(Self { pipelines })
44    }
45
46    pub async fn execute(&self, context: &mut Context) -> Result<Option<Value>, PhlowError> {
47        if self.pipelines.is_empty() {
48            return Ok(None);
49        }
50
51        let mut current = self.pipelines.len() - 1;
52
53        loop {
54            let pipeline = self
55                .pipelines
56                .get(&current)
57                .ok_or(PhlowError::PipelineNotFound)?;
58
59            match pipeline.execute(context).await {
60                Ok(step_output) => match step_output {
61                    Some(step_output) => match step_output.next_step {
62                        NextStep::Next | NextStep::Stop => {
63                            return Ok(step_output.output);
64                        }
65                        NextStep::Pipeline(id) => {
66                            current = id;
67                        }
68                    },
69                    None => {
70                        return Ok(None);
71                    }
72                },
73                Err(err) => {
74                    return Err(PhlowError::PipelineError(err));
75                }
76            }
77        }
78    }
79}
80
81#[cfg(test)]
82mod tests {
83    use super::*;
84    use valu3::json;
85
86    fn get_original() -> Value {
87        json!({
88          "steps": [
89            {
90              "condition": {
91                "left": "{{params.requested}}",
92                "right": "{{params.pre_approved}}",
93                "operator": "less_than_or_equal"
94              },
95              "then": {
96                "return": "{{params.requested}}"
97              },
98              "else": {
99                "steps": [
100                  {
101                    "condition": {
102                      "left": "{{params.score}}",
103                      "right": 0.5,
104                      "operator": "greater_than_or_equal"
105                    },
106                    "then": [
107                        {
108                            "id": "approved",
109                            "payload": {
110                                "total": "{{(params.requested * 0.3) + params.pre_approved}}"
111                            }
112                            },
113                            {
114                            "condition": {
115                                "left": "{{steps.approved.total}}",
116                                "right": "{{params.requested}}",
117                                "operator": "greater_than_or_equal"
118                            },
119                            "then": {
120                                "return": "{{params.requested}}"
121                            },
122                            "else": {
123                                "return": "{{steps.approved.total}}"
124                            }
125                            }
126                    ]
127                  }
128                ]
129              }
130            }
131          ]
132        })
133    }
134
135    #[tokio::test]
136    async fn test_phlow_original_1() {
137        let original = get_original();
138        let phlow = Phlow::try_from_value(&original, None, None).unwrap();
139        let mut context = Context::new(Some(json!({
140            "requested": 10000.00,
141            "pre_approved": 10000.00,
142            "score": 0.6
143        })));
144
145        let result = phlow.execute(&mut context).await.unwrap();
146
147        assert_eq!(result, Some(json!(10000.0)));
148    }
149
150    #[tokio::test]
151    async fn test_phlow_original_2() {
152        let original = get_original();
153        let phlow = Phlow::try_from_value(&original, None, None).unwrap();
154        let mut context = Context::new(Some(json!({
155            "requested": 10000.00,
156            "pre_approved": 500.00,
157            "score": 0.6
158        })));
159
160        let result = phlow.execute(&mut context).await.unwrap();
161
162        assert_eq!(result, Some(json!(3500.0)));
163    }
164
165    #[tokio::test]
166    async fn test_phlow_original_3() {
167        let original = get_original();
168        let phlow = Phlow::try_from_value(&original, None, None).unwrap();
169        let mut context = Context::new(Some(json!({
170            "requested": 10000.00,
171            "pre_approved": 500.00,
172            "score": 0.2
173        })));
174
175        let result = phlow.execute(&mut context).await.unwrap();
176
177        assert_eq!(result, None);
178    }
179
180    #[tokio::test]
181    async fn test_phlow_original_4() {
182        let original = get_original();
183        let phlow = Phlow::try_from_value(&original, None, None).unwrap();
184        let mut context = Context::new(Some(json!({
185            "requested": 10000.00,
186            "pre_approved": 9999.00,
187            "score": 0.6
188        })));
189
190        let result = phlow.execute(&mut context).await.unwrap();
191
192        assert_eq!(result, Some(json!(10000.0)));
193    }
194}