phlow_engine/
step_worker.rs

1use crate::{
2    condition::{Condition, ConditionError},
3    context::Context,
4    id::ID,
5    script::Script,
6};
7use once_cell::sync::Lazy;
8use phlow_sdk::prelude::*;
9use rhai::Engine;
10use serde::Serialize;
11use std::{fmt::Display, sync::Arc};
12
13static PHLOW_TRUNCATE_SPAN_VALUE: Lazy<usize> =
14    Lazy::new(|| match std::env::var("PHLOW_TRUNCATE_SPAN_VALUE") {
15        Ok(value) => value.parse::<usize>().unwrap_or(100),
16        Err(_) => 100,
17    });
18
19#[derive(Debug)]
20pub enum StepWorkerError {
21    ConditionError(ConditionError),
22    PayloadError(phs::ScriptError),
23    ModulesError(ModulesError),
24    InputError(phs::ScriptError),
25}
26
27impl Display for StepWorkerError {
28    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
29        match self {
30            StepWorkerError::ConditionError(err) => write!(f, "Condition error: {}", err),
31            StepWorkerError::PayloadError(err) => write!(f, "Payload error: {}", err),
32            StepWorkerError::ModulesError(err) => write!(f, "Modules error: {}", err),
33            StepWorkerError::InputError(err) => write!(f, "Input error: {}", err),
34        }
35    }
36}
37
38#[derive(Debug, Clone, PartialEq, Serialize)]
39pub enum NextStep {
40    Pipeline(usize),
41    GoToStep(StepReference),
42    Stop,
43    Next,
44}
45
46#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, ToValue)]
47pub struct StepReference {
48    pub pipeline: usize,
49    pub step: usize,
50}
51
52#[derive(Debug)]
53pub struct StepOutput {
54    pub next_step: NextStep,
55    pub output: Option<Value>,
56}
57
58#[derive(Debug, Clone, Default)]
59pub struct StepWorker {
60    pub(crate) id: ID,
61    pub(crate) label: Option<String>,
62    pub(crate) module: Option<String>,
63    pub(crate) condition: Option<Condition>,
64    pub(crate) input: Option<Script>,
65    pub(crate) payload: Option<Script>,
66    pub(crate) then_case: Option<usize>,
67    pub(crate) else_case: Option<usize>,
68    pub(crate) modules: Arc<Modules>,
69    pub(crate) return_case: Option<Script>,
70    pub(crate) to: Option<StepReference>,
71}
72
73impl StepWorker {
74    pub fn try_from_value(
75        engine: Arc<Engine>,
76        modules: Arc<Modules>,
77        value: &Value,
78    ) -> Result<Self, StepWorkerError> {
79        let id = match value.get("id") {
80            Some(id) => ID::from(id),
81            None => ID::new(),
82        };
83        let label: Option<String> = match value.get("label") {
84            Some(label) => Some(label.as_string()),
85            None => None,
86        };
87        let condition = {
88            if let Some(condition) = value
89                .get("condition")
90                .map(|condition| Condition::try_from_value(engine.clone(), condition))
91            {
92                Some(condition.map_err(StepWorkerError::ConditionError)?)
93            } else {
94                if let Some(condition) = value.get("assert").map(|assert| {
95                    Condition::try_build_with_assert(engine.clone(), assert.to_string())
96                }) {
97                    Some(condition.map_err(StepWorkerError::ConditionError)?)
98                } else {
99                    None
100                }
101            }
102        };
103        let payload = match value.get("payload") {
104            Some(payload) => match Script::try_build(engine.clone(), payload) {
105                Ok(payload) => Some(payload),
106                Err(err) => return Err(StepWorkerError::PayloadError(err)),
107            },
108            None => None,
109        };
110        let input = match value.get("input") {
111            Some(input) => match Script::try_build(engine.clone(), input) {
112                Ok(input) => Some(input),
113                Err(err) => return Err(StepWorkerError::InputError(err)),
114            },
115            None => None,
116        };
117        let then_case = match value.get("then") {
118            Some(then_case) => match then_case.to_u64() {
119                Some(then_case) => Some(then_case as usize),
120                None => None,
121            },
122            None => None,
123        };
124        let else_case = match value.get("else") {
125            Some(else_case) => match else_case.to_u64() {
126                Some(else_case) => Some(else_case as usize),
127                None => None,
128            },
129            None => None,
130        };
131        let return_case = match value.get("return") {
132            Some(return_case) => match Script::try_build(engine, return_case) {
133                Ok(return_case) => Some(return_case),
134                Err(err) => return Err(StepWorkerError::PayloadError(err)),
135            },
136            None => None,
137        };
138        let module = match value.get("use") {
139            Some(module) => Some(module.to_string()),
140            None => None,
141        };
142
143        let to = match value.get("to") {
144            Some(to_step) => match to_step.as_object() {
145                Some(to_step) => {
146                    let pipeline = to_step.get("pipeline").and_then(|v| v.to_u64());
147                    let step = to_step.get("step").and_then(|v| v.to_u64());
148
149                    if pipeline.is_some() && step.is_some() {
150                        Some(StepReference {
151                            pipeline: pipeline.unwrap() as usize,
152                            step: step.unwrap() as usize,
153                        })
154                    } else {
155                        None
156                    }
157                }
158                None => None,
159            },
160            None => None,
161        };
162
163        Ok(Self {
164            id,
165            label,
166            module,
167            input,
168            condition,
169            payload,
170            then_case,
171            else_case,
172            modules,
173            return_case,
174            to,
175        })
176    }
177
178    pub fn get_id(&self) -> &ID {
179        &self.id
180    }
181
182    fn evaluate_payload(
183        &self,
184        context: &Context,
185        default: Option<Value>,
186    ) -> Result<Option<Value>, StepWorkerError> {
187        if let Some(ref payload) = self.payload {
188            let value = Some(
189                payload
190                    .evaluate(context)
191                    .map_err(StepWorkerError::PayloadError)?,
192            );
193            Ok(value)
194        } else {
195            Ok(default)
196        }
197    }
198
199    fn evaluate_input(&self, context: &Context) -> Result<Option<Value>, StepWorkerError> {
200        if let Some(ref input) = self.input {
201            let value = Some(
202                input
203                    .evaluate(context)
204                    .map_err(StepWorkerError::InputError)?,
205            );
206            Ok(value)
207        } else {
208            Ok(None)
209        }
210    }
211
212    fn evaluate_return(&self, context: &Context) -> Result<Option<Value>, StepWorkerError> {
213        if let Some(ref return_case) = self.return_case {
214            let value = Some(
215                return_case
216                    .evaluate(context)
217                    .map_err(StepWorkerError::PayloadError)?,
218            );
219            Ok(value)
220        } else {
221            Ok(None)
222        }
223    }
224
225    async fn evaluate_module(
226        &self,
227        context: &Context,
228    ) -> Result<Option<(Option<String>, Option<Value>, Context)>, StepWorkerError> {
229        if let Some(ref module) = self.module {
230            let input = self.evaluate_input(context)?;
231
232            let context = if let Some(input) = &input {
233                context.add_module_input(input.clone())
234            } else {
235                context.clone()
236            };
237
238            match self.modules.execute(module, &context.input).await {
239                Ok(response) => {
240                    if let Some(err) = response.error {
241                        return Err(StepWorkerError::ModulesError(ModulesError::ModuleError(
242                            err,
243                        )));
244                    }
245
246                    Ok(Some((Some(module.clone()), Some(response.data), context)))
247                }
248                Err(err) => Err(StepWorkerError::ModulesError(err)),
249            }
250        } else {
251            Ok(None)
252        }
253    }
254
255    pub async fn execute(&self, context: &Context) -> Result<StepOutput, StepWorkerError> {
256        let span = tracing::info_span!(
257            "step",
258            otel.name = field::Empty,
259            context.main = field::Empty,
260            context.params = field::Empty,
261            context.payload = field::Empty,
262            context.input = field::Empty,
263            step.id = field::Empty,
264            step.label = field::Empty,
265            step.module = field::Empty,
266            step.condition = field::Empty,
267            step.payload = field::Empty,
268            step.return = field::Empty,
269        );
270        let _guard = span.enter();
271
272        {
273            let step_name = self.label.clone().unwrap_or(self.id.to_string());
274            span.record("otel.name", format!("step {}", step_name));
275
276            if let Some(ref input) = context.input {
277                span.record("context.input", input.to_string());
278            }
279
280            if let Some(ref payload) = context.payload {
281                span.record("context.payload", truncate_string(&payload));
282            }
283
284            if let Some(ref main) = context.main {
285                span.record("context.main", truncate_string(&main));
286            }
287
288            span.record("step.id", self.id.to_string());
289
290            if let Some(ref label) = self.label {
291                span.record("step.label", label.to_string());
292            }
293        }
294
295        if let Some(output) = self.evaluate_return(context)? {
296            {
297                span.record("step.return", output.to_string());
298            }
299
300            return Ok(StepOutput {
301                next_step: NextStep::Stop,
302                output: Some(output),
303            });
304        }
305
306        if let Some((module, output, context)) = self.evaluate_module(context).await? {
307            {
308                span.record("step.module", module.clone());
309
310                if let Some(ref output) = output {
311                    span.record("context.payload", truncate_string(output));
312                }
313            }
314
315            let context = if let Some(output) = output.clone() {
316                context.add_module_output(output)
317            } else {
318                context.clone()
319            };
320
321            return Ok(StepOutput {
322                next_step: NextStep::Next,
323                output: self.evaluate_payload(&context, output)?,
324            });
325        }
326
327        if let Some(condition) = &self.condition {
328            let (next_step, output) = if condition
329                .evaluate(context)
330                .map_err(StepWorkerError::ConditionError)?
331            {
332                let next_step = if let Some(ref then_case) = self.then_case {
333                    NextStep::Pipeline(*then_case)
334                } else {
335                    NextStep::Next
336                };
337
338                (next_step, self.evaluate_payload(context, None)?)
339            } else {
340                let next_step = if let Some(ref else_case) = self.else_case {
341                    NextStep::Pipeline(*else_case)
342                } else {
343                    NextStep::Next
344                };
345
346                (next_step, None)
347            };
348
349            {
350                span.record("step.condition", condition.raw.to_string());
351
352                if let Some(ref output) = output {
353                    span.record("context.payload", truncate_string(output));
354                }
355            }
356
357            return Ok(StepOutput { next_step, output });
358        }
359
360        let output = self.evaluate_payload(context, None)?;
361
362        {
363            if let Some(ref output) = output {
364                span.record("context.payload", truncate_string(output));
365            }
366        }
367
368        if let Some(to) = &self.to {
369            return Ok(StepOutput {
370                next_step: NextStep::GoToStep(to.clone()),
371                output,
372            });
373        }
374
375        return Ok(StepOutput {
376            next_step: NextStep::Next,
377            output,
378        });
379    }
380}
381
382fn truncate_string(string: &Value) -> String {
383    let limit = *PHLOW_TRUNCATE_SPAN_VALUE;
384    let string = string.to_string();
385    if string.len() > limit {
386        format!("{}...", &string[..limit])
387    } else {
388        string.to_string()
389    }
390}
391
392#[cfg(test)]
393mod test {
394    use super::*;
395    use phlow_sdk::valu3;
396    use phs::build_engine;
397    use valu3::prelude::ToValueBehavior;
398    use valu3::value::Value;
399
400    #[tokio::test]
401    async fn test_step_get_reference_id() {
402        let step = StepWorker {
403            id: ID::from("id"),
404            label: Some("label".to_string()),
405            ..Default::default()
406        };
407
408        assert_eq!(step.get_id(), &ID::from("id"));
409    }
410
411    #[tokio::test]
412    async fn test_step_execute() {
413        let engine = build_engine(None);
414        let step = StepWorker {
415            payload: Some(Script::try_build(engine, &"10".to_value()).unwrap()),
416            ..Default::default()
417        };
418
419        let context = Context::new();
420
421        let result = step.execute(&context).await.unwrap();
422
423        assert_eq!(result.next_step, NextStep::Next);
424        assert_eq!(result.output, Some(Value::from(10i64)));
425    }
426
427    #[tokio::test]
428    async fn test_step_execute_with_condition() {
429        let engine = build_engine(None);
430        let step = StepWorker {
431            id: ID::new(),
432            condition: Some(
433                Condition::try_build_with_operator(
434                    engine.clone(),
435                    "10".to_string(),
436                    "20".to_string(),
437                    crate::condition::Operator::NotEqual,
438                )
439                .unwrap(),
440            ),
441            payload: Some(Script::try_build(engine, &"10".to_value()).unwrap()),
442            ..Default::default()
443        };
444
445        let context = Context::new();
446
447        let result = step.execute(&context).await.unwrap();
448
449        assert_eq!(result.next_step, NextStep::Next);
450        assert_eq!(result.output, Some(Value::from(10i64)));
451    }
452
453    #[tokio::test]
454    async fn test_step_execute_with_condition_then_case() {
455        let engine = build_engine(None);
456        let step = StepWorker {
457            id: ID::new(),
458            condition: Some(
459                Condition::try_build_with_operator(
460                    engine.clone(),
461                    "10".to_string(),
462                    "20".to_string(),
463                    crate::condition::Operator::NotEqual,
464                )
465                .unwrap(),
466            ),
467            payload: Some(Script::try_build(engine, &"10".to_value()).unwrap()),
468            then_case: Some(0),
469            ..Default::default()
470        };
471
472        let context = Context::new();
473
474        let result = step.execute(&context).await.unwrap();
475
476        assert_eq!(result.next_step, NextStep::Pipeline(0));
477        assert_eq!(result.output, Some(Value::from(10i64)));
478    }
479
480    #[tokio::test]
481    async fn test_step_execute_with_condition_else_case() {
482        let engine = build_engine(None);
483        let step = StepWorker {
484            id: ID::new(),
485            condition: Some(
486                Condition::try_build_with_operator(
487                    engine.clone(),
488                    "10".to_string(),
489                    "20".to_string(),
490                    crate::condition::Operator::Equal,
491                )
492                .unwrap(),
493            ),
494            payload: Some(Script::try_build(engine.clone(), &"10".to_value()).unwrap()),
495            else_case: Some(1),
496            ..Default::default()
497        };
498
499        let context = Context::new();
500
501        let result = step.execute(&context).await.unwrap();
502
503        assert_eq!(result.next_step, NextStep::Pipeline(1));
504        assert_eq!(result.output, None);
505    }
506
507    #[tokio::test]
508    async fn test_step_execute_with_return_case() {
509        let engine = build_engine(None);
510        let step = StepWorker {
511            id: ID::new(),
512            return_case: Some(Script::try_build(engine.clone(), &"10".to_value()).unwrap()),
513            ..Default::default()
514        };
515
516        let context = Context::new();
517
518        let result = step.execute(&context).await.unwrap();
519
520        assert_eq!(result.next_step, NextStep::Stop);
521        assert_eq!(result.output, Some(Value::from(10i64)));
522    }
523
524    #[tokio::test]
525    async fn test_step_execute_with_return_case_and_payload() {
526        let engine = build_engine(None);
527        let step = StepWorker {
528            id: ID::new(),
529            payload: Some(Script::try_build(engine.clone(), &"10".to_value()).unwrap()),
530            return_case: Some(Script::try_build(engine.clone(), &"20".to_value()).unwrap()),
531            ..Default::default()
532        };
533
534        let context = Context::new();
535
536        let result = step.execute(&context).await.unwrap();
537
538        assert_eq!(result.next_step, NextStep::Stop);
539        assert_eq!(result.output, Some(Value::from(20i64)));
540    }
541
542    #[tokio::test]
543    async fn test_step_execute_with_return_case_and_condition() {
544        let engine = build_engine(None);
545        let step = StepWorker {
546            id: ID::new(),
547            condition: Some(
548                Condition::try_build_with_operator(
549                    engine.clone(),
550                    "10".to_string(),
551                    "20".to_string(),
552                    crate::condition::Operator::Equal,
553                )
554                .unwrap(),
555            ),
556            return_case: Some(Script::try_build(engine.clone(), &"10".to_value()).unwrap()),
557            ..Default::default()
558        };
559
560        let context = Context::new();
561
562        let result = step.execute(&context).await.unwrap();
563
564        assert_eq!(result.next_step, NextStep::Stop);
565        assert_eq!(result.output, Some(Value::from(10i64)));
566    }
567
568    #[tokio::test]
569    async fn test_step_execute_with_return_case_and_condition_then_case() {
570        let engine = build_engine(None);
571        let step = StepWorker {
572            id: ID::new(),
573            condition: Some(
574                Condition::try_build_with_operator(
575                    engine.clone(),
576                    "10".to_string(),
577                    "20".to_string(),
578                    crate::condition::Operator::Equal,
579                )
580                .unwrap(),
581            ),
582            then_case: Some(0),
583            return_case: Some(Script::try_build(engine.clone(), &"Ok".to_value()).unwrap()),
584            ..Default::default()
585        };
586
587        let context = Context::new();
588        let output = step.execute(&context).await.unwrap();
589
590        assert_eq!(output.next_step, NextStep::Stop);
591        assert_eq!(output.output, Some(Value::from("Ok")));
592    }
593
594    #[tokio::test]
595    async fn test_step_execute_with_return_case_and_condition_else_case() {
596        let engine = build_engine(None);
597        let step = StepWorker {
598            id: ID::new(),
599            condition: Some(
600                Condition::try_build_with_operator(
601                    engine.clone(),
602                    "10".to_string(),
603                    "20".to_string(),
604                    crate::condition::Operator::Equal,
605                )
606                .unwrap(),
607            ),
608            else_case: Some(0),
609            return_case: Some(Script::try_build(engine.clone(), &"Ok".to_value()).unwrap()),
610            ..Default::default()
611        };
612
613        let context = Context::new();
614        let result = step.execute(&context).await.unwrap();
615
616        assert_eq!(result.next_step, NextStep::Stop);
617        assert_eq!(result.output, Some(Value::from("Ok")));
618    }
619}