phlow_engine/
step_worker.rs

1use crate::{
2    condition::{Condition, ConditionError},
3    context::Context,
4    id::ID,
5    script::Script,
6};
7use once_cell::sync::Lazy;
8use phlow_sdk::{
9    prelude::{log::debug, *},
10    tracing::field,
11};
12use rhai::Engine;
13use serde::Serialize;
14use std::{fmt::Display, sync::Arc};
15
16static PHLOW_TRUNCATE_SPAN_VALUE: Lazy<usize> =
17    Lazy::new(|| match std::env::var("PHLOW_TRUNCATE_SPAN_VALUE") {
18        Ok(value) => value.parse::<usize>().unwrap_or(100),
19        Err(_) => 100,
20    });
21
22#[derive(Debug)]
23pub enum StepWorkerError {
24    ConditionError(ConditionError),
25    PayloadError(phs::ScriptError),
26    ModulesError(ModulesError),
27    InputError(phs::ScriptError),
28}
29
30impl Display for StepWorkerError {
31    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
32        match self {
33            StepWorkerError::ConditionError(err) => write!(f, "Condition error: {}", err),
34            StepWorkerError::PayloadError(err) => write!(f, "Payload error: {}", err),
35            StepWorkerError::ModulesError(err) => write!(f, "Modules error: {}", err),
36            StepWorkerError::InputError(err) => write!(f, "Input error: {}", err),
37        }
38    }
39}
40
41impl std::error::Error for StepWorkerError {
42    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
43        match self {
44            StepWorkerError::ConditionError(err) => Some(err),
45            StepWorkerError::PayloadError(_) => None, // ScriptError doesn't implement std::error::Error
46            StepWorkerError::ModulesError(_) => None, // ModulesError doesn't implement std::error::Error
47            StepWorkerError::InputError(_) => None, // ScriptError doesn't implement std::error::Error
48        }
49    }
50}
51
52#[derive(Debug, Clone, PartialEq, Serialize)]
53pub enum NextStep {
54    Pipeline(usize),
55    GoToStep(StepReference),
56    Stop,
57    Next,
58}
59
60#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, ToValue)]
61pub struct StepReference {
62    pub pipeline: usize,
63    pub step: usize,
64}
65
66#[derive(Debug)]
67pub struct StepOutput {
68    pub next_step: NextStep,
69    pub output: Option<Value>,
70}
71
72#[derive(Debug, Clone, Default)]
73pub struct StepWorker {
74    pub(crate) id: ID,
75    pub(crate) label: Option<String>,
76    pub(crate) module: Option<String>,
77    pub(crate) condition: Option<Condition>,
78    pub(crate) input: Option<Script>,
79    pub(crate) payload: Option<Script>,
80    pub(crate) then_case: Option<usize>,
81    pub(crate) else_case: Option<usize>,
82    pub(crate) modules: Arc<Modules>,
83    pub(crate) return_case: Option<Script>,
84    pub(crate) to: Option<StepReference>,
85    #[cfg(debug_assertions)]
86    pub(crate) step_raw: String,
87}
88
89impl StepWorker {
90    pub fn try_from_value(
91        engine: Arc<Engine>,
92        modules: Arc<Modules>,
93        value: &Value,
94    ) -> Result<Self, StepWorkerError> {
95        log::debug!(
96            "Parsing step worker from value: {}",
97            value.to_json(JsonMode::Indented)
98        );
99
100        let id = match value.get("id") {
101            Some(id) => ID::from(id),
102            None => ID::new(),
103        };
104        let label: Option<String> = match value.get("label") {
105            Some(label) => Some(label.as_string()),
106            None => None,
107        };
108        let condition = {
109            if let Some(condition) = value
110                .get("condition")
111                .map(|condition| Condition::try_from_value(engine.clone(), condition))
112            {
113                Some(condition.map_err(StepWorkerError::ConditionError)?)
114            } else {
115                if let Some(condition) = value.get("assert").map(|assert| {
116                    Condition::try_build_with_assert(engine.clone(), assert.to_string())
117                }) {
118                    Some(condition.map_err(StepWorkerError::ConditionError)?)
119                } else {
120                    None
121                }
122            }
123        };
124        let payload = match value.get("payload") {
125            Some(payload) => match Script::try_build(engine.clone(), payload) {
126                Ok(payload) => Some(payload),
127                Err(err) => return Err(StepWorkerError::PayloadError(err)),
128            },
129            None => None,
130        };
131        let input = match value.get("input") {
132            Some(input) => match Script::try_build(engine.clone(), input) {
133                Ok(input) => Some(input),
134                Err(err) => return Err(StepWorkerError::InputError(err)),
135            },
136            None => None,
137        };
138        let then_case = match value.get("then") {
139            Some(then_case) => match then_case.to_u64() {
140                Some(then_case) => Some(then_case as usize),
141                None => None,
142            },
143            None => None,
144        };
145        let else_case = match value.get("else") {
146            Some(else_case) => match else_case.to_u64() {
147                Some(else_case) => Some(else_case as usize),
148                None => None,
149            },
150            None => None,
151        };
152        let return_case = match value.get("return") {
153            Some(return_case) => match Script::try_build(engine, return_case) {
154                Ok(return_case) => Some(return_case),
155                Err(err) => return Err(StepWorkerError::PayloadError(err)),
156            },
157            None => None,
158        };
159        let module = match value.get("use") {
160            Some(module) => Some(module.to_string()),
161            None => None,
162        };
163
164        let to = match value.get("to") {
165            Some(to_step) => match to_step.as_object() {
166                Some(to_step) => {
167                    let pipeline = to_step.get("pipeline").and_then(|v| v.to_u64());
168                    let step = to_step.get("step").and_then(|v| v.to_u64());
169
170                    if pipeline.is_some() && step.is_some() {
171                        Some(StepReference {
172                            pipeline: pipeline.unwrap() as usize,
173                            step: step.unwrap() as usize,
174                        })
175                    } else {
176                        None
177                    }
178                }
179                None => None,
180            },
181            None => None,
182        };
183
184        #[cfg(debug_assertions)]
185        let step_raw = value.to_string();
186
187        Ok(Self {
188            id,
189            label,
190            module,
191            input,
192            condition,
193            payload,
194            then_case,
195            else_case,
196            modules,
197            return_case,
198            to,
199            #[cfg(debug_assertions)]
200            step_raw,
201        })
202    }
203
204    pub fn get_id(&self) -> &ID {
205        &self.id
206    }
207
208    fn evaluate_payload(
209        &self,
210        context: &Context,
211        default: Option<Value>,
212    ) -> Result<Option<Value>, StepWorkerError> {
213        if let Some(ref payload) = self.payload {
214            let value = Some(
215                payload
216                    .evaluate(context)
217                    .map_err(StepWorkerError::PayloadError)?,
218            );
219            Ok(value)
220        } else {
221            Ok(default)
222        }
223    }
224
225    fn evaluate_input(&self, context: &Context) -> Result<Option<Value>, StepWorkerError> {
226        if let Some(ref input) = self.input {
227            let value = Some(
228                input
229                    .evaluate(context)
230                    .map_err(StepWorkerError::InputError)?,
231            );
232            Ok(value)
233        } else {
234            Ok(None)
235        }
236    }
237
238    fn evaluate_return(&self, context: &Context) -> Result<Option<Value>, StepWorkerError> {
239        if let Some(ref return_case) = self.return_case {
240            let value = Some(
241                return_case
242                    .evaluate(context)
243                    .map_err(StepWorkerError::PayloadError)?,
244            );
245
246            #[cfg(debug_assertions)]
247            log::debug!(
248                "Evaluating return case for step {}: {}",
249                self.id,
250                value.as_ref().map_or("None".to_string(), |v| v.to_string())
251            );
252
253            Ok(value)
254        } else {
255            Ok(None)
256        }
257    }
258
259    async fn evaluate_module(
260        &self,
261        context: &Context,
262    ) -> Result<Option<(Option<String>, Option<Value>, Context)>, StepWorkerError> {
263        if let Some(ref module) = self.module {
264            let input = self.evaluate_input(context)?;
265
266            let context = if let Some(input) = &input {
267                context.add_module_input(input.clone())
268            } else {
269                context.clone()
270            };
271
272            match self
273                .modules
274                .execute(module, &context.get_input(), &context.get_payload())
275                .await
276            {
277                Ok(response) => {
278                    #[cfg(debug_assertions)]
279                    log::debug!("Module response for step {}: {:?}", self.id, response);
280
281                    if let Some(err) = response.error {
282                        return Err(StepWorkerError::ModulesError(ModulesError::ModuleError(
283                            err,
284                        )));
285                    }
286
287                    Ok(Some((Some(module.clone()), Some(response.data), context)))
288                }
289                Err(err) => Err(StepWorkerError::ModulesError(err)),
290            }
291        } else {
292            Ok(None)
293        }
294    }
295
296    pub async fn execute(&self, context: &Context) -> Result<StepOutput, StepWorkerError> {
297        #[cfg(debug_assertions)]
298        log::debug!(
299            "Entering step: {}, with: \n\tmain={:?}\n\tpayload={:?}\n\tsetup={:?}",
300            self.step_raw,
301            &context.get_main().to_value().to_string(),
302            &context.get_payload().to_value().to_string(),
303            &context.get_setup().to_value().to_string()
304        );
305
306        let span = tracing::info_span!(
307            "step",
308            otel.name = field::Empty,
309            context.main = field::Empty,
310            context.params = field::Empty,
311            context.payload = field::Empty,
312            context.input = field::Empty,
313            step.id = field::Empty,
314            step.label = field::Empty,
315            step.module = field::Empty,
316            step.condition = field::Empty,
317            step.payload = field::Empty,
318            step.return = field::Empty,
319        );
320        let _guard = span.enter();
321
322        {
323            let step_name = self.label.clone().unwrap_or(self.id.to_string());
324
325            span.record("otel.name", format!("step {}", step_name));
326
327            if let Some(ref input) = context.get_input() {
328                span.record("context.input", input.to_string());
329            }
330
331            if let Some(ref payload) = context.get_payload() {
332                span.record("context.payload", truncate_string(&payload));
333            }
334
335            if let Some(ref main) = context.get_main() {
336                span.record("context.main", truncate_string(&main));
337            }
338
339            span.record("step.id", self.id.to_string());
340
341            if let Some(ref label) = self.label {
342                span.record("step.label", label.to_string());
343            }
344        }
345
346        if let Some(output) = self.evaluate_return(context)? {
347            {
348                span.record("step.return", output.to_string());
349            }
350
351            return Ok(StepOutput {
352                next_step: NextStep::Stop,
353                output: Some(output),
354            });
355        }
356
357        if let Some((module, output, context)) = self.evaluate_module(context).await? {
358            {
359                span.record("step.module", module.clone());
360
361                if let Some(ref output) = output {
362                    span.record("context.payload", truncate_string(output));
363                }
364            }
365
366            let context = if let Some(output) = output.clone() {
367                context.add_module_output(output)
368            } else {
369                context.clone()
370            };
371
372            let output = self.evaluate_payload(&context, output)?;
373
374            if let Some(to) = &self.to {
375                debug!(
376                    "Define switching to step {} in pipeline {}",
377                    to.step, to.pipeline
378                );
379                return Ok(StepOutput {
380                    next_step: NextStep::GoToStep(to.clone()),
381                    output,
382                });
383            }
384
385            return Ok(StepOutput {
386                next_step: NextStep::Next,
387                output,
388            });
389        }
390
391        if let Some(condition) = &self.condition {
392            let (next_step, output) = if condition
393                .evaluate(context)
394                .map_err(StepWorkerError::ConditionError)?
395            {
396                let next_step = if let Some(ref then_case) = self.then_case {
397                    NextStep::Pipeline(*then_case)
398                } else {
399                    NextStep::Next
400                };
401
402                (next_step, self.evaluate_payload(context, None)?)
403            } else {
404                let next_step = if let Some(ref else_case) = self.else_case {
405                    NextStep::Pipeline(*else_case)
406                } else {
407                    NextStep::Next
408                };
409
410                (next_step, None)
411            };
412
413            {
414                span.record("step.condition", condition.raw.to_string());
415
416                if let Some(ref output) = output {
417                    span.record("context.payload", truncate_string(output));
418                }
419            }
420
421            return Ok(StepOutput { next_step, output });
422        }
423
424        let output = self.evaluate_payload(context, None)?;
425
426        {
427            if let Some(ref output) = output {
428                span.record("context.payload", truncate_string(output));
429            }
430        }
431
432        if let Some(to) = &self.to {
433            debug!(
434                "Define switching to step {} in pipeline {}",
435                to.step, to.pipeline
436            );
437            return Ok(StepOutput {
438                next_step: NextStep::GoToStep(to.clone()),
439                output,
440            });
441        }
442
443        return Ok(StepOutput {
444            next_step: NextStep::Next,
445            output,
446        });
447    }
448}
449
450fn truncate_string(string: &Value) -> String {
451    let limit = *PHLOW_TRUNCATE_SPAN_VALUE;
452    let string = string.to_string();
453    if string.len() > limit {
454        format!("{}...", &string[..limit])
455    } else {
456        string.to_string()
457    }
458}
459
460#[cfg(test)]
461mod test {
462    use super::*;
463    use phlow_sdk::valu3;
464    use phs::build_engine;
465    use valu3::prelude::ToValueBehavior;
466    use valu3::value::Value;
467
468    #[tokio::test]
469    async fn test_step_get_reference_id() {
470        let step = StepWorker {
471            id: ID::from("id"),
472            label: Some("label".to_string()),
473            ..Default::default()
474        };
475
476        assert_eq!(step.get_id(), &ID::from("id"));
477    }
478
479    #[tokio::test]
480    async fn test_step_execute() {
481        let engine = build_engine(None);
482        let step = StepWorker {
483            payload: Some(Script::try_build(engine, &"10".to_value()).unwrap()),
484            ..Default::default()
485        };
486
487        let context = Context::new();
488
489        let result = step.execute(&context).await.unwrap();
490
491        assert_eq!(result.next_step, NextStep::Next);
492        assert_eq!(result.output, Some(Value::from(10i64)));
493    }
494
495    #[tokio::test]
496    async fn test_step_execute_with_condition() {
497        let engine = build_engine(None);
498        let step = StepWorker {
499            id: ID::new(),
500            condition: Some(
501                Condition::try_build_with_operator(
502                    engine.clone(),
503                    "10".to_string(),
504                    "20".to_string(),
505                    crate::condition::Operator::NotEqual,
506                )
507                .unwrap(),
508            ),
509            payload: Some(Script::try_build(engine, &"10".to_value()).unwrap()),
510            ..Default::default()
511        };
512
513        let context = Context::new();
514
515        let result = step.execute(&context).await.unwrap();
516
517        assert_eq!(result.next_step, NextStep::Next);
518        assert_eq!(result.output, Some(Value::from(10i64)));
519    }
520
521    #[tokio::test]
522    async fn test_step_execute_with_condition_then_case() {
523        let engine = build_engine(None);
524        let step = StepWorker {
525            id: ID::new(),
526            condition: Some(
527                Condition::try_build_with_operator(
528                    engine.clone(),
529                    "10".to_string(),
530                    "20".to_string(),
531                    crate::condition::Operator::NotEqual,
532                )
533                .unwrap(),
534            ),
535            payload: Some(Script::try_build(engine, &"10".to_value()).unwrap()),
536            then_case: Some(0),
537            ..Default::default()
538        };
539
540        let context = Context::new();
541
542        let result = step.execute(&context).await.unwrap();
543
544        assert_eq!(result.next_step, NextStep::Pipeline(0));
545        assert_eq!(result.output, Some(Value::from(10i64)));
546    }
547
548    #[tokio::test]
549    async fn test_step_execute_with_condition_else_case() {
550        let engine = build_engine(None);
551        let step = StepWorker {
552            id: ID::new(),
553            condition: Some(
554                Condition::try_build_with_operator(
555                    engine.clone(),
556                    "10".to_string(),
557                    "20".to_string(),
558                    crate::condition::Operator::Equal,
559                )
560                .unwrap(),
561            ),
562            payload: Some(Script::try_build(engine.clone(), &"10".to_value()).unwrap()),
563            else_case: Some(1),
564            ..Default::default()
565        };
566
567        let context = Context::new();
568
569        let result = step.execute(&context).await.unwrap();
570
571        assert_eq!(result.next_step, NextStep::Pipeline(1));
572        assert_eq!(result.output, None);
573    }
574
575    #[tokio::test]
576    async fn test_step_execute_with_return_case() {
577        let engine = build_engine(None);
578        let step = StepWorker {
579            id: ID::new(),
580            return_case: Some(Script::try_build(engine.clone(), &"10".to_value()).unwrap()),
581            ..Default::default()
582        };
583
584        let context = Context::new();
585
586        let result = step.execute(&context).await.unwrap();
587
588        assert_eq!(result.next_step, NextStep::Stop);
589        assert_eq!(result.output, Some(Value::from(10i64)));
590    }
591
592    #[tokio::test]
593    async fn test_step_execute_with_return_case_and_payload() {
594        let engine = build_engine(None);
595        let step = StepWorker {
596            id: ID::new(),
597            payload: Some(Script::try_build(engine.clone(), &"10".to_value()).unwrap()),
598            return_case: Some(Script::try_build(engine.clone(), &"20".to_value()).unwrap()),
599            ..Default::default()
600        };
601
602        let context = Context::new();
603
604        let result = step.execute(&context).await.unwrap();
605
606        assert_eq!(result.next_step, NextStep::Stop);
607        assert_eq!(result.output, Some(Value::from(20i64)));
608    }
609
610    #[tokio::test]
611    async fn test_step_execute_with_return_case_and_condition() {
612        let engine = build_engine(None);
613        let step = StepWorker {
614            id: ID::new(),
615            condition: Some(
616                Condition::try_build_with_operator(
617                    engine.clone(),
618                    "10".to_string(),
619                    "20".to_string(),
620                    crate::condition::Operator::Equal,
621                )
622                .unwrap(),
623            ),
624            return_case: Some(Script::try_build(engine.clone(), &"10".to_value()).unwrap()),
625            ..Default::default()
626        };
627
628        let context = Context::new();
629
630        let result = step.execute(&context).await.unwrap();
631
632        assert_eq!(result.next_step, NextStep::Stop);
633        assert_eq!(result.output, Some(Value::from(10i64)));
634    }
635
636    #[tokio::test]
637    async fn test_step_execute_with_return_case_and_condition_then_case() {
638        let engine = build_engine(None);
639        let step = StepWorker {
640            id: ID::new(),
641            condition: Some(
642                Condition::try_build_with_operator(
643                    engine.clone(),
644                    "10".to_string(),
645                    "20".to_string(),
646                    crate::condition::Operator::Equal,
647                )
648                .unwrap(),
649            ),
650            then_case: Some(0),
651            return_case: Some(Script::try_build(engine.clone(), &"Ok".to_value()).unwrap()),
652            ..Default::default()
653        };
654
655        let context = Context::new();
656        let output = step.execute(&context).await.unwrap();
657
658        assert_eq!(output.next_step, NextStep::Stop);
659        assert_eq!(output.output, Some(Value::from("Ok")));
660    }
661
662    #[tokio::test]
663    async fn test_step_execute_with_return_case_and_condition_else_case() {
664        let engine = build_engine(None);
665        let step = StepWorker {
666            id: ID::new(),
667            condition: Some(
668                Condition::try_build_with_operator(
669                    engine.clone(),
670                    "10".to_string(),
671                    "20".to_string(),
672                    crate::condition::Operator::Equal,
673                )
674                .unwrap(),
675            ),
676            else_case: Some(0),
677            return_case: Some(Script::try_build(engine.clone(), &"Ok".to_value()).unwrap()),
678            ..Default::default()
679        };
680
681        let context = Context::new();
682        let result = step.execute(&context).await.unwrap();
683
684        assert_eq!(result.next_step, NextStep::Stop);
685        assert_eq!(result.output, Some(Value::from("Ok")));
686    }
687}