1use crate::{
2 condition::{Condition, ConditionError},
3 context::Context,
4 id::ID,
5 script::Script,
6};
7use once_cell::sync::Lazy;
8use phlow_sdk::{
9 prelude::{log::debug, *},
10 tracing::field,
11};
12use rhai::Engine;
13use serde::Serialize;
14use std::{fmt::Display, sync::Arc};
15
16static PHLOW_TRUNCATE_SPAN_VALUE: Lazy<usize> =
17 Lazy::new(|| match std::env::var("PHLOW_TRUNCATE_SPAN_VALUE") {
18 Ok(value) => value.parse::<usize>().unwrap_or(100),
19 Err(_) => 100,
20 });
21
22#[derive(Debug)]
23pub enum StepWorkerError {
24 ConditionError(ConditionError),
25 PayloadError(phs::ScriptError),
26 ModulesError(ModulesError),
27 InputError(phs::ScriptError),
28}
29
30impl Display for StepWorkerError {
31 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
32 match self {
33 StepWorkerError::ConditionError(err) => write!(f, "Condition error: {}", err),
34 StepWorkerError::PayloadError(err) => write!(f, "Payload error: {}", err),
35 StepWorkerError::ModulesError(err) => write!(f, "Modules error: {}", err),
36 StepWorkerError::InputError(err) => write!(f, "Input error: {}", err),
37 }
38 }
39}
40
41impl std::error::Error for StepWorkerError {
42 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
43 match self {
44 StepWorkerError::ConditionError(err) => Some(err),
45 StepWorkerError::PayloadError(_) => None, StepWorkerError::ModulesError(_) => None, StepWorkerError::InputError(_) => None, }
49 }
50}
51
52#[derive(Debug, Clone, PartialEq, Serialize)]
53pub enum NextStep {
54 Pipeline(usize),
55 GoToStep(StepReference),
56 Stop,
57 Next,
58}
59
60#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, ToValue)]
61pub struct StepReference {
62 pub pipeline: usize,
63 pub step: usize,
64}
65
66#[derive(Debug)]
67pub struct StepOutput {
68 pub next_step: NextStep,
69 pub output: Option<Value>,
70}
71
72#[derive(Debug, Clone, Default)]
73pub struct StepWorker {
74 pub(crate) id: ID,
75 pub(crate) label: Option<String>,
76 pub(crate) module: Option<String>,
77 pub(crate) condition: Option<Condition>,
78 pub(crate) input: Option<Script>,
79 pub(crate) payload: Option<Script>,
80 pub(crate) then_case: Option<usize>,
81 pub(crate) else_case: Option<usize>,
82 pub(crate) modules: Arc<Modules>,
83 pub(crate) return_case: Option<Script>,
84 pub(crate) to: Option<StepReference>,
85 #[cfg(debug_assertions)]
86 pub(crate) step_raw: String,
87}
88
89impl StepWorker {
90 pub fn try_from_value(
91 engine: Arc<Engine>,
92 modules: Arc<Modules>,
93 value: &Value,
94 ) -> Result<Self, StepWorkerError> {
95 log::debug!(
96 "Parsing step worker from value: {}",
97 value.to_json(JsonMode::Indented)
98 );
99
100 let id = match value.get("id") {
101 Some(id) => ID::from(id),
102 None => ID::new(),
103 };
104 let label: Option<String> = match value.get("label") {
105 Some(label) => Some(label.as_string()),
106 None => None,
107 };
108 let condition = {
109 if let Some(condition) = value
110 .get("condition")
111 .map(|condition| Condition::try_from_value(engine.clone(), condition))
112 {
113 Some(condition.map_err(StepWorkerError::ConditionError)?)
114 } else {
115 if let Some(condition) = value.get("assert").map(|assert| {
116 Condition::try_build_with_assert(engine.clone(), assert.to_string())
117 }) {
118 Some(condition.map_err(StepWorkerError::ConditionError)?)
119 } else {
120 None
121 }
122 }
123 };
124 let payload = match value.get("payload") {
125 Some(payload) => match Script::try_build(engine.clone(), payload) {
126 Ok(payload) => Some(payload),
127 Err(err) => return Err(StepWorkerError::PayloadError(err)),
128 },
129 None => None,
130 };
131 let input = match value.get("input") {
132 Some(input) => match Script::try_build(engine.clone(), input) {
133 Ok(input) => Some(input),
134 Err(err) => return Err(StepWorkerError::InputError(err)),
135 },
136 None => None,
137 };
138 let then_case = match value.get("then") {
139 Some(then_case) => match then_case.to_u64() {
140 Some(then_case) => Some(then_case as usize),
141 None => None,
142 },
143 None => None,
144 };
145 let else_case = match value.get("else") {
146 Some(else_case) => match else_case.to_u64() {
147 Some(else_case) => Some(else_case as usize),
148 None => None,
149 },
150 None => None,
151 };
152 let return_case = match value.get("return") {
153 Some(return_case) => match Script::try_build(engine, return_case) {
154 Ok(return_case) => Some(return_case),
155 Err(err) => return Err(StepWorkerError::PayloadError(err)),
156 },
157 None => None,
158 };
159 let module = match value.get("use") {
160 Some(module) => Some(module.to_string()),
161 None => None,
162 };
163
164 let to = match value.get("to") {
165 Some(to_step) => match to_step.as_object() {
166 Some(to_step) => {
167 let pipeline = to_step.get("pipeline").and_then(|v| v.to_u64());
168 let step = to_step.get("step").and_then(|v| v.to_u64());
169
170 if pipeline.is_some() && step.is_some() {
171 Some(StepReference {
172 pipeline: pipeline.unwrap() as usize,
173 step: step.unwrap() as usize,
174 })
175 } else {
176 None
177 }
178 }
179 None => None,
180 },
181 None => None,
182 };
183
184 #[cfg(debug_assertions)]
185 let step_raw = value.to_string();
186
187 Ok(Self {
188 id,
189 label,
190 module,
191 input,
192 condition,
193 payload,
194 then_case,
195 else_case,
196 modules,
197 return_case,
198 to,
199 #[cfg(debug_assertions)]
200 step_raw,
201 })
202 }
203
204 pub fn get_id(&self) -> &ID {
205 &self.id
206 }
207
208 fn evaluate_payload(
209 &self,
210 context: &Context,
211 default: Option<Value>,
212 ) -> Result<Option<Value>, StepWorkerError> {
213 if let Some(ref payload) = self.payload {
214 let value = Some(
215 payload
216 .evaluate(context)
217 .map_err(StepWorkerError::PayloadError)?,
218 );
219 Ok(value)
220 } else {
221 Ok(default)
222 }
223 }
224
225 fn evaluate_input(&self, context: &Context) -> Result<Option<Value>, StepWorkerError> {
226 if let Some(ref input) = self.input {
227 let value = Some(
228 input
229 .evaluate(context)
230 .map_err(StepWorkerError::InputError)?,
231 );
232 Ok(value)
233 } else {
234 Ok(None)
235 }
236 }
237
238 fn evaluate_return(&self, context: &Context) -> Result<Option<Value>, StepWorkerError> {
239 if let Some(ref return_case) = self.return_case {
240 let value = Some(
241 return_case
242 .evaluate(context)
243 .map_err(StepWorkerError::PayloadError)?,
244 );
245
246 #[cfg(debug_assertions)]
247 log::debug!(
248 "Evaluating return case for step {}: {}",
249 self.id,
250 value.as_ref().map_or("None".to_string(), |v| v.to_string())
251 );
252
253 Ok(value)
254 } else {
255 Ok(None)
256 }
257 }
258
259 async fn evaluate_module(
260 &self,
261 context: &Context,
262 ) -> Result<Option<(Option<String>, Option<Value>, Context)>, StepWorkerError> {
263 if let Some(ref module) = self.module {
264 let input = self.evaluate_input(context)?;
265
266 let context = if let Some(input) = &input {
267 context.clone_with_input(input.clone())
268 } else {
269 context.clone()
270 };
271
272 match self
273 .modules
274 .execute(module, &context.get_input(), &context.get_payload())
275 .await
276 {
277 Ok(response) => {
278 #[cfg(debug_assertions)]
279 log::debug!("Module response for step {}: {:?}", self.id, response);
280
281 if let Some(err) = response.error {
282 return Err(StepWorkerError::ModulesError(ModulesError::ModuleError(
283 err,
284 )));
285 }
286
287 Ok(Some((Some(module.clone()), Some(response.data), context)))
288 }
289 Err(err) => Err(StepWorkerError::ModulesError(err)),
290 }
291 } else {
292 Ok(None)
293 }
294 }
295
296 pub async fn execute(&self, context: &Context) -> Result<StepOutput, StepWorkerError> {
297 #[cfg(debug_assertions)]
298 log::debug!(
299 "Entering step: {}, with: \n\tmain={:?}\n\tpayload={:?}\n\tsetup={:?}",
300 self.step_raw,
301 &context.get_main().to_value().to_string(),
302 &context.get_payload().to_value().to_string(),
303 &context.get_setup().to_value().to_string()
304 );
305
306 let span = tracing::info_span!(
307 "step",
308 otel.name = field::Empty,
309 context.main = field::Empty,
310 context.params = field::Empty,
311 context.payload = field::Empty,
312 context.input = field::Empty,
313 step.id = field::Empty,
314 step.label = field::Empty,
315 step.module = field::Empty,
316 step.condition = field::Empty,
317 step.payload = field::Empty,
318 step.return = field::Empty,
319 );
320 let _guard = span.enter();
321
322 {
323 let step_name = self.label.clone().unwrap_or(self.id.to_string());
324
325 span.record("otel.name", format!("step {}", step_name));
326
327 if let Some(ref input) = context.get_input() {
328 span.record("context.input", input.to_string());
329 }
330
331 if let Some(ref payload) = context.get_payload() {
332 span.record("context.payload", truncate_string(&payload));
333 }
334
335 if let Some(ref main) = context.get_main() {
336 span.record("context.main", truncate_string(&main));
337 }
338
339 span.record("step.id", self.id.to_string());
340
341 if let Some(ref label) = self.label {
342 span.record("step.label", label.to_string());
343 }
344 }
345
346 if let Some(output) = self.evaluate_return(context)? {
347 debug!(
348 "[step {}] return case acionado (condicional de parada)",
349 self.id
350 );
351 {
352 span.record("step.return", output.to_string());
353 }
354
355 return Ok(StepOutput {
356 next_step: NextStep::Stop,
357 output: Some(output),
358 });
359 }
360
361 if let Some((module, output, context)) = self.evaluate_module(context).await? {
362 debug!(
363 "[step {}] módulo '{}' executado; output inicial {:?}",
364 self.id,
365 module.as_deref().unwrap_or("<none>"),
366 output
367 );
368 {
369 span.record("step.module", module.clone());
370
371 if let Some(ref output) = output {
372 span.record("context.payload", truncate_string(output));
373 }
374 }
375
376 let context = if let Some(output) = output.clone() {
377 debug!(
378 "[step {}] definindo output no contexto após execução do módulo",
379 self.id
380 );
381 context.clone_with_output(output)
382 } else {
383 context.clone()
384 };
385
386 let output = self.evaluate_payload(&context, output)?;
387
388 if let Some(to) = &self.to {
389 debug!(
390 "[step {}] condição 'to' detectada após módulo -> pipeline {}, step {}",
391 self.id, to.pipeline, to.step
392 );
393 debug!(
394 "Define switching to step {} in pipeline {}",
395 to.step, to.pipeline
396 );
397 return Ok(StepOutput {
398 next_step: NextStep::GoToStep(to.clone()),
399 output,
400 });
401 }
402
403 debug!("[step {}] seguindo para próximo step após módulo", self.id);
404 return Ok(StepOutput {
405 next_step: NextStep::Next,
406 output,
407 });
408 }
409
410 if let Some(condition) = &self.condition {
411 debug!("[step {}] avaliando condição", self.id);
412 let (next_step, output) = if condition
413 .evaluate(context)
414 .map_err(StepWorkerError::ConditionError)?
415 {
416 debug!("[step {}] condição verdadeira", self.id);
417 let next_step = if let Some(ref then_case) = self.then_case {
418 debug!("[step {}] then_case -> pipeline {}", self.id, then_case);
419 NextStep::Pipeline(*then_case)
420 } else {
421 debug!("[step {}] then_case não definido -> Next", self.id);
422 NextStep::Next
423 };
424
425 (next_step, self.evaluate_payload(context, None)?)
426 } else {
427 debug!("[step {}] condição falsa", self.id);
428 let next_step = if let Some(ref else_case) = self.else_case {
429 debug!("[step {}] else_case -> pipeline {}", self.id, else_case);
430 NextStep::Pipeline(*else_case)
431 } else {
432 debug!("[step {}] else_case não definido -> Next", self.id);
433 NextStep::Next
434 };
435
436 (next_step, None)
437 };
438
439 {
440 span.record("step.condition", condition.raw.to_string());
441
442 if let Some(ref output) = output {
443 span.record("context.payload", truncate_string(output));
444 }
445 }
446
447 return Ok(StepOutput { next_step, output });
448 }
449
450 let output = self.evaluate_payload(context, None)?;
451
452 {
453 if let Some(ref output) = output {
454 span.record("context.payload", truncate_string(output));
455 }
456 }
457
458 if let Some(to) = &self.to {
459 debug!(
460 "[step {}] condição 'to' detectada (sem módulo) -> pipeline {}, step {}",
461 self.id, to.pipeline, to.step
462 );
463 debug!(
464 "Define switching to step {} in pipeline {}",
465 to.step, to.pipeline
466 );
467 return Ok(StepOutput {
468 next_step: NextStep::GoToStep(to.clone()),
469 output,
470 });
471 }
472
473 debug!("[step {}] nenhuma condição especial -> Next", self.id);
474 return Ok(StepOutput {
475 next_step: NextStep::Next,
476 output,
477 });
478 }
479}
480
481fn truncate_string(string: &Value) -> String {
482 let limit = *PHLOW_TRUNCATE_SPAN_VALUE;
483 let string = string.to_string();
484 if string.len() > limit {
485 format!("{}...", &string[..limit])
486 } else {
487 string.to_string()
488 }
489}
490
491#[cfg(test)]
492mod test {
493 use super::*;
494 use phlow_sdk::valu3;
495 use phs::build_engine;
496 use valu3::prelude::ToValueBehavior;
497 use valu3::value::Value;
498
499 #[tokio::test]
500 async fn test_step_get_reference_id() {
501 let step = StepWorker {
502 id: ID::from("id"),
503 label: Some("label".to_string()),
504 ..Default::default()
505 };
506
507 assert_eq!(step.get_id(), &ID::from("id"));
508 }
509
510 #[tokio::test]
511 async fn test_step_execute() {
512 let engine = build_engine(None);
513 let step = StepWorker {
514 payload: Some(Script::try_build(engine, &"10".to_value()).unwrap()),
515 ..Default::default()
516 };
517
518 let context = Context::new();
519
520 let result = step.execute(&context).await.unwrap();
521
522 assert_eq!(result.next_step, NextStep::Next);
523 assert_eq!(result.output, Some(Value::from(10i64)));
524 }
525
526 #[tokio::test]
527 async fn test_step_execute_with_condition() {
528 let engine = build_engine(None);
529 let step = StepWorker {
530 id: ID::new(),
531 condition: Some(
532 Condition::try_build_with_operator(
533 engine.clone(),
534 "10".to_string(),
535 "20".to_string(),
536 crate::condition::Operator::NotEqual,
537 )
538 .unwrap(),
539 ),
540 payload: Some(Script::try_build(engine, &"10".to_value()).unwrap()),
541 ..Default::default()
542 };
543
544 let context = Context::new();
545
546 let result = step.execute(&context).await.unwrap();
547
548 assert_eq!(result.next_step, NextStep::Next);
549 assert_eq!(result.output, Some(Value::from(10i64)));
550 }
551
552 #[tokio::test]
553 async fn test_step_execute_with_condition_then_case() {
554 let engine = build_engine(None);
555 let step = StepWorker {
556 id: ID::new(),
557 condition: Some(
558 Condition::try_build_with_operator(
559 engine.clone(),
560 "10".to_string(),
561 "20".to_string(),
562 crate::condition::Operator::NotEqual,
563 )
564 .unwrap(),
565 ),
566 payload: Some(Script::try_build(engine, &"10".to_value()).unwrap()),
567 then_case: Some(0),
568 ..Default::default()
569 };
570
571 let context = Context::new();
572
573 let result = step.execute(&context).await.unwrap();
574
575 assert_eq!(result.next_step, NextStep::Pipeline(0));
576 assert_eq!(result.output, Some(Value::from(10i64)));
577 }
578
579 #[tokio::test]
580 async fn test_step_execute_with_condition_else_case() {
581 let engine = build_engine(None);
582 let step = StepWorker {
583 id: ID::new(),
584 condition: Some(
585 Condition::try_build_with_operator(
586 engine.clone(),
587 "10".to_string(),
588 "20".to_string(),
589 crate::condition::Operator::Equal,
590 )
591 .unwrap(),
592 ),
593 payload: Some(Script::try_build(engine.clone(), &"10".to_value()).unwrap()),
594 else_case: Some(1),
595 ..Default::default()
596 };
597
598 let context = Context::new();
599
600 let result = step.execute(&context).await.unwrap();
601
602 assert_eq!(result.next_step, NextStep::Pipeline(1));
603 assert_eq!(result.output, None);
604 }
605
606 #[tokio::test]
607 async fn test_step_execute_with_return_case() {
608 let engine = build_engine(None);
609 let step = StepWorker {
610 id: ID::new(),
611 return_case: Some(Script::try_build(engine.clone(), &"10".to_value()).unwrap()),
612 ..Default::default()
613 };
614
615 let context = Context::new();
616
617 let result = step.execute(&context).await.unwrap();
618
619 assert_eq!(result.next_step, NextStep::Stop);
620 assert_eq!(result.output, Some(Value::from(10i64)));
621 }
622
623 #[tokio::test]
624 async fn test_step_execute_with_return_case_and_payload() {
625 let engine = build_engine(None);
626 let step = StepWorker {
627 id: ID::new(),
628 payload: Some(Script::try_build(engine.clone(), &"10".to_value()).unwrap()),
629 return_case: Some(Script::try_build(engine.clone(), &"20".to_value()).unwrap()),
630 ..Default::default()
631 };
632
633 let context = Context::new();
634
635 let result = step.execute(&context).await.unwrap();
636
637 assert_eq!(result.next_step, NextStep::Stop);
638 assert_eq!(result.output, Some(Value::from(20i64)));
639 }
640
641 #[tokio::test]
642 async fn test_step_execute_with_return_case_and_condition() {
643 let engine = build_engine(None);
644 let step = StepWorker {
645 id: ID::new(),
646 condition: Some(
647 Condition::try_build_with_operator(
648 engine.clone(),
649 "10".to_string(),
650 "20".to_string(),
651 crate::condition::Operator::Equal,
652 )
653 .unwrap(),
654 ),
655 return_case: Some(Script::try_build(engine.clone(), &"10".to_value()).unwrap()),
656 ..Default::default()
657 };
658
659 let context = Context::new();
660
661 let result = step.execute(&context).await.unwrap();
662
663 assert_eq!(result.next_step, NextStep::Stop);
664 assert_eq!(result.output, Some(Value::from(10i64)));
665 }
666
667 #[tokio::test]
668 async fn test_step_execute_with_return_case_and_condition_then_case() {
669 let engine = build_engine(None);
670 let step = StepWorker {
671 id: ID::new(),
672 condition: Some(
673 Condition::try_build_with_operator(
674 engine.clone(),
675 "10".to_string(),
676 "20".to_string(),
677 crate::condition::Operator::Equal,
678 )
679 .unwrap(),
680 ),
681 then_case: Some(0),
682 return_case: Some(Script::try_build(engine.clone(), &"Ok".to_value()).unwrap()),
683 ..Default::default()
684 };
685
686 let context = Context::new();
687 let output = step.execute(&context).await.unwrap();
688
689 assert_eq!(output.next_step, NextStep::Stop);
690 assert_eq!(output.output, Some(Value::from("Ok")));
691 }
692
693 #[tokio::test]
694 async fn test_step_execute_with_return_case_and_condition_else_case() {
695 let engine = build_engine(None);
696 let step = StepWorker {
697 id: ID::new(),
698 condition: Some(
699 Condition::try_build_with_operator(
700 engine.clone(),
701 "10".to_string(),
702 "20".to_string(),
703 crate::condition::Operator::Equal,
704 )
705 .unwrap(),
706 ),
707 else_case: Some(0),
708 return_case: Some(Script::try_build(engine.clone(), &"Ok".to_value()).unwrap()),
709 ..Default::default()
710 };
711
712 let context = Context::new();
713 let result = step.execute(&context).await.unwrap();
714
715 assert_eq!(result.next_step, NextStep::Stop);
716 assert_eq!(result.output, Some(Value::from("Ok")));
717 }
718}