phlow_engine/
step_worker.rs

1use crate::{
2    condition::{Condition, ConditionError},
3    context::Context,
4    id::ID,
5    script::Script,
6};
7use once_cell::sync::Lazy;
8use phlow_sdk::{
9    prelude::{log::debug, *},
10    tracing::field,
11};
12use rhai::Engine;
13use serde::Serialize;
14use std::{fmt::Display, sync::Arc};
15
16static PHLOW_TRUNCATE_SPAN_VALUE: Lazy<usize> =
17    Lazy::new(|| match std::env::var("PHLOW_TRUNCATE_SPAN_VALUE") {
18        Ok(value) => value.parse::<usize>().unwrap_or(100),
19        Err(_) => 100,
20    });
21
22#[derive(Debug)]
23pub enum StepWorkerError {
24    ConditionError(ConditionError),
25    PayloadError(phs::ScriptError),
26    ModulesError(ModulesError),
27    InputError(phs::ScriptError),
28    LogError(phs::ScriptError),
29}
30
31impl Display for StepWorkerError {
32    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
33        match self {
34            StepWorkerError::ConditionError(err) => write!(f, "Condition error: {}", err),
35            StepWorkerError::PayloadError(err) => write!(f, "Payload error: {}", err),
36            StepWorkerError::ModulesError(err) => write!(f, "Modules error: {}", err),
37            StepWorkerError::InputError(err) => write!(f, "Input error: {}", err),
38            StepWorkerError::LogError(err) => write!(f, "Log error: {}", err),
39        }
40    }
41}
42
43impl std::error::Error for StepWorkerError {
44    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
45        match self {
46            StepWorkerError::ConditionError(err) => Some(err),
47            StepWorkerError::PayloadError(_) => None, // ScriptError doesn't implement std::error::Error
48            StepWorkerError::ModulesError(_) => None, // ModulesError doesn't implement std::error::Error
49            StepWorkerError::InputError(_) => None, // ScriptError doesn't implement std::error::Error
50            StepWorkerError::LogError(_) => None, // ScriptError doesn't implement std::error::Error
51        }
52    }
53}
54
55#[derive(Debug, Clone, Copy)]
56enum LogLevel {
57    Info,
58    Debug,
59    Warn,
60    Error,
61    Trace,
62}
63
64impl LogLevel {
65    fn from_str(level: &str) -> Self {
66        match level.to_ascii_lowercase().as_str() {
67            "debug" => LogLevel::Debug,
68            "warn" | "warning" => LogLevel::Warn,
69            "error" => LogLevel::Error,
70            "trace" => LogLevel::Trace,
71            _ => LogLevel::Info,
72        }
73    }
74
75    fn log(self, message: &str) {
76        match self {
77            LogLevel::Info => log::info!("{}", message),
78            LogLevel::Debug => log::debug!("{}", message),
79            LogLevel::Warn => log::warn!("{}", message),
80            LogLevel::Error => log::error!("{}", message),
81            LogLevel::Trace => log::trace!("{}", message),
82        }
83    }
84}
85
86#[derive(Debug, Clone)]
87pub(crate) struct LogStep {
88    level: LogLevel,
89    message: Option<Script>,
90}
91
92#[derive(Debug, Clone, PartialEq, Serialize)]
93pub enum NextStep {
94    Pipeline(usize),
95    GoToStep(StepReference),
96    Stop,
97    Next,
98}
99
100#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, ToValue)]
101pub struct StepReference {
102    pub pipeline: usize,
103    pub step: usize,
104}
105
106#[derive(Debug)]
107pub struct StepOutput {
108    pub next_step: NextStep,
109    pub output: Option<Value>,
110}
111
112#[derive(Debug, Clone, Default)]
113pub struct StepWorker {
114    pub(crate) id: ID,
115    pub(crate) label: Option<String>,
116    pub(crate) module: Option<String>,
117    pub(crate) condition: Option<Condition>,
118    pub(crate) input: Option<Script>,
119    pub(crate) payload: Option<Script>,
120    pub(crate) then_case: Option<usize>,
121    pub(crate) else_case: Option<usize>,
122    pub(crate) modules: Arc<Modules>,
123    pub(crate) return_case: Option<Script>,
124    pub(crate) to: Option<StepReference>,
125    pub(crate) log: Option<LogStep>,
126    #[cfg(debug_assertions)]
127    pub(crate) step_raw: String,
128}
129
130impl StepWorker {
131    pub fn try_from_value(
132        engine: Arc<Engine>,
133        modules: Arc<Modules>,
134        value: &Value,
135    ) -> Result<Self, StepWorkerError> {
136        log::debug!(
137            "Parsing step worker from value: {}",
138            value.to_json(JsonMode::Indented)
139        );
140
141        let id = match value.get("id") {
142            Some(id) => ID::from(id),
143            None => ID::new(),
144        };
145        let label: Option<String> = match value.get("label") {
146            Some(label) => Some(label.as_string()),
147            None => None,
148        };
149        let condition = {
150            if let Some(condition) = value
151                .get("condition")
152                .map(|condition| Condition::try_from_value(engine.clone(), condition))
153            {
154                Some(condition.map_err(StepWorkerError::ConditionError)?)
155            } else {
156                if let Some(condition) = value.get("assert").map(|assert| {
157                    Condition::try_build_with_assert(engine.clone(), assert.to_string())
158                }) {
159                    Some(condition.map_err(StepWorkerError::ConditionError)?)
160                } else {
161                    None
162                }
163            }
164        };
165        let payload = match value.get("payload") {
166            Some(payload) => match Script::try_build(engine.clone(), payload) {
167                Ok(payload) => Some(payload),
168                Err(err) => return Err(StepWorkerError::PayloadError(err)),
169            },
170            None => None,
171        };
172        let input = match value.get("input") {
173            Some(input) => match Script::try_build(engine.clone(), input) {
174                Ok(input) => Some(input),
175                Err(err) => return Err(StepWorkerError::InputError(err)),
176            },
177            None => None,
178        };
179        let then_case = match value.get("then") {
180            Some(then_case) => match then_case.to_u64() {
181                Some(then_case) => Some(then_case as usize),
182                None => None,
183            },
184            None => None,
185        };
186        let else_case = match value.get("else") {
187            Some(else_case) => match else_case.to_u64() {
188                Some(else_case) => Some(else_case as usize),
189                None => None,
190            },
191            None => None,
192        };
193        let return_case = match value.get("return") {
194            Some(return_case) => match Script::try_build(engine.clone(), return_case) {
195                Ok(return_case) => Some(return_case),
196                Err(err) => return Err(StepWorkerError::PayloadError(err)),
197            },
198            None => None,
199        };
200        let module = match value.get("use") {
201            Some(module) => Some(module.to_string()),
202            None => None,
203        };
204        let log = build_log_step(engine.clone(), value, module.as_deref())?;
205
206        let to = match value.get("to") {
207            Some(to_step) => match to_step.as_object() {
208                Some(to_step) => {
209                    let pipeline = to_step.get("pipeline").and_then(|v| v.to_u64());
210                    let step = to_step.get("step").and_then(|v| v.to_u64());
211
212                    if pipeline.is_some() && step.is_some() {
213                        Some(StepReference {
214                            pipeline: pipeline.unwrap() as usize,
215                            step: step.unwrap() as usize,
216                        })
217                    } else {
218                        None
219                    }
220                }
221                None => None,
222            },
223            None => None,
224        };
225
226        #[cfg(debug_assertions)]
227        let step_raw = value.to_string();
228
229        Ok(Self {
230            id,
231            label,
232            module,
233            input,
234            condition,
235            payload,
236            then_case,
237            else_case,
238            modules,
239            return_case,
240            to,
241            log,
242            #[cfg(debug_assertions)]
243            step_raw,
244        })
245    }
246
247    pub fn get_id(&self) -> &ID {
248        &self.id
249    }
250
251    fn evaluate_payload(
252        &self,
253        context: &Context,
254        default: Option<Value>,
255    ) -> Result<Option<Value>, StepWorkerError> {
256        if let Some(ref payload) = self.payload {
257            let value = Some(
258                payload
259                    .evaluate(context)
260                    .map_err(StepWorkerError::PayloadError)?,
261            );
262            Ok(value)
263        } else {
264            Ok(default)
265        }
266    }
267
268    fn evaluate_input(&self, context: &Context) -> Result<Option<Value>, StepWorkerError> {
269        if let Some(ref input) = self.input {
270            let value = Some(
271                input
272                    .evaluate(context)
273                    .map_err(StepWorkerError::InputError)?,
274            );
275            Ok(value)
276        } else {
277            Ok(None)
278        }
279    }
280
281    fn evaluate_return(&self, context: &Context) -> Result<Option<Value>, StepWorkerError> {
282        if let Some(ref return_case) = self.return_case {
283            let value = Some(
284                return_case
285                    .evaluate(context)
286                    .map_err(StepWorkerError::PayloadError)?,
287            );
288
289            #[cfg(debug_assertions)]
290            log::debug!(
291                "Evaluating return case for step {}: {}",
292                self.id,
293                value.as_ref().map_or("None".to_string(), |v| v.to_string())
294            );
295
296            Ok(value)
297        } else {
298            Ok(None)
299        }
300    }
301
302    async fn evaluate_module(
303        &self,
304        context: &Context,
305    ) -> Result<Option<(Option<String>, Option<Value>, Context)>, StepWorkerError> {
306        if self.module.as_deref() == Some("log") {
307            return Ok(None);
308        }
309
310        if let Some(ref module) = self.module {
311            let input = self.evaluate_input(context)?;
312
313            let context = if let Some(input) = &input {
314                context.clone_with_input(input.clone())
315            } else {
316                context.clone()
317            };
318
319            match self
320                .modules
321                .execute(module, &context.get_input(), &context.get_payload())
322                .await
323            {
324                Ok(response) => {
325                    #[cfg(debug_assertions)]
326                    log::debug!("Module response for step {}: {:?}", self.id, response);
327
328                    if let Some(err) = response.error {
329                        return Err(StepWorkerError::ModulesError(ModulesError::ModuleError(
330                            err,
331                        )));
332                    }
333
334                    Ok(Some((Some(module.clone()), Some(response.data), context)))
335                }
336                Err(err) => Err(StepWorkerError::ModulesError(err)),
337            }
338        } else {
339            Ok(None)
340        }
341    }
342
343    pub async fn execute(&self, context: &Context) -> Result<StepOutput, StepWorkerError> {
344        #[cfg(debug_assertions)]
345        log::debug!(
346            "Entering step: {}, with: \n\tmain={:?}\n\tpayload={:?}\n\tsetup={:?}",
347            self.step_raw,
348            &context.get_main().to_value().to_string(),
349            &context.get_payload().to_value().to_string(),
350            &context.get_setup().to_value().to_string()
351        );
352
353        let span = tracing::info_span!(
354            "step",
355            otel.name = field::Empty,
356            context.main = field::Empty,
357            context.params = field::Empty,
358            context.payload = field::Empty,
359            context.input = field::Empty,
360            step.id = field::Empty,
361            step.label = field::Empty,
362            step.module = field::Empty,
363            step.condition = field::Empty,
364            step.payload = field::Empty,
365            step.return = field::Empty,
366        );
367        let _guard = span.enter();
368
369        {
370            let step_name = self.label.clone().unwrap_or(self.id.to_string());
371
372            span.record("otel.name", format!("step {}", step_name));
373
374            if let Some(ref input) = context.get_input() {
375                span.record("context.input", input.to_string());
376            }
377
378            if let Some(ref payload) = context.get_payload() {
379                span.record("context.payload", truncate_string(&payload));
380            }
381
382            if let Some(ref main) = context.get_main() {
383                span.record("context.main", truncate_string(&main));
384            }
385
386            span.record("step.id", self.id.to_string());
387
388            if let Some(ref label) = self.label {
389                span.record("step.label", label.to_string());
390            }
391        }
392
393        if let Some(log_step) = &self.log {
394            let message = match &log_step.message {
395                Some(script) => script
396                    .evaluate(context)
397                    .map_err(StepWorkerError::LogError)?
398                    .to_string(),
399                None => String::new(),
400            };
401            log_step.level.log(&message);
402        }
403
404        if let Some(output) = self.evaluate_return(context)? {
405            debug!(
406                "[step {}] return case acionado (condicional de parada)",
407                self.id
408            );
409            {
410                span.record("step.return", output.to_string());
411            }
412
413            return Ok(StepOutput {
414                next_step: NextStep::Stop,
415                output: Some(output),
416            });
417        }
418
419        if let Some((module, output, context)) = self.evaluate_module(context).await? {
420            debug!(
421                "[step {}] módulo '{}' executado; output inicial {:?}",
422                self.id,
423                module.as_deref().unwrap_or("<none>"),
424                output
425            );
426            {
427                span.record("step.module", module.clone());
428
429                if let Some(ref output) = output {
430                    span.record("context.payload", truncate_string(output));
431                }
432            }
433
434            let context = if let Some(output) = output.clone() {
435                debug!(
436                    "[step {}] definindo output no contexto após execução do módulo",
437                    self.id
438                );
439                context.clone_with_output(output)
440            } else {
441                context.clone()
442            };
443
444            let output = self.evaluate_payload(&context, output)?;
445
446            if let Some(to) = &self.to {
447                debug!(
448                    "[step {}] condição 'to' detectada após módulo -> pipeline {}, step {}",
449                    self.id, to.pipeline, to.step
450                );
451                debug!(
452                    "Define switching to step {} in pipeline {}",
453                    to.step, to.pipeline
454                );
455                return Ok(StepOutput {
456                    next_step: NextStep::GoToStep(to.clone()),
457                    output,
458                });
459            }
460
461            debug!("[step {}] seguindo para próximo step após módulo", self.id);
462            return Ok(StepOutput {
463                next_step: NextStep::Next,
464                output,
465            });
466        }
467
468        if let Some(condition) = &self.condition {
469            debug!("[step {}] avaliando condição", self.id);
470            let (next_step, output) = if condition
471                .evaluate(context)
472                .map_err(StepWorkerError::ConditionError)?
473            {
474                debug!("[step {}] condição verdadeira", self.id);
475                let next_step = if let Some(ref then_case) = self.then_case {
476                    debug!("[step {}] then_case -> pipeline {}", self.id, then_case);
477                    NextStep::Pipeline(*then_case)
478                } else {
479                    debug!("[step {}] then_case não definido -> Next", self.id);
480                    NextStep::Next
481                };
482
483                (next_step, self.evaluate_payload(context, None)?)
484            } else {
485                debug!("[step {}] condição falsa", self.id);
486                let next_step = if let Some(ref else_case) = self.else_case {
487                    debug!("[step {}] else_case -> pipeline {}", self.id, else_case);
488                    NextStep::Pipeline(*else_case)
489                } else {
490                    debug!("[step {}] else_case não definido -> Next", self.id);
491                    NextStep::Next
492                };
493
494                (next_step, None)
495            };
496
497            {
498                span.record("step.condition", condition.raw.to_string());
499
500                if let Some(ref output) = output {
501                    span.record("context.payload", truncate_string(output));
502                }
503            }
504
505            return Ok(StepOutput { next_step, output });
506        }
507
508        let default_output = if self.log.is_some() {
509            context.get_payload()
510        } else {
511            None
512        };
513        let output = self.evaluate_payload(context, default_output)?;
514
515        {
516            if let Some(ref output) = output {
517                span.record("context.payload", truncate_string(output));
518            }
519        }
520
521        if let Some(to) = &self.to {
522            debug!(
523                "[step {}] condição 'to' detectada (sem módulo) -> pipeline {}, step {}",
524                self.id, to.pipeline, to.step
525            );
526            debug!(
527                "Define switching to step {} in pipeline {}",
528                to.step, to.pipeline
529            );
530            return Ok(StepOutput {
531                next_step: NextStep::GoToStep(to.clone()),
532                output,
533            });
534        }
535
536        debug!("[step {}] nenhuma condição especial -> Next", self.id);
537        return Ok(StepOutput {
538            next_step: NextStep::Next,
539            output,
540        });
541    }
542}
543
544fn build_log_step(
545    engine: Arc<Engine>,
546    value: &Value,
547    module: Option<&str>,
548) -> Result<Option<LogStep>, StepWorkerError> {
549    if let Some(log_step) = extract_log_from_key(engine.clone(), value)? {
550        return Ok(Some(log_step));
551    }
552
553    if module == Some("log") {
554        let input_value = value.get("input");
555        let log_step = build_log_from_input(engine, input_value)?;
556        return Ok(Some(log_step));
557    }
558
559    Ok(None)
560}
561
562fn extract_log_from_key(
563    engine: Arc<Engine>,
564    value: &Value,
565) -> Result<Option<LogStep>, StepWorkerError> {
566    let Some(obj) = value.as_object() else {
567        return Ok(None);
568    };
569
570    for (key, value) in obj.iter() {
571        let key_str = key.to_string();
572        let Some(level_key) = key_str.strip_prefix("log.") else {
573            continue;
574        };
575        let level = level_key.split('.').next().unwrap_or(level_key);
576        let level = LogLevel::from_str(level);
577        let message_value = if let Some(obj) = value.as_object() {
578            obj.get("message").cloned().unwrap_or_else(|| value.clone())
579        } else {
580            value.clone()
581        };
582        let message =
583            Script::try_build(engine.clone(), &message_value).map_err(StepWorkerError::LogError)?;
584
585        return Ok(Some(LogStep {
586            level,
587            message: Some(message),
588        }));
589    }
590
591    Ok(None)
592}
593
594fn build_log_from_input(
595    engine: Arc<Engine>,
596    input_value: Option<&Value>,
597) -> Result<LogStep, StepWorkerError> {
598    let mut level = LogLevel::Info;
599    let mut message_value: Option<Value> = None;
600
601    if let Some(input_value) = input_value {
602        if let Some(obj) = input_value.as_object() {
603            if let Some(level_value) = obj.get("action").or_else(|| obj.get("level")) {
604                level = LogLevel::from_str(level_value.as_string().as_str());
605            }
606
607            message_value = obj.get("message").cloned();
608        } else {
609            message_value = Some(input_value.clone());
610        }
611    }
612
613    let message = if let Some(message_value) = message_value {
614        Some(
615            Script::try_build(engine, &message_value).map_err(StepWorkerError::LogError)?,
616        )
617    } else {
618        None
619    };
620
621    Ok(LogStep { level, message })
622}
623
624fn truncate_string(string: &Value) -> String {
625    let limit = *PHLOW_TRUNCATE_SPAN_VALUE;
626    let string = string.to_string();
627    if string.len() > limit {
628        format!("{}...", &string[..limit])
629    } else {
630        string.to_string()
631    }
632}
633
634#[cfg(test)]
635mod test {
636    use super::*;
637    use phlow_sdk::valu3;
638    use phs::build_engine;
639    use valu3::prelude::ToValueBehavior;
640    use valu3::value::Value;
641
642    #[tokio::test]
643    async fn test_step_get_reference_id() {
644        let step = StepWorker {
645            id: ID::from("id"),
646            label: Some("label".to_string()),
647            ..Default::default()
648        };
649
650        assert_eq!(step.get_id(), &ID::from("id"));
651    }
652
653    #[tokio::test]
654    async fn test_step_execute() {
655        let engine = build_engine(None);
656        let step = StepWorker {
657            payload: Some(Script::try_build(engine, &"10".to_value()).unwrap()),
658            ..Default::default()
659        };
660
661        let context = Context::new();
662
663        let result = step.execute(&context).await.unwrap();
664
665        assert_eq!(result.next_step, NextStep::Next);
666        assert_eq!(result.output, Some(Value::from(10i64)));
667    }
668
669    #[tokio::test]
670    async fn test_step_execute_with_return_case() {
671        let engine = build_engine(None);
672        let step = StepWorker {
673            id: ID::new(),
674            return_case: Some(Script::try_build(engine.clone(), &"10".to_value()).unwrap()),
675            ..Default::default()
676        };
677
678        let context = Context::new();
679
680        let result = step.execute(&context).await.unwrap();
681
682        assert_eq!(result.next_step, NextStep::Stop);
683        assert_eq!(result.output, Some(Value::from(10i64)));
684    }
685
686    #[tokio::test]
687    async fn test_step_execute_with_return_case_and_payload() {
688        let engine = build_engine(None);
689        let step = StepWorker {
690            id: ID::new(),
691            payload: Some(Script::try_build(engine.clone(), &"10".to_value()).unwrap()),
692            return_case: Some(Script::try_build(engine.clone(), &"20".to_value()).unwrap()),
693            ..Default::default()
694        };
695
696        let context = Context::new();
697
698        let result = step.execute(&context).await.unwrap();
699
700        assert_eq!(result.next_step, NextStep::Stop);
701        assert_eq!(result.output, Some(Value::from(20i64)));
702    }
703}