phlow_engine/
step_worker.rs

1use crate::{
2    condition::{Condition, ConditionError},
3    context::Context,
4    id::ID,
5    script::{Script, ScriptError},
6};
7use once_cell::sync::Lazy;
8use phlow_sdk::prelude::*;
9use rhai::Engine;
10use serde::Serialize;
11use std::sync::Arc;
12
13static PHLOW_TRUNCATE_SPAN_VALUE: Lazy<usize> =
14    Lazy::new(|| match std::env::var("PHLOW_TRUNCATE_SPAN_VALUE") {
15        Ok(value) => value.parse::<usize>().unwrap_or(100),
16        Err(_) => 100,
17    });
18
19#[derive(Debug)]
20pub enum StepWorkerError {
21    ConditionError(ConditionError),
22    PayloadError(ScriptError),
23    ModulesError(ModulesError),
24    InputError(ScriptError),
25}
26
27#[derive(Debug, Clone, PartialEq, Serialize)]
28pub enum NextStep {
29    Pipeline(usize),
30    GoToStep(StepReference),
31    Stop,
32    Next,
33}
34
35#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, ToValue)]
36pub struct StepReference {
37    pub pipeline: usize,
38    pub step: usize,
39}
40
41#[derive(Debug)]
42pub struct StepOutput {
43    pub next_step: NextStep,
44    pub output: Option<Value>,
45}
46
47#[derive(Debug, Clone, Default)]
48pub struct StepWorker {
49    pub(crate) id: ID,
50    pub(crate) label: Option<String>,
51    pub(crate) module: Option<String>,
52    pub(crate) condition: Option<Condition>,
53    pub(crate) input: Option<Script>,
54    pub(crate) payload: Option<Script>,
55    pub(crate) then_case: Option<usize>,
56    pub(crate) else_case: Option<usize>,
57    pub(crate) modules: Arc<Modules>,
58    pub(crate) return_case: Option<Script>,
59    pub(crate) to: Option<StepReference>,
60}
61
62impl StepWorker {
63    pub fn try_from_value(
64        engine: Arc<Engine>,
65        modules: Arc<Modules>,
66        value: &Value,
67    ) -> Result<Self, StepWorkerError> {
68        let id = match value.get("id") {
69            Some(id) => ID::from(id),
70            None => ID::new(),
71        };
72        let label: Option<String> = match value.get("label") {
73            Some(label) => Some(label.as_string()),
74            None => None,
75        };
76        let condition = {
77            if let Some(condition) = value
78                .get("condition")
79                .map(|condition| Condition::try_from_value(engine.clone(), condition))
80            {
81                Some(condition.map_err(StepWorkerError::ConditionError)?)
82            } else {
83                None
84            }
85        };
86        let payload = match value.get("payload") {
87            Some(payload) => match Script::try_build(engine.clone(), payload) {
88                Ok(payload) => Some(payload),
89                Err(err) => return Err(StepWorkerError::PayloadError(err)),
90            },
91            None => None,
92        };
93        let input = match value.get("input") {
94            Some(input) => match Script::try_build(engine.clone(), input) {
95                Ok(input) => Some(input),
96                Err(err) => return Err(StepWorkerError::InputError(err)),
97            },
98            None => None,
99        };
100        let then_case = match value.get("then") {
101            Some(then_case) => match then_case.to_u64() {
102                Some(then_case) => Some(then_case as usize),
103                None => None,
104            },
105            None => None,
106        };
107        let else_case = match value.get("else") {
108            Some(else_case) => match else_case.to_u64() {
109                Some(else_case) => Some(else_case as usize),
110                None => None,
111            },
112            None => None,
113        };
114        let return_case = match value.get("return") {
115            Some(return_case) => match Script::try_build(engine, return_case) {
116                Ok(return_case) => Some(return_case),
117                Err(err) => return Err(StepWorkerError::PayloadError(err)),
118            },
119            None => None,
120        };
121        let module = match value.get("use") {
122            Some(module) => Some(module.to_string()),
123            None => None,
124        };
125
126        let to = match value.get("to") {
127            Some(to_step) => match to_step.as_object() {
128                Some(to_step) => {
129                    let pipeline = to_step.get("pipeline").and_then(|v| v.to_u64());
130                    let step = to_step.get("step").and_then(|v| v.to_u64());
131
132                    if pipeline.is_some() && step.is_some() {
133                        Some(StepReference {
134                            pipeline: pipeline.unwrap() as usize,
135                            step: step.unwrap() as usize,
136                        })
137                    } else {
138                        None
139                    }
140                }
141                None => None,
142            },
143            None => None,
144        };
145
146        Ok(Self {
147            id,
148            label,
149            module,
150            input,
151            condition,
152            payload,
153            then_case,
154            else_case,
155            modules,
156            return_case,
157            to,
158        })
159    }
160
161    pub fn get_id(&self) -> &ID {
162        &self.id
163    }
164
165    fn evaluate_payload(
166        &self,
167        context: &Context,
168        default: Option<Value>,
169    ) -> Result<Option<Value>, StepWorkerError> {
170        if let Some(ref payload) = self.payload {
171            let value = Some(
172                payload
173                    .evaluate(context)
174                    .map_err(StepWorkerError::PayloadError)?,
175            );
176            Ok(value)
177        } else {
178            Ok(default)
179        }
180    }
181
182    fn evaluate_input(&self, context: &Context) -> Result<Option<Value>, StepWorkerError> {
183        if let Some(ref input) = self.input {
184            let value = Some(
185                input
186                    .evaluate(context)
187                    .map_err(StepWorkerError::InputError)?,
188            );
189            Ok(value)
190        } else {
191            Ok(None)
192        }
193    }
194
195    fn evaluate_return(&self, context: &Context) -> Result<Option<Value>, StepWorkerError> {
196        if let Some(ref return_case) = self.return_case {
197            let value = Some(
198                return_case
199                    .evaluate(context)
200                    .map_err(StepWorkerError::PayloadError)?,
201            );
202            Ok(value)
203        } else {
204            Ok(None)
205        }
206    }
207
208    async fn evaluate_module(
209        &self,
210        context: &Context,
211    ) -> Result<Option<(Option<String>, Option<Value>, Context)>, StepWorkerError> {
212        if let Some(ref module) = self.module {
213            let input = self.evaluate_input(context)?;
214
215            let context = if let Some(input) = &input {
216                context.add_module_input(input.clone())
217            } else {
218                context.clone()
219            };
220
221            match self.modules.execute(module, &context).await {
222                Ok(response) => {
223                    if let Some(err) = response.error {
224                        return Err(StepWorkerError::ModulesError(ModulesError::ModuleError(
225                            err,
226                        )));
227                    }
228
229                    Ok(Some((Some(module.clone()), Some(response.data), context)))
230                }
231                Err(err) => Err(StepWorkerError::ModulesError(err)),
232            }
233        } else {
234            Ok(None)
235        }
236    }
237
238    pub async fn execute(&self, context: &Context) -> Result<StepOutput, StepWorkerError> {
239        let span = tracing::info_span!(
240            "step",
241            otel.name = field::Empty,
242            context.main = field::Empty,
243            context.params = field::Empty,
244            context.payload = field::Empty,
245            context.input = field::Empty,
246            step.id = field::Empty,
247            step.label = field::Empty,
248            step.module = field::Empty,
249            step.condition = field::Empty,
250            step.payload = field::Empty,
251            step.return = field::Empty,
252        );
253        let _guard = span.enter();
254
255        {
256            let step_name = self.label.clone().unwrap_or(self.id.to_string());
257            span.record("otel.name", format!("step {}", step_name));
258
259            if let Some(ref input) = context.input {
260                span.record("context.input", input.to_string());
261            }
262
263            if let Some(ref payload) = context.payload {
264                span.record("context.payload", truncate_string(&payload));
265            }
266
267            if let Some(ref main) = context.main {
268                span.record("context.main", truncate_string(&main));
269            }
270
271            span.record("step.id", self.id.to_string());
272
273            if let Some(ref label) = self.label {
274                span.record("step.label", label.to_string());
275            }
276        }
277
278        if let Some(output) = self.evaluate_return(context)? {
279            {
280                span.record("step.return", output.to_string());
281            }
282
283            return Ok(StepOutput {
284                next_step: NextStep::Stop,
285                output: Some(output),
286            });
287        }
288
289        if let Some((module, output, context)) = self.evaluate_module(context).await? {
290            {
291                span.record("step.module", module.clone());
292
293                if let Some(ref output) = output {
294                    span.record("context.payload", truncate_string(output));
295                }
296            }
297
298            let context = if let Some(output) = output.clone() {
299                context.add_module_output(output)
300            } else {
301                context.clone()
302            };
303
304            return Ok(StepOutput {
305                next_step: NextStep::Next,
306                output: self.evaluate_payload(&context, output)?,
307            });
308        }
309
310        if let Some(condition) = &self.condition {
311            let (next_step, output) = if condition
312                .evaluate(context)
313                .map_err(StepWorkerError::ConditionError)?
314            {
315                let next_step = if let Some(ref then_case) = self.then_case {
316                    NextStep::Pipeline(*then_case)
317                } else {
318                    NextStep::Next
319                };
320
321                (next_step, self.evaluate_payload(context, None)?)
322            } else {
323                let next_step = if let Some(ref else_case) = self.else_case {
324                    NextStep::Pipeline(*else_case)
325                } else {
326                    NextStep::Next
327                };
328
329                (next_step, None)
330            };
331
332            {
333                span.record("step.condition", condition.raw.to_string());
334
335                if let Some(ref output) = output {
336                    span.record("context.payload", truncate_string(output));
337                }
338            }
339
340            return Ok(StepOutput { next_step, output });
341        }
342
343        let output = self.evaluate_payload(context, None)?;
344
345        {
346            if let Some(ref output) = output {
347                span.record("context.payload", truncate_string(output));
348            }
349        }
350
351        if let Some(to) = &self.to {
352            return Ok(StepOutput {
353                next_step: NextStep::GoToStep(to.clone()),
354                output,
355            });
356        }
357
358        return Ok(StepOutput {
359            next_step: NextStep::Next,
360            output,
361        });
362    }
363}
364
365fn truncate_string(string: &Value) -> String {
366    let limit = *PHLOW_TRUNCATE_SPAN_VALUE;
367    let string = string.to_string();
368    if string.len() > limit {
369        format!("{}...", &string[..limit])
370    } else {
371        string.to_string()
372    }
373}
374
375#[cfg(test)]
376mod test {
377    use crate::engine::build_engine_async;
378
379    use super::*;
380    use phlow_sdk::valu3;
381    use valu3::prelude::ToValueBehavior;
382    use valu3::value::Value;
383
384    #[tokio::test]
385    async fn test_step_get_reference_id() {
386        let step = StepWorker {
387            id: ID::from("id"),
388            label: Some("label".to_string()),
389            ..Default::default()
390        };
391
392        assert_eq!(step.get_id(), &ID::from("id"));
393    }
394
395    #[tokio::test]
396    async fn test_step_execute() {
397        let engine = build_engine_async(None);
398        let step = StepWorker {
399            payload: Some(Script::try_build(engine, &"10".to_value()).unwrap()),
400            ..Default::default()
401        };
402
403        let context = Context::new();
404
405        let result = step.execute(&context).await.unwrap();
406
407        assert_eq!(result.next_step, NextStep::Next);
408        assert_eq!(result.output, Some(Value::from(10i64)));
409    }
410
411    #[tokio::test]
412    async fn test_step_execute_with_condition() {
413        let engine = build_engine_async(None);
414        let step = StepWorker {
415            id: ID::new(),
416            condition: Some(
417                Condition::try_build_with_operator(
418                    engine.clone(),
419                    "10".to_string(),
420                    "20".to_string(),
421                    crate::condition::Operator::NotEqual,
422                )
423                .unwrap(),
424            ),
425            payload: Some(Script::try_build(engine, &"10".to_value()).unwrap()),
426            ..Default::default()
427        };
428
429        let context = Context::new();
430
431        let result = step.execute(&context).await.unwrap();
432
433        assert_eq!(result.next_step, NextStep::Next);
434        assert_eq!(result.output, Some(Value::from(10i64)));
435    }
436
437    #[tokio::test]
438    async fn test_step_execute_with_condition_then_case() {
439        let engine = build_engine_async(None);
440        let step = StepWorker {
441            id: ID::new(),
442            condition: Some(
443                Condition::try_build_with_operator(
444                    engine.clone(),
445                    "10".to_string(),
446                    "20".to_string(),
447                    crate::condition::Operator::NotEqual,
448                )
449                .unwrap(),
450            ),
451            payload: Some(Script::try_build(engine, &"10".to_value()).unwrap()),
452            then_case: Some(0),
453            ..Default::default()
454        };
455
456        let context = Context::new();
457
458        let result = step.execute(&context).await.unwrap();
459
460        assert_eq!(result.next_step, NextStep::Pipeline(0));
461        assert_eq!(result.output, Some(Value::from(10i64)));
462    }
463
464    #[tokio::test]
465    async fn test_step_execute_with_condition_else_case() {
466        let engine = build_engine_async(None);
467        let step = StepWorker {
468            id: ID::new(),
469            condition: Some(
470                Condition::try_build_with_operator(
471                    engine.clone(),
472                    "10".to_string(),
473                    "20".to_string(),
474                    crate::condition::Operator::Equal,
475                )
476                .unwrap(),
477            ),
478            payload: Some(Script::try_build(engine.clone(), &"10".to_value()).unwrap()),
479            else_case: Some(1),
480            ..Default::default()
481        };
482
483        let context = Context::new();
484
485        let result = step.execute(&context).await.unwrap();
486
487        assert_eq!(result.next_step, NextStep::Pipeline(1));
488        assert_eq!(result.output, None);
489    }
490
491    #[tokio::test]
492    async fn test_step_execute_with_return_case() {
493        let engine = build_engine_async(None);
494        let step = StepWorker {
495            id: ID::new(),
496            return_case: Some(Script::try_build(engine.clone(), &"10".to_value()).unwrap()),
497            ..Default::default()
498        };
499
500        let context = Context::new();
501
502        let result = step.execute(&context).await.unwrap();
503
504        assert_eq!(result.next_step, NextStep::Stop);
505        assert_eq!(result.output, Some(Value::from(10i64)));
506    }
507
508    #[tokio::test]
509    async fn test_step_execute_with_return_case_and_payload() {
510        let engine = build_engine_async(None);
511        let step = StepWorker {
512            id: ID::new(),
513            payload: Some(Script::try_build(engine.clone(), &"10".to_value()).unwrap()),
514            return_case: Some(Script::try_build(engine.clone(), &"20".to_value()).unwrap()),
515            ..Default::default()
516        };
517
518        let context = Context::new();
519
520        let result = step.execute(&context).await.unwrap();
521
522        assert_eq!(result.next_step, NextStep::Stop);
523        assert_eq!(result.output, Some(Value::from(20i64)));
524    }
525
526    #[tokio::test]
527    async fn test_step_execute_with_return_case_and_condition() {
528        let engine = build_engine_async(None);
529        let step = StepWorker {
530            id: ID::new(),
531            condition: Some(
532                Condition::try_build_with_operator(
533                    engine.clone(),
534                    "10".to_string(),
535                    "20".to_string(),
536                    crate::condition::Operator::Equal,
537                )
538                .unwrap(),
539            ),
540            return_case: Some(Script::try_build(engine.clone(), &"10".to_value()).unwrap()),
541            ..Default::default()
542        };
543
544        let context = Context::new();
545
546        let result = step.execute(&context).await.unwrap();
547
548        assert_eq!(result.next_step, NextStep::Stop);
549        assert_eq!(result.output, Some(Value::from(10i64)));
550    }
551
552    #[tokio::test]
553    async fn test_step_execute_with_return_case_and_condition_then_case() {
554        let engine = build_engine_async(None);
555        let step = StepWorker {
556            id: ID::new(),
557            condition: Some(
558                Condition::try_build_with_operator(
559                    engine.clone(),
560                    "10".to_string(),
561                    "20".to_string(),
562                    crate::condition::Operator::Equal,
563                )
564                .unwrap(),
565            ),
566            then_case: Some(0),
567            return_case: Some(Script::try_build(engine.clone(), &"Ok".to_value()).unwrap()),
568            ..Default::default()
569        };
570
571        let context = Context::new();
572        let output = step.execute(&context).await.unwrap();
573
574        assert_eq!(output.next_step, NextStep::Stop);
575        assert_eq!(output.output, Some(Value::from("Ok")));
576    }
577
578    #[tokio::test]
579    async fn test_step_execute_with_return_case_and_condition_else_case() {
580        let engine = build_engine_async(None);
581        let step = StepWorker {
582            id: ID::new(),
583            condition: Some(
584                Condition::try_build_with_operator(
585                    engine.clone(),
586                    "10".to_string(),
587                    "20".to_string(),
588                    crate::condition::Operator::Equal,
589                )
590                .unwrap(),
591            ),
592            else_case: Some(0),
593            return_case: Some(Script::try_build(engine.clone(), &"Ok".to_value()).unwrap()),
594            ..Default::default()
595        };
596
597        let context = Context::new();
598        let result = step.execute(&context).await.unwrap();
599
600        assert_eq!(result.next_step, NextStep::Stop);
601        assert_eq!(result.output, Some(Value::from("Ok")));
602    }
603}