1use crate::{
2 condition::{Condition, ConditionError},
3 context::Context,
4 id::ID,
5 script::Script,
6};
7use once_cell::sync::Lazy;
8use phlow_sdk::{
9 prelude::{log::debug, *},
10 tracing::field,
11};
12use rhai::Engine;
13use serde::Serialize;
14use std::{fmt::Display, sync::Arc};
15
16static PHLOW_TRUNCATE_SPAN_VALUE: Lazy<usize> =
17 Lazy::new(|| match std::env::var("PHLOW_TRUNCATE_SPAN_VALUE") {
18 Ok(value) => value.parse::<usize>().unwrap_or(100),
19 Err(_) => 100,
20 });
21
22#[derive(Debug)]
23pub enum StepWorkerError {
24 ConditionError(ConditionError),
25 PayloadError(phs::ScriptError),
26 ModulesError(ModulesError),
27 InputError(phs::ScriptError),
28 LogError(phs::ScriptError),
29}
30
31impl Display for StepWorkerError {
32 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
33 match self {
34 StepWorkerError::ConditionError(err) => write!(f, "Condition error: {}", err),
35 StepWorkerError::PayloadError(err) => write!(f, "Payload error: {}", err),
36 StepWorkerError::ModulesError(err) => write!(f, "Modules error: {}", err),
37 StepWorkerError::InputError(err) => write!(f, "Input error: {}", err),
38 StepWorkerError::LogError(err) => write!(f, "Log error: {}", err),
39 }
40 }
41}
42
43impl std::error::Error for StepWorkerError {
44 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
45 match self {
46 StepWorkerError::ConditionError(err) => Some(err),
47 StepWorkerError::PayloadError(_) => None, StepWorkerError::ModulesError(_) => None, StepWorkerError::InputError(_) => None, StepWorkerError::LogError(_) => None, }
52 }
53}
54
55#[derive(Debug, Clone, Copy)]
56enum LogLevel {
57 Info,
58 Debug,
59 Warn,
60 Error,
61 Trace,
62}
63
64impl LogLevel {
65 fn from_str(level: &str) -> Self {
66 match level.to_ascii_lowercase().as_str() {
67 "debug" => LogLevel::Debug,
68 "warn" | "warning" => LogLevel::Warn,
69 "error" => LogLevel::Error,
70 "trace" => LogLevel::Trace,
71 _ => LogLevel::Info,
72 }
73 }
74
75 fn log(self, message: &str) {
76 match self {
77 LogLevel::Info => log::info!("{}", message),
78 LogLevel::Debug => log::debug!("{}", message),
79 LogLevel::Warn => log::warn!("{}", message),
80 LogLevel::Error => log::error!("{}", message),
81 LogLevel::Trace => log::trace!("{}", message),
82 }
83 }
84}
85
86#[derive(Debug, Clone)]
87pub(crate) struct LogStep {
88 level: LogLevel,
89 message: Option<Script>,
90}
91
92#[derive(Debug, Clone, PartialEq, Serialize)]
93pub enum NextStep {
94 Pipeline(usize),
95 GoToStep(StepReference),
96 Stop,
97 Next,
98}
99
100#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, ToValue)]
101pub struct StepReference {
102 pub pipeline: usize,
103 pub step: usize,
104}
105
106#[derive(Debug)]
107pub struct StepOutput {
108 pub next_step: NextStep,
109 pub output: Option<Value>,
110}
111
112#[derive(Debug, Clone, Default)]
113pub struct StepWorker {
114 pub(crate) id: ID,
115 pub(crate) label: Option<String>,
116 pub(crate) module: Option<String>,
117 pub(crate) condition: Option<Condition>,
118 pub(crate) input: Option<Script>,
119 pub(crate) payload: Option<Script>,
120 pub(crate) then_case: Option<usize>,
121 pub(crate) else_case: Option<usize>,
122 pub(crate) modules: Arc<Modules>,
123 pub(crate) return_case: Option<Script>,
124 pub(crate) to: Option<StepReference>,
125 pub(crate) log: Option<LogStep>,
126 #[cfg(debug_assertions)]
127 pub(crate) step_raw: String,
128}
129
130impl StepWorker {
131 pub fn try_from_value(
132 engine: Arc<Engine>,
133 modules: Arc<Modules>,
134 value: &Value,
135 ) -> Result<Self, StepWorkerError> {
136 log::debug!(
137 "Parsing step worker from value: {}",
138 value.to_json(JsonMode::Indented)
139 );
140
141 let id = match value.get("id") {
142 Some(id) => ID::from(id),
143 None => ID::new(),
144 };
145 let label: Option<String> = match value.get("label") {
146 Some(label) => Some(label.as_string()),
147 None => None,
148 };
149 let condition = {
150 if let Some(condition) = value
151 .get("condition")
152 .map(|condition| Condition::try_from_value(engine.clone(), condition))
153 {
154 Some(condition.map_err(StepWorkerError::ConditionError)?)
155 } else {
156 if let Some(condition) = value.get("assert").map(|assert| {
157 Condition::try_build_with_assert(engine.clone(), assert.to_string())
158 }) {
159 Some(condition.map_err(StepWorkerError::ConditionError)?)
160 } else {
161 None
162 }
163 }
164 };
165 let payload = match value.get("payload") {
166 Some(payload) => match Script::try_build(engine.clone(), payload) {
167 Ok(payload) => Some(payload),
168 Err(err) => return Err(StepWorkerError::PayloadError(err)),
169 },
170 None => None,
171 };
172 let input = match value.get("input") {
173 Some(input) => match Script::try_build(engine.clone(), input) {
174 Ok(input) => Some(input),
175 Err(err) => return Err(StepWorkerError::InputError(err)),
176 },
177 None => None,
178 };
179 let then_case = match value.get("then") {
180 Some(then_case) => match then_case.to_u64() {
181 Some(then_case) => Some(then_case as usize),
182 None => None,
183 },
184 None => None,
185 };
186 let else_case = match value.get("else") {
187 Some(else_case) => match else_case.to_u64() {
188 Some(else_case) => Some(else_case as usize),
189 None => None,
190 },
191 None => None,
192 };
193 let return_case = match value.get("return") {
194 Some(return_case) => match Script::try_build(engine.clone(), return_case) {
195 Ok(return_case) => Some(return_case),
196 Err(err) => return Err(StepWorkerError::PayloadError(err)),
197 },
198 None => None,
199 };
200 let module = match value.get("use") {
201 Some(module) => Some(module.to_string()),
202 None => None,
203 };
204 let log = build_log_step(engine.clone(), value, module.as_deref())?;
205
206 let to = match value.get("to") {
207 Some(to_step) => match to_step.as_object() {
208 Some(to_step) => {
209 let pipeline = to_step.get("pipeline").and_then(|v| v.to_u64());
210 let step = to_step.get("step").and_then(|v| v.to_u64());
211
212 if pipeline.is_some() && step.is_some() {
213 Some(StepReference {
214 pipeline: pipeline.unwrap() as usize,
215 step: step.unwrap() as usize,
216 })
217 } else {
218 None
219 }
220 }
221 None => None,
222 },
223 None => None,
224 };
225
226 #[cfg(debug_assertions)]
227 let step_raw = value.to_string();
228
229 Ok(Self {
230 id,
231 label,
232 module,
233 input,
234 condition,
235 payload,
236 then_case,
237 else_case,
238 modules,
239 return_case,
240 to,
241 log,
242 #[cfg(debug_assertions)]
243 step_raw,
244 })
245 }
246
247 pub fn get_id(&self) -> &ID {
248 &self.id
249 }
250
251 fn evaluate_payload(
252 &self,
253 context: &Context,
254 default: Option<Value>,
255 ) -> Result<Option<Value>, StepWorkerError> {
256 if let Some(ref payload) = self.payload {
257 let value = Some(
258 payload
259 .evaluate(context)
260 .map_err(StepWorkerError::PayloadError)?,
261 );
262 Ok(value)
263 } else {
264 Ok(default)
265 }
266 }
267
268 fn evaluate_input(&self, context: &Context) -> Result<Option<Value>, StepWorkerError> {
269 if let Some(ref input) = self.input {
270 let value = Some(
271 input
272 .evaluate(context)
273 .map_err(StepWorkerError::InputError)?,
274 );
275 Ok(value)
276 } else {
277 Ok(None)
278 }
279 }
280
281 fn evaluate_return(&self, context: &Context) -> Result<Option<Value>, StepWorkerError> {
282 if let Some(ref return_case) = self.return_case {
283 let value = Some(
284 return_case
285 .evaluate(context)
286 .map_err(StepWorkerError::PayloadError)?,
287 );
288
289 #[cfg(debug_assertions)]
290 log::debug!(
291 "Evaluating return case for step {}: {}",
292 self.id,
293 value.as_ref().map_or("None".to_string(), |v| v.to_string())
294 );
295
296 Ok(value)
297 } else {
298 Ok(None)
299 }
300 }
301
302 async fn evaluate_module(
303 &self,
304 context: &Context,
305 ) -> Result<Option<(Option<String>, Option<Value>, Context)>, StepWorkerError> {
306 if self.module.as_deref() == Some("log") {
307 return Ok(None);
308 }
309
310 if let Some(ref module) = self.module {
311 let input = self.evaluate_input(context)?;
312
313 let context = if let Some(input) = &input {
314 context.clone_with_input(input.clone())
315 } else {
316 context.clone()
317 };
318
319 match self
320 .modules
321 .execute(module, &context.get_input(), &context.get_payload())
322 .await
323 {
324 Ok(response) => {
325 #[cfg(debug_assertions)]
326 log::debug!("Module response for step {}: {:?}", self.id, response);
327
328 if let Some(err) = response.error {
329 return Err(StepWorkerError::ModulesError(ModulesError::ModuleError(
330 err,
331 )));
332 }
333
334 Ok(Some((Some(module.clone()), Some(response.data), context)))
335 }
336 Err(err) => Err(StepWorkerError::ModulesError(err)),
337 }
338 } else {
339 Ok(None)
340 }
341 }
342
343 pub async fn execute(&self, context: &Context) -> Result<StepOutput, StepWorkerError> {
344 #[cfg(debug_assertions)]
345 log::debug!(
346 "Entering step: {}, with: \n\tmain={:?}\n\tpayload={:?}\n\tsetup={:?}",
347 self.step_raw,
348 &context.get_main().to_value().to_string(),
349 &context.get_payload().to_value().to_string(),
350 &context.get_setup().to_value().to_string()
351 );
352
353 let span = tracing::info_span!(
354 "step",
355 otel.name = field::Empty,
356 context.main = field::Empty,
357 context.params = field::Empty,
358 context.payload = field::Empty,
359 context.input = field::Empty,
360 step.id = field::Empty,
361 step.label = field::Empty,
362 step.module = field::Empty,
363 step.condition = field::Empty,
364 step.payload = field::Empty,
365 step.return = field::Empty,
366 );
367 let _guard = span.enter();
368
369 {
370 let step_name = self.label.clone().unwrap_or(self.id.to_string());
371
372 span.record("otel.name", format!("step {}", step_name));
373
374 if let Some(ref input) = context.get_input() {
375 span.record("context.input", input.to_string());
376 }
377
378 if let Some(ref payload) = context.get_payload() {
379 span.record("context.payload", truncate_string(&payload));
380 }
381
382 if let Some(ref main) = context.get_main() {
383 span.record("context.main", truncate_string(&main));
384 }
385
386 span.record("step.id", self.id.to_string());
387
388 if let Some(ref label) = self.label {
389 span.record("step.label", label.to_string());
390 }
391 }
392
393 if let Some(log_step) = &self.log {
394 let message = match &log_step.message {
395 Some(script) => script
396 .evaluate(context)
397 .map_err(StepWorkerError::LogError)?
398 .to_string(),
399 None => String::new(),
400 };
401 log_step.level.log(&message);
402 }
403
404 if let Some(output) = self.evaluate_return(context)? {
405 debug!(
406 "[step {}] return case acionado (condicional de parada)",
407 self.id
408 );
409 {
410 span.record("step.return", output.to_string());
411 }
412
413 return Ok(StepOutput {
414 next_step: NextStep::Stop,
415 output: Some(output),
416 });
417 }
418
419 if let Some((module, output, context)) = self.evaluate_module(context).await? {
420 debug!(
421 "[step {}] módulo '{}' executado; output inicial {:?}",
422 self.id,
423 module.as_deref().unwrap_or("<none>"),
424 output
425 );
426 {
427 span.record("step.module", module.clone());
428
429 if let Some(ref output) = output {
430 span.record("context.payload", truncate_string(output));
431 }
432 }
433
434 let context = if let Some(output) = output.clone() {
435 debug!(
436 "[step {}] definindo output no contexto após execução do módulo",
437 self.id
438 );
439 context.clone_with_output(output)
440 } else {
441 context.clone()
442 };
443
444 let output = self.evaluate_payload(&context, output)?;
445
446 if let Some(to) = &self.to {
447 debug!(
448 "[step {}] condição 'to' detectada após módulo -> pipeline {}, step {}",
449 self.id, to.pipeline, to.step
450 );
451 debug!(
452 "Define switching to step {} in pipeline {}",
453 to.step, to.pipeline
454 );
455 return Ok(StepOutput {
456 next_step: NextStep::GoToStep(to.clone()),
457 output,
458 });
459 }
460
461 debug!("[step {}] seguindo para próximo step após módulo", self.id);
462 return Ok(StepOutput {
463 next_step: NextStep::Next,
464 output,
465 });
466 }
467
468 if let Some(condition) = &self.condition {
469 debug!("[step {}] avaliando condição", self.id);
470 let (next_step, output) = if condition
471 .evaluate(context)
472 .map_err(StepWorkerError::ConditionError)?
473 {
474 debug!("[step {}] condição verdadeira", self.id);
475 let next_step = if let Some(ref then_case) = self.then_case {
476 debug!("[step {}] then_case -> pipeline {}", self.id, then_case);
477 NextStep::Pipeline(*then_case)
478 } else {
479 debug!("[step {}] then_case não definido -> Next", self.id);
480 NextStep::Next
481 };
482
483 (next_step, self.evaluate_payload(context, None)?)
484 } else {
485 debug!("[step {}] condição falsa", self.id);
486 let next_step = if let Some(ref else_case) = self.else_case {
487 debug!("[step {}] else_case -> pipeline {}", self.id, else_case);
488 NextStep::Pipeline(*else_case)
489 } else {
490 debug!("[step {}] else_case não definido -> Next", self.id);
491 NextStep::Next
492 };
493
494 (next_step, None)
495 };
496
497 {
498 span.record("step.condition", condition.raw.to_string());
499
500 if let Some(ref output) = output {
501 span.record("context.payload", truncate_string(output));
502 }
503 }
504
505 return Ok(StepOutput { next_step, output });
506 }
507
508 let default_output = if self.log.is_some() {
509 context.get_payload()
510 } else {
511 None
512 };
513 let output = self.evaluate_payload(context, default_output)?;
514
515 {
516 if let Some(ref output) = output {
517 span.record("context.payload", truncate_string(output));
518 }
519 }
520
521 if let Some(to) = &self.to {
522 debug!(
523 "[step {}] condição 'to' detectada (sem módulo) -> pipeline {}, step {}",
524 self.id, to.pipeline, to.step
525 );
526 debug!(
527 "Define switching to step {} in pipeline {}",
528 to.step, to.pipeline
529 );
530 return Ok(StepOutput {
531 next_step: NextStep::GoToStep(to.clone()),
532 output,
533 });
534 }
535
536 debug!("[step {}] nenhuma condição especial -> Next", self.id);
537 return Ok(StepOutput {
538 next_step: NextStep::Next,
539 output,
540 });
541 }
542}
543
544fn build_log_step(
545 engine: Arc<Engine>,
546 value: &Value,
547 module: Option<&str>,
548) -> Result<Option<LogStep>, StepWorkerError> {
549 if let Some(log_step) = extract_log_from_key(engine.clone(), value)? {
550 return Ok(Some(log_step));
551 }
552
553 if module == Some("log") {
554 let input_value = value.get("input");
555 let log_step = build_log_from_input(engine, input_value)?;
556 return Ok(Some(log_step));
557 }
558
559 Ok(None)
560}
561
562fn extract_log_from_key(
563 engine: Arc<Engine>,
564 value: &Value,
565) -> Result<Option<LogStep>, StepWorkerError> {
566 let Some(obj) = value.as_object() else {
567 return Ok(None);
568 };
569
570 for (key, value) in obj.iter() {
571 let key_str = key.to_string();
572 let Some(level_key) = key_str.strip_prefix("log.") else {
573 continue;
574 };
575 let level = level_key.split('.').next().unwrap_or(level_key);
576 let level = LogLevel::from_str(level);
577 let message_value = if let Some(obj) = value.as_object() {
578 obj.get("message").cloned().unwrap_or_else(|| value.clone())
579 } else {
580 value.clone()
581 };
582 let message =
583 Script::try_build(engine.clone(), &message_value).map_err(StepWorkerError::LogError)?;
584
585 return Ok(Some(LogStep {
586 level,
587 message: Some(message),
588 }));
589 }
590
591 Ok(None)
592}
593
594fn build_log_from_input(
595 engine: Arc<Engine>,
596 input_value: Option<&Value>,
597) -> Result<LogStep, StepWorkerError> {
598 let mut level = LogLevel::Info;
599 let mut message_value: Option<Value> = None;
600
601 if let Some(input_value) = input_value {
602 if let Some(obj) = input_value.as_object() {
603 if let Some(level_value) = obj.get("action").or_else(|| obj.get("level")) {
604 level = LogLevel::from_str(level_value.as_string().as_str());
605 }
606
607 message_value = obj.get("message").cloned();
608 } else {
609 message_value = Some(input_value.clone());
610 }
611 }
612
613 let message = if let Some(message_value) = message_value {
614 Some(
615 Script::try_build(engine, &message_value).map_err(StepWorkerError::LogError)?,
616 )
617 } else {
618 None
619 };
620
621 Ok(LogStep { level, message })
622}
623
624fn truncate_string(string: &Value) -> String {
625 let limit = *PHLOW_TRUNCATE_SPAN_VALUE;
626 let string = string.to_string();
627 if string.len() > limit {
628 format!("{}...", &string[..limit])
629 } else {
630 string.to_string()
631 }
632}
633
634#[cfg(test)]
635mod test {
636 use super::*;
637 use phlow_sdk::valu3;
638 use phs::build_engine;
639 use valu3::prelude::ToValueBehavior;
640 use valu3::value::Value;
641
642 #[tokio::test]
643 async fn test_step_get_reference_id() {
644 let step = StepWorker {
645 id: ID::from("id"),
646 label: Some("label".to_string()),
647 ..Default::default()
648 };
649
650 assert_eq!(step.get_id(), &ID::from("id"));
651 }
652
653 #[tokio::test]
654 async fn test_step_execute() {
655 let engine = build_engine(None);
656 let step = StepWorker {
657 payload: Some(Script::try_build(engine, &"10".to_value()).unwrap()),
658 ..Default::default()
659 };
660
661 let context = Context::new();
662
663 let result = step.execute(&context).await.unwrap();
664
665 assert_eq!(result.next_step, NextStep::Next);
666 assert_eq!(result.output, Some(Value::from(10i64)));
667 }
668
669 #[tokio::test]
670 async fn test_step_execute_with_return_case() {
671 let engine = build_engine(None);
672 let step = StepWorker {
673 id: ID::new(),
674 return_case: Some(Script::try_build(engine.clone(), &"10".to_value()).unwrap()),
675 ..Default::default()
676 };
677
678 let context = Context::new();
679
680 let result = step.execute(&context).await.unwrap();
681
682 assert_eq!(result.next_step, NextStep::Stop);
683 assert_eq!(result.output, Some(Value::from(10i64)));
684 }
685
686 #[tokio::test]
687 async fn test_step_execute_with_return_case_and_payload() {
688 let engine = build_engine(None);
689 let step = StepWorker {
690 id: ID::new(),
691 payload: Some(Script::try_build(engine.clone(), &"10".to_value()).unwrap()),
692 return_case: Some(Script::try_build(engine.clone(), &"20".to_value()).unwrap()),
693 ..Default::default()
694 };
695
696 let context = Context::new();
697
698 let result = step.execute(&context).await.unwrap();
699
700 assert_eq!(result.next_step, NextStep::Stop);
701 assert_eq!(result.output, Some(Value::from(20i64)));
702 }
703}