1use crate::{
2 condition::{Condition, ConditionError},
3 context::Context,
4 debug::debug_controller,
5 id::ID,
6 script::Script,
7};
8use once_cell::sync::Lazy;
9use phlow_sdk::{
10 prelude::{log::debug, *},
11 tracing::field,
12};
13use rhai::Engine;
14use serde::Serialize;
15use std::{fmt::Display, sync::Arc};
16use uuid::Uuid;
17
18static PHLOW_TRUNCATE_SPAN_VALUE: Lazy<usize> =
19 Lazy::new(|| match std::env::var("PHLOW_TRUNCATE_SPAN_VALUE") {
20 Ok(value) => value.parse::<usize>().unwrap_or(100),
21 Err(_) => 100,
22 });
23
24#[derive(Debug)]
25pub enum StepWorkerError {
26 ConditionError(ConditionError),
27 PayloadError(phs::ScriptError),
28 ModulesError(ModulesError),
29 InputError(phs::ScriptError),
30 LogError(phs::ScriptError),
31}
32
33impl Display for StepWorkerError {
34 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
35 match self {
36 StepWorkerError::ConditionError(err) => write!(f, "Condition error: {}", err),
37 StepWorkerError::PayloadError(err) => write!(f, "Payload error: {}", err),
38 StepWorkerError::ModulesError(err) => write!(f, "Modules error: {}", err),
39 StepWorkerError::InputError(err) => write!(f, "Input error: {}", err),
40 StepWorkerError::LogError(err) => write!(f, "Log error: {}", err),
41 }
42 }
43}
44
45impl std::error::Error for StepWorkerError {
46 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
47 match self {
48 StepWorkerError::ConditionError(err) => Some(err),
49 StepWorkerError::PayloadError(_) => None, StepWorkerError::ModulesError(_) => None, StepWorkerError::InputError(_) => None, StepWorkerError::LogError(_) => None, }
54 }
55}
56
57#[derive(Debug, Clone, Copy)]
58enum LogLevel {
59 Info,
60 Debug,
61 Warn,
62 Error,
63 Trace,
64}
65
66impl LogLevel {
67 fn from_str(level: &str) -> Self {
68 match level.to_ascii_lowercase().as_str() {
69 "debug" => LogLevel::Debug,
70 "warn" | "warning" => LogLevel::Warn,
71 "error" => LogLevel::Error,
72 "trace" => LogLevel::Trace,
73 _ => LogLevel::Info,
74 }
75 }
76
77 fn log(self, message: &str) {
78 match self {
79 LogLevel::Info => log::info!("{}", message),
80 LogLevel::Debug => log::debug!("{}", message),
81 LogLevel::Warn => log::warn!("{}", message),
82 LogLevel::Error => log::error!("{}", message),
83 LogLevel::Trace => log::trace!("{}", message),
84 }
85 }
86}
87
88#[derive(Debug, Clone)]
89pub(crate) struct LogStep {
90 level: LogLevel,
91 message: Option<Script>,
92}
93
94#[derive(Debug, Clone, PartialEq, Serialize)]
95pub enum NextStep {
96 Pipeline(usize),
97 GoToStep(StepReference),
98 Stop,
99 Next,
100}
101
102#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, ToValue)]
103pub struct StepReference {
104 pub pipeline: usize,
105 pub step: usize,
106}
107
108#[derive(Debug)]
109pub struct StepOutput {
110 pub next_step: NextStep,
111 pub output: Option<Value>,
112}
113
114#[derive(Debug, Clone, Default)]
115pub struct StepWorker {
116 pub(crate) id: ID,
117 pub(crate) label: Option<String>,
118 pub(crate) module: Option<String>,
119 pub(crate) condition: Option<Condition>,
120 pub(crate) input: Option<Script>,
121 pub(crate) payload: Option<Script>,
122 pub(crate) then_case: Option<usize>,
123 pub(crate) else_case: Option<usize>,
124 pub(crate) modules: Arc<Modules>,
125 pub(crate) return_case: Option<Script>,
126 pub(crate) to: Option<StepReference>,
127 pub(crate) log: Option<LogStep>,
128 pub(crate) step_value: Option<Value>,
129 #[cfg(debug_assertions)]
130 pub(crate) step_raw: String,
131}
132
133impl StepWorker {
134 pub fn try_from_value(
135 engine: Arc<Engine>,
136 modules: Arc<Modules>,
137 value: &Value,
138 ) -> Result<Self, StepWorkerError> {
139 log::debug!(
140 "Parsing step worker from value: {}",
141 value.to_json(JsonMode::Indented)
142 );
143
144 let id = match value.get("id") {
145 Some(id) => ID::from(id),
146 None => ID::new(),
147 };
148 let label: Option<String> = match value.get("label") {
149 Some(label) => Some(label.as_string()),
150 None => None,
151 };
152 let condition = {
153 if let Some(condition) = value
154 .get("condition")
155 .map(|condition| Condition::try_from_value(engine.clone(), condition))
156 {
157 Some(condition.map_err(StepWorkerError::ConditionError)?)
158 } else {
159 if let Some(condition) = value.get("assert").map(|assert| {
160 Condition::try_build_with_assert(engine.clone(), assert.to_string())
161 }) {
162 Some(condition.map_err(StepWorkerError::ConditionError)?)
163 } else {
164 None
165 }
166 }
167 };
168 let payload = match value.get("payload") {
169 Some(payload) => match Script::try_build(engine.clone(), payload) {
170 Ok(payload) => Some(payload),
171 Err(err) => return Err(StepWorkerError::PayloadError(err)),
172 },
173 None => None,
174 };
175 let input = match value.get("input") {
176 Some(input) => match Script::try_build(engine.clone(), input) {
177 Ok(input) => Some(input),
178 Err(err) => return Err(StepWorkerError::InputError(err)),
179 },
180 None => None,
181 };
182 let then_case = match value.get("then") {
183 Some(then_case) => match then_case.to_u64() {
184 Some(then_case) => Some(then_case as usize),
185 None => None,
186 },
187 None => None,
188 };
189 let else_case = match value.get("else") {
190 Some(else_case) => match else_case.to_u64() {
191 Some(else_case) => Some(else_case as usize),
192 None => None,
193 },
194 None => None,
195 };
196 let return_case = match value.get("return") {
197 Some(return_case) => match Script::try_build(engine.clone(), return_case) {
198 Ok(return_case) => Some(return_case),
199 Err(err) => return Err(StepWorkerError::PayloadError(err)),
200 },
201 None => None,
202 };
203 let module = match value.get("use") {
204 Some(module) => Some(module.to_string()),
205 None => None,
206 };
207 let log = build_log_step(engine.clone(), value, module.as_deref())?;
208
209 let to = match value.get("to") {
210 Some(to_step) => match to_step.as_object() {
211 Some(to_step) => {
212 let pipeline = to_step.get("pipeline").and_then(|v| v.to_u64());
213 let step = to_step.get("step").and_then(|v| v.to_u64());
214
215 if pipeline.is_some() && step.is_some() {
216 Some(StepReference {
217 pipeline: pipeline.unwrap() as usize,
218 step: step.unwrap() as usize,
219 })
220 } else {
221 None
222 }
223 }
224 None => None,
225 },
226 None => None,
227 };
228
229 let mut step_value = value.clone();
230 if Self::should_add_uuid() {
231 if let Some(obj) = step_value.as_object_mut() {
232 if !obj.contains_key(&"#uuid".to_string()) {
233 obj.insert("#uuid".to_string(), Uuid::new_v4().to_string().to_value());
234 }
235 }
236 }
237 #[cfg(debug_assertions)]
238 let step_raw = value.to_string();
239
240 Ok(Self {
241 id,
242 label,
243 module,
244 input,
245 condition,
246 payload,
247 then_case,
248 else_case,
249 modules,
250 return_case,
251 to,
252 log,
253 step_value: Some(step_value),
254 #[cfg(debug_assertions)]
255 step_raw,
256 })
257 }
258
259 pub fn get_id(&self) -> &ID {
260 &self.id
261 }
262
263 pub(crate) fn compiled_debug(&self) -> Value {
264 let mut map = std::collections::HashMap::new();
265 if let Some(payload) = &self.payload {
266 map.insert("payload".to_string(), payload.compiled_debug());
267 }
268 if let Some(input) = &self.input {
269 map.insert("input".to_string(), input.compiled_debug());
270 }
271 if let Some(return_case) = &self.return_case {
272 map.insert("return".to_string(), return_case.compiled_debug());
273 }
274 if let Some(condition) = &self.condition {
275 map.insert("condition".to_string(), condition.expression.compiled_debug());
276 }
277 if let Some(log_step) = &self.log {
278 if let Some(message) = &log_step.message {
279 map.insert("log".to_string(), message.compiled_debug());
280 }
281 }
282 map.to_value()
283 }
284
285 fn should_add_uuid() -> bool {
286 if debug_controller().is_some() {
287 return true;
288 }
289 std::env::var("PHLOW_DEBUG")
290 .map(|value| value.eq_ignore_ascii_case("true"))
291 .unwrap_or(false)
292 }
293
294 fn evaluate_payload(
295 &self,
296 context: &Context,
297 default: Option<Value>,
298 ) -> Result<Option<Value>, StepWorkerError> {
299 if let Some(ref payload) = self.payload {
300 let value = Some(
301 payload
302 .evaluate(context)
303 .map_err(StepWorkerError::PayloadError)?,
304 );
305 Ok(value)
306 } else {
307 Ok(default)
308 }
309 }
310
311 fn evaluate_input(&self, context: &Context) -> Result<Option<Value>, StepWorkerError> {
312 if let Some(ref input) = self.input {
313 let value = Some(
314 input
315 .evaluate(context)
316 .map_err(StepWorkerError::InputError)?,
317 );
318 Ok(value)
319 } else {
320 Ok(None)
321 }
322 }
323
324 fn evaluate_return(&self, context: &Context) -> Result<Option<Value>, StepWorkerError> {
325 if let Some(ref return_case) = self.return_case {
326 let value = Some(
327 return_case
328 .evaluate(context)
329 .map_err(StepWorkerError::PayloadError)?,
330 );
331
332 #[cfg(debug_assertions)]
333 log::debug!(
334 "Evaluating return case for step {}: {}",
335 self.id,
336 value.as_ref().map_or("None".to_string(), |v| v.to_string())
337 );
338
339 Ok(value)
340 } else {
341 Ok(None)
342 }
343 }
344
345 async fn evaluate_module(
346 &self,
347 context: &Context,
348 ) -> Result<Option<(Option<String>, Option<Value>, Context)>, StepWorkerError> {
349 if self.module.as_deref() == Some("log") {
350 return Ok(None);
351 }
352
353 if let Some(ref module) = self.module {
354 let input = self.evaluate_input(context)?;
355
356 let context = if let Some(input) = &input {
357 context.clone_with_input(input.clone())
358 } else {
359 context.clone()
360 };
361
362 match self
363 .modules
364 .execute(module, &context.get_input(), &context.get_payload())
365 .await
366 {
367 Ok(response) => {
368 #[cfg(debug_assertions)]
369 log::debug!("Module response for step {}: {:?}", self.id, response);
370
371 if let Some(err) = response.error {
372 return Err(StepWorkerError::ModulesError(ModulesError::ModuleError(
373 err,
374 )));
375 }
376
377 Ok(Some((Some(module.clone()), Some(response.data), context)))
378 }
379 Err(err) => Err(StepWorkerError::ModulesError(err)),
380 }
381 } else {
382 Ok(None)
383 }
384 }
385
386 pub async fn execute(&self, context: &Context) -> Result<StepOutput, StepWorkerError> {
387 #[cfg(debug_assertions)]
388 log::debug!(
389 "Entering step: {}, with: \n\tmain={:?}\n\tpayload={:?}\n\tsetup={:?}",
390 self.step_raw,
391 &context.get_main().to_value().to_string(),
392 &context.get_payload().to_value().to_string(),
393 &context.get_setup().to_value().to_string()
394 );
395
396 let span = tracing::info_span!(
397 "step",
398 otel.name = field::Empty,
399 context.main = field::Empty,
400 context.params = field::Empty,
401 context.payload = field::Empty,
402 context.input = field::Empty,
403 step.id = field::Empty,
404 step.label = field::Empty,
405 step.module = field::Empty,
406 step.condition = field::Empty,
407 step.payload = field::Empty,
408 step.return = field::Empty,
409 );
410 let _guard = span.enter();
411
412 {
413 let step_name = self.label.clone().unwrap_or(self.id.to_string());
414
415 span.record("otel.name", format!("step {}", step_name));
416
417 if let Some(ref input) = context.get_input() {
418 span.record("context.input", input.to_string());
419 }
420
421 if let Some(ref payload) = context.get_payload() {
422 span.record("context.payload", truncate_string(&payload));
423 }
424
425 if let Some(ref main) = context.get_main() {
426 span.record("context.main", truncate_string(&main));
427 }
428
429 span.record("step.id", self.id.to_string());
430
431 if let Some(ref label) = self.label {
432 span.record("step.label", label.to_string());
433 }
434 }
435
436 if let Some(log_step) = &self.log {
437 let message = match &log_step.message {
438 Some(script) => script
439 .evaluate(context)
440 .map_err(StepWorkerError::LogError)?
441 .to_string(),
442 None => String::new(),
443 };
444 log_step.level.log(&message);
445 }
446
447 if let Some(condition) = &self.condition {
448 debug!("[step {}] avaliando condição", self.id);
449 let result = condition
450 .evaluate(context)
451 .map_err(StepWorkerError::ConditionError)?;
452
453 span.record("step.condition", condition.raw.to_string());
454
455 if self.then_case.is_some() || self.else_case.is_some() {
456 let (next_step, output) = if result {
457 debug!("[step {}] condição verdadeira", self.id);
458 let next_step = if let Some(ref then_case) = self.then_case {
459 debug!("[step {}] then_case -> pipeline {}", self.id, then_case);
460 NextStep::Pipeline(*then_case)
461 } else {
462 debug!("[step {}] then_case não definido -> Next", self.id);
463 NextStep::Next
464 };
465
466 (next_step, self.evaluate_payload(context, None)?)
467 } else {
468 debug!("[step {}] condição falsa", self.id);
469 let next_step = if let Some(ref else_case) = self.else_case {
470 debug!("[step {}] else_case -> pipeline {}", self.id, else_case);
471 NextStep::Pipeline(*else_case)
472 } else {
473 debug!("[step {}] else_case não definido -> Next", self.id);
474 NextStep::Next
475 };
476
477 (next_step, None)
478 };
479
480 if let Some(ref output) = output {
481 span.record("context.payload", truncate_string(output));
482 }
483
484 return Ok(StepOutput { next_step, output });
485 }
486
487 if !result {
488 debug!("[step {}] condição falsa - pulando step", self.id);
489 return Ok(StepOutput {
490 next_step: NextStep::Next,
491 output: None,
492 });
493 }
494 }
495
496 if let Some(output) = self.evaluate_return(context)? {
497 debug!(
498 "[step {}] return case acionado (condicional de parada)",
499 self.id
500 );
501 {
502 span.record("step.return", output.to_string());
503 }
504
505 return Ok(StepOutput {
506 next_step: NextStep::Stop,
507 output: Some(output),
508 });
509 }
510
511 if let Some((module, output, context)) = self.evaluate_module(context).await? {
512 debug!(
513 "[step {}] módulo '{}' executado; output inicial {:?}",
514 self.id,
515 module.as_deref().unwrap_or("<none>"),
516 output
517 );
518 {
519 span.record("step.module", module.clone());
520
521 if let Some(ref output) = output {
522 span.record("context.payload", truncate_string(output));
523 }
524 }
525
526 let context = if let Some(output) = output.clone() {
527 debug!(
528 "[step {}] definindo output no contexto após execução do módulo",
529 self.id
530 );
531 context.clone_with_output(output)
532 } else {
533 context.clone()
534 };
535
536 let output = self.evaluate_payload(&context, output)?;
537
538 if let Some(to) = &self.to {
539 debug!(
540 "[step {}] condição 'to' detectada após módulo -> pipeline {}, step {}",
541 self.id, to.pipeline, to.step
542 );
543 debug!(
544 "Define switching to step {} in pipeline {}",
545 to.step, to.pipeline
546 );
547 return Ok(StepOutput {
548 next_step: NextStep::GoToStep(to.clone()),
549 output,
550 });
551 }
552
553 debug!("[step {}] seguindo para próximo step após módulo", self.id);
554 return Ok(StepOutput {
555 next_step: NextStep::Next,
556 output,
557 });
558 }
559
560 let default_output = if self.log.is_some() {
561 context.get_payload()
562 } else {
563 None
564 };
565 let output = self.evaluate_payload(context, default_output)?;
566
567 {
568 if let Some(ref output) = output {
569 span.record("context.payload", truncate_string(output));
570 }
571 }
572
573 if let Some(to) = &self.to {
574 debug!(
575 "[step {}] condição 'to' detectada (sem módulo) -> pipeline {}, step {}",
576 self.id, to.pipeline, to.step
577 );
578 debug!(
579 "Define switching to step {} in pipeline {}",
580 to.step, to.pipeline
581 );
582 return Ok(StepOutput {
583 next_step: NextStep::GoToStep(to.clone()),
584 output,
585 });
586 }
587
588 debug!("[step {}] nenhuma condição especial -> Next", self.id);
589 return Ok(StepOutput {
590 next_step: NextStep::Next,
591 output,
592 });
593 }
594}
595
596fn build_log_step(
597 engine: Arc<Engine>,
598 value: &Value,
599 module: Option<&str>,
600) -> Result<Option<LogStep>, StepWorkerError> {
601 if let Some(log_step) = extract_log_from_key(engine.clone(), value)? {
602 return Ok(Some(log_step));
603 }
604
605 if module == Some("log") {
606 let input_value = value.get("input");
607 let log_step = build_log_from_input(engine, input_value)?;
608 return Ok(Some(log_step));
609 }
610
611 Ok(None)
612}
613
614fn extract_log_from_key(
615 engine: Arc<Engine>,
616 value: &Value,
617) -> Result<Option<LogStep>, StepWorkerError> {
618 let Some(obj) = value.as_object() else {
619 return Ok(None);
620 };
621
622 for (key, value) in obj.iter() {
623 let key_str = key.to_string();
624 let Some(level_key) = key_str.strip_prefix("log.") else {
625 continue;
626 };
627 let level = level_key.split('.').next().unwrap_or(level_key);
628 let level = LogLevel::from_str(level);
629 let message_value = if let Some(obj) = value.as_object() {
630 obj.get("message").cloned().unwrap_or_else(|| value.clone())
631 } else {
632 value.clone()
633 };
634 let message =
635 Script::try_build(engine.clone(), &message_value).map_err(StepWorkerError::LogError)?;
636
637 return Ok(Some(LogStep {
638 level,
639 message: Some(message),
640 }));
641 }
642
643 Ok(None)
644}
645
646fn build_log_from_input(
647 engine: Arc<Engine>,
648 input_value: Option<&Value>,
649) -> Result<LogStep, StepWorkerError> {
650 let mut level = LogLevel::Info;
651 let mut message_value: Option<Value> = None;
652
653 if let Some(input_value) = input_value {
654 if let Some(obj) = input_value.as_object() {
655 if let Some(level_value) = obj.get("action").or_else(|| obj.get("level")) {
656 level = LogLevel::from_str(level_value.as_string().as_str());
657 }
658
659 message_value = obj.get("message").cloned();
660 } else {
661 message_value = Some(input_value.clone());
662 }
663 }
664
665 let message = if let Some(message_value) = message_value {
666 Some(
667 Script::try_build(engine, &message_value).map_err(StepWorkerError::LogError)?,
668 )
669 } else {
670 None
671 };
672
673 Ok(LogStep { level, message })
674}
675
676fn truncate_string(string: &Value) -> String {
677 let limit = *PHLOW_TRUNCATE_SPAN_VALUE;
678 let string = string.to_string();
679 if string.len() > limit {
680 format!("{}...", &string[..limit])
681 } else {
682 string.to_string()
683 }
684}
685
686#[cfg(test)]
687mod test {
688 use super::*;
689 use phlow_sdk::valu3;
690 use phs::build_engine;
691 use valu3::prelude::ToValueBehavior;
692 use valu3::value::Value;
693
694 #[tokio::test]
695 async fn test_step_get_reference_id() {
696 let step = StepWorker {
697 id: ID::from("id"),
698 label: Some("label".to_string()),
699 ..Default::default()
700 };
701
702 assert_eq!(step.get_id(), &ID::from("id"));
703 }
704
705 #[tokio::test]
706 async fn test_step_execute() {
707 let engine = build_engine(None);
708 let step = StepWorker {
709 payload: Some(Script::try_build(engine, &"10".to_value()).unwrap()),
710 ..Default::default()
711 };
712
713 let context = Context::new();
714
715 let result = step.execute(&context).await.unwrap();
716
717 assert_eq!(result.next_step, NextStep::Next);
718 assert_eq!(result.output, Some(Value::from(10i64)));
719 }
720
721 #[tokio::test]
722 async fn test_step_execute_with_return_case() {
723 let engine = build_engine(None);
724 let step = StepWorker {
725 id: ID::new(),
726 return_case: Some(Script::try_build(engine.clone(), &"10".to_value()).unwrap()),
727 ..Default::default()
728 };
729
730 let context = Context::new();
731
732 let result = step.execute(&context).await.unwrap();
733
734 assert_eq!(result.next_step, NextStep::Stop);
735 assert_eq!(result.output, Some(Value::from(10i64)));
736 }
737
738 #[tokio::test]
739 async fn test_step_execute_with_return_case_and_payload() {
740 let engine = build_engine(None);
741 let step = StepWorker {
742 id: ID::new(),
743 payload: Some(Script::try_build(engine.clone(), &"10".to_value()).unwrap()),
744 return_case: Some(Script::try_build(engine.clone(), &"20".to_value()).unwrap()),
745 ..Default::default()
746 };
747
748 let context = Context::new();
749
750 let result = step.execute(&context).await.unwrap();
751
752 assert_eq!(result.next_step, NextStep::Stop);
753 assert_eq!(result.output, Some(Value::from(20i64)));
754 }
755
756 #[tokio::test]
757 async fn test_step_execute_with_assert_and_return_case() {
758 let engine = build_engine(None);
759 let step = StepWorker {
760 condition: Some(
761 Condition::try_build_with_assert(engine.clone(), "{{ payload == 0 }}".to_string())
762 .unwrap(),
763 ),
764 return_case: Some(Script::try_build(engine.clone(), &"10".to_value()).unwrap()),
765 ..Default::default()
766 };
767
768 let context_true = Context::new().clone_with_output(Value::from(0i64));
769 let result_true = step.execute(&context_true).await.unwrap();
770
771 assert_eq!(result_true.next_step, NextStep::Stop);
772 assert_eq!(result_true.output, Some(Value::from(10i64)));
773
774 let context_false = Context::new().clone_with_output(Value::from(1i64));
775 let result_false = step.execute(&context_false).await.unwrap();
776
777 assert_eq!(result_false.next_step, NextStep::Next);
778 assert_eq!(result_false.output, None);
779 }
780}