phlow_engine/
step_worker.rs

1use crate::{
2    condition::{Condition, ConditionError},
3    context::Context,
4    id::ID,
5    script::Script,
6};
7use once_cell::sync::Lazy;
8use phlow_sdk::{
9    prelude::{log::debug, *},
10    tracing::field,
11};
12use rhai::Engine;
13use serde::Serialize;
14use std::{fmt::Display, sync::Arc};
15
16static PHLOW_TRUNCATE_SPAN_VALUE: Lazy<usize> =
17    Lazy::new(|| match std::env::var("PHLOW_TRUNCATE_SPAN_VALUE") {
18        Ok(value) => value.parse::<usize>().unwrap_or(100),
19        Err(_) => 100,
20    });
21
22#[derive(Debug)]
23pub enum StepWorkerError {
24    ConditionError(ConditionError),
25    PayloadError(phs::ScriptError),
26    ModulesError(ModulesError),
27    InputError(phs::ScriptError),
28}
29
30impl Display for StepWorkerError {
31    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
32        match self {
33            StepWorkerError::ConditionError(err) => write!(f, "Condition error: {}", err),
34            StepWorkerError::PayloadError(err) => write!(f, "Payload error: {}", err),
35            StepWorkerError::ModulesError(err) => write!(f, "Modules error: {}", err),
36            StepWorkerError::InputError(err) => write!(f, "Input error: {}", err),
37        }
38    }
39}
40
41impl std::error::Error for StepWorkerError {
42    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
43        match self {
44            StepWorkerError::ConditionError(err) => Some(err),
45            StepWorkerError::PayloadError(_) => None, // ScriptError doesn't implement std::error::Error
46            StepWorkerError::ModulesError(_) => None, // ModulesError doesn't implement std::error::Error
47            StepWorkerError::InputError(_) => None, // ScriptError doesn't implement std::error::Error
48        }
49    }
50}
51
52#[derive(Debug, Clone, PartialEq, Serialize)]
53pub enum NextStep {
54    Pipeline(usize),
55    GoToStep(StepReference),
56    Stop,
57    Next,
58}
59
60#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, ToValue)]
61pub struct StepReference {
62    pub pipeline: usize,
63    pub step: usize,
64}
65
66#[derive(Debug)]
67pub struct StepOutput {
68    pub next_step: NextStep,
69    pub output: Option<Value>,
70}
71
72#[derive(Debug, Clone, Default)]
73pub struct StepWorker {
74    pub(crate) id: ID,
75    pub(crate) label: Option<String>,
76    pub(crate) module: Option<String>,
77    pub(crate) condition: Option<Condition>,
78    pub(crate) input: Option<Script>,
79    pub(crate) payload: Option<Script>,
80    pub(crate) then_case: Option<usize>,
81    pub(crate) else_case: Option<usize>,
82    pub(crate) modules: Arc<Modules>,
83    pub(crate) return_case: Option<Script>,
84    pub(crate) to: Option<StepReference>,
85    #[cfg(debug_assertions)]
86    pub(crate) step_raw: String,
87}
88
89impl StepWorker {
90    pub fn try_from_value(
91        engine: Arc<Engine>,
92        modules: Arc<Modules>,
93        value: &Value,
94    ) -> Result<Self, StepWorkerError> {
95        log::debug!(
96            "Parsing step worker from value: {}",
97            value.to_json(JsonMode::Indented)
98        );
99
100        let id = match value.get("id") {
101            Some(id) => ID::from(id),
102            None => ID::new(),
103        };
104        let label: Option<String> = match value.get("label") {
105            Some(label) => Some(label.as_string()),
106            None => None,
107        };
108        let condition = {
109            if let Some(condition) = value
110                .get("condition")
111                .map(|condition| Condition::try_from_value(engine.clone(), condition))
112            {
113                Some(condition.map_err(StepWorkerError::ConditionError)?)
114            } else {
115                if let Some(condition) = value.get("assert").map(|assert| {
116                    Condition::try_build_with_assert(engine.clone(), assert.to_string())
117                }) {
118                    Some(condition.map_err(StepWorkerError::ConditionError)?)
119                } else {
120                    None
121                }
122            }
123        };
124        let payload = match value.get("payload") {
125            Some(payload) => match Script::try_build(engine.clone(), payload) {
126                Ok(payload) => Some(payload),
127                Err(err) => return Err(StepWorkerError::PayloadError(err)),
128            },
129            None => None,
130        };
131        let input = match value.get("input") {
132            Some(input) => match Script::try_build(engine.clone(), input) {
133                Ok(input) => Some(input),
134                Err(err) => return Err(StepWorkerError::InputError(err)),
135            },
136            None => None,
137        };
138        let then_case = match value.get("then") {
139            Some(then_case) => match then_case.to_u64() {
140                Some(then_case) => Some(then_case as usize),
141                None => None,
142            },
143            None => None,
144        };
145        let else_case = match value.get("else") {
146            Some(else_case) => match else_case.to_u64() {
147                Some(else_case) => Some(else_case as usize),
148                None => None,
149            },
150            None => None,
151        };
152        let return_case = match value.get("return") {
153            Some(return_case) => match Script::try_build(engine, return_case) {
154                Ok(return_case) => Some(return_case),
155                Err(err) => return Err(StepWorkerError::PayloadError(err)),
156            },
157            None => None,
158        };
159        let module = match value.get("use") {
160            Some(module) => Some(module.to_string()),
161            None => None,
162        };
163
164        let to = match value.get("to") {
165            Some(to_step) => match to_step.as_object() {
166                Some(to_step) => {
167                    let pipeline = to_step.get("pipeline").and_then(|v| v.to_u64());
168                    let step = to_step.get("step").and_then(|v| v.to_u64());
169
170                    if pipeline.is_some() && step.is_some() {
171                        Some(StepReference {
172                            pipeline: pipeline.unwrap() as usize,
173                            step: step.unwrap() as usize,
174                        })
175                    } else {
176                        None
177                    }
178                }
179                None => None,
180            },
181            None => None,
182        };
183
184        #[cfg(debug_assertions)]
185        let step_raw = value.to_string();
186
187        Ok(Self {
188            id,
189            label,
190            module,
191            input,
192            condition,
193            payload,
194            then_case,
195            else_case,
196            modules,
197            return_case,
198            to,
199            #[cfg(debug_assertions)]
200            step_raw,
201        })
202    }
203
204    pub fn get_id(&self) -> &ID {
205        &self.id
206    }
207
208    fn evaluate_payload(
209        &self,
210        context: &Context,
211        default: Option<Value>,
212    ) -> Result<Option<Value>, StepWorkerError> {
213        if let Some(ref payload) = self.payload {
214            let value = Some(
215                payload
216                    .evaluate(context)
217                    .map_err(StepWorkerError::PayloadError)?,
218            );
219            Ok(value)
220        } else {
221            Ok(default)
222        }
223    }
224
225    fn evaluate_input(&self, context: &Context) -> Result<Option<Value>, StepWorkerError> {
226        if let Some(ref input) = self.input {
227            let value = Some(
228                input
229                    .evaluate(context)
230                    .map_err(StepWorkerError::InputError)?,
231            );
232            Ok(value)
233        } else {
234            Ok(None)
235        }
236    }
237
238    fn evaluate_return(&self, context: &Context) -> Result<Option<Value>, StepWorkerError> {
239        if let Some(ref return_case) = self.return_case {
240            let value = Some(
241                return_case
242                    .evaluate(context)
243                    .map_err(StepWorkerError::PayloadError)?,
244            );
245
246            #[cfg(debug_assertions)]
247            log::debug!(
248                "Evaluating return case for step {}: {}",
249                self.id,
250                value.as_ref().map_or("None".to_string(), |v| v.to_string())
251            );
252
253            Ok(value)
254        } else {
255            Ok(None)
256        }
257    }
258
259    async fn evaluate_module(
260        &self,
261        context: &Context,
262    ) -> Result<Option<(Option<String>, Option<Value>, Context)>, StepWorkerError> {
263        if let Some(ref module) = self.module {
264            let input = self.evaluate_input(context)?;
265
266            let context = if let Some(input) = &input {
267                context.clone_with_input(input.clone())
268            } else {
269                context.clone()
270            };
271
272            match self
273                .modules
274                .execute(module, &context.get_input(), &context.get_payload())
275                .await
276            {
277                Ok(response) => {
278                    #[cfg(debug_assertions)]
279                    log::debug!("Module response for step {}: {:?}", self.id, response);
280
281                    if let Some(err) = response.error {
282                        return Err(StepWorkerError::ModulesError(ModulesError::ModuleError(
283                            err,
284                        )));
285                    }
286
287                    Ok(Some((Some(module.clone()), Some(response.data), context)))
288                }
289                Err(err) => Err(StepWorkerError::ModulesError(err)),
290            }
291        } else {
292            Ok(None)
293        }
294    }
295
296    pub async fn execute(&self, context: &Context) -> Result<StepOutput, StepWorkerError> {
297        #[cfg(debug_assertions)]
298        log::debug!(
299            "Entering step: {}, with: \n\tmain={:?}\n\tpayload={:?}\n\tsetup={:?}",
300            self.step_raw,
301            &context.get_main().to_value().to_string(),
302            &context.get_payload().to_value().to_string(),
303            &context.get_setup().to_value().to_string()
304        );
305
306        let span = tracing::info_span!(
307            "step",
308            otel.name = field::Empty,
309            context.main = field::Empty,
310            context.params = field::Empty,
311            context.payload = field::Empty,
312            context.input = field::Empty,
313            step.id = field::Empty,
314            step.label = field::Empty,
315            step.module = field::Empty,
316            step.condition = field::Empty,
317            step.payload = field::Empty,
318            step.return = field::Empty,
319        );
320        let _guard = span.enter();
321
322        {
323            let step_name = self.label.clone().unwrap_or(self.id.to_string());
324
325            span.record("otel.name", format!("step {}", step_name));
326
327            if let Some(ref input) = context.get_input() {
328                span.record("context.input", input.to_string());
329            }
330
331            if let Some(ref payload) = context.get_payload() {
332                span.record("context.payload", truncate_string(&payload));
333            }
334
335            if let Some(ref main) = context.get_main() {
336                span.record("context.main", truncate_string(&main));
337            }
338
339            span.record("step.id", self.id.to_string());
340
341            if let Some(ref label) = self.label {
342                span.record("step.label", label.to_string());
343            }
344        }
345
346        if let Some(output) = self.evaluate_return(context)? {
347            debug!(
348                "[step {}] return case acionado (condicional de parada)",
349                self.id
350            );
351            {
352                span.record("step.return", output.to_string());
353            }
354
355            return Ok(StepOutput {
356                next_step: NextStep::Stop,
357                output: Some(output),
358            });
359        }
360
361        if let Some((module, output, context)) = self.evaluate_module(context).await? {
362            debug!(
363                "[step {}] módulo '{}' executado; output inicial {:?}",
364                self.id,
365                module.as_deref().unwrap_or("<none>"),
366                output
367            );
368            {
369                span.record("step.module", module.clone());
370
371                if let Some(ref output) = output {
372                    span.record("context.payload", truncate_string(output));
373                }
374            }
375
376            let context = if let Some(output) = output.clone() {
377                debug!(
378                    "[step {}] definindo output no contexto após execução do módulo",
379                    self.id
380                );
381                context.clone_with_output(output)
382            } else {
383                context.clone()
384            };
385
386            let output = self.evaluate_payload(&context, output)?;
387
388            if let Some(to) = &self.to {
389                debug!(
390                    "[step {}] condição 'to' detectada após módulo -> pipeline {}, step {}",
391                    self.id, to.pipeline, to.step
392                );
393                debug!(
394                    "Define switching to step {} in pipeline {}",
395                    to.step, to.pipeline
396                );
397                return Ok(StepOutput {
398                    next_step: NextStep::GoToStep(to.clone()),
399                    output,
400                });
401            }
402
403            debug!("[step {}] seguindo para próximo step após módulo", self.id);
404            return Ok(StepOutput {
405                next_step: NextStep::Next,
406                output,
407            });
408        }
409
410        if let Some(condition) = &self.condition {
411            debug!("[step {}] avaliando condição", self.id);
412            let (next_step, output) = if condition
413                .evaluate(context)
414                .map_err(StepWorkerError::ConditionError)?
415            {
416                debug!("[step {}] condição verdadeira", self.id);
417                let next_step = if let Some(ref then_case) = self.then_case {
418                    debug!("[step {}] then_case -> pipeline {}", self.id, then_case);
419                    NextStep::Pipeline(*then_case)
420                } else {
421                    debug!("[step {}] then_case não definido -> Next", self.id);
422                    NextStep::Next
423                };
424
425                (next_step, self.evaluate_payload(context, None)?)
426            } else {
427                debug!("[step {}] condição falsa", self.id);
428                let next_step = if let Some(ref else_case) = self.else_case {
429                    debug!("[step {}] else_case -> pipeline {}", self.id, else_case);
430                    NextStep::Pipeline(*else_case)
431                } else {
432                    debug!("[step {}] else_case não definido -> Next", self.id);
433                    NextStep::Next
434                };
435
436                (next_step, None)
437            };
438
439            {
440                span.record("step.condition", condition.raw.to_string());
441
442                if let Some(ref output) = output {
443                    span.record("context.payload", truncate_string(output));
444                }
445            }
446
447            return Ok(StepOutput { next_step, output });
448        }
449
450        let output = self.evaluate_payload(context, None)?;
451
452        {
453            if let Some(ref output) = output {
454                span.record("context.payload", truncate_string(output));
455            }
456        }
457
458        if let Some(to) = &self.to {
459            debug!(
460                "[step {}] condição 'to' detectada (sem módulo) -> pipeline {}, step {}",
461                self.id, to.pipeline, to.step
462            );
463            debug!(
464                "Define switching to step {} in pipeline {}",
465                to.step, to.pipeline
466            );
467            return Ok(StepOutput {
468                next_step: NextStep::GoToStep(to.clone()),
469                output,
470            });
471        }
472
473        debug!("[step {}] nenhuma condição especial -> Next", self.id);
474        return Ok(StepOutput {
475            next_step: NextStep::Next,
476            output,
477        });
478    }
479}
480
481fn truncate_string(string: &Value) -> String {
482    let limit = *PHLOW_TRUNCATE_SPAN_VALUE;
483    let string = string.to_string();
484    if string.len() > limit {
485        format!("{}...", &string[..limit])
486    } else {
487        string.to_string()
488    }
489}
490
491#[cfg(test)]
492mod test {
493    use super::*;
494    use phlow_sdk::valu3;
495    use phs::build_engine;
496    use valu3::prelude::ToValueBehavior;
497    use valu3::value::Value;
498
499    #[tokio::test]
500    async fn test_step_get_reference_id() {
501        let step = StepWorker {
502            id: ID::from("id"),
503            label: Some("label".to_string()),
504            ..Default::default()
505        };
506
507        assert_eq!(step.get_id(), &ID::from("id"));
508    }
509
510    #[tokio::test]
511    async fn test_step_execute() {
512        let engine = build_engine(None);
513        let step = StepWorker {
514            payload: Some(Script::try_build(engine, &"10".to_value()).unwrap()),
515            ..Default::default()
516        };
517
518        let context = Context::new();
519
520        let result = step.execute(&context).await.unwrap();
521
522        assert_eq!(result.next_step, NextStep::Next);
523        assert_eq!(result.output, Some(Value::from(10i64)));
524    }
525
526    #[tokio::test]
527    async fn test_step_execute_with_condition() {
528        let engine = build_engine(None);
529        let step = StepWorker {
530            id: ID::new(),
531            condition: Some(
532                Condition::try_build_with_operator(
533                    engine.clone(),
534                    "10".to_string(),
535                    "20".to_string(),
536                    crate::condition::Operator::NotEqual,
537                )
538                .unwrap(),
539            ),
540            payload: Some(Script::try_build(engine, &"10".to_value()).unwrap()),
541            ..Default::default()
542        };
543
544        let context = Context::new();
545
546        let result = step.execute(&context).await.unwrap();
547
548        assert_eq!(result.next_step, NextStep::Next);
549        assert_eq!(result.output, Some(Value::from(10i64)));
550    }
551
552    #[tokio::test]
553    async fn test_step_execute_with_condition_then_case() {
554        let engine = build_engine(None);
555        let step = StepWorker {
556            id: ID::new(),
557            condition: Some(
558                Condition::try_build_with_operator(
559                    engine.clone(),
560                    "10".to_string(),
561                    "20".to_string(),
562                    crate::condition::Operator::NotEqual,
563                )
564                .unwrap(),
565            ),
566            payload: Some(Script::try_build(engine, &"10".to_value()).unwrap()),
567            then_case: Some(0),
568            ..Default::default()
569        };
570
571        let context = Context::new();
572
573        let result = step.execute(&context).await.unwrap();
574
575        assert_eq!(result.next_step, NextStep::Pipeline(0));
576        assert_eq!(result.output, Some(Value::from(10i64)));
577    }
578
579    #[tokio::test]
580    async fn test_step_execute_with_condition_else_case() {
581        let engine = build_engine(None);
582        let step = StepWorker {
583            id: ID::new(),
584            condition: Some(
585                Condition::try_build_with_operator(
586                    engine.clone(),
587                    "10".to_string(),
588                    "20".to_string(),
589                    crate::condition::Operator::Equal,
590                )
591                .unwrap(),
592            ),
593            payload: Some(Script::try_build(engine.clone(), &"10".to_value()).unwrap()),
594            else_case: Some(1),
595            ..Default::default()
596        };
597
598        let context = Context::new();
599
600        let result = step.execute(&context).await.unwrap();
601
602        assert_eq!(result.next_step, NextStep::Pipeline(1));
603        assert_eq!(result.output, None);
604    }
605
606    #[tokio::test]
607    async fn test_step_execute_with_return_case() {
608        let engine = build_engine(None);
609        let step = StepWorker {
610            id: ID::new(),
611            return_case: Some(Script::try_build(engine.clone(), &"10".to_value()).unwrap()),
612            ..Default::default()
613        };
614
615        let context = Context::new();
616
617        let result = step.execute(&context).await.unwrap();
618
619        assert_eq!(result.next_step, NextStep::Stop);
620        assert_eq!(result.output, Some(Value::from(10i64)));
621    }
622
623    #[tokio::test]
624    async fn test_step_execute_with_return_case_and_payload() {
625        let engine = build_engine(None);
626        let step = StepWorker {
627            id: ID::new(),
628            payload: Some(Script::try_build(engine.clone(), &"10".to_value()).unwrap()),
629            return_case: Some(Script::try_build(engine.clone(), &"20".to_value()).unwrap()),
630            ..Default::default()
631        };
632
633        let context = Context::new();
634
635        let result = step.execute(&context).await.unwrap();
636
637        assert_eq!(result.next_step, NextStep::Stop);
638        assert_eq!(result.output, Some(Value::from(20i64)));
639    }
640
641    #[tokio::test]
642    async fn test_step_execute_with_return_case_and_condition() {
643        let engine = build_engine(None);
644        let step = StepWorker {
645            id: ID::new(),
646            condition: Some(
647                Condition::try_build_with_operator(
648                    engine.clone(),
649                    "10".to_string(),
650                    "20".to_string(),
651                    crate::condition::Operator::Equal,
652                )
653                .unwrap(),
654            ),
655            return_case: Some(Script::try_build(engine.clone(), &"10".to_value()).unwrap()),
656            ..Default::default()
657        };
658
659        let context = Context::new();
660
661        let result = step.execute(&context).await.unwrap();
662
663        assert_eq!(result.next_step, NextStep::Stop);
664        assert_eq!(result.output, Some(Value::from(10i64)));
665    }
666
667    #[tokio::test]
668    async fn test_step_execute_with_return_case_and_condition_then_case() {
669        let engine = build_engine(None);
670        let step = StepWorker {
671            id: ID::new(),
672            condition: Some(
673                Condition::try_build_with_operator(
674                    engine.clone(),
675                    "10".to_string(),
676                    "20".to_string(),
677                    crate::condition::Operator::Equal,
678                )
679                .unwrap(),
680            ),
681            then_case: Some(0),
682            return_case: Some(Script::try_build(engine.clone(), &"Ok".to_value()).unwrap()),
683            ..Default::default()
684        };
685
686        let context = Context::new();
687        let output = step.execute(&context).await.unwrap();
688
689        assert_eq!(output.next_step, NextStep::Stop);
690        assert_eq!(output.output, Some(Value::from("Ok")));
691    }
692
693    #[tokio::test]
694    async fn test_step_execute_with_return_case_and_condition_else_case() {
695        let engine = build_engine(None);
696        let step = StepWorker {
697            id: ID::new(),
698            condition: Some(
699                Condition::try_build_with_operator(
700                    engine.clone(),
701                    "10".to_string(),
702                    "20".to_string(),
703                    crate::condition::Operator::Equal,
704                )
705                .unwrap(),
706            ),
707            else_case: Some(0),
708            return_case: Some(Script::try_build(engine.clone(), &"Ok".to_value()).unwrap()),
709            ..Default::default()
710        };
711
712        let context = Context::new();
713        let result = step.execute(&context).await.unwrap();
714
715        assert_eq!(result.next_step, NextStep::Stop);
716        assert_eq!(result.output, Some(Value::from("Ok")));
717    }
718}