1use crate::{
2 condition::{Condition, ConditionError},
3 context::Context,
4 id::ID,
5 script::Script,
6};
7use once_cell::sync::Lazy;
8use phlow_sdk::prelude::*;
9use rhai::Engine;
10use serde::Serialize;
11use std::{fmt::Display, sync::Arc};
12
13static PHLOW_TRUNCATE_SPAN_VALUE: Lazy<usize> =
14 Lazy::new(|| match std::env::var("PHLOW_TRUNCATE_SPAN_VALUE") {
15 Ok(value) => value.parse::<usize>().unwrap_or(100),
16 Err(_) => 100,
17 });
18
19#[derive(Debug)]
20pub enum StepWorkerError {
21 ConditionError(ConditionError),
22 PayloadError(phs::ScriptError),
23 ModulesError(ModulesError),
24 InputError(phs::ScriptError),
25}
26
27impl Display for StepWorkerError {
28 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
29 match self {
30 StepWorkerError::ConditionError(err) => write!(f, "Condition error: {}", err),
31 StepWorkerError::PayloadError(err) => write!(f, "Payload error: {}", err),
32 StepWorkerError::ModulesError(err) => write!(f, "Modules error: {}", err),
33 StepWorkerError::InputError(err) => write!(f, "Input error: {}", err),
34 }
35 }
36}
37
38#[derive(Debug, Clone, PartialEq, Serialize)]
39pub enum NextStep {
40 Pipeline(usize),
41 GoToStep(StepReference),
42 Stop,
43 Next,
44}
45
46#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, ToValue)]
47pub struct StepReference {
48 pub pipeline: usize,
49 pub step: usize,
50}
51
52#[derive(Debug)]
53pub struct StepOutput {
54 pub next_step: NextStep,
55 pub output: Option<Value>,
56}
57
58#[derive(Debug, Clone, Default)]
59pub struct StepWorker {
60 pub(crate) id: ID,
61 pub(crate) label: Option<String>,
62 pub(crate) module: Option<String>,
63 pub(crate) condition: Option<Condition>,
64 pub(crate) input: Option<Script>,
65 pub(crate) payload: Option<Script>,
66 pub(crate) then_case: Option<usize>,
67 pub(crate) else_case: Option<usize>,
68 pub(crate) modules: Arc<Modules>,
69 pub(crate) return_case: Option<Script>,
70 pub(crate) to: Option<StepReference>,
71}
72
73impl StepWorker {
74 pub fn try_from_value(
75 engine: Arc<Engine>,
76 modules: Arc<Modules>,
77 value: &Value,
78 ) -> Result<Self, StepWorkerError> {
79 let id = match value.get("id") {
80 Some(id) => ID::from(id),
81 None => ID::new(),
82 };
83 let label: Option<String> = match value.get("label") {
84 Some(label) => Some(label.as_string()),
85 None => None,
86 };
87 let condition = {
88 if let Some(condition) = value
89 .get("condition")
90 .map(|condition| Condition::try_from_value(engine.clone(), condition))
91 {
92 Some(condition.map_err(StepWorkerError::ConditionError)?)
93 } else {
94 if let Some(condition) = value.get("assert").map(|assert| {
95 Condition::try_build_with_assert(engine.clone(), assert.to_string())
96 }) {
97 Some(condition.map_err(StepWorkerError::ConditionError)?)
98 } else {
99 None
100 }
101 }
102 };
103 let payload = match value.get("payload") {
104 Some(payload) => match Script::try_build(engine.clone(), payload) {
105 Ok(payload) => Some(payload),
106 Err(err) => return Err(StepWorkerError::PayloadError(err)),
107 },
108 None => None,
109 };
110 let input = match value.get("input") {
111 Some(input) => match Script::try_build(engine.clone(), input) {
112 Ok(input) => Some(input),
113 Err(err) => return Err(StepWorkerError::InputError(err)),
114 },
115 None => None,
116 };
117 let then_case = match value.get("then") {
118 Some(then_case) => match then_case.to_u64() {
119 Some(then_case) => Some(then_case as usize),
120 None => None,
121 },
122 None => None,
123 };
124 let else_case = match value.get("else") {
125 Some(else_case) => match else_case.to_u64() {
126 Some(else_case) => Some(else_case as usize),
127 None => None,
128 },
129 None => None,
130 };
131 let return_case = match value.get("return") {
132 Some(return_case) => match Script::try_build(engine, return_case) {
133 Ok(return_case) => Some(return_case),
134 Err(err) => return Err(StepWorkerError::PayloadError(err)),
135 },
136 None => None,
137 };
138 let module = match value.get("use") {
139 Some(module) => Some(module.to_string()),
140 None => None,
141 };
142
143 let to = match value.get("to") {
144 Some(to_step) => match to_step.as_object() {
145 Some(to_step) => {
146 let pipeline = to_step.get("pipeline").and_then(|v| v.to_u64());
147 let step = to_step.get("step").and_then(|v| v.to_u64());
148
149 if pipeline.is_some() && step.is_some() {
150 Some(StepReference {
151 pipeline: pipeline.unwrap() as usize,
152 step: step.unwrap() as usize,
153 })
154 } else {
155 None
156 }
157 }
158 None => None,
159 },
160 None => None,
161 };
162
163 Ok(Self {
164 id,
165 label,
166 module,
167 input,
168 condition,
169 payload,
170 then_case,
171 else_case,
172 modules,
173 return_case,
174 to,
175 })
176 }
177
178 pub fn get_id(&self) -> &ID {
179 &self.id
180 }
181
182 fn evaluate_payload(
183 &self,
184 context: &Context,
185 default: Option<Value>,
186 ) -> Result<Option<Value>, StepWorkerError> {
187 if let Some(ref payload) = self.payload {
188 let value = Some(
189 payload
190 .evaluate(context)
191 .map_err(StepWorkerError::PayloadError)?,
192 );
193 Ok(value)
194 } else {
195 Ok(default)
196 }
197 }
198
199 fn evaluate_input(&self, context: &Context) -> Result<Option<Value>, StepWorkerError> {
200 if let Some(ref input) = self.input {
201 let value = Some(
202 input
203 .evaluate(context)
204 .map_err(StepWorkerError::InputError)?,
205 );
206 Ok(value)
207 } else {
208 Ok(None)
209 }
210 }
211
212 fn evaluate_return(&self, context: &Context) -> Result<Option<Value>, StepWorkerError> {
213 if let Some(ref return_case) = self.return_case {
214 let value = Some(
215 return_case
216 .evaluate(context)
217 .map_err(StepWorkerError::PayloadError)?,
218 );
219 Ok(value)
220 } else {
221 Ok(None)
222 }
223 }
224
225 async fn evaluate_module(
226 &self,
227 context: &Context,
228 ) -> Result<Option<(Option<String>, Option<Value>, Context)>, StepWorkerError> {
229 if let Some(ref module) = self.module {
230 let input = self.evaluate_input(context)?;
231
232 let context = if let Some(input) = &input {
233 context.add_module_input(input.clone())
234 } else {
235 context.clone()
236 };
237
238 match self.modules.execute(module, &context.input).await {
239 Ok(response) => {
240 if let Some(err) = response.error {
241 return Err(StepWorkerError::ModulesError(ModulesError::ModuleError(
242 err,
243 )));
244 }
245
246 Ok(Some((Some(module.clone()), Some(response.data), context)))
247 }
248 Err(err) => Err(StepWorkerError::ModulesError(err)),
249 }
250 } else {
251 Ok(None)
252 }
253 }
254
255 pub async fn execute(&self, context: &Context) -> Result<StepOutput, StepWorkerError> {
256 let span = tracing::info_span!(
257 "step",
258 otel.name = field::Empty,
259 context.main = field::Empty,
260 context.params = field::Empty,
261 context.payload = field::Empty,
262 context.input = field::Empty,
263 step.id = field::Empty,
264 step.label = field::Empty,
265 step.module = field::Empty,
266 step.condition = field::Empty,
267 step.payload = field::Empty,
268 step.return = field::Empty,
269 );
270 let _guard = span.enter();
271
272 {
273 let step_name = self.label.clone().unwrap_or(self.id.to_string());
274 span.record("otel.name", format!("step {}", step_name));
275
276 if let Some(ref input) = context.input {
277 span.record("context.input", input.to_string());
278 }
279
280 if let Some(ref payload) = context.payload {
281 span.record("context.payload", truncate_string(&payload));
282 }
283
284 if let Some(ref main) = context.main {
285 span.record("context.main", truncate_string(&main));
286 }
287
288 span.record("step.id", self.id.to_string());
289
290 if let Some(ref label) = self.label {
291 span.record("step.label", label.to_string());
292 }
293 }
294
295 if let Some(output) = self.evaluate_return(context)? {
296 {
297 span.record("step.return", output.to_string());
298 }
299
300 return Ok(StepOutput {
301 next_step: NextStep::Stop,
302 output: Some(output),
303 });
304 }
305
306 if let Some((module, output, context)) = self.evaluate_module(context).await? {
307 {
308 span.record("step.module", module.clone());
309
310 if let Some(ref output) = output {
311 span.record("context.payload", truncate_string(output));
312 }
313 }
314
315 let context = if let Some(output) = output.clone() {
316 context.add_module_output(output)
317 } else {
318 context.clone()
319 };
320
321 return Ok(StepOutput {
322 next_step: NextStep::Next,
323 output: self.evaluate_payload(&context, output)?,
324 });
325 }
326
327 if let Some(condition) = &self.condition {
328 let (next_step, output) = if condition
329 .evaluate(context)
330 .map_err(StepWorkerError::ConditionError)?
331 {
332 let next_step = if let Some(ref then_case) = self.then_case {
333 NextStep::Pipeline(*then_case)
334 } else {
335 NextStep::Next
336 };
337
338 (next_step, self.evaluate_payload(context, None)?)
339 } else {
340 let next_step = if let Some(ref else_case) = self.else_case {
341 NextStep::Pipeline(*else_case)
342 } else {
343 NextStep::Next
344 };
345
346 (next_step, None)
347 };
348
349 {
350 span.record("step.condition", condition.raw.to_string());
351
352 if let Some(ref output) = output {
353 span.record("context.payload", truncate_string(output));
354 }
355 }
356
357 return Ok(StepOutput { next_step, output });
358 }
359
360 let output = self.evaluate_payload(context, None)?;
361
362 {
363 if let Some(ref output) = output {
364 span.record("context.payload", truncate_string(output));
365 }
366 }
367
368 if let Some(to) = &self.to {
369 return Ok(StepOutput {
370 next_step: NextStep::GoToStep(to.clone()),
371 output,
372 });
373 }
374
375 return Ok(StepOutput {
376 next_step: NextStep::Next,
377 output,
378 });
379 }
380}
381
382fn truncate_string(string: &Value) -> String {
383 let limit = *PHLOW_TRUNCATE_SPAN_VALUE;
384 let string = string.to_string();
385 if string.len() > limit {
386 format!("{}...", &string[..limit])
387 } else {
388 string.to_string()
389 }
390}
391
392#[cfg(test)]
393mod test {
394 use super::*;
395 use phlow_sdk::valu3;
396 use phs::build_engine;
397 use valu3::prelude::ToValueBehavior;
398 use valu3::value::Value;
399
400 #[tokio::test]
401 async fn test_step_get_reference_id() {
402 let step = StepWorker {
403 id: ID::from("id"),
404 label: Some("label".to_string()),
405 ..Default::default()
406 };
407
408 assert_eq!(step.get_id(), &ID::from("id"));
409 }
410
411 #[tokio::test]
412 async fn test_step_execute() {
413 let engine = build_engine(None);
414 let step = StepWorker {
415 payload: Some(Script::try_build(engine, &"10".to_value()).unwrap()),
416 ..Default::default()
417 };
418
419 let context = Context::new();
420
421 let result = step.execute(&context).await.unwrap();
422
423 assert_eq!(result.next_step, NextStep::Next);
424 assert_eq!(result.output, Some(Value::from(10i64)));
425 }
426
427 #[tokio::test]
428 async fn test_step_execute_with_condition() {
429 let engine = build_engine(None);
430 let step = StepWorker {
431 id: ID::new(),
432 condition: Some(
433 Condition::try_build_with_operator(
434 engine.clone(),
435 "10".to_string(),
436 "20".to_string(),
437 crate::condition::Operator::NotEqual,
438 )
439 .unwrap(),
440 ),
441 payload: Some(Script::try_build(engine, &"10".to_value()).unwrap()),
442 ..Default::default()
443 };
444
445 let context = Context::new();
446
447 let result = step.execute(&context).await.unwrap();
448
449 assert_eq!(result.next_step, NextStep::Next);
450 assert_eq!(result.output, Some(Value::from(10i64)));
451 }
452
453 #[tokio::test]
454 async fn test_step_execute_with_condition_then_case() {
455 let engine = build_engine(None);
456 let step = StepWorker {
457 id: ID::new(),
458 condition: Some(
459 Condition::try_build_with_operator(
460 engine.clone(),
461 "10".to_string(),
462 "20".to_string(),
463 crate::condition::Operator::NotEqual,
464 )
465 .unwrap(),
466 ),
467 payload: Some(Script::try_build(engine, &"10".to_value()).unwrap()),
468 then_case: Some(0),
469 ..Default::default()
470 };
471
472 let context = Context::new();
473
474 let result = step.execute(&context).await.unwrap();
475
476 assert_eq!(result.next_step, NextStep::Pipeline(0));
477 assert_eq!(result.output, Some(Value::from(10i64)));
478 }
479
480 #[tokio::test]
481 async fn test_step_execute_with_condition_else_case() {
482 let engine = build_engine(None);
483 let step = StepWorker {
484 id: ID::new(),
485 condition: Some(
486 Condition::try_build_with_operator(
487 engine.clone(),
488 "10".to_string(),
489 "20".to_string(),
490 crate::condition::Operator::Equal,
491 )
492 .unwrap(),
493 ),
494 payload: Some(Script::try_build(engine.clone(), &"10".to_value()).unwrap()),
495 else_case: Some(1),
496 ..Default::default()
497 };
498
499 let context = Context::new();
500
501 let result = step.execute(&context).await.unwrap();
502
503 assert_eq!(result.next_step, NextStep::Pipeline(1));
504 assert_eq!(result.output, None);
505 }
506
507 #[tokio::test]
508 async fn test_step_execute_with_return_case() {
509 let engine = build_engine(None);
510 let step = StepWorker {
511 id: ID::new(),
512 return_case: Some(Script::try_build(engine.clone(), &"10".to_value()).unwrap()),
513 ..Default::default()
514 };
515
516 let context = Context::new();
517
518 let result = step.execute(&context).await.unwrap();
519
520 assert_eq!(result.next_step, NextStep::Stop);
521 assert_eq!(result.output, Some(Value::from(10i64)));
522 }
523
524 #[tokio::test]
525 async fn test_step_execute_with_return_case_and_payload() {
526 let engine = build_engine(None);
527 let step = StepWorker {
528 id: ID::new(),
529 payload: Some(Script::try_build(engine.clone(), &"10".to_value()).unwrap()),
530 return_case: Some(Script::try_build(engine.clone(), &"20".to_value()).unwrap()),
531 ..Default::default()
532 };
533
534 let context = Context::new();
535
536 let result = step.execute(&context).await.unwrap();
537
538 assert_eq!(result.next_step, NextStep::Stop);
539 assert_eq!(result.output, Some(Value::from(20i64)));
540 }
541
542 #[tokio::test]
543 async fn test_step_execute_with_return_case_and_condition() {
544 let engine = build_engine(None);
545 let step = StepWorker {
546 id: ID::new(),
547 condition: Some(
548 Condition::try_build_with_operator(
549 engine.clone(),
550 "10".to_string(),
551 "20".to_string(),
552 crate::condition::Operator::Equal,
553 )
554 .unwrap(),
555 ),
556 return_case: Some(Script::try_build(engine.clone(), &"10".to_value()).unwrap()),
557 ..Default::default()
558 };
559
560 let context = Context::new();
561
562 let result = step.execute(&context).await.unwrap();
563
564 assert_eq!(result.next_step, NextStep::Stop);
565 assert_eq!(result.output, Some(Value::from(10i64)));
566 }
567
568 #[tokio::test]
569 async fn test_step_execute_with_return_case_and_condition_then_case() {
570 let engine = build_engine(None);
571 let step = StepWorker {
572 id: ID::new(),
573 condition: Some(
574 Condition::try_build_with_operator(
575 engine.clone(),
576 "10".to_string(),
577 "20".to_string(),
578 crate::condition::Operator::Equal,
579 )
580 .unwrap(),
581 ),
582 then_case: Some(0),
583 return_case: Some(Script::try_build(engine.clone(), &"Ok".to_value()).unwrap()),
584 ..Default::default()
585 };
586
587 let context = Context::new();
588 let output = step.execute(&context).await.unwrap();
589
590 assert_eq!(output.next_step, NextStep::Stop);
591 assert_eq!(output.output, Some(Value::from("Ok")));
592 }
593
594 #[tokio::test]
595 async fn test_step_execute_with_return_case_and_condition_else_case() {
596 let engine = build_engine(None);
597 let step = StepWorker {
598 id: ID::new(),
599 condition: Some(
600 Condition::try_build_with_operator(
601 engine.clone(),
602 "10".to_string(),
603 "20".to_string(),
604 crate::condition::Operator::Equal,
605 )
606 .unwrap(),
607 ),
608 else_case: Some(0),
609 return_case: Some(Script::try_build(engine.clone(), &"Ok".to_value()).unwrap()),
610 ..Default::default()
611 };
612
613 let context = Context::new();
614 let result = step.execute(&context).await.unwrap();
615
616 assert_eq!(result.next_step, NextStep::Stop);
617 assert_eq!(result.output, Some(Value::from("Ok")));
618 }
619}