phlow_engine/
phlow.rs

1use crate::{
2    build_engine_async,
3    context::Context,
4    pipeline::{Pipeline, PipelineError},
5    step_worker::NextStep,
6    transform::{value_to_pipelines, TransformError},
7};
8use phlow_sdk::prelude::*;
9use std::{collections::HashMap, sync::Arc};
10
11#[derive(Debug)]
12pub enum PhlowError {
13    TransformError(TransformError),
14    PipelineError(PipelineError),
15    PipelineNotFound,
16    ParentError,
17}
18
19pub type PipelineMap = HashMap<usize, Pipeline>;
20
21#[derive(Debug, Default)]
22pub struct Phlow {
23    pipelines: PipelineMap,
24}
25
26impl Phlow {
27    pub fn try_from_value(
28        value: &Value,
29        modules: Option<Arc<Modules>>,
30    ) -> Result<Self, PhlowError> {
31        let engine = build_engine_async(None);
32
33        let modules = if let Some(modules) = modules {
34            modules
35        } else {
36            Arc::new(Modules::default())
37        };
38        let pipelines =
39            value_to_pipelines(engine, modules, value).map_err(PhlowError::TransformError)?;
40
41        Ok(Self { pipelines })
42    }
43
44    pub async fn execute(&self, context: &mut Context) -> Result<Option<Value>, PhlowError> {
45        if self.pipelines.is_empty() {
46            return Ok(None);
47        }
48
49        let mut current_pipeline = self.pipelines.len() - 1;
50        let mut current_step = 0;
51
52        loop {
53            let pipeline = self
54                .pipelines
55                .get(&current_pipeline)
56                .ok_or(PhlowError::PipelineNotFound)?;
57
58            match pipeline.execute(context, current_step).await {
59                Ok(step_output) => match step_output {
60                    Some(step_output) => match step_output.next_step {
61                        NextStep::Next | NextStep::Stop => {
62                            return Ok(step_output.output);
63                        }
64                        NextStep::Pipeline(id) => {
65                            current_pipeline = id;
66                            current_step = 0;
67                        }
68                        NextStep::GoToStep(to) => {
69                            current_pipeline = to.pipeline;
70                            current_step = to.step;
71                        }
72                    },
73                    None => {
74                        return Ok(None);
75                    }
76                },
77                Err(err) => {
78                    return Err(PhlowError::PipelineError(err));
79                }
80            }
81        }
82    }
83}
84
85#[cfg(test)]
86mod tests {
87    use super::*;
88    use phlow_sdk::valu3;
89    use valu3::json;
90
91    fn get_original() -> Value {
92        json!({
93          "steps": [
94            {
95              "condition": {
96                "left": "{{payload.requested}}",
97                "right": "{{payload.pre_approved}}",
98                "operator": "less_than_or_equal"
99              },
100              "then": {
101                "return": "{{payload.requested}}"
102              },
103              "else": {
104                "steps": [
105                  {
106                    "condition": {
107                      "left": "{{payload.score}}",
108                      "right": 0.5,
109                      "operator": "greater_than_or_equal"
110                    },
111                    "then": [
112                        {
113                            "id": "approved",
114                            "payload": {
115                                "total": "{{(payload.requested * 0.3) + payload.pre_approved}}"
116                            }
117                            },
118                            {
119                            "condition": {
120                                "left": "{{steps.approved.total}}",
121                                "right": "{{payload.requested}}",
122                                "operator": "greater_than_or_equal"
123                            },
124                            "then": {
125                                "return": "{{payload.requested}}"
126                            },
127                            "else": {
128                                "return": "{{steps.approved.total}}"
129                            }
130                        }
131                    ]
132                  }
133                ]
134              }
135            }
136          ]
137        })
138    }
139
140    #[tokio::test]
141    async fn test_phlow_original_1() {
142        let original = get_original();
143        let phlow = Phlow::try_from_value(&original, None).unwrap();
144        let mut context = Context::from_payload(json!({
145            "requested": 10000.00,
146            "pre_approved": 10000.00,
147            "score": 0.6
148        }));
149
150        let result = phlow.execute(&mut context).await.unwrap();
151
152        assert_eq!(result, Some(json!(10000.0)));
153    }
154
155    #[tokio::test]
156    async fn test_phlow_original_2() {
157        let original = get_original();
158        let phlow = Phlow::try_from_value(&original, None).unwrap();
159        let mut context = Context::from_payload(json!({
160            "requested": 10000.00,
161            "pre_approved": 500.00,
162            "score": 0.6
163        }));
164
165        let result = phlow.execute(&mut context).await.unwrap();
166
167        assert_eq!(result, Some(json!(3500.0)));
168    }
169
170    #[tokio::test]
171    async fn test_phlow_original_3() {
172        let original = get_original();
173        let phlow = Phlow::try_from_value(&original, None).unwrap();
174        let mut context = Context::from_payload(json!({
175            "requested": 10000.00,
176            "pre_approved": 500.00,
177            "score": 0.2
178        }));
179
180        let result = phlow.execute(&mut context).await.unwrap();
181
182        assert_eq!(result, None);
183    }
184
185    #[tokio::test]
186    async fn test_phlow_original_4() {
187        let original = get_original();
188        let phlow = Phlow::try_from_value(&original, None).unwrap();
189        let mut context = Context::from_payload(json!({
190            "requested": 10000.00,
191            "pre_approved": 9999.00,
192            "score": 0.6
193        }));
194
195        let result = phlow.execute(&mut context).await.unwrap();
196
197        assert_eq!(result, Some(json!(10000.0)));
198    }
199}