phlow_engine/
step_worker.rs

1use crate::{
2    collector::{ContextSender, Step},
3    condition::{Condition, ConditionError},
4    context::Context,
5    id::ID,
6    modules::{Modules, ModulesError},
7    script::{Script, ScriptError},
8};
9use phlow_sdk::{sender_safe, tracing};
10use rhai::Engine;
11use serde::Serialize;
12use std::sync::Arc;
13use valu3::prelude::NumberBehavior;
14use valu3::{prelude::StringBehavior, value::Value};
15
16#[derive(Debug)]
17pub enum StepWorkerError {
18    ConditionError(ConditionError),
19    PayloadError(ScriptError),
20    ModulesError(ModulesError),
21    InputError(ScriptError),
22}
23
24#[derive(Debug, Clone, PartialEq, Serialize)]
25pub enum NextStep {
26    Pipeline(usize),
27    Stop,
28    Next,
29}
30
31#[derive(Debug)]
32pub struct StepOutput {
33    pub next_step: NextStep,
34    pub output: Option<Value>,
35}
36
37#[derive(Debug, Clone, Default)]
38pub struct StepWorker {
39    pub(crate) id: ID,
40    pub(crate) label: Option<String>,
41    pub(crate) module: Option<String>,
42    pub(crate) condition: Option<Condition>,
43    pub(crate) input: Option<Script>,
44    pub(crate) payload: Option<Script>,
45    pub(crate) then_case: Option<usize>,
46    pub(crate) else_case: Option<usize>,
47    pub(crate) modules: Arc<Modules>,
48    pub(crate) return_case: Option<Script>,
49    pub(crate) trace_sender: Option<ContextSender>,
50}
51
52impl StepWorker {
53    pub fn try_from_value(
54        engine: Arc<Engine>,
55        modules: Arc<Modules>,
56        trace_sender: Option<ContextSender>,
57        value: &Value,
58    ) -> Result<Self, StepWorkerError> {
59        let id = match value.get("id") {
60            Some(id) => ID::from(id),
61            None => ID::new(),
62        };
63        let label: Option<String> = match value.get("label") {
64            Some(label) => Some(label.as_string()),
65            None => None,
66        };
67        let condition = {
68            if let Some(condition) = value
69                .get("condition")
70                .map(|condition| Condition::try_from_value(engine.clone(), condition))
71            {
72                Some(condition.map_err(StepWorkerError::ConditionError)?)
73            } else {
74                None
75            }
76        };
77        let payload = match value.get("payload") {
78            Some(payload) => match Script::try_build(engine.clone(), payload) {
79                Ok(payload) => Some(payload),
80                Err(err) => return Err(StepWorkerError::PayloadError(err)),
81            },
82            None => None,
83        };
84        let input = match value.get("input") {
85            Some(input) => match Script::try_build(engine.clone(), input) {
86                Ok(input) => Some(input),
87                Err(err) => return Err(StepWorkerError::InputError(err)),
88            },
89            None => None,
90        };
91        let then_case = match value.get("then") {
92            Some(then_case) => match then_case.to_u64() {
93                Some(then_case) => Some(then_case as usize),
94                None => None,
95            },
96            None => None,
97        };
98        let else_case = match value.get("else") {
99            Some(else_case) => match else_case.to_u64() {
100                Some(else_case) => Some(else_case as usize),
101                None => None,
102            },
103            None => None,
104        };
105        let return_case = match value.get("return") {
106            Some(return_case) => match Script::try_build(engine, return_case) {
107                Ok(return_case) => Some(return_case),
108                Err(err) => return Err(StepWorkerError::PayloadError(err)),
109            },
110            None => None,
111        };
112        let module = match value.get("use") {
113            Some(module) => Some(module.to_string()),
114            None => None,
115        };
116
117        Ok(Self {
118            id,
119            label,
120            module,
121            input,
122            condition,
123            payload,
124            then_case,
125            else_case,
126            modules,
127            return_case,
128            trace_sender,
129        })
130    }
131
132    pub fn get_id(&self) -> &ID {
133        &self.id
134    }
135
136    fn evaluate_payload(
137        &self,
138        context: &Context,
139        default: Option<Value>,
140    ) -> Result<Option<Value>, StepWorkerError> {
141        if let Some(ref payload) = self.payload {
142            let value = Some(
143                payload
144                    .evaluate(context)
145                    .map_err(StepWorkerError::PayloadError)?,
146            );
147            Ok(value)
148        } else {
149            Ok(default)
150        }
151    }
152
153    fn evaluate_input(&self, context: &Context) -> Result<Option<Value>, StepWorkerError> {
154        if let Some(ref input) = self.input {
155            let value = Some(
156                input
157                    .evaluate(context)
158                    .map_err(StepWorkerError::InputError)?,
159            );
160            Ok(value)
161        } else {
162            Ok(None)
163        }
164    }
165
166    fn evaluate_return(&self, context: &Context) -> Result<Option<Value>, StepWorkerError> {
167        if let Some(ref return_case) = self.return_case {
168            let value = Some(
169                return_case
170                    .evaluate(context)
171                    .map_err(StepWorkerError::PayloadError)?,
172            );
173            Ok(value)
174        } else {
175            Ok(None)
176        }
177    }
178
179    async fn evaluate_module(
180        &self,
181        context: &Context,
182    ) -> Result<Option<(Option<String>, Option<Value>, Context)>, StepWorkerError> {
183        if let Some(ref module) = self.module {
184            let input = self.evaluate_input(context)?;
185
186            let context = if let Some(input) = &input {
187                context.add_module_input(input.clone())
188            } else {
189                context.clone()
190            };
191
192            match self.modules.execute(module, &context).await {
193                Ok(value) => Ok(Some((Some(module.clone()), Some(value), context))),
194                Err(err) => Err(StepWorkerError::ModulesError(err)),
195            }
196        } else {
197            Ok(None)
198        }
199    }
200
201    #[tracing::instrument(skip(context))]
202    pub async fn execute(&self, context: &Context) -> Result<StepOutput, StepWorkerError> {
203        if let Some(output) = self.evaluate_return(context)? {
204            if let Some(sender) = &self.trace_sender {
205                sender_safe!(
206                    sender,
207                    Step {
208                        id: self.id.clone(),
209                        label: self.label.clone(),
210                        input: None,
211                        module: None,
212                        condition: None,
213                        payload: None,
214                        return_case: Some(output.clone()),
215                    }
216                );
217            }
218
219            return Ok(StepOutput {
220                next_step: NextStep::Stop,
221                output: Some(output),
222            });
223        }
224
225        if let Ok(Some((module, output, context))) = self.evaluate_module(context).await {
226            if let Some(sender) = &self.trace_sender {
227                sender_safe!(
228                    sender,
229                    Step {
230                        id: self.id.clone(),
231                        label: self.label.clone(),
232                        input: context.input.clone(),
233                        module,
234                        condition: None,
235                        payload: output.clone(),
236                        return_case: None,
237                    }
238                );
239            }
240
241            let context = if let Some(output) = output.clone() {
242                context.add_module_output(output)
243            } else {
244                context.clone()
245            };
246
247            return Ok(StepOutput {
248                next_step: NextStep::Next,
249                output: self.evaluate_payload(&context, output)?,
250            });
251        }
252
253        if let Some(condition) = &self.condition {
254            let (next_step, output) = if condition
255                .evaluate(context)
256                .map_err(StepWorkerError::ConditionError)?
257            {
258                let next_step = if let Some(ref then_case) = self.then_case {
259                    NextStep::Pipeline(*then_case)
260                } else {
261                    NextStep::Next
262                };
263
264                (next_step, self.evaluate_payload(context, None)?)
265            } else {
266                let next_step = if let Some(ref else_case) = self.else_case {
267                    NextStep::Pipeline(*else_case)
268                } else {
269                    NextStep::Next
270                };
271
272                (next_step, None)
273            };
274
275            if let Some(sender) = &self.trace_sender {
276                sender_safe!(
277                    sender,
278                    Step {
279                        id: self.id.clone(),
280                        label: self.label.clone(),
281                        module: None,
282                        input: None,
283                        condition: Some(condition.raw.clone()),
284                        payload: output.clone(),
285                        return_case: None,
286                    }
287                );
288            }
289
290            return Ok(StepOutput { next_step, output });
291        }
292
293        let output = self.evaluate_payload(context, None)?;
294
295        if let Some(sender) = &self.trace_sender {
296            sender_safe!(
297                sender,
298                Step {
299                    id: self.id.clone(),
300                    label: self.label.clone(),
301                    module: None,
302                    input: None,
303                    condition: None,
304                    payload: output.clone(),
305                    return_case: None,
306                }
307            );
308        }
309
310        return Ok(StepOutput {
311            next_step: NextStep::Next,
312            output,
313        });
314    }
315}
316
317#[cfg(test)]
318mod test {
319    use crate::engine::build_engine_async;
320
321    use super::*;
322    use valu3::prelude::ToValueBehavior;
323    use valu3::value::Value;
324
325    #[tokio::test]
326    async fn test_step_get_reference_id() {
327        let step = StepWorker {
328            id: ID::from("id"),
329            label: Some("label".to_string()),
330            ..Default::default()
331        };
332
333        assert_eq!(step.get_id(), &ID::from("id"));
334    }
335
336    #[tokio::test]
337    async fn test_step_execute() {
338        let engine = build_engine_async(None);
339        let step = StepWorker {
340            payload: Some(Script::try_build(engine, &"10".to_value()).unwrap()),
341            ..Default::default()
342        };
343
344        let context = Context::new(None);
345
346        let result = step.execute(&context).await.unwrap();
347
348        assert_eq!(result.next_step, NextStep::Next);
349        assert_eq!(result.output, Some(Value::from(10i64)));
350    }
351
352    #[tokio::test]
353    async fn test_step_execute_with_condition() {
354        let engine = build_engine_async(None);
355        let step = StepWorker {
356            id: ID::new(),
357            condition: Some(
358                Condition::try_build_with_operator(
359                    engine.clone(),
360                    "10".to_string(),
361                    "20".to_string(),
362                    crate::condition::Operator::NotEqual,
363                )
364                .unwrap(),
365            ),
366            payload: Some(Script::try_build(engine, &"10".to_value()).unwrap()),
367            ..Default::default()
368        };
369
370        let context = Context::new(None);
371
372        let result = step.execute(&context).await.unwrap();
373
374        assert_eq!(result.next_step, NextStep::Next);
375        assert_eq!(result.output, Some(Value::from(10i64)));
376    }
377
378    #[tokio::test]
379    async fn test_step_execute_with_condition_then_case() {
380        let engine = build_engine_async(None);
381        let step = StepWorker {
382            id: ID::new(),
383            condition: Some(
384                Condition::try_build_with_operator(
385                    engine.clone(),
386                    "10".to_string(),
387                    "20".to_string(),
388                    crate::condition::Operator::NotEqual,
389                )
390                .unwrap(),
391            ),
392            payload: Some(Script::try_build(engine, &"10".to_value()).unwrap()),
393            then_case: Some(0),
394            ..Default::default()
395        };
396
397        let context = Context::new(None);
398
399        let result = step.execute(&context).await.unwrap();
400
401        assert_eq!(result.next_step, NextStep::Pipeline(0));
402        assert_eq!(result.output, Some(Value::from(10i64)));
403    }
404
405    #[tokio::test]
406    async fn test_step_execute_with_condition_else_case() {
407        let engine = build_engine_async(None);
408        let step = StepWorker {
409            id: ID::new(),
410            condition: Some(
411                Condition::try_build_with_operator(
412                    engine.clone(),
413                    "10".to_string(),
414                    "20".to_string(),
415                    crate::condition::Operator::Equal,
416                )
417                .unwrap(),
418            ),
419            payload: Some(Script::try_build(engine.clone(), &"10".to_value()).unwrap()),
420            else_case: Some(1),
421            ..Default::default()
422        };
423
424        let context = Context::new(None);
425
426        let result = step.execute(&context).await.unwrap();
427
428        assert_eq!(result.next_step, NextStep::Pipeline(1));
429        assert_eq!(result.output, None);
430    }
431
432    #[tokio::test]
433    async fn test_step_execute_with_return_case() {
434        let engine = build_engine_async(None);
435        let step = StepWorker {
436            id: ID::new(),
437            return_case: Some(Script::try_build(engine.clone(), &"10".to_value()).unwrap()),
438            ..Default::default()
439        };
440
441        let context = Context::new(None);
442
443        let result = step.execute(&context).await.unwrap();
444
445        assert_eq!(result.next_step, NextStep::Stop);
446        assert_eq!(result.output, Some(Value::from(10i64)));
447    }
448
449    #[tokio::test]
450    async fn test_step_execute_with_return_case_and_payload() {
451        let engine = build_engine_async(None);
452        let step = StepWorker {
453            id: ID::new(),
454            payload: Some(Script::try_build(engine.clone(), &"10".to_value()).unwrap()),
455            return_case: Some(Script::try_build(engine.clone(), &"20".to_value()).unwrap()),
456            ..Default::default()
457        };
458
459        let context = Context::new(None);
460
461        let result = step.execute(&context).await.unwrap();
462
463        assert_eq!(result.next_step, NextStep::Stop);
464        assert_eq!(result.output, Some(Value::from(20i64)));
465    }
466
467    #[tokio::test]
468    async fn test_step_execute_with_return_case_and_condition() {
469        let engine = build_engine_async(None);
470        let step = StepWorker {
471            id: ID::new(),
472            condition: Some(
473                Condition::try_build_with_operator(
474                    engine.clone(),
475                    "10".to_string(),
476                    "20".to_string(),
477                    crate::condition::Operator::Equal,
478                )
479                .unwrap(),
480            ),
481            return_case: Some(Script::try_build(engine.clone(), &"10".to_value()).unwrap()),
482            ..Default::default()
483        };
484
485        let context = Context::new(None);
486
487        let result = step.execute(&context).await.unwrap();
488
489        assert_eq!(result.next_step, NextStep::Stop);
490        assert_eq!(result.output, Some(Value::from(10i64)));
491    }
492
493    #[tokio::test]
494    async fn test_step_execute_with_return_case_and_condition_then_case() {
495        let engine = build_engine_async(None);
496        let step = StepWorker {
497            id: ID::new(),
498            condition: Some(
499                Condition::try_build_with_operator(
500                    engine.clone(),
501                    "10".to_string(),
502                    "20".to_string(),
503                    crate::condition::Operator::Equal,
504                )
505                .unwrap(),
506            ),
507            then_case: Some(0),
508            return_case: Some(Script::try_build(engine.clone(), &"Ok".to_value()).unwrap()),
509            ..Default::default()
510        };
511
512        let context = Context::new(None);
513        let output = step.execute(&context).await.unwrap();
514
515        assert_eq!(output.next_step, NextStep::Stop);
516        assert_eq!(output.output, Some(Value::from("Ok")));
517    }
518
519    #[tokio::test]
520    async fn test_step_execute_with_return_case_and_condition_else_case() {
521        let engine = build_engine_async(None);
522        let step = StepWorker {
523            id: ID::new(),
524            condition: Some(
525                Condition::try_build_with_operator(
526                    engine.clone(),
527                    "10".to_string(),
528                    "20".to_string(),
529                    crate::condition::Operator::Equal,
530                )
531                .unwrap(),
532            ),
533            else_case: Some(0),
534            return_case: Some(Script::try_build(engine.clone(), &"Ok".to_value()).unwrap()),
535            ..Default::default()
536        };
537
538        let context = Context::new(None);
539        let result = step.execute(&context).await.unwrap();
540
541        assert_eq!(result.next_step, NextStep::Stop);
542        assert_eq!(result.output, Some(Value::from("Ok")));
543    }
544}