phlow_engine/
step_worker.rs

1use crate::{
2    condition::{Condition, ConditionError},
3    context::Context,
4    debug::debug_controller,
5    id::ID,
6    script::Script,
7};
8use once_cell::sync::Lazy;
9use phlow_sdk::{
10    prelude::{log::debug, *},
11    tracing::field,
12};
13use rhai::Engine;
14use serde::Serialize;
15use std::{fmt::Display, sync::Arc};
16use uuid::Uuid;
17
18static PHLOW_TRUNCATE_SPAN_VALUE: Lazy<usize> =
19    Lazy::new(|| match std::env::var("PHLOW_TRUNCATE_SPAN_VALUE") {
20        Ok(value) => value.parse::<usize>().unwrap_or(100),
21        Err(_) => 100,
22    });
23
24#[derive(Debug)]
25pub enum StepWorkerError {
26    ConditionError(ConditionError),
27    PayloadError(phs::ScriptError),
28    ModulesError(ModulesError),
29    InputError(phs::ScriptError),
30    LogError(phs::ScriptError),
31}
32
33impl Display for StepWorkerError {
34    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
35        match self {
36            StepWorkerError::ConditionError(err) => write!(f, "Condition error: {}", err),
37            StepWorkerError::PayloadError(err) => write!(f, "Payload error: {}", err),
38            StepWorkerError::ModulesError(err) => write!(f, "Modules error: {}", err),
39            StepWorkerError::InputError(err) => write!(f, "Input error: {}", err),
40            StepWorkerError::LogError(err) => write!(f, "Log error: {}", err),
41        }
42    }
43}
44
45impl std::error::Error for StepWorkerError {
46    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
47        match self {
48            StepWorkerError::ConditionError(err) => Some(err),
49            StepWorkerError::PayloadError(_) => None, // ScriptError doesn't implement std::error::Error
50            StepWorkerError::ModulesError(_) => None, // ModulesError doesn't implement std::error::Error
51            StepWorkerError::InputError(_) => None, // ScriptError doesn't implement std::error::Error
52            StepWorkerError::LogError(_) => None, // ScriptError doesn't implement std::error::Error
53        }
54    }
55}
56
57#[derive(Debug, Clone, Copy)]
58enum LogLevel {
59    Info,
60    Debug,
61    Warn,
62    Error,
63    Trace,
64}
65
66impl LogLevel {
67    fn from_str(level: &str) -> Self {
68        match level.to_ascii_lowercase().as_str() {
69            "debug" => LogLevel::Debug,
70            "warn" | "warning" => LogLevel::Warn,
71            "error" => LogLevel::Error,
72            "trace" => LogLevel::Trace,
73            _ => LogLevel::Info,
74        }
75    }
76
77    fn log(self, message: &str) {
78        match self {
79            LogLevel::Info => log::info!("{}", message),
80            LogLevel::Debug => log::debug!("{}", message),
81            LogLevel::Warn => log::warn!("{}", message),
82            LogLevel::Error => log::error!("{}", message),
83            LogLevel::Trace => log::trace!("{}", message),
84        }
85    }
86}
87
88#[derive(Debug, Clone)]
89pub(crate) struct LogStep {
90    level: LogLevel,
91    message: Option<Script>,
92}
93
94#[derive(Debug, Clone, PartialEq, Serialize)]
95pub enum NextStep {
96    Pipeline(usize),
97    GoToStep(StepReference),
98    Stop,
99    Next,
100}
101
102#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, ToValue)]
103pub struct StepReference {
104    pub pipeline: usize,
105    pub step: usize,
106}
107
108#[derive(Debug)]
109pub struct StepOutput {
110    pub next_step: NextStep,
111    pub output: Option<Value>,
112}
113
114#[derive(Debug, Clone, Default)]
115pub struct StepWorker {
116    pub(crate) id: ID,
117    pub(crate) label: Option<String>,
118    pub(crate) module: Option<String>,
119    pub(crate) condition: Option<Condition>,
120    pub(crate) input: Option<Script>,
121    pub(crate) payload: Option<Script>,
122    pub(crate) then_case: Option<usize>,
123    pub(crate) else_case: Option<usize>,
124    pub(crate) modules: Arc<Modules>,
125    pub(crate) return_case: Option<Script>,
126    pub(crate) to: Option<StepReference>,
127    pub(crate) log: Option<LogStep>,
128    pub(crate) step_value: Option<Value>,
129    #[cfg(debug_assertions)]
130    pub(crate) step_raw: String,
131}
132
133impl StepWorker {
134    pub fn try_from_value(
135        engine: Arc<Engine>,
136        modules: Arc<Modules>,
137        value: &Value,
138    ) -> Result<Self, StepWorkerError> {
139        log::debug!(
140            "Parsing step worker from value: {}",
141            value.to_json(JsonMode::Indented)
142        );
143
144        let id = match value.get("id") {
145            Some(id) => ID::from(id),
146            None => ID::new(),
147        };
148        let label: Option<String> = match value.get("label") {
149            Some(label) => Some(label.as_string()),
150            None => None,
151        };
152        let condition = {
153            if let Some(condition) = value
154                .get("condition")
155                .map(|condition| Condition::try_from_value(engine.clone(), condition))
156            {
157                Some(condition.map_err(StepWorkerError::ConditionError)?)
158            } else {
159                if let Some(condition) = value.get("assert").map(|assert| {
160                    Condition::try_build_with_assert(engine.clone(), assert.to_string())
161                }) {
162                    Some(condition.map_err(StepWorkerError::ConditionError)?)
163                } else {
164                    None
165                }
166            }
167        };
168        let payload = match value.get("payload") {
169            Some(payload) => match Script::try_build(engine.clone(), payload) {
170                Ok(payload) => Some(payload),
171                Err(err) => return Err(StepWorkerError::PayloadError(err)),
172            },
173            None => None,
174        };
175        let input = match value.get("input") {
176            Some(input) => match Script::try_build(engine.clone(), input) {
177                Ok(input) => Some(input),
178                Err(err) => return Err(StepWorkerError::InputError(err)),
179            },
180            None => None,
181        };
182        let then_case = match value.get("then") {
183            Some(then_case) => match then_case.to_u64() {
184                Some(then_case) => Some(then_case as usize),
185                None => None,
186            },
187            None => None,
188        };
189        let else_case = match value.get("else") {
190            Some(else_case) => match else_case.to_u64() {
191                Some(else_case) => Some(else_case as usize),
192                None => None,
193            },
194            None => None,
195        };
196        let return_case = match value.get("return") {
197            Some(return_case) => match Script::try_build(engine.clone(), return_case) {
198                Ok(return_case) => Some(return_case),
199                Err(err) => return Err(StepWorkerError::PayloadError(err)),
200            },
201            None => None,
202        };
203        let module = match value.get("use") {
204            Some(module) => Some(module.to_string()),
205            None => None,
206        };
207        let log = build_log_step(engine.clone(), value, module.as_deref())?;
208
209        let to = match value.get("to") {
210            Some(to_step) => match to_step.as_object() {
211                Some(to_step) => {
212                    let pipeline = to_step.get("pipeline").and_then(|v| v.to_u64());
213                    let step = to_step.get("step").and_then(|v| v.to_u64());
214
215                    if pipeline.is_some() && step.is_some() {
216                        Some(StepReference {
217                            pipeline: pipeline.unwrap() as usize,
218                            step: step.unwrap() as usize,
219                        })
220                    } else {
221                        None
222                    }
223                }
224                None => None,
225            },
226            None => None,
227        };
228
229        let mut step_value = value.clone();
230        if Self::should_add_uuid() {
231            if let Some(obj) = step_value.as_object_mut() {
232                if !obj.contains_key(&"#uuid".to_string()) {
233                    obj.insert("#uuid".to_string(), Uuid::new_v4().to_string().to_value());
234                }
235            }
236        }
237        #[cfg(debug_assertions)]
238        let step_raw = value.to_string();
239
240        Ok(Self {
241            id,
242            label,
243            module,
244            input,
245            condition,
246            payload,
247            then_case,
248            else_case,
249            modules,
250            return_case,
251            to,
252            log,
253            step_value: Some(step_value),
254            #[cfg(debug_assertions)]
255            step_raw,
256        })
257    }
258
259    pub fn get_id(&self) -> &ID {
260        &self.id
261    }
262
263    pub(crate) fn compiled_debug(&self) -> Value {
264        let mut map = std::collections::HashMap::new();
265        if let Some(payload) = &self.payload {
266            map.insert("payload".to_string(), payload.compiled_debug());
267        }
268        if let Some(input) = &self.input {
269            map.insert("input".to_string(), input.compiled_debug());
270        }
271        if let Some(return_case) = &self.return_case {
272            map.insert("return".to_string(), return_case.compiled_debug());
273        }
274        if let Some(condition) = &self.condition {
275            map.insert("condition".to_string(), condition.expression.compiled_debug());
276        }
277        if let Some(log_step) = &self.log {
278            if let Some(message) = &log_step.message {
279                map.insert("log".to_string(), message.compiled_debug());
280            }
281        }
282        map.to_value()
283    }
284
285    fn should_add_uuid() -> bool {
286        if debug_controller().is_some() {
287            return true;
288        }
289        std::env::var("PHLOW_DEBUG")
290            .map(|value| value.eq_ignore_ascii_case("true"))
291            .unwrap_or(false)
292    }
293
294    fn evaluate_payload(
295        &self,
296        context: &Context,
297        default: Option<Value>,
298    ) -> Result<Option<Value>, StepWorkerError> {
299        if let Some(ref payload) = self.payload {
300            let value = Some(
301                payload
302                    .evaluate(context)
303                    .map_err(StepWorkerError::PayloadError)?,
304            );
305            Ok(value)
306        } else {
307            Ok(default)
308        }
309    }
310
311    fn evaluate_input(&self, context: &Context) -> Result<Option<Value>, StepWorkerError> {
312        if let Some(ref input) = self.input {
313            let value = Some(
314                input
315                    .evaluate(context)
316                    .map_err(StepWorkerError::InputError)?,
317            );
318            Ok(value)
319        } else {
320            Ok(None)
321        }
322    }
323
324    fn evaluate_return(&self, context: &Context) -> Result<Option<Value>, StepWorkerError> {
325        if let Some(ref return_case) = self.return_case {
326            let value = Some(
327                return_case
328                    .evaluate(context)
329                    .map_err(StepWorkerError::PayloadError)?,
330            );
331
332            #[cfg(debug_assertions)]
333            log::debug!(
334                "Evaluating return case for step {}: {}",
335                self.id,
336                value.as_ref().map_or("None".to_string(), |v| v.to_string())
337            );
338
339            Ok(value)
340        } else {
341            Ok(None)
342        }
343    }
344
345    async fn evaluate_module(
346        &self,
347        context: &Context,
348    ) -> Result<Option<(Option<String>, Option<Value>, Context)>, StepWorkerError> {
349        if self.module.as_deref() == Some("log") {
350            return Ok(None);
351        }
352
353        if let Some(ref module) = self.module {
354            let input = self.evaluate_input(context)?;
355
356            let context = if let Some(input) = &input {
357                context.clone_with_input(input.clone())
358            } else {
359                context.clone()
360            };
361
362            match self
363                .modules
364                .execute(module, &context.get_input(), &context.get_payload())
365                .await
366            {
367                Ok(response) => {
368                    #[cfg(debug_assertions)]
369                    log::debug!("Module response for step {}: {:?}", self.id, response);
370
371                    if let Some(err) = response.error {
372                        return Err(StepWorkerError::ModulesError(ModulesError::ModuleError(
373                            err,
374                        )));
375                    }
376
377                    Ok(Some((Some(module.clone()), Some(response.data), context)))
378                }
379                Err(err) => Err(StepWorkerError::ModulesError(err)),
380            }
381        } else {
382            Ok(None)
383        }
384    }
385
386    pub async fn execute(&self, context: &Context) -> Result<StepOutput, StepWorkerError> {
387        #[cfg(debug_assertions)]
388        log::debug!(
389            "Entering step: {}, with: \n\tmain={:?}\n\tpayload={:?}\n\tsetup={:?}",
390            self.step_raw,
391            &context.get_main().to_value().to_string(),
392            &context.get_payload().to_value().to_string(),
393            &context.get_setup().to_value().to_string()
394        );
395
396        let span = tracing::info_span!(
397            "step",
398            otel.name = field::Empty,
399            context.main = field::Empty,
400            context.params = field::Empty,
401            context.payload = field::Empty,
402            context.input = field::Empty,
403            step.id = field::Empty,
404            step.label = field::Empty,
405            step.module = field::Empty,
406            step.condition = field::Empty,
407            step.payload = field::Empty,
408            step.return = field::Empty,
409        );
410        let _guard = span.enter();
411
412        {
413            let step_name = self.label.clone().unwrap_or(self.id.to_string());
414
415            span.record("otel.name", format!("step {}", step_name));
416
417            if let Some(ref input) = context.get_input() {
418                span.record("context.input", input.to_string());
419            }
420
421            if let Some(ref payload) = context.get_payload() {
422                span.record("context.payload", truncate_string(&payload));
423            }
424
425            if let Some(ref main) = context.get_main() {
426                span.record("context.main", truncate_string(&main));
427            }
428
429            span.record("step.id", self.id.to_string());
430
431            if let Some(ref label) = self.label {
432                span.record("step.label", label.to_string());
433            }
434        }
435
436        if let Some(log_step) = &self.log {
437            let message = match &log_step.message {
438                Some(script) => script
439                    .evaluate(context)
440                    .map_err(StepWorkerError::LogError)?
441                    .to_string(),
442                None => String::new(),
443            };
444            log_step.level.log(&message);
445        }
446
447        if let Some(condition) = &self.condition {
448            debug!("[step {}] avaliando condição", self.id);
449            let result = condition
450                .evaluate(context)
451                .map_err(StepWorkerError::ConditionError)?;
452
453            span.record("step.condition", condition.raw.to_string());
454
455            if self.then_case.is_some() || self.else_case.is_some() {
456                let (next_step, output) = if result {
457                    debug!("[step {}] condição verdadeira", self.id);
458                    let next_step = if let Some(ref then_case) = self.then_case {
459                        debug!("[step {}] then_case -> pipeline {}", self.id, then_case);
460                        NextStep::Pipeline(*then_case)
461                    } else {
462                        debug!("[step {}] then_case não definido -> Next", self.id);
463                        NextStep::Next
464                    };
465
466                    (next_step, self.evaluate_payload(context, None)?)
467                } else {
468                    debug!("[step {}] condição falsa", self.id);
469                    let next_step = if let Some(ref else_case) = self.else_case {
470                        debug!("[step {}] else_case -> pipeline {}", self.id, else_case);
471                        NextStep::Pipeline(*else_case)
472                    } else {
473                        debug!("[step {}] else_case não definido -> Next", self.id);
474                        NextStep::Next
475                    };
476
477                    (next_step, None)
478                };
479
480                if let Some(ref output) = output {
481                    span.record("context.payload", truncate_string(output));
482                }
483
484                return Ok(StepOutput { next_step, output });
485            }
486
487            if !result {
488                debug!("[step {}] condição falsa - pulando step", self.id);
489                return Ok(StepOutput {
490                    next_step: NextStep::Next,
491                    output: None,
492                });
493            }
494        }
495
496        if let Some(output) = self.evaluate_return(context)? {
497            debug!(
498                "[step {}] return case acionado (condicional de parada)",
499                self.id
500            );
501            {
502                span.record("step.return", output.to_string());
503            }
504
505            return Ok(StepOutput {
506                next_step: NextStep::Stop,
507                output: Some(output),
508            });
509        }
510
511        if let Some((module, output, context)) = self.evaluate_module(context).await? {
512            debug!(
513                "[step {}] módulo '{}' executado; output inicial {:?}",
514                self.id,
515                module.as_deref().unwrap_or("<none>"),
516                output
517            );
518            {
519                span.record("step.module", module.clone());
520
521                if let Some(ref output) = output {
522                    span.record("context.payload", truncate_string(output));
523                }
524            }
525
526            let context = if let Some(output) = output.clone() {
527                debug!(
528                    "[step {}] definindo output no contexto após execução do módulo",
529                    self.id
530                );
531                context.clone_with_output(output)
532            } else {
533                context.clone()
534            };
535
536            let output = self.evaluate_payload(&context, output)?;
537
538            if let Some(to) = &self.to {
539                debug!(
540                    "[step {}] condição 'to' detectada após módulo -> pipeline {}, step {}",
541                    self.id, to.pipeline, to.step
542                );
543                debug!(
544                    "Define switching to step {} in pipeline {}",
545                    to.step, to.pipeline
546                );
547                return Ok(StepOutput {
548                    next_step: NextStep::GoToStep(to.clone()),
549                    output,
550                });
551            }
552
553            debug!("[step {}] seguindo para próximo step após módulo", self.id);
554            return Ok(StepOutput {
555                next_step: NextStep::Next,
556                output,
557            });
558        }
559
560        let default_output = if self.log.is_some() {
561            context.get_payload()
562        } else {
563            None
564        };
565        let output = self.evaluate_payload(context, default_output)?;
566
567        {
568            if let Some(ref output) = output {
569                span.record("context.payload", truncate_string(output));
570            }
571        }
572
573        if let Some(to) = &self.to {
574            debug!(
575                "[step {}] condição 'to' detectada (sem módulo) -> pipeline {}, step {}",
576                self.id, to.pipeline, to.step
577            );
578            debug!(
579                "Define switching to step {} in pipeline {}",
580                to.step, to.pipeline
581            );
582            return Ok(StepOutput {
583                next_step: NextStep::GoToStep(to.clone()),
584                output,
585            });
586        }
587
588        debug!("[step {}] nenhuma condição especial -> Next", self.id);
589        return Ok(StepOutput {
590            next_step: NextStep::Next,
591            output,
592        });
593    }
594}
595
596fn build_log_step(
597    engine: Arc<Engine>,
598    value: &Value,
599    module: Option<&str>,
600) -> Result<Option<LogStep>, StepWorkerError> {
601    if let Some(log_step) = extract_log_from_key(engine.clone(), value)? {
602        return Ok(Some(log_step));
603    }
604
605    if module == Some("log") {
606        let input_value = value.get("input");
607        let log_step = build_log_from_input(engine, input_value)?;
608        return Ok(Some(log_step));
609    }
610
611    Ok(None)
612}
613
614fn extract_log_from_key(
615    engine: Arc<Engine>,
616    value: &Value,
617) -> Result<Option<LogStep>, StepWorkerError> {
618    let Some(obj) = value.as_object() else {
619        return Ok(None);
620    };
621
622    for (key, value) in obj.iter() {
623        let key_str = key.to_string();
624        let Some(level_key) = key_str.strip_prefix("log.") else {
625            continue;
626        };
627        let level = level_key.split('.').next().unwrap_or(level_key);
628        let level = LogLevel::from_str(level);
629        let message_value = if let Some(obj) = value.as_object() {
630            obj.get("message").cloned().unwrap_or_else(|| value.clone())
631        } else {
632            value.clone()
633        };
634        let message =
635            Script::try_build(engine.clone(), &message_value).map_err(StepWorkerError::LogError)?;
636
637        return Ok(Some(LogStep {
638            level,
639            message: Some(message),
640        }));
641    }
642
643    Ok(None)
644}
645
646fn build_log_from_input(
647    engine: Arc<Engine>,
648    input_value: Option<&Value>,
649) -> Result<LogStep, StepWorkerError> {
650    let mut level = LogLevel::Info;
651    let mut message_value: Option<Value> = None;
652
653    if let Some(input_value) = input_value {
654        if let Some(obj) = input_value.as_object() {
655            if let Some(level_value) = obj.get("action").or_else(|| obj.get("level")) {
656                level = LogLevel::from_str(level_value.as_string().as_str());
657            }
658
659            message_value = obj.get("message").cloned();
660        } else {
661            message_value = Some(input_value.clone());
662        }
663    }
664
665    let message = if let Some(message_value) = message_value {
666        Some(
667            Script::try_build(engine, &message_value).map_err(StepWorkerError::LogError)?,
668        )
669    } else {
670        None
671    };
672
673    Ok(LogStep { level, message })
674}
675
676fn truncate_string(string: &Value) -> String {
677    let limit = *PHLOW_TRUNCATE_SPAN_VALUE;
678    let string = string.to_string();
679    if string.len() > limit {
680        format!("{}...", &string[..limit])
681    } else {
682        string.to_string()
683    }
684}
685
686#[cfg(test)]
687mod test {
688    use super::*;
689    use phlow_sdk::valu3;
690    use phs::build_engine;
691    use valu3::prelude::ToValueBehavior;
692    use valu3::value::Value;
693
694    #[tokio::test]
695    async fn test_step_get_reference_id() {
696        let step = StepWorker {
697            id: ID::from("id"),
698            label: Some("label".to_string()),
699            ..Default::default()
700        };
701
702        assert_eq!(step.get_id(), &ID::from("id"));
703    }
704
705    #[tokio::test]
706    async fn test_step_execute() {
707        let engine = build_engine(None);
708        let step = StepWorker {
709            payload: Some(Script::try_build(engine, &"10".to_value()).unwrap()),
710            ..Default::default()
711        };
712
713        let context = Context::new();
714
715        let result = step.execute(&context).await.unwrap();
716
717        assert_eq!(result.next_step, NextStep::Next);
718        assert_eq!(result.output, Some(Value::from(10i64)));
719    }
720
721    #[tokio::test]
722    async fn test_step_execute_with_return_case() {
723        let engine = build_engine(None);
724        let step = StepWorker {
725            id: ID::new(),
726            return_case: Some(Script::try_build(engine.clone(), &"10".to_value()).unwrap()),
727            ..Default::default()
728        };
729
730        let context = Context::new();
731
732        let result = step.execute(&context).await.unwrap();
733
734        assert_eq!(result.next_step, NextStep::Stop);
735        assert_eq!(result.output, Some(Value::from(10i64)));
736    }
737
738    #[tokio::test]
739    async fn test_step_execute_with_return_case_and_payload() {
740        let engine = build_engine(None);
741        let step = StepWorker {
742            id: ID::new(),
743            payload: Some(Script::try_build(engine.clone(), &"10".to_value()).unwrap()),
744            return_case: Some(Script::try_build(engine.clone(), &"20".to_value()).unwrap()),
745            ..Default::default()
746        };
747
748        let context = Context::new();
749
750        let result = step.execute(&context).await.unwrap();
751
752        assert_eq!(result.next_step, NextStep::Stop);
753        assert_eq!(result.output, Some(Value::from(20i64)));
754    }
755
756    #[tokio::test]
757    async fn test_step_execute_with_assert_and_return_case() {
758        let engine = build_engine(None);
759        let step = StepWorker {
760            condition: Some(
761                Condition::try_build_with_assert(engine.clone(), "{{ payload == 0 }}".to_string())
762                    .unwrap(),
763            ),
764            return_case: Some(Script::try_build(engine.clone(), &"10".to_value()).unwrap()),
765            ..Default::default()
766        };
767
768        let context_true = Context::new().clone_with_output(Value::from(0i64));
769        let result_true = step.execute(&context_true).await.unwrap();
770
771        assert_eq!(result_true.next_step, NextStep::Stop);
772        assert_eq!(result_true.output, Some(Value::from(10i64)));
773
774        let context_false = Context::new().clone_with_output(Value::from(1i64));
775        let result_false = step.execute(&context_false).await.unwrap();
776
777        assert_eq!(result_false.next_step, NextStep::Next);
778        assert_eq!(result_false.output, None);
779    }
780}