1use crate::{
2 condition::{Condition, ConditionError},
3 context::Context,
4 debug::debug_controller,
5 id::ID,
6 script::Script,
7};
8use once_cell::sync::Lazy;
9use phlow_sdk::{
10 prelude::{log::debug, *},
11 tracing::field,
12};
13use rhai::Engine;
14use serde::Serialize;
15use std::{fmt::Display, sync::Arc};
16use uuid::Uuid;
17
18static PHLOW_TRUNCATE_SPAN_VALUE: Lazy<usize> =
19 Lazy::new(|| match std::env::var("PHLOW_TRUNCATE_SPAN_VALUE") {
20 Ok(value) => value.parse::<usize>().unwrap_or(100),
21 Err(_) => 100,
22 });
23
24#[derive(Debug)]
25pub enum StepWorkerError {
26 ConditionError(ConditionError),
27 PayloadError(phs::ScriptError),
28 ModulesError(ModulesError),
29 InputError(phs::ScriptError),
30 LogError(phs::ScriptError),
31}
32
33impl Display for StepWorkerError {
34 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
35 match self {
36 StepWorkerError::ConditionError(err) => write!(f, "Condition error: {}", err),
37 StepWorkerError::PayloadError(err) => write!(f, "Payload error: {}", err),
38 StepWorkerError::ModulesError(err) => write!(f, "Modules error: {}", err),
39 StepWorkerError::InputError(err) => write!(f, "Input error: {}", err),
40 StepWorkerError::LogError(err) => write!(f, "Log error: {}", err),
41 }
42 }
43}
44
45impl std::error::Error for StepWorkerError {
46 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
47 match self {
48 StepWorkerError::ConditionError(err) => Some(err),
49 StepWorkerError::PayloadError(_) => None, StepWorkerError::ModulesError(_) => None, StepWorkerError::InputError(_) => None, StepWorkerError::LogError(_) => None, }
54 }
55}
56
57#[derive(Debug, Clone, Copy)]
58enum LogLevel {
59 Info,
60 Debug,
61 Warn,
62 Error,
63 Trace,
64}
65
66impl LogLevel {
67 fn from_str(level: &str) -> Self {
68 match level.to_ascii_lowercase().as_str() {
69 "debug" => LogLevel::Debug,
70 "warn" | "warning" => LogLevel::Warn,
71 "error" => LogLevel::Error,
72 "trace" => LogLevel::Trace,
73 _ => LogLevel::Info,
74 }
75 }
76
77 fn log(self, message: &str) {
78 match self {
79 LogLevel::Info => log::info!("{}", message),
80 LogLevel::Debug => log::debug!("{}", message),
81 LogLevel::Warn => log::warn!("{}", message),
82 LogLevel::Error => log::error!("{}", message),
83 LogLevel::Trace => log::trace!("{}", message),
84 }
85 }
86}
87
88#[derive(Debug, Clone)]
89pub(crate) struct LogStep {
90 level: LogLevel,
91 message: Option<Script>,
92}
93
94#[derive(Debug, Clone, PartialEq, Serialize)]
95pub enum NextStep {
96 Pipeline(usize),
97 GoToStep(StepReference),
98 Stop,
99 Next,
100}
101
102#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, ToValue)]
103pub struct StepReference {
104 pub pipeline: usize,
105 pub step: usize,
106}
107
108#[derive(Debug)]
109pub struct StepOutput {
110 pub next_step: NextStep,
111 pub output: Option<Value>,
112}
113
114#[derive(Debug, Clone, Default)]
115pub struct StepWorker {
116 pub(crate) id: ID,
117 pub(crate) label: Option<String>,
118 pub(crate) module: Option<String>,
119 pub(crate) condition: Option<Condition>,
120 pub(crate) input: Option<Script>,
121 pub(crate) payload: Option<Script>,
122 pub(crate) then_case: Option<usize>,
123 pub(crate) else_case: Option<usize>,
124 pub(crate) modules: Arc<Modules>,
125 pub(crate) return_case: Option<Script>,
126 pub(crate) to: Option<StepReference>,
127 pub(crate) log: Option<LogStep>,
128 pub(crate) step_value: Option<Value>,
129 #[cfg(debug_assertions)]
130 pub(crate) step_raw: String,
131}
132
133impl StepWorker {
134 pub fn try_from_value(
135 engine: Arc<Engine>,
136 modules: Arc<Modules>,
137 value: &Value,
138 ) -> Result<Self, StepWorkerError> {
139 log::debug!(
140 "Parsing step worker from value: {}",
141 value.to_json(JsonMode::Indented)
142 );
143
144 let id = match value.get("id") {
145 Some(id) => ID::from(id),
146 None => ID::new(),
147 };
148 let label: Option<String> = match value.get("label") {
149 Some(label) => Some(label.as_string()),
150 None => None,
151 };
152 let condition = {
153 if let Some(condition) = value
154 .get("condition")
155 .map(|condition| Condition::try_from_value(engine.clone(), condition))
156 {
157 Some(condition.map_err(StepWorkerError::ConditionError)?)
158 } else {
159 if let Some(condition) = value.get("assert").map(|assert| {
160 Condition::try_build_with_assert(engine.clone(), assert.to_string())
161 }) {
162 Some(condition.map_err(StepWorkerError::ConditionError)?)
163 } else {
164 None
165 }
166 }
167 };
168 let payload = match value.get("payload") {
169 Some(payload) => match Script::try_build(engine.clone(), payload) {
170 Ok(payload) => Some(payload),
171 Err(err) => return Err(StepWorkerError::PayloadError(err)),
172 },
173 None => None,
174 };
175 let input = match value.get("input") {
176 Some(input) => match Script::try_build(engine.clone(), input) {
177 Ok(input) => Some(input),
178 Err(err) => return Err(StepWorkerError::InputError(err)),
179 },
180 None => None,
181 };
182 let then_case = match value.get("then") {
183 Some(then_case) => match then_case.to_u64() {
184 Some(then_case) => Some(then_case as usize),
185 None => None,
186 },
187 None => None,
188 };
189 let else_case = match value.get("else") {
190 Some(else_case) => match else_case.to_u64() {
191 Some(else_case) => Some(else_case as usize),
192 None => None,
193 },
194 None => None,
195 };
196 let return_case = match value.get("return") {
197 Some(return_case) => match Script::try_build(engine.clone(), return_case) {
198 Ok(return_case) => Some(return_case),
199 Err(err) => return Err(StepWorkerError::PayloadError(err)),
200 },
201 None => None,
202 };
203 let module = match value.get("use") {
204 Some(module) => Some(module.to_string()),
205 None => None,
206 };
207 let is_log_module = module.as_deref() == Some("log");
208 let log = build_log_step(engine.clone(), value, is_log_module)?;
209 let module = if is_log_module { None } else { module };
210
211 let to = match value.get("to") {
212 Some(to_step) => match to_step.as_object() {
213 Some(to_step) => {
214 let pipeline = to_step.get("pipeline").and_then(|v| v.to_u64());
215 let step = to_step.get("step").and_then(|v| v.to_u64());
216
217 if pipeline.is_some() && step.is_some() {
218 Some(StepReference {
219 pipeline: pipeline.unwrap() as usize,
220 step: step.unwrap() as usize,
221 })
222 } else {
223 None
224 }
225 }
226 None => None,
227 },
228 None => None,
229 };
230
231 let mut step_value = value.clone();
232 if Self::should_add_uuid() {
233 if let Some(obj) = step_value.as_object_mut() {
234 if !obj.contains_key(&"#uuid".to_string()) {
235 obj.insert("#uuid".to_string(), Uuid::new_v4().to_string().to_value());
236 }
237 }
238 }
239 #[cfg(debug_assertions)]
240 let step_raw = value.to_string();
241
242 Ok(Self {
243 id,
244 label,
245 module,
246 input,
247 condition,
248 payload,
249 then_case,
250 else_case,
251 modules,
252 return_case,
253 to,
254 log,
255 step_value: Some(step_value),
256 #[cfg(debug_assertions)]
257 step_raw,
258 })
259 }
260
261 pub fn get_id(&self) -> &ID {
262 &self.id
263 }
264
265 pub(crate) fn compiled_debug(&self) -> Value {
266 let mut map = std::collections::HashMap::new();
267 if let Some(payload) = &self.payload {
268 map.insert("payload".to_string(), payload.compiled_debug());
269 }
270 if let Some(input) = &self.input {
271 map.insert("input".to_string(), input.compiled_debug());
272 }
273 if let Some(return_case) = &self.return_case {
274 map.insert("return".to_string(), return_case.compiled_debug());
275 }
276 if let Some(condition) = &self.condition {
277 map.insert("condition".to_string(), condition.expression.compiled_debug());
278 }
279 if let Some(log_step) = &self.log {
280 if let Some(message) = &log_step.message {
281 map.insert("log".to_string(), message.compiled_debug());
282 }
283 }
284 map.to_value()
285 }
286
287 fn should_add_uuid() -> bool {
288 if debug_controller().is_some() {
289 return true;
290 }
291 std::env::var("PHLOW_DEBUG")
292 .map(|value| value.eq_ignore_ascii_case("true"))
293 .unwrap_or(false)
294 }
295
296 fn evaluate_payload(
297 &self,
298 context: &Context,
299 default: Option<Value>,
300 ) -> Result<Option<Value>, StepWorkerError> {
301 if let Some(ref payload) = self.payload {
302 let value = Some(
303 payload
304 .evaluate(context)
305 .map_err(StepWorkerError::PayloadError)?,
306 );
307 Ok(value)
308 } else {
309 Ok(default)
310 }
311 }
312
313 fn evaluate_input(&self, context: &Context) -> Result<Option<Value>, StepWorkerError> {
314 if let Some(ref input) = self.input {
315 let value = Some(
316 input
317 .evaluate(context)
318 .map_err(StepWorkerError::InputError)?,
319 );
320 Ok(value)
321 } else {
322 Ok(None)
323 }
324 }
325
326 fn evaluate_return(&self, context: &Context) -> Result<Option<Value>, StepWorkerError> {
327 if let Some(ref return_case) = self.return_case {
328 let value = Some(
329 return_case
330 .evaluate(context)
331 .map_err(StepWorkerError::PayloadError)?,
332 );
333
334 #[cfg(debug_assertions)]
335 log::debug!(
336 "Evaluating return case for step {}: {}",
337 self.id,
338 value.as_ref().map_or("None".to_string(), |v| v.to_string())
339 );
340
341 Ok(value)
342 } else {
343 Ok(None)
344 }
345 }
346
347 async fn evaluate_module(
348 &self,
349 context: &Context,
350 ) -> Result<Option<(Option<String>, Option<Value>, Context)>, StepWorkerError> {
351 if let Some(ref module) = self.module {
352 let input = self.evaluate_input(context)?;
353
354 let context = if let Some(input) = &input {
355 context.clone_with_input(input.clone())
356 } else {
357 context.clone()
358 };
359
360 match self
361 .modules
362 .execute(module, &context.get_input(), &context.get_payload())
363 .await
364 {
365 Ok(response) => {
366 #[cfg(debug_assertions)]
367 log::debug!("Module response for step {}: {:?}", self.id, response);
368
369 if let Some(err) = response.error {
370 return Err(StepWorkerError::ModulesError(ModulesError::ModuleError(
371 err,
372 )));
373 }
374
375 Ok(Some((Some(module.clone()), Some(response.data), context)))
376 }
377 Err(err) => Err(StepWorkerError::ModulesError(err)),
378 }
379 } else {
380 Ok(None)
381 }
382 }
383
384 pub async fn execute(&self, context: &Context) -> Result<StepOutput, StepWorkerError> {
385 #[cfg(debug_assertions)]
386 log::debug!(
387 "Entering step: {}, with: \n\tmain={:?}\n\tpayload={:?}\n\tsetup={:?}",
388 self.step_raw,
389 &context.get_main().to_value().to_string(),
390 &context.get_payload().to_value().to_string(),
391 &context.get_setup().to_value().to_string()
392 );
393
394 let span = tracing::info_span!(
395 "step",
396 otel.name = field::Empty,
397 context.main = field::Empty,
398 context.params = field::Empty,
399 context.payload = field::Empty,
400 context.input = field::Empty,
401 step.id = field::Empty,
402 step.label = field::Empty,
403 step.module = field::Empty,
404 step.condition = field::Empty,
405 step.payload = field::Empty,
406 step.return = field::Empty,
407 );
408 let _guard = span.enter();
409
410 {
411 let step_name = self.label.clone().unwrap_or(self.id.to_string());
412
413 span.record("otel.name", format!("step {}", step_name));
414
415 if let Some(ref input) = context.get_input() {
416 span.record("context.input", input.to_string());
417 }
418
419 if let Some(ref payload) = context.get_payload() {
420 span.record("context.payload", truncate_string(&payload));
421 }
422
423 if let Some(ref main) = context.get_main() {
424 span.record("context.main", truncate_string(&main));
425 }
426
427 span.record("step.id", self.id.to_string());
428
429 if let Some(ref label) = self.label {
430 span.record("step.label", label.to_string());
431 }
432 }
433
434 if let Some(log_step) = &self.log {
435 let message = match &log_step.message {
436 Some(script) => script
437 .evaluate(context)
438 .map_err(StepWorkerError::LogError)?
439 .to_string(),
440 None => String::new(),
441 };
442 log_step.level.log(&message);
443 }
444
445 if let Some(condition) = &self.condition {
446 debug!("[step {}] avaliando condição", self.id);
447 let result = condition
448 .evaluate(context)
449 .map_err(StepWorkerError::ConditionError)?;
450
451 span.record("step.condition", condition.raw.to_string());
452
453 if self.then_case.is_some() || self.else_case.is_some() {
454 let (mut next_step, output) = if result {
455 debug!("[step {}] condição verdadeira", self.id);
456 let next_step = if let Some(ref then_case) = self.then_case {
457 debug!("[step {}] then_case -> pipeline {}", self.id, then_case);
458 NextStep::Pipeline(*then_case)
459 } else {
460 debug!("[step {}] then_case não definido -> Next", self.id);
461 NextStep::Next
462 };
463
464 (next_step, self.evaluate_payload(context, None)?)
465 } else {
466 debug!("[step {}] condição falsa", self.id);
467 let next_step = if let Some(ref else_case) = self.else_case {
468 debug!("[step {}] else_case -> pipeline {}", self.id, else_case);
469 NextStep::Pipeline(*else_case)
470 } else {
471 debug!("[step {}] else_case não definido -> Next", self.id);
472 NextStep::Next
473 };
474
475 (next_step, None)
476 };
477
478 if matches!(next_step, NextStep::Next) {
479 if let Some(to) = &self.to {
480 debug!(
481 "[step {}] condição 'to' detectada após condicional -> pipeline {}, step {}",
482 self.id, to.pipeline, to.step
483 );
484 next_step = NextStep::GoToStep(to.clone());
485 }
486 }
487
488 if let Some(ref output) = output {
489 span.record("context.payload", truncate_string(output));
490 }
491
492 return Ok(StepOutput { next_step, output });
493 }
494
495 if !result {
496 debug!("[step {}] condição falsa - pulando step", self.id);
497 return Ok(StepOutput {
498 next_step: NextStep::Next,
499 output: None,
500 });
501 }
502 }
503
504 if let Some(output) = self.evaluate_return(context)? {
505 debug!(
506 "[step {}] return case acionado (condicional de parada)",
507 self.id
508 );
509 {
510 span.record("step.return", output.to_string());
511 }
512
513 return Ok(StepOutput {
514 next_step: NextStep::Stop,
515 output: Some(output),
516 });
517 }
518
519 if let Some((module, output, context)) = self.evaluate_module(context).await? {
520 debug!(
521 "[step {}] módulo '{}' executado; output inicial {:?}",
522 self.id,
523 module.as_deref().unwrap_or("<none>"),
524 output
525 );
526 {
527 span.record("step.module", module.clone());
528
529 if let Some(ref output) = output {
530 span.record("context.payload", truncate_string(output));
531 }
532 }
533
534 let context = if let Some(output) = output.clone() {
535 debug!(
536 "[step {}] definindo output no contexto após execução do módulo",
537 self.id
538 );
539 context.clone_with_output(output)
540 } else {
541 context.clone()
542 };
543
544 let output = self.evaluate_payload(&context, output)?;
545
546 if let Some(to) = &self.to {
547 debug!(
548 "[step {}] condição 'to' detectada após módulo -> pipeline {}, step {}",
549 self.id, to.pipeline, to.step
550 );
551 debug!(
552 "Define switching to step {} in pipeline {}",
553 to.step, to.pipeline
554 );
555 return Ok(StepOutput {
556 next_step: NextStep::GoToStep(to.clone()),
557 output,
558 });
559 }
560
561 debug!("[step {}] seguindo para próximo step após módulo", self.id);
562 return Ok(StepOutput {
563 next_step: NextStep::Next,
564 output,
565 });
566 }
567
568 let default_output = if self.log.is_some() {
569 context.get_payload()
570 } else {
571 None
572 };
573 let output = self.evaluate_payload(context, default_output)?;
574
575 {
576 if let Some(ref output) = output {
577 span.record("context.payload", truncate_string(output));
578 }
579 }
580
581 if let Some(to) = &self.to {
582 debug!(
583 "[step {}] condição 'to' detectada (sem módulo) -> pipeline {}, step {}",
584 self.id, to.pipeline, to.step
585 );
586 debug!(
587 "Define switching to step {} in pipeline {}",
588 to.step, to.pipeline
589 );
590 return Ok(StepOutput {
591 next_step: NextStep::GoToStep(to.clone()),
592 output,
593 });
594 }
595
596 debug!("[step {}] nenhuma condição especial -> Next", self.id);
597 return Ok(StepOutput {
598 next_step: NextStep::Next,
599 output,
600 });
601 }
602}
603
604fn build_log_step(
605 engine: Arc<Engine>,
606 value: &Value,
607 is_log_module: bool,
608) -> Result<Option<LogStep>, StepWorkerError> {
609 if let Some(log_step) = extract_log_from_key(engine.clone(), value)? {
610 return Ok(Some(log_step));
611 }
612
613 if is_log_module {
614 let input_value = value.get("input");
615 let log_step = build_log_from_input(engine, input_value)?;
616 return Ok(Some(log_step));
617 }
618
619 Ok(None)
620}
621
622fn extract_log_from_key(
623 engine: Arc<Engine>,
624 value: &Value,
625) -> Result<Option<LogStep>, StepWorkerError> {
626 let Some(obj) = value.as_object() else {
627 return Ok(None);
628 };
629
630 for (key, value) in obj.iter() {
631 let key_str = key.to_string();
632 let Some(level_key) = key_str.strip_prefix("log.") else {
633 continue;
634 };
635 let level = level_key.split('.').next().unwrap_or(level_key);
636 let level = LogLevel::from_str(level);
637 let message_value = if let Some(obj) = value.as_object() {
638 obj.get("message").cloned().unwrap_or_else(|| value.clone())
639 } else {
640 value.clone()
641 };
642 let message =
643 Script::try_build(engine.clone(), &message_value).map_err(StepWorkerError::LogError)?;
644
645 return Ok(Some(LogStep {
646 level,
647 message: Some(message),
648 }));
649 }
650
651 Ok(None)
652}
653
654fn build_log_from_input(
655 engine: Arc<Engine>,
656 input_value: Option<&Value>,
657) -> Result<LogStep, StepWorkerError> {
658 let mut level = LogLevel::Info;
659 let mut message_value: Option<Value> = None;
660
661 if let Some(input_value) = input_value {
662 if let Some(obj) = input_value.as_object() {
663 if let Some(level_value) = obj.get("action").or_else(|| obj.get("level")) {
664 level = LogLevel::from_str(level_value.as_string().as_str());
665 }
666
667 message_value = obj.get("message").cloned();
668 } else {
669 message_value = Some(input_value.clone());
670 }
671 }
672
673 let message = if let Some(message_value) = message_value {
674 Some(
675 Script::try_build(engine, &message_value).map_err(StepWorkerError::LogError)?,
676 )
677 } else {
678 None
679 };
680
681 Ok(LogStep { level, message })
682}
683
684fn truncate_string(string: &Value) -> String {
685 let limit = *PHLOW_TRUNCATE_SPAN_VALUE;
686 let string = string.to_string();
687 if string.len() <= limit {
688 return string;
689 }
690
691 let mut end = 0;
692 for (idx, ch) in string.char_indices() {
693 let next = idx + ch.len_utf8();
694 if next > limit {
695 break;
696 }
697 end = next;
698 }
699
700 format!("{}...", &string[..end])
701}
702
703#[cfg(test)]
704mod test {
705 use super::*;
706 use phlow_sdk::valu3;
707 use phs::build_engine;
708 use valu3::json;
709 use valu3::prelude::ToValueBehavior;
710 use valu3::value::Value;
711
712 #[tokio::test]
713 async fn test_step_get_reference_id() {
714 let step = StepWorker {
715 id: ID::from("id"),
716 label: Some("label".to_string()),
717 ..Default::default()
718 };
719
720 assert_eq!(step.get_id(), &ID::from("id"));
721 }
722
723 #[tokio::test]
724 async fn test_step_execute() {
725 let engine = build_engine(None);
726 let step = StepWorker {
727 payload: Some(Script::try_build(engine, &"10".to_value()).unwrap()),
728 ..Default::default()
729 };
730
731 let context = Context::new();
732
733 let result = step.execute(&context).await.unwrap();
734
735 assert_eq!(result.next_step, NextStep::Next);
736 assert_eq!(result.output, Some(Value::from(10i64)));
737 }
738
739 #[tokio::test]
740 async fn test_step_execute_with_return_case() {
741 let engine = build_engine(None);
742 let step = StepWorker {
743 id: ID::new(),
744 return_case: Some(Script::try_build(engine.clone(), &"10".to_value()).unwrap()),
745 ..Default::default()
746 };
747
748 let context = Context::new();
749
750 let result = step.execute(&context).await.unwrap();
751
752 assert_eq!(result.next_step, NextStep::Stop);
753 assert_eq!(result.output, Some(Value::from(10i64)));
754 }
755
756 #[tokio::test]
757 async fn test_step_execute_with_return_case_and_payload() {
758 let engine = build_engine(None);
759 let step = StepWorker {
760 id: ID::new(),
761 payload: Some(Script::try_build(engine.clone(), &"10".to_value()).unwrap()),
762 return_case: Some(Script::try_build(engine.clone(), &"20".to_value()).unwrap()),
763 ..Default::default()
764 };
765
766 let context = Context::new();
767
768 let result = step.execute(&context).await.unwrap();
769
770 assert_eq!(result.next_step, NextStep::Stop);
771 assert_eq!(result.output, Some(Value::from(20i64)));
772 }
773
774 #[tokio::test]
775 async fn test_step_execute_with_assert_and_return_case() {
776 let engine = build_engine(None);
777 let step = StepWorker {
778 condition: Some(
779 Condition::try_build_with_assert(engine.clone(), "{{ payload == 0 }}".to_string())
780 .unwrap(),
781 ),
782 return_case: Some(Script::try_build(engine.clone(), &"10".to_value()).unwrap()),
783 ..Default::default()
784 };
785
786 let context_true = Context::new().clone_with_output(Value::from(0i64));
787 let result_true = step.execute(&context_true).await.unwrap();
788
789 assert_eq!(result_true.next_step, NextStep::Stop);
790 assert_eq!(result_true.output, Some(Value::from(10i64)));
791
792 let context_false = Context::new().clone_with_output(Value::from(1i64));
793 let result_false = step.execute(&context_false).await.unwrap();
794
795 assert_eq!(result_false.next_step, NextStep::Next);
796 assert_eq!(result_false.output, None);
797 }
798
799 #[test]
800 fn test_step_from_value_log_key() {
801 let engine = build_engine(None);
802 let value = json!({ "log.info": "Hello" });
803 let step =
804 StepWorker::try_from_value(engine, Arc::new(Modules::default()), &value).unwrap();
805
806 assert!(step.module.is_none());
807 assert!(matches!(step.log.map(|log| log.level), Some(LogLevel::Info)));
808 }
809
810 #[test]
811 fn test_step_from_value_log_module_internal() {
812 let engine = build_engine(None);
813 let value = json!({
814 "use": "log",
815 "input": {
816 "level": "warn",
817 "message": "User not found"
818 }
819 });
820 let step =
821 StepWorker::try_from_value(engine, Arc::new(Modules::default()), &value).unwrap();
822
823 assert!(step.module.is_none());
824 assert!(matches!(step.log.map(|log| log.level), Some(LogLevel::Warn)));
825 }
826}