1use crate::{
2 condition::{Condition, ConditionError},
3 context::Context,
4 id::ID,
5 script::{Script, ScriptError},
6};
7use once_cell::sync::Lazy;
8use phlow_sdk::prelude::*;
9use rhai::Engine;
10use serde::Serialize;
11use std::sync::Arc;
12
13static PHLOW_TRUNCATE_SPAN_VALUE: Lazy<usize> =
14 Lazy::new(|| match std::env::var("PHLOW_TRUNCATE_SPAN_VALUE") {
15 Ok(value) => value.parse::<usize>().unwrap_or(100),
16 Err(_) => 100,
17 });
18
19#[derive(Debug)]
20pub enum StepWorkerError {
21 ConditionError(ConditionError),
22 PayloadError(ScriptError),
23 ModulesError(ModulesError),
24 InputError(ScriptError),
25}
26
27#[derive(Debug, Clone, PartialEq, Serialize)]
28pub enum NextStep {
29 Pipeline(usize),
30 GoToStep(StepReference),
31 Stop,
32 Next,
33}
34
35#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, ToValue)]
36pub struct StepReference {
37 pub pipeline: usize,
38 pub step: usize,
39}
40
41#[derive(Debug)]
42pub struct StepOutput {
43 pub next_step: NextStep,
44 pub output: Option<Value>,
45}
46
47#[derive(Debug, Clone, Default)]
48pub struct StepWorker {
49 pub(crate) id: ID,
50 pub(crate) label: Option<String>,
51 pub(crate) module: Option<String>,
52 pub(crate) condition: Option<Condition>,
53 pub(crate) input: Option<Script>,
54 pub(crate) payload: Option<Script>,
55 pub(crate) then_case: Option<usize>,
56 pub(crate) else_case: Option<usize>,
57 pub(crate) modules: Arc<Modules>,
58 pub(crate) return_case: Option<Script>,
59 pub(crate) to: Option<StepReference>,
60}
61
62impl StepWorker {
63 pub fn try_from_value(
64 engine: Arc<Engine>,
65 modules: Arc<Modules>,
66 value: &Value,
67 ) -> Result<Self, StepWorkerError> {
68 let id = match value.get("id") {
69 Some(id) => ID::from(id),
70 None => ID::new(),
71 };
72 let label: Option<String> = match value.get("label") {
73 Some(label) => Some(label.as_string()),
74 None => None,
75 };
76 let condition = {
77 if let Some(condition) = value
78 .get("condition")
79 .map(|condition| Condition::try_from_value(engine.clone(), condition))
80 {
81 Some(condition.map_err(StepWorkerError::ConditionError)?)
82 } else {
83 None
84 }
85 };
86 let payload = match value.get("payload") {
87 Some(payload) => match Script::try_build(engine.clone(), payload) {
88 Ok(payload) => Some(payload),
89 Err(err) => return Err(StepWorkerError::PayloadError(err)),
90 },
91 None => None,
92 };
93 let input = match value.get("input") {
94 Some(input) => match Script::try_build(engine.clone(), input) {
95 Ok(input) => Some(input),
96 Err(err) => return Err(StepWorkerError::InputError(err)),
97 },
98 None => None,
99 };
100 let then_case = match value.get("then") {
101 Some(then_case) => match then_case.to_u64() {
102 Some(then_case) => Some(then_case as usize),
103 None => None,
104 },
105 None => None,
106 };
107 let else_case = match value.get("else") {
108 Some(else_case) => match else_case.to_u64() {
109 Some(else_case) => Some(else_case as usize),
110 None => None,
111 },
112 None => None,
113 };
114 let return_case = match value.get("return") {
115 Some(return_case) => match Script::try_build(engine, return_case) {
116 Ok(return_case) => Some(return_case),
117 Err(err) => return Err(StepWorkerError::PayloadError(err)),
118 },
119 None => None,
120 };
121 let module = match value.get("use") {
122 Some(module) => Some(module.to_string()),
123 None => None,
124 };
125
126 let to = match value.get("to") {
127 Some(to_step) => match to_step.as_object() {
128 Some(to_step) => {
129 let pipeline = to_step.get("pipeline").and_then(|v| v.to_u64());
130 let step = to_step.get("step").and_then(|v| v.to_u64());
131
132 if pipeline.is_some() && step.is_some() {
133 Some(StepReference {
134 pipeline: pipeline.unwrap() as usize,
135 step: step.unwrap() as usize,
136 })
137 } else {
138 None
139 }
140 }
141 None => None,
142 },
143 None => None,
144 };
145
146 Ok(Self {
147 id,
148 label,
149 module,
150 input,
151 condition,
152 payload,
153 then_case,
154 else_case,
155 modules,
156 return_case,
157 to,
158 })
159 }
160
161 pub fn get_id(&self) -> &ID {
162 &self.id
163 }
164
165 fn evaluate_payload(
166 &self,
167 context: &Context,
168 default: Option<Value>,
169 ) -> Result<Option<Value>, StepWorkerError> {
170 if let Some(ref payload) = self.payload {
171 let value = Some(
172 payload
173 .evaluate(context)
174 .map_err(StepWorkerError::PayloadError)?,
175 );
176 Ok(value)
177 } else {
178 Ok(default)
179 }
180 }
181
182 fn evaluate_input(&self, context: &Context) -> Result<Option<Value>, StepWorkerError> {
183 if let Some(ref input) = self.input {
184 let value = Some(
185 input
186 .evaluate(context)
187 .map_err(StepWorkerError::InputError)?,
188 );
189 Ok(value)
190 } else {
191 Ok(None)
192 }
193 }
194
195 fn evaluate_return(&self, context: &Context) -> Result<Option<Value>, StepWorkerError> {
196 if let Some(ref return_case) = self.return_case {
197 let value = Some(
198 return_case
199 .evaluate(context)
200 .map_err(StepWorkerError::PayloadError)?,
201 );
202 Ok(value)
203 } else {
204 Ok(None)
205 }
206 }
207
208 async fn evaluate_module(
209 &self,
210 context: &Context,
211 ) -> Result<Option<(Option<String>, Option<Value>, Context)>, StepWorkerError> {
212 if let Some(ref module) = self.module {
213 let input = self.evaluate_input(context)?;
214
215 let context = if let Some(input) = &input {
216 context.add_module_input(input.clone())
217 } else {
218 context.clone()
219 };
220
221 match self.modules.execute(module, &context).await {
222 Ok(response) => {
223 if let Some(err) = response.error {
224 return Err(StepWorkerError::ModulesError(ModulesError::ModuleError(
225 err,
226 )));
227 }
228
229 Ok(Some((Some(module.clone()), Some(response.data), context)))
230 }
231 Err(err) => Err(StepWorkerError::ModulesError(err)),
232 }
233 } else {
234 Ok(None)
235 }
236 }
237
238 pub async fn execute(&self, context: &Context) -> Result<StepOutput, StepWorkerError> {
239 let span = tracing::info_span!(
240 "step",
241 otel.name = field::Empty,
242 context.main = field::Empty,
243 context.params = field::Empty,
244 context.payload = field::Empty,
245 context.input = field::Empty,
246 step.id = field::Empty,
247 step.label = field::Empty,
248 step.module = field::Empty,
249 step.condition = field::Empty,
250 step.payload = field::Empty,
251 step.return = field::Empty,
252 );
253 let _guard = span.enter();
254
255 {
256 let step_name = self.label.clone().unwrap_or(self.id.to_string());
257 span.record("otel.name", format!("step {}", step_name));
258
259 if let Some(ref input) = context.input {
260 span.record("context.input", input.to_string());
261 }
262
263 if let Some(ref payload) = context.payload {
264 span.record("context.payload", truncate_string(&payload));
265 }
266
267 if let Some(ref main) = context.main {
268 span.record("context.main", truncate_string(&main));
269 }
270
271 span.record("step.id", self.id.to_string());
272
273 if let Some(ref label) = self.label {
274 span.record("step.label", label.to_string());
275 }
276 }
277
278 if let Some(output) = self.evaluate_return(context)? {
279 {
280 span.record("step.return", output.to_string());
281 }
282
283 return Ok(StepOutput {
284 next_step: NextStep::Stop,
285 output: Some(output),
286 });
287 }
288
289 if let Some((module, output, context)) = self.evaluate_module(context).await? {
290 {
291 span.record("step.module", module.clone());
292
293 if let Some(ref output) = output {
294 span.record("context.payload", truncate_string(output));
295 }
296 }
297
298 let context = if let Some(output) = output.clone() {
299 context.add_module_output(output)
300 } else {
301 context.clone()
302 };
303
304 return Ok(StepOutput {
305 next_step: NextStep::Next,
306 output: self.evaluate_payload(&context, output)?,
307 });
308 }
309
310 if let Some(condition) = &self.condition {
311 let (next_step, output) = if condition
312 .evaluate(context)
313 .map_err(StepWorkerError::ConditionError)?
314 {
315 let next_step = if let Some(ref then_case) = self.then_case {
316 NextStep::Pipeline(*then_case)
317 } else {
318 NextStep::Next
319 };
320
321 (next_step, self.evaluate_payload(context, None)?)
322 } else {
323 let next_step = if let Some(ref else_case) = self.else_case {
324 NextStep::Pipeline(*else_case)
325 } else {
326 NextStep::Next
327 };
328
329 (next_step, None)
330 };
331
332 {
333 span.record("step.condition", condition.raw.to_string());
334
335 if let Some(ref output) = output {
336 span.record("context.payload", truncate_string(output));
337 }
338 }
339
340 return Ok(StepOutput { next_step, output });
341 }
342
343 let output = self.evaluate_payload(context, None)?;
344
345 {
346 if let Some(ref output) = output {
347 span.record("context.payload", truncate_string(output));
348 }
349 }
350
351 if let Some(to) = &self.to {
352 return Ok(StepOutput {
353 next_step: NextStep::GoToStep(to.clone()),
354 output,
355 });
356 }
357
358 return Ok(StepOutput {
359 next_step: NextStep::Next,
360 output,
361 });
362 }
363}
364
365fn truncate_string(string: &Value) -> String {
366 let limit = *PHLOW_TRUNCATE_SPAN_VALUE;
367 let string = string.to_string();
368 if string.len() > limit {
369 format!("{}...", &string[..limit])
370 } else {
371 string.to_string()
372 }
373}
374
375#[cfg(test)]
376mod test {
377 use crate::engine::build_engine_async;
378
379 use super::*;
380 use phlow_sdk::valu3;
381 use valu3::prelude::ToValueBehavior;
382 use valu3::value::Value;
383
384 #[tokio::test]
385 async fn test_step_get_reference_id() {
386 let step = StepWorker {
387 id: ID::from("id"),
388 label: Some("label".to_string()),
389 ..Default::default()
390 };
391
392 assert_eq!(step.get_id(), &ID::from("id"));
393 }
394
395 #[tokio::test]
396 async fn test_step_execute() {
397 let engine = build_engine_async(None);
398 let step = StepWorker {
399 payload: Some(Script::try_build(engine, &"10".to_value()).unwrap()),
400 ..Default::default()
401 };
402
403 let context = Context::new();
404
405 let result = step.execute(&context).await.unwrap();
406
407 assert_eq!(result.next_step, NextStep::Next);
408 assert_eq!(result.output, Some(Value::from(10i64)));
409 }
410
411 #[tokio::test]
412 async fn test_step_execute_with_condition() {
413 let engine = build_engine_async(None);
414 let step = StepWorker {
415 id: ID::new(),
416 condition: Some(
417 Condition::try_build_with_operator(
418 engine.clone(),
419 "10".to_string(),
420 "20".to_string(),
421 crate::condition::Operator::NotEqual,
422 )
423 .unwrap(),
424 ),
425 payload: Some(Script::try_build(engine, &"10".to_value()).unwrap()),
426 ..Default::default()
427 };
428
429 let context = Context::new();
430
431 let result = step.execute(&context).await.unwrap();
432
433 assert_eq!(result.next_step, NextStep::Next);
434 assert_eq!(result.output, Some(Value::from(10i64)));
435 }
436
437 #[tokio::test]
438 async fn test_step_execute_with_condition_then_case() {
439 let engine = build_engine_async(None);
440 let step = StepWorker {
441 id: ID::new(),
442 condition: Some(
443 Condition::try_build_with_operator(
444 engine.clone(),
445 "10".to_string(),
446 "20".to_string(),
447 crate::condition::Operator::NotEqual,
448 )
449 .unwrap(),
450 ),
451 payload: Some(Script::try_build(engine, &"10".to_value()).unwrap()),
452 then_case: Some(0),
453 ..Default::default()
454 };
455
456 let context = Context::new();
457
458 let result = step.execute(&context).await.unwrap();
459
460 assert_eq!(result.next_step, NextStep::Pipeline(0));
461 assert_eq!(result.output, Some(Value::from(10i64)));
462 }
463
464 #[tokio::test]
465 async fn test_step_execute_with_condition_else_case() {
466 let engine = build_engine_async(None);
467 let step = StepWorker {
468 id: ID::new(),
469 condition: Some(
470 Condition::try_build_with_operator(
471 engine.clone(),
472 "10".to_string(),
473 "20".to_string(),
474 crate::condition::Operator::Equal,
475 )
476 .unwrap(),
477 ),
478 payload: Some(Script::try_build(engine.clone(), &"10".to_value()).unwrap()),
479 else_case: Some(1),
480 ..Default::default()
481 };
482
483 let context = Context::new();
484
485 let result = step.execute(&context).await.unwrap();
486
487 assert_eq!(result.next_step, NextStep::Pipeline(1));
488 assert_eq!(result.output, None);
489 }
490
491 #[tokio::test]
492 async fn test_step_execute_with_return_case() {
493 let engine = build_engine_async(None);
494 let step = StepWorker {
495 id: ID::new(),
496 return_case: Some(Script::try_build(engine.clone(), &"10".to_value()).unwrap()),
497 ..Default::default()
498 };
499
500 let context = Context::new();
501
502 let result = step.execute(&context).await.unwrap();
503
504 assert_eq!(result.next_step, NextStep::Stop);
505 assert_eq!(result.output, Some(Value::from(10i64)));
506 }
507
508 #[tokio::test]
509 async fn test_step_execute_with_return_case_and_payload() {
510 let engine = build_engine_async(None);
511 let step = StepWorker {
512 id: ID::new(),
513 payload: Some(Script::try_build(engine.clone(), &"10".to_value()).unwrap()),
514 return_case: Some(Script::try_build(engine.clone(), &"20".to_value()).unwrap()),
515 ..Default::default()
516 };
517
518 let context = Context::new();
519
520 let result = step.execute(&context).await.unwrap();
521
522 assert_eq!(result.next_step, NextStep::Stop);
523 assert_eq!(result.output, Some(Value::from(20i64)));
524 }
525
526 #[tokio::test]
527 async fn test_step_execute_with_return_case_and_condition() {
528 let engine = build_engine_async(None);
529 let step = StepWorker {
530 id: ID::new(),
531 condition: Some(
532 Condition::try_build_with_operator(
533 engine.clone(),
534 "10".to_string(),
535 "20".to_string(),
536 crate::condition::Operator::Equal,
537 )
538 .unwrap(),
539 ),
540 return_case: Some(Script::try_build(engine.clone(), &"10".to_value()).unwrap()),
541 ..Default::default()
542 };
543
544 let context = Context::new();
545
546 let result = step.execute(&context).await.unwrap();
547
548 assert_eq!(result.next_step, NextStep::Stop);
549 assert_eq!(result.output, Some(Value::from(10i64)));
550 }
551
552 #[tokio::test]
553 async fn test_step_execute_with_return_case_and_condition_then_case() {
554 let engine = build_engine_async(None);
555 let step = StepWorker {
556 id: ID::new(),
557 condition: Some(
558 Condition::try_build_with_operator(
559 engine.clone(),
560 "10".to_string(),
561 "20".to_string(),
562 crate::condition::Operator::Equal,
563 )
564 .unwrap(),
565 ),
566 then_case: Some(0),
567 return_case: Some(Script::try_build(engine.clone(), &"Ok".to_value()).unwrap()),
568 ..Default::default()
569 };
570
571 let context = Context::new();
572 let output = step.execute(&context).await.unwrap();
573
574 assert_eq!(output.next_step, NextStep::Stop);
575 assert_eq!(output.output, Some(Value::from("Ok")));
576 }
577
578 #[tokio::test]
579 async fn test_step_execute_with_return_case_and_condition_else_case() {
580 let engine = build_engine_async(None);
581 let step = StepWorker {
582 id: ID::new(),
583 condition: Some(
584 Condition::try_build_with_operator(
585 engine.clone(),
586 "10".to_string(),
587 "20".to_string(),
588 crate::condition::Operator::Equal,
589 )
590 .unwrap(),
591 ),
592 else_case: Some(0),
593 return_case: Some(Script::try_build(engine.clone(), &"Ok".to_value()).unwrap()),
594 ..Default::default()
595 };
596
597 let context = Context::new();
598 let result = step.execute(&context).await.unwrap();
599
600 assert_eq!(result.next_step, NextStep::Stop);
601 assert_eq!(result.output, Some(Value::from("Ok")));
602 }
603}