phlow_engine/
step_worker.rs

1use crate::{
2    condition::{Condition, ConditionError},
3    context::Context,
4    id::ID,
5    script::{Script, ScriptError},
6};
7use once_cell::sync::Lazy;
8use phlow_sdk::prelude::*;
9use rhai::Engine;
10use serde::Serialize;
11use std::sync::Arc;
12
13static PHLOW_TRUNCATE_SPAN_VALUE: Lazy<usize> =
14    Lazy::new(|| match std::env::var("PHLOW_TRUNCATE_SPAN_VALUE") {
15        Ok(value) => value.parse::<usize>().unwrap_or(100),
16        Err(_) => 100,
17    });
18
19#[derive(Debug)]
20pub enum StepWorkerError {
21    ConditionError(ConditionError),
22    PayloadError(ScriptError),
23    ModulesError(ModulesError),
24    InputError(ScriptError),
25}
26
27#[derive(Debug, Clone, PartialEq, Serialize)]
28pub enum NextStep {
29    Pipeline(usize),
30    GoToStep(StepReference),
31    Stop,
32    Next,
33}
34
35#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, ToValue)]
36pub struct StepReference {
37    pub pipeline: usize,
38    pub step: usize,
39}
40
41#[derive(Debug)]
42pub struct StepOutput {
43    pub next_step: NextStep,
44    pub output: Option<Value>,
45}
46
47#[derive(Debug, Clone, Default)]
48pub struct StepWorker {
49    pub(crate) id: ID,
50    pub(crate) label: Option<String>,
51    pub(crate) module: Option<String>,
52    pub(crate) condition: Option<Condition>,
53    pub(crate) input: Option<Script>,
54    pub(crate) payload: Option<Script>,
55    pub(crate) then_case: Option<usize>,
56    pub(crate) else_case: Option<usize>,
57    pub(crate) modules: Arc<Modules>,
58    pub(crate) return_case: Option<Script>,
59    pub(crate) to: Option<StepReference>,
60}
61
62impl StepWorker {
63    pub fn try_from_value(
64        engine: Arc<Engine>,
65        modules: Arc<Modules>,
66        value: &Value,
67    ) -> Result<Self, StepWorkerError> {
68        let id = match value.get("id") {
69            Some(id) => ID::from(id),
70            None => ID::new(),
71        };
72        let label: Option<String> = match value.get("label") {
73            Some(label) => Some(label.as_string()),
74            None => None,
75        };
76        let condition = {
77            if let Some(condition) = value
78                .get("condition")
79                .map(|condition| Condition::try_from_value(engine.clone(), condition))
80            {
81                Some(condition.map_err(StepWorkerError::ConditionError)?)
82            } else {
83                None
84            }
85        };
86        let payload = match value.get("payload") {
87            Some(payload) => match Script::try_build(engine.clone(), payload) {
88                Ok(payload) => Some(payload),
89                Err(err) => return Err(StepWorkerError::PayloadError(err)),
90            },
91            None => None,
92        };
93        let input = match value.get("input") {
94            Some(input) => match Script::try_build(engine.clone(), input) {
95                Ok(input) => Some(input),
96                Err(err) => return Err(StepWorkerError::InputError(err)),
97            },
98            None => None,
99        };
100        let then_case = match value.get("then") {
101            Some(then_case) => match then_case.to_u64() {
102                Some(then_case) => Some(then_case as usize),
103                None => None,
104            },
105            None => None,
106        };
107        let else_case = match value.get("else") {
108            Some(else_case) => match else_case.to_u64() {
109                Some(else_case) => Some(else_case as usize),
110                None => None,
111            },
112            None => None,
113        };
114        let return_case = match value.get("return") {
115            Some(return_case) => match Script::try_build(engine, return_case) {
116                Ok(return_case) => Some(return_case),
117                Err(err) => return Err(StepWorkerError::PayloadError(err)),
118            },
119            None => None,
120        };
121        let module = match value.get("use") {
122            Some(module) => Some(module.to_string()),
123            None => None,
124        };
125
126        let to = match value.get("to") {
127            Some(to_step) => match to_step.as_object() {
128                Some(to_step) => {
129                    let pipeline = to_step.get("pipeline").and_then(|v| v.to_u64());
130                    let step = to_step.get("step").and_then(|v| v.to_u64());
131
132                    if pipeline.is_some() && step.is_some() {
133                        Some(StepReference {
134                            pipeline: pipeline.unwrap() as usize,
135                            step: step.unwrap() as usize,
136                        })
137                    } else {
138                        None
139                    }
140                }
141                None => None,
142            },
143            None => None,
144        };
145
146        Ok(Self {
147            id,
148            label,
149            module,
150            input,
151            condition,
152            payload,
153            then_case,
154            else_case,
155            modules,
156            return_case,
157            to,
158        })
159    }
160
161    pub fn get_id(&self) -> &ID {
162        &self.id
163    }
164
165    fn evaluate_payload(
166        &self,
167        context: &Context,
168        default: Option<Value>,
169    ) -> Result<Option<Value>, StepWorkerError> {
170        if let Some(ref payload) = self.payload {
171            let value = Some(
172                payload
173                    .evaluate(context)
174                    .map_err(StepWorkerError::PayloadError)?,
175            );
176            Ok(value)
177        } else {
178            Ok(default)
179        }
180    }
181
182    fn evaluate_input(&self, context: &Context) -> Result<Option<Value>, StepWorkerError> {
183        if let Some(ref input) = self.input {
184            let value = Some(
185                input
186                    .evaluate(context)
187                    .map_err(StepWorkerError::InputError)?,
188            );
189            Ok(value)
190        } else {
191            Ok(None)
192        }
193    }
194
195    fn evaluate_return(&self, context: &Context) -> Result<Option<Value>, StepWorkerError> {
196        if let Some(ref return_case) = self.return_case {
197            let value = Some(
198                return_case
199                    .evaluate(context)
200                    .map_err(StepWorkerError::PayloadError)?,
201            );
202            Ok(value)
203        } else {
204            Ok(None)
205        }
206    }
207
208    async fn evaluate_module(
209        &self,
210        context: &Context,
211    ) -> Result<Option<(Option<String>, Option<Value>, Context)>, StepWorkerError> {
212        if let Some(ref module) = self.module {
213            let input = self.evaluate_input(context)?;
214
215            let context = if let Some(input) = &input {
216                context.add_module_input(input.clone())
217            } else {
218                context.clone()
219            };
220
221            match self.modules.execute(module, &context.input).await {
222                Ok(response) => {
223                    if let Some(err) = response.error {
224                        return Err(StepWorkerError::ModulesError(ModulesError::ModuleError(
225                            err,
226                        )));
227                    }
228
229                    Ok(Some((Some(module.clone()), Some(response.data), context)))
230                }
231                Err(err) => Err(StepWorkerError::ModulesError(err)),
232            }
233        } else {
234            Ok(None)
235        }
236    }
237
238    pub async fn execute(&self, context: &Context) -> Result<StepOutput, StepWorkerError> {
239        let span = tracing::info_span!(
240            "step",
241            otel.name = field::Empty,
242            context.main = field::Empty,
243            context.params = field::Empty,
244            context.payload = field::Empty,
245            context.input = field::Empty,
246            step.id = field::Empty,
247            step.label = field::Empty,
248            step.module = field::Empty,
249            step.condition = field::Empty,
250            step.payload = field::Empty,
251            step.return = field::Empty,
252        );
253        let _guard = span.enter();
254
255        {
256            let step_name = self.label.clone().unwrap_or(self.id.to_string());
257            span.record("otel.name", format!("step {}", step_name));
258
259            if let Some(ref input) = context.input {
260                span.record("context.input", input.to_string());
261            }
262
263            if let Some(ref payload) = context.payload {
264                span.record("context.payload", truncate_string(&payload));
265            }
266
267            if let Some(ref main) = context.main {
268                span.record("context.main", truncate_string(&main));
269            }
270
271            span.record("step.id", self.id.to_string());
272
273            if let Some(ref label) = self.label {
274                span.record("step.label", label.to_string());
275            }
276        }
277
278        if let Some(output) = self.evaluate_return(context)? {
279            {
280                span.record("step.return", output.to_string());
281            }
282
283            return Ok(StepOutput {
284                next_step: NextStep::Stop,
285                output: Some(output),
286            });
287        }
288
289        if let Some((module, output, context)) = self.evaluate_module(context).await? {
290            {
291                span.record("step.module", module.clone());
292
293                if let Some(ref output) = output {
294                    span.record("context.payload", truncate_string(output));
295                }
296            }
297
298            let context = if let Some(output) = output.clone() {
299                context.add_module_output(output)
300            } else {
301                context.clone()
302            };
303
304            return Ok(StepOutput {
305                next_step: NextStep::Next,
306                output: self.evaluate_payload(&context, output)?,
307            });
308        }
309
310        if let Some(condition) = &self.condition {
311            let (next_step, output) = if condition
312                .evaluate(context)
313                .map_err(StepWorkerError::ConditionError)?
314            {
315                let next_step = if let Some(ref then_case) = self.then_case {
316                    NextStep::Pipeline(*then_case)
317                } else {
318                    NextStep::Next
319                };
320
321                (next_step, self.evaluate_payload(context, None)?)
322            } else {
323                let next_step = if let Some(ref else_case) = self.else_case {
324                    NextStep::Pipeline(*else_case)
325                } else {
326                    NextStep::Next
327                };
328
329                (next_step, None)
330            };
331
332            {
333                span.record("step.condition", condition.raw.to_string());
334
335                if let Some(ref output) = output {
336                    span.record("context.payload", truncate_string(output));
337                }
338            }
339
340            return Ok(StepOutput { next_step, output });
341        }
342
343        let output = self.evaluate_payload(context, None)?;
344
345        {
346            if let Some(ref output) = output {
347                span.record("context.payload", truncate_string(output));
348            }
349        }
350
351        if let Some(to) = &self.to {
352            return Ok(StepOutput {
353                next_step: NextStep::GoToStep(to.clone()),
354                output,
355            });
356        }
357
358        return Ok(StepOutput {
359            next_step: NextStep::Next,
360            output,
361        });
362    }
363}
364
365fn truncate_string(string: &Value) -> String {
366    let limit = *PHLOW_TRUNCATE_SPAN_VALUE;
367    let string = string.to_string();
368    if string.len() > limit {
369        format!("{}...", &string[..limit])
370    } else {
371        string.to_string()
372    }
373}
374
375#[cfg(test)]
376mod test {
377    use super::*;
378    use phlow_sdk::valu3;
379    use phs::build_engine;
380    use valu3::prelude::ToValueBehavior;
381    use valu3::value::Value;
382
383    #[tokio::test]
384    async fn test_step_get_reference_id() {
385        let step = StepWorker {
386            id: ID::from("id"),
387            label: Some("label".to_string()),
388            ..Default::default()
389        };
390
391        assert_eq!(step.get_id(), &ID::from("id"));
392    }
393
394    #[tokio::test]
395    async fn test_step_execute() {
396        let engine = build_engine(None);
397        let step = StepWorker {
398            payload: Some(Script::try_build(engine, &"10".to_value()).unwrap()),
399            ..Default::default()
400        };
401
402        let context = Context::new();
403
404        let result = step.execute(&context).await.unwrap();
405
406        assert_eq!(result.next_step, NextStep::Next);
407        assert_eq!(result.output, Some(Value::from(10i64)));
408    }
409
410    #[tokio::test]
411    async fn test_step_execute_with_condition() {
412        let engine = build_engine(None);
413        let step = StepWorker {
414            id: ID::new(),
415            condition: Some(
416                Condition::try_build_with_operator(
417                    engine.clone(),
418                    "10".to_string(),
419                    "20".to_string(),
420                    crate::condition::Operator::NotEqual,
421                )
422                .unwrap(),
423            ),
424            payload: Some(Script::try_build(engine, &"10".to_value()).unwrap()),
425            ..Default::default()
426        };
427
428        let context = Context::new();
429
430        let result = step.execute(&context).await.unwrap();
431
432        assert_eq!(result.next_step, NextStep::Next);
433        assert_eq!(result.output, Some(Value::from(10i64)));
434    }
435
436    #[tokio::test]
437    async fn test_step_execute_with_condition_then_case() {
438        let engine = build_engine(None);
439        let step = StepWorker {
440            id: ID::new(),
441            condition: Some(
442                Condition::try_build_with_operator(
443                    engine.clone(),
444                    "10".to_string(),
445                    "20".to_string(),
446                    crate::condition::Operator::NotEqual,
447                )
448                .unwrap(),
449            ),
450            payload: Some(Script::try_build(engine, &"10".to_value()).unwrap()),
451            then_case: Some(0),
452            ..Default::default()
453        };
454
455        let context = Context::new();
456
457        let result = step.execute(&context).await.unwrap();
458
459        assert_eq!(result.next_step, NextStep::Pipeline(0));
460        assert_eq!(result.output, Some(Value::from(10i64)));
461    }
462
463    #[tokio::test]
464    async fn test_step_execute_with_condition_else_case() {
465        let engine = build_engine(None);
466        let step = StepWorker {
467            id: ID::new(),
468            condition: Some(
469                Condition::try_build_with_operator(
470                    engine.clone(),
471                    "10".to_string(),
472                    "20".to_string(),
473                    crate::condition::Operator::Equal,
474                )
475                .unwrap(),
476            ),
477            payload: Some(Script::try_build(engine.clone(), &"10".to_value()).unwrap()),
478            else_case: Some(1),
479            ..Default::default()
480        };
481
482        let context = Context::new();
483
484        let result = step.execute(&context).await.unwrap();
485
486        assert_eq!(result.next_step, NextStep::Pipeline(1));
487        assert_eq!(result.output, None);
488    }
489
490    #[tokio::test]
491    async fn test_step_execute_with_return_case() {
492        let engine = build_engine(None);
493        let step = StepWorker {
494            id: ID::new(),
495            return_case: Some(Script::try_build(engine.clone(), &"10".to_value()).unwrap()),
496            ..Default::default()
497        };
498
499        let context = Context::new();
500
501        let result = step.execute(&context).await.unwrap();
502
503        assert_eq!(result.next_step, NextStep::Stop);
504        assert_eq!(result.output, Some(Value::from(10i64)));
505    }
506
507    #[tokio::test]
508    async fn test_step_execute_with_return_case_and_payload() {
509        let engine = build_engine(None);
510        let step = StepWorker {
511            id: ID::new(),
512            payload: Some(Script::try_build(engine.clone(), &"10".to_value()).unwrap()),
513            return_case: Some(Script::try_build(engine.clone(), &"20".to_value()).unwrap()),
514            ..Default::default()
515        };
516
517        let context = Context::new();
518
519        let result = step.execute(&context).await.unwrap();
520
521        assert_eq!(result.next_step, NextStep::Stop);
522        assert_eq!(result.output, Some(Value::from(20i64)));
523    }
524
525    #[tokio::test]
526    async fn test_step_execute_with_return_case_and_condition() {
527        let engine = build_engine(None);
528        let step = StepWorker {
529            id: ID::new(),
530            condition: Some(
531                Condition::try_build_with_operator(
532                    engine.clone(),
533                    "10".to_string(),
534                    "20".to_string(),
535                    crate::condition::Operator::Equal,
536                )
537                .unwrap(),
538            ),
539            return_case: Some(Script::try_build(engine.clone(), &"10".to_value()).unwrap()),
540            ..Default::default()
541        };
542
543        let context = Context::new();
544
545        let result = step.execute(&context).await.unwrap();
546
547        assert_eq!(result.next_step, NextStep::Stop);
548        assert_eq!(result.output, Some(Value::from(10i64)));
549    }
550
551    #[tokio::test]
552    async fn test_step_execute_with_return_case_and_condition_then_case() {
553        let engine = build_engine(None);
554        let step = StepWorker {
555            id: ID::new(),
556            condition: Some(
557                Condition::try_build_with_operator(
558                    engine.clone(),
559                    "10".to_string(),
560                    "20".to_string(),
561                    crate::condition::Operator::Equal,
562                )
563                .unwrap(),
564            ),
565            then_case: Some(0),
566            return_case: Some(Script::try_build(engine.clone(), &"Ok".to_value()).unwrap()),
567            ..Default::default()
568        };
569
570        let context = Context::new();
571        let output = step.execute(&context).await.unwrap();
572
573        assert_eq!(output.next_step, NextStep::Stop);
574        assert_eq!(output.output, Some(Value::from("Ok")));
575    }
576
577    #[tokio::test]
578    async fn test_step_execute_with_return_case_and_condition_else_case() {
579        let engine = build_engine(None);
580        let step = StepWorker {
581            id: ID::new(),
582            condition: Some(
583                Condition::try_build_with_operator(
584                    engine.clone(),
585                    "10".to_string(),
586                    "20".to_string(),
587                    crate::condition::Operator::Equal,
588                )
589                .unwrap(),
590            ),
591            else_case: Some(0),
592            return_case: Some(Script::try_build(engine.clone(), &"Ok".to_value()).unwrap()),
593            ..Default::default()
594        };
595
596        let context = Context::new();
597        let result = step.execute(&context).await.unwrap();
598
599        assert_eq!(result.next_step, NextStep::Stop);
600        assert_eq!(result.output, Some(Value::from("Ok")));
601    }
602}