1use crate::{
2 condition::{Condition, ConditionError},
3 context::Context,
4 id::ID,
5 script::Script,
6};
7use once_cell::sync::Lazy;
8use phlow_sdk::{
9 prelude::{log::debug, *},
10 tracing::field,
11};
12use rhai::Engine;
13use serde::Serialize;
14use std::{fmt::Display, sync::Arc};
15
16static PHLOW_TRUNCATE_SPAN_VALUE: Lazy<usize> =
17 Lazy::new(|| match std::env::var("PHLOW_TRUNCATE_SPAN_VALUE") {
18 Ok(value) => value.parse::<usize>().unwrap_or(100),
19 Err(_) => 100,
20 });
21
22#[derive(Debug)]
23pub enum StepWorkerError {
24 ConditionError(ConditionError),
25 PayloadError(phs::ScriptError),
26 ModulesError(ModulesError),
27 InputError(phs::ScriptError),
28}
29
30impl Display for StepWorkerError {
31 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
32 match self {
33 StepWorkerError::ConditionError(err) => write!(f, "Condition error: {}", err),
34 StepWorkerError::PayloadError(err) => write!(f, "Payload error: {}", err),
35 StepWorkerError::ModulesError(err) => write!(f, "Modules error: {}", err),
36 StepWorkerError::InputError(err) => write!(f, "Input error: {}", err),
37 }
38 }
39}
40
41impl std::error::Error for StepWorkerError {
42 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
43 match self {
44 StepWorkerError::ConditionError(err) => Some(err),
45 StepWorkerError::PayloadError(_) => None, StepWorkerError::ModulesError(_) => None, StepWorkerError::InputError(_) => None, }
49 }
50}
51
52#[derive(Debug, Clone, PartialEq, Serialize)]
53pub enum NextStep {
54 Pipeline(usize),
55 GoToStep(StepReference),
56 Stop,
57 Next,
58}
59
60#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, ToValue)]
61pub struct StepReference {
62 pub pipeline: usize,
63 pub step: usize,
64}
65
66#[derive(Debug)]
67pub struct StepOutput {
68 pub next_step: NextStep,
69 pub output: Option<Value>,
70}
71
72#[derive(Debug, Clone, Default)]
73pub struct StepWorker {
74 pub(crate) id: ID,
75 pub(crate) label: Option<String>,
76 pub(crate) module: Option<String>,
77 pub(crate) condition: Option<Condition>,
78 pub(crate) input: Option<Script>,
79 pub(crate) payload: Option<Script>,
80 pub(crate) then_case: Option<usize>,
81 pub(crate) else_case: Option<usize>,
82 pub(crate) modules: Arc<Modules>,
83 pub(crate) return_case: Option<Script>,
84 pub(crate) to: Option<StepReference>,
85 #[cfg(debug_assertions)]
86 pub(crate) step_raw: String,
87}
88
89impl StepWorker {
90 pub fn try_from_value(
91 engine: Arc<Engine>,
92 modules: Arc<Modules>,
93 value: &Value,
94 ) -> Result<Self, StepWorkerError> {
95 log::debug!(
96 "Parsing step worker from value: {}",
97 value.to_json(JsonMode::Indented)
98 );
99
100 let id = match value.get("id") {
101 Some(id) => ID::from(id),
102 None => ID::new(),
103 };
104 let label: Option<String> = match value.get("label") {
105 Some(label) => Some(label.as_string()),
106 None => None,
107 };
108 let condition = {
109 if let Some(condition) = value
110 .get("condition")
111 .map(|condition| Condition::try_from_value(engine.clone(), condition))
112 {
113 Some(condition.map_err(StepWorkerError::ConditionError)?)
114 } else {
115 if let Some(condition) = value.get("assert").map(|assert| {
116 Condition::try_build_with_assert(engine.clone(), assert.to_string())
117 }) {
118 Some(condition.map_err(StepWorkerError::ConditionError)?)
119 } else {
120 None
121 }
122 }
123 };
124 let payload = match value.get("payload") {
125 Some(payload) => match Script::try_build(engine.clone(), payload) {
126 Ok(payload) => Some(payload),
127 Err(err) => return Err(StepWorkerError::PayloadError(err)),
128 },
129 None => None,
130 };
131 let input = match value.get("input") {
132 Some(input) => match Script::try_build(engine.clone(), input) {
133 Ok(input) => Some(input),
134 Err(err) => return Err(StepWorkerError::InputError(err)),
135 },
136 None => None,
137 };
138 let then_case = match value.get("then") {
139 Some(then_case) => match then_case.to_u64() {
140 Some(then_case) => Some(then_case as usize),
141 None => None,
142 },
143 None => None,
144 };
145 let else_case = match value.get("else") {
146 Some(else_case) => match else_case.to_u64() {
147 Some(else_case) => Some(else_case as usize),
148 None => None,
149 },
150 None => None,
151 };
152 let return_case = match value.get("return") {
153 Some(return_case) => match Script::try_build(engine, return_case) {
154 Ok(return_case) => Some(return_case),
155 Err(err) => return Err(StepWorkerError::PayloadError(err)),
156 },
157 None => None,
158 };
159 let module = match value.get("use") {
160 Some(module) => Some(module.to_string()),
161 None => None,
162 };
163
164 let to = match value.get("to") {
165 Some(to_step) => match to_step.as_object() {
166 Some(to_step) => {
167 let pipeline = to_step.get("pipeline").and_then(|v| v.to_u64());
168 let step = to_step.get("step").and_then(|v| v.to_u64());
169
170 if pipeline.is_some() && step.is_some() {
171 Some(StepReference {
172 pipeline: pipeline.unwrap() as usize,
173 step: step.unwrap() as usize,
174 })
175 } else {
176 None
177 }
178 }
179 None => None,
180 },
181 None => None,
182 };
183
184 #[cfg(debug_assertions)]
185 let step_raw = value.to_string();
186
187 Ok(Self {
188 id,
189 label,
190 module,
191 input,
192 condition,
193 payload,
194 then_case,
195 else_case,
196 modules,
197 return_case,
198 to,
199 #[cfg(debug_assertions)]
200 step_raw,
201 })
202 }
203
204 pub fn get_id(&self) -> &ID {
205 &self.id
206 }
207
208 fn evaluate_payload(
209 &self,
210 context: &Context,
211 default: Option<Value>,
212 ) -> Result<Option<Value>, StepWorkerError> {
213 if let Some(ref payload) = self.payload {
214 let value = Some(
215 payload
216 .evaluate(context)
217 .map_err(StepWorkerError::PayloadError)?,
218 );
219 Ok(value)
220 } else {
221 Ok(default)
222 }
223 }
224
225 fn evaluate_input(&self, context: &Context) -> Result<Option<Value>, StepWorkerError> {
226 if let Some(ref input) = self.input {
227 let value = Some(
228 input
229 .evaluate(context)
230 .map_err(StepWorkerError::InputError)?,
231 );
232 Ok(value)
233 } else {
234 Ok(None)
235 }
236 }
237
238 fn evaluate_return(&self, context: &Context) -> Result<Option<Value>, StepWorkerError> {
239 if let Some(ref return_case) = self.return_case {
240 let value = Some(
241 return_case
242 .evaluate(context)
243 .map_err(StepWorkerError::PayloadError)?,
244 );
245
246 #[cfg(debug_assertions)]
247 log::debug!(
248 "Evaluating return case for step {}: {}",
249 self.id,
250 value.as_ref().map_or("None".to_string(), |v| v.to_string())
251 );
252
253 Ok(value)
254 } else {
255 Ok(None)
256 }
257 }
258
259 async fn evaluate_module(
260 &self,
261 context: &Context,
262 ) -> Result<Option<(Option<String>, Option<Value>, Context)>, StepWorkerError> {
263 if let Some(ref module) = self.module {
264 let input = self.evaluate_input(context)?;
265
266 let context = if let Some(input) = &input {
267 context.clone_with_input(input.clone())
268 } else {
269 context.clone()
270 };
271
272 match self
273 .modules
274 .execute(module, &context.get_input(), &context.get_payload())
275 .await
276 {
277 Ok(response) => {
278 #[cfg(debug_assertions)]
279 log::debug!("Module response for step {}: {:?}", self.id, response);
280
281 if let Some(err) = response.error {
282 return Err(StepWorkerError::ModulesError(ModulesError::ModuleError(
283 err,
284 )));
285 }
286
287 Ok(Some((Some(module.clone()), Some(response.data), context)))
288 }
289 Err(err) => Err(StepWorkerError::ModulesError(err)),
290 }
291 } else {
292 Ok(None)
293 }
294 }
295
296 pub async fn execute(&self, context: &Context) -> Result<StepOutput, StepWorkerError> {
297 #[cfg(debug_assertions)]
298 log::debug!(
299 "Entering step: {}, with: \n\tmain={:?}\n\tpayload={:?}\n\tsetup={:?}",
300 self.step_raw,
301 &context.get_main().to_value().to_string(),
302 &context.get_payload().to_value().to_string(),
303 &context.get_setup().to_value().to_string()
304 );
305
306 let span = tracing::info_span!(
307 "step",
308 otel.name = field::Empty,
309 context.main = field::Empty,
310 context.params = field::Empty,
311 context.payload = field::Empty,
312 context.input = field::Empty,
313 step.id = field::Empty,
314 step.label = field::Empty,
315 step.module = field::Empty,
316 step.condition = field::Empty,
317 step.payload = field::Empty,
318 step.return = field::Empty,
319 );
320 let _guard = span.enter();
321
322 {
323 let step_name = self.label.clone().unwrap_or(self.id.to_string());
324
325 span.record("otel.name", format!("step {}", step_name));
326
327 if let Some(ref input) = context.get_input() {
328 span.record("context.input", input.to_string());
329 }
330
331 if let Some(ref payload) = context.get_payload() {
332 span.record("context.payload", truncate_string(&payload));
333 }
334
335 if let Some(ref main) = context.get_main() {
336 span.record("context.main", truncate_string(&main));
337 }
338
339 span.record("step.id", self.id.to_string());
340
341 if let Some(ref label) = self.label {
342 span.record("step.label", label.to_string());
343 }
344 }
345
346 if let Some(output) = self.evaluate_return(context)? {
347 {
348 span.record("step.return", output.to_string());
349 }
350
351 return Ok(StepOutput {
352 next_step: NextStep::Stop,
353 output: Some(output),
354 });
355 }
356
357 if let Some((module, output, context)) = self.evaluate_module(context).await? {
358 {
359 span.record("step.module", module.clone());
360
361 if let Some(ref output) = output {
362 span.record("context.payload", truncate_string(output));
363 }
364 }
365
366 let context = if let Some(output) = output.clone() {
367 context.clone_with_output(output)
368 } else {
369 context.clone()
370 };
371
372 let output = self.evaluate_payload(&context, output)?;
373
374 if let Some(to) = &self.to {
375 debug!(
376 "Define switching to step {} in pipeline {}",
377 to.step, to.pipeline
378 );
379 return Ok(StepOutput {
380 next_step: NextStep::GoToStep(to.clone()),
381 output,
382 });
383 }
384
385 return Ok(StepOutput {
386 next_step: NextStep::Next,
387 output,
388 });
389 }
390
391 if let Some(condition) = &self.condition {
392 let (next_step, output) = if condition
393 .evaluate(context)
394 .map_err(StepWorkerError::ConditionError)?
395 {
396 let next_step = if let Some(ref then_case) = self.then_case {
397 NextStep::Pipeline(*then_case)
398 } else {
399 NextStep::Next
400 };
401
402 (next_step, self.evaluate_payload(context, None)?)
403 } else {
404 let next_step = if let Some(ref else_case) = self.else_case {
405 NextStep::Pipeline(*else_case)
406 } else {
407 NextStep::Next
408 };
409
410 (next_step, None)
411 };
412
413 {
414 span.record("step.condition", condition.raw.to_string());
415
416 if let Some(ref output) = output {
417 span.record("context.payload", truncate_string(output));
418 }
419 }
420
421 return Ok(StepOutput { next_step, output });
422 }
423
424 let output = self.evaluate_payload(context, None)?;
425
426 {
427 if let Some(ref output) = output {
428 span.record("context.payload", truncate_string(output));
429 }
430 }
431
432 if let Some(to) = &self.to {
433 debug!(
434 "Define switching to step {} in pipeline {}",
435 to.step, to.pipeline
436 );
437 return Ok(StepOutput {
438 next_step: NextStep::GoToStep(to.clone()),
439 output,
440 });
441 }
442
443 return Ok(StepOutput {
444 next_step: NextStep::Next,
445 output,
446 });
447 }
448}
449
450fn truncate_string(string: &Value) -> String {
451 let limit = *PHLOW_TRUNCATE_SPAN_VALUE;
452 let string = string.to_string();
453 if string.len() > limit {
454 format!("{}...", &string[..limit])
455 } else {
456 string.to_string()
457 }
458}
459
460#[cfg(test)]
461mod test {
462 use super::*;
463 use phlow_sdk::valu3;
464 use phs::build_engine;
465 use valu3::prelude::ToValueBehavior;
466 use valu3::value::Value;
467
468 #[tokio::test]
469 async fn test_step_get_reference_id() {
470 let step = StepWorker {
471 id: ID::from("id"),
472 label: Some("label".to_string()),
473 ..Default::default()
474 };
475
476 assert_eq!(step.get_id(), &ID::from("id"));
477 }
478
479 #[tokio::test]
480 async fn test_step_execute() {
481 let engine = build_engine(None);
482 let step = StepWorker {
483 payload: Some(Script::try_build(engine, &"10".to_value()).unwrap()),
484 ..Default::default()
485 };
486
487 let context = Context::new();
488
489 let result = step.execute(&context).await.unwrap();
490
491 assert_eq!(result.next_step, NextStep::Next);
492 assert_eq!(result.output, Some(Value::from(10i64)));
493 }
494
495 #[tokio::test]
496 async fn test_step_execute_with_condition() {
497 let engine = build_engine(None);
498 let step = StepWorker {
499 id: ID::new(),
500 condition: Some(
501 Condition::try_build_with_operator(
502 engine.clone(),
503 "10".to_string(),
504 "20".to_string(),
505 crate::condition::Operator::NotEqual,
506 )
507 .unwrap(),
508 ),
509 payload: Some(Script::try_build(engine, &"10".to_value()).unwrap()),
510 ..Default::default()
511 };
512
513 let context = Context::new();
514
515 let result = step.execute(&context).await.unwrap();
516
517 assert_eq!(result.next_step, NextStep::Next);
518 assert_eq!(result.output, Some(Value::from(10i64)));
519 }
520
521 #[tokio::test]
522 async fn test_step_execute_with_condition_then_case() {
523 let engine = build_engine(None);
524 let step = StepWorker {
525 id: ID::new(),
526 condition: Some(
527 Condition::try_build_with_operator(
528 engine.clone(),
529 "10".to_string(),
530 "20".to_string(),
531 crate::condition::Operator::NotEqual,
532 )
533 .unwrap(),
534 ),
535 payload: Some(Script::try_build(engine, &"10".to_value()).unwrap()),
536 then_case: Some(0),
537 ..Default::default()
538 };
539
540 let context = Context::new();
541
542 let result = step.execute(&context).await.unwrap();
543
544 assert_eq!(result.next_step, NextStep::Pipeline(0));
545 assert_eq!(result.output, Some(Value::from(10i64)));
546 }
547
548 #[tokio::test]
549 async fn test_step_execute_with_condition_else_case() {
550 let engine = build_engine(None);
551 let step = StepWorker {
552 id: ID::new(),
553 condition: Some(
554 Condition::try_build_with_operator(
555 engine.clone(),
556 "10".to_string(),
557 "20".to_string(),
558 crate::condition::Operator::Equal,
559 )
560 .unwrap(),
561 ),
562 payload: Some(Script::try_build(engine.clone(), &"10".to_value()).unwrap()),
563 else_case: Some(1),
564 ..Default::default()
565 };
566
567 let context = Context::new();
568
569 let result = step.execute(&context).await.unwrap();
570
571 assert_eq!(result.next_step, NextStep::Pipeline(1));
572 assert_eq!(result.output, None);
573 }
574
575 #[tokio::test]
576 async fn test_step_execute_with_return_case() {
577 let engine = build_engine(None);
578 let step = StepWorker {
579 id: ID::new(),
580 return_case: Some(Script::try_build(engine.clone(), &"10".to_value()).unwrap()),
581 ..Default::default()
582 };
583
584 let context = Context::new();
585
586 let result = step.execute(&context).await.unwrap();
587
588 assert_eq!(result.next_step, NextStep::Stop);
589 assert_eq!(result.output, Some(Value::from(10i64)));
590 }
591
592 #[tokio::test]
593 async fn test_step_execute_with_return_case_and_payload() {
594 let engine = build_engine(None);
595 let step = StepWorker {
596 id: ID::new(),
597 payload: Some(Script::try_build(engine.clone(), &"10".to_value()).unwrap()),
598 return_case: Some(Script::try_build(engine.clone(), &"20".to_value()).unwrap()),
599 ..Default::default()
600 };
601
602 let context = Context::new();
603
604 let result = step.execute(&context).await.unwrap();
605
606 assert_eq!(result.next_step, NextStep::Stop);
607 assert_eq!(result.output, Some(Value::from(20i64)));
608 }
609
610 #[tokio::test]
611 async fn test_step_execute_with_return_case_and_condition() {
612 let engine = build_engine(None);
613 let step = StepWorker {
614 id: ID::new(),
615 condition: Some(
616 Condition::try_build_with_operator(
617 engine.clone(),
618 "10".to_string(),
619 "20".to_string(),
620 crate::condition::Operator::Equal,
621 )
622 .unwrap(),
623 ),
624 return_case: Some(Script::try_build(engine.clone(), &"10".to_value()).unwrap()),
625 ..Default::default()
626 };
627
628 let context = Context::new();
629
630 let result = step.execute(&context).await.unwrap();
631
632 assert_eq!(result.next_step, NextStep::Stop);
633 assert_eq!(result.output, Some(Value::from(10i64)));
634 }
635
636 #[tokio::test]
637 async fn test_step_execute_with_return_case_and_condition_then_case() {
638 let engine = build_engine(None);
639 let step = StepWorker {
640 id: ID::new(),
641 condition: Some(
642 Condition::try_build_with_operator(
643 engine.clone(),
644 "10".to_string(),
645 "20".to_string(),
646 crate::condition::Operator::Equal,
647 )
648 .unwrap(),
649 ),
650 then_case: Some(0),
651 return_case: Some(Script::try_build(engine.clone(), &"Ok".to_value()).unwrap()),
652 ..Default::default()
653 };
654
655 let context = Context::new();
656 let output = step.execute(&context).await.unwrap();
657
658 assert_eq!(output.next_step, NextStep::Stop);
659 assert_eq!(output.output, Some(Value::from("Ok")));
660 }
661
662 #[tokio::test]
663 async fn test_step_execute_with_return_case_and_condition_else_case() {
664 let engine = build_engine(None);
665 let step = StepWorker {
666 id: ID::new(),
667 condition: Some(
668 Condition::try_build_with_operator(
669 engine.clone(),
670 "10".to_string(),
671 "20".to_string(),
672 crate::condition::Operator::Equal,
673 )
674 .unwrap(),
675 ),
676 else_case: Some(0),
677 return_case: Some(Script::try_build(engine.clone(), &"Ok".to_value()).unwrap()),
678 ..Default::default()
679 };
680
681 let context = Context::new();
682 let result = step.execute(&context).await.unwrap();
683
684 assert_eq!(result.next_step, NextStep::Stop);
685 assert_eq!(result.output, Some(Value::from("Ok")));
686 }
687}