1use crate::{
2 condition::{Condition, ConditionError},
3 context::Context,
4 id::ID,
5 script::{Script, ScriptError},
6};
7use once_cell::sync::Lazy;
8use phlow_sdk::prelude::*;
9use rhai::Engine;
10use serde::Serialize;
11use std::sync::Arc;
12
13static PHLOW_TRUNCATE_SPAN_VALUE: Lazy<usize> =
14 Lazy::new(|| match std::env::var("PHLOW_TRUNCATE_SPAN_VALUE") {
15 Ok(value) => value.parse::<usize>().unwrap_or(100),
16 Err(_) => 100,
17 });
18
19#[derive(Debug)]
20pub enum StepWorkerError {
21 ConditionError(ConditionError),
22 PayloadError(ScriptError),
23 ModulesError(ModulesError),
24 InputError(ScriptError),
25}
26
27#[derive(Debug, Clone, PartialEq, Serialize)]
28pub enum NextStep {
29 Pipeline(usize),
30 GoToStep(StepReference),
31 Stop,
32 Next,
33}
34
35#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, ToValue)]
36pub struct StepReference {
37 pub pipeline: usize,
38 pub step: usize,
39}
40
41#[derive(Debug)]
42pub struct StepOutput {
43 pub next_step: NextStep,
44 pub output: Option<Value>,
45}
46
47#[derive(Debug, Clone, Default)]
48pub struct StepWorker {
49 pub(crate) id: ID,
50 pub(crate) label: Option<String>,
51 pub(crate) module: Option<String>,
52 pub(crate) condition: Option<Condition>,
53 pub(crate) input: Option<Script>,
54 pub(crate) payload: Option<Script>,
55 pub(crate) then_case: Option<usize>,
56 pub(crate) else_case: Option<usize>,
57 pub(crate) modules: Arc<Modules>,
58 pub(crate) return_case: Option<Script>,
59 pub(crate) to: Option<StepReference>,
60}
61
62impl StepWorker {
63 pub fn try_from_value(
64 engine: Arc<Engine>,
65 modules: Arc<Modules>,
66 value: &Value,
67 ) -> Result<Self, StepWorkerError> {
68 let id = match value.get("id") {
69 Some(id) => ID::from(id),
70 None => ID::new(),
71 };
72 let label: Option<String> = match value.get("label") {
73 Some(label) => Some(label.as_string()),
74 None => None,
75 };
76 let condition = {
77 if let Some(condition) = value
78 .get("condition")
79 .map(|condition| Condition::try_from_value(engine.clone(), condition))
80 {
81 Some(condition.map_err(StepWorkerError::ConditionError)?)
82 } else {
83 None
84 }
85 };
86 let payload = match value.get("payload") {
87 Some(payload) => match Script::try_build(engine.clone(), payload) {
88 Ok(payload) => Some(payload),
89 Err(err) => return Err(StepWorkerError::PayloadError(err)),
90 },
91 None => None,
92 };
93 let input = match value.get("input") {
94 Some(input) => match Script::try_build(engine.clone(), input) {
95 Ok(input) => Some(input),
96 Err(err) => return Err(StepWorkerError::InputError(err)),
97 },
98 None => None,
99 };
100 let then_case = match value.get("then") {
101 Some(then_case) => match then_case.to_u64() {
102 Some(then_case) => Some(then_case as usize),
103 None => None,
104 },
105 None => None,
106 };
107 let else_case = match value.get("else") {
108 Some(else_case) => match else_case.to_u64() {
109 Some(else_case) => Some(else_case as usize),
110 None => None,
111 },
112 None => None,
113 };
114 let return_case = match value.get("return") {
115 Some(return_case) => match Script::try_build(engine, return_case) {
116 Ok(return_case) => Some(return_case),
117 Err(err) => return Err(StepWorkerError::PayloadError(err)),
118 },
119 None => None,
120 };
121 let module = match value.get("use") {
122 Some(module) => Some(module.to_string()),
123 None => None,
124 };
125
126 let to = match value.get("to") {
127 Some(to_step) => match to_step.as_object() {
128 Some(to_step) => {
129 let pipeline = to_step.get("pipeline").and_then(|v| v.to_u64());
130 let step = to_step.get("step").and_then(|v| v.to_u64());
131
132 if pipeline.is_some() && step.is_some() {
133 Some(StepReference {
134 pipeline: pipeline.unwrap() as usize,
135 step: step.unwrap() as usize,
136 })
137 } else {
138 None
139 }
140 }
141 None => None,
142 },
143 None => None,
144 };
145
146 Ok(Self {
147 id,
148 label,
149 module,
150 input,
151 condition,
152 payload,
153 then_case,
154 else_case,
155 modules,
156 return_case,
157 to,
158 })
159 }
160
161 pub fn get_id(&self) -> &ID {
162 &self.id
163 }
164
165 fn evaluate_payload(
166 &self,
167 context: &Context,
168 default: Option<Value>,
169 ) -> Result<Option<Value>, StepWorkerError> {
170 if let Some(ref payload) = self.payload {
171 let value = Some(
172 payload
173 .evaluate(context)
174 .map_err(StepWorkerError::PayloadError)?,
175 );
176 Ok(value)
177 } else {
178 Ok(default)
179 }
180 }
181
182 fn evaluate_input(&self, context: &Context) -> Result<Option<Value>, StepWorkerError> {
183 if let Some(ref input) = self.input {
184 let value = Some(
185 input
186 .evaluate(context)
187 .map_err(StepWorkerError::InputError)?,
188 );
189 Ok(value)
190 } else {
191 Ok(None)
192 }
193 }
194
195 fn evaluate_return(&self, context: &Context) -> Result<Option<Value>, StepWorkerError> {
196 if let Some(ref return_case) = self.return_case {
197 let value = Some(
198 return_case
199 .evaluate(context)
200 .map_err(StepWorkerError::PayloadError)?,
201 );
202 Ok(value)
203 } else {
204 Ok(None)
205 }
206 }
207
208 async fn evaluate_module(
209 &self,
210 context: &Context,
211 ) -> Result<Option<(Option<String>, Option<Value>, Context)>, StepWorkerError> {
212 if let Some(ref module) = self.module {
213 let input = self.evaluate_input(context)?;
214
215 let context = if let Some(input) = &input {
216 context.add_module_input(input.clone())
217 } else {
218 context.clone()
219 };
220
221 match self.modules.execute(module, &context.input).await {
222 Ok(response) => {
223 if let Some(err) = response.error {
224 return Err(StepWorkerError::ModulesError(ModulesError::ModuleError(
225 err,
226 )));
227 }
228
229 Ok(Some((Some(module.clone()), Some(response.data), context)))
230 }
231 Err(err) => Err(StepWorkerError::ModulesError(err)),
232 }
233 } else {
234 Ok(None)
235 }
236 }
237
238 pub async fn execute(&self, context: &Context) -> Result<StepOutput, StepWorkerError> {
239 let span = tracing::info_span!(
240 "step",
241 otel.name = field::Empty,
242 context.main = field::Empty,
243 context.params = field::Empty,
244 context.payload = field::Empty,
245 context.input = field::Empty,
246 step.id = field::Empty,
247 step.label = field::Empty,
248 step.module = field::Empty,
249 step.condition = field::Empty,
250 step.payload = field::Empty,
251 step.return = field::Empty,
252 );
253 let _guard = span.enter();
254
255 {
256 let step_name = self.label.clone().unwrap_or(self.id.to_string());
257 span.record("otel.name", format!("step {}", step_name));
258
259 if let Some(ref input) = context.input {
260 span.record("context.input", input.to_string());
261 }
262
263 if let Some(ref payload) = context.payload {
264 span.record("context.payload", truncate_string(&payload));
265 }
266
267 if let Some(ref main) = context.main {
268 span.record("context.main", truncate_string(&main));
269 }
270
271 span.record("step.id", self.id.to_string());
272
273 if let Some(ref label) = self.label {
274 span.record("step.label", label.to_string());
275 }
276 }
277
278 if let Some(output) = self.evaluate_return(context)? {
279 {
280 span.record("step.return", output.to_string());
281 }
282
283 return Ok(StepOutput {
284 next_step: NextStep::Stop,
285 output: Some(output),
286 });
287 }
288
289 if let Some((module, output, context)) = self.evaluate_module(context).await? {
290 {
291 span.record("step.module", module.clone());
292
293 if let Some(ref output) = output {
294 span.record("context.payload", truncate_string(output));
295 }
296 }
297
298 let context = if let Some(output) = output.clone() {
299 context.add_module_output(output)
300 } else {
301 context.clone()
302 };
303
304 return Ok(StepOutput {
305 next_step: NextStep::Next,
306 output: self.evaluate_payload(&context, output)?,
307 });
308 }
309
310 if let Some(condition) = &self.condition {
311 let (next_step, output) = if condition
312 .evaluate(context)
313 .map_err(StepWorkerError::ConditionError)?
314 {
315 let next_step = if let Some(ref then_case) = self.then_case {
316 NextStep::Pipeline(*then_case)
317 } else {
318 NextStep::Next
319 };
320
321 (next_step, self.evaluate_payload(context, None)?)
322 } else {
323 let next_step = if let Some(ref else_case) = self.else_case {
324 NextStep::Pipeline(*else_case)
325 } else {
326 NextStep::Next
327 };
328
329 (next_step, None)
330 };
331
332 {
333 span.record("step.condition", condition.raw.to_string());
334
335 if let Some(ref output) = output {
336 span.record("context.payload", truncate_string(output));
337 }
338 }
339
340 return Ok(StepOutput { next_step, output });
341 }
342
343 let output = self.evaluate_payload(context, None)?;
344
345 {
346 if let Some(ref output) = output {
347 span.record("context.payload", truncate_string(output));
348 }
349 }
350
351 if let Some(to) = &self.to {
352 return Ok(StepOutput {
353 next_step: NextStep::GoToStep(to.clone()),
354 output,
355 });
356 }
357
358 return Ok(StepOutput {
359 next_step: NextStep::Next,
360 output,
361 });
362 }
363}
364
365fn truncate_string(string: &Value) -> String {
366 let limit = *PHLOW_TRUNCATE_SPAN_VALUE;
367 let string = string.to_string();
368 if string.len() > limit {
369 format!("{}...", &string[..limit])
370 } else {
371 string.to_string()
372 }
373}
374
375#[cfg(test)]
376mod test {
377 use super::*;
378 use phlow_sdk::valu3;
379 use phs::build_engine;
380 use valu3::prelude::ToValueBehavior;
381 use valu3::value::Value;
382
383 #[tokio::test]
384 async fn test_step_get_reference_id() {
385 let step = StepWorker {
386 id: ID::from("id"),
387 label: Some("label".to_string()),
388 ..Default::default()
389 };
390
391 assert_eq!(step.get_id(), &ID::from("id"));
392 }
393
394 #[tokio::test]
395 async fn test_step_execute() {
396 let engine = build_engine(None);
397 let step = StepWorker {
398 payload: Some(Script::try_build(engine, &"10".to_value()).unwrap()),
399 ..Default::default()
400 };
401
402 let context = Context::new();
403
404 let result = step.execute(&context).await.unwrap();
405
406 assert_eq!(result.next_step, NextStep::Next);
407 assert_eq!(result.output, Some(Value::from(10i64)));
408 }
409
410 #[tokio::test]
411 async fn test_step_execute_with_condition() {
412 let engine = build_engine(None);
413 let step = StepWorker {
414 id: ID::new(),
415 condition: Some(
416 Condition::try_build_with_operator(
417 engine.clone(),
418 "10".to_string(),
419 "20".to_string(),
420 crate::condition::Operator::NotEqual,
421 )
422 .unwrap(),
423 ),
424 payload: Some(Script::try_build(engine, &"10".to_value()).unwrap()),
425 ..Default::default()
426 };
427
428 let context = Context::new();
429
430 let result = step.execute(&context).await.unwrap();
431
432 assert_eq!(result.next_step, NextStep::Next);
433 assert_eq!(result.output, Some(Value::from(10i64)));
434 }
435
436 #[tokio::test]
437 async fn test_step_execute_with_condition_then_case() {
438 let engine = build_engine(None);
439 let step = StepWorker {
440 id: ID::new(),
441 condition: Some(
442 Condition::try_build_with_operator(
443 engine.clone(),
444 "10".to_string(),
445 "20".to_string(),
446 crate::condition::Operator::NotEqual,
447 )
448 .unwrap(),
449 ),
450 payload: Some(Script::try_build(engine, &"10".to_value()).unwrap()),
451 then_case: Some(0),
452 ..Default::default()
453 };
454
455 let context = Context::new();
456
457 let result = step.execute(&context).await.unwrap();
458
459 assert_eq!(result.next_step, NextStep::Pipeline(0));
460 assert_eq!(result.output, Some(Value::from(10i64)));
461 }
462
463 #[tokio::test]
464 async fn test_step_execute_with_condition_else_case() {
465 let engine = build_engine(None);
466 let step = StepWorker {
467 id: ID::new(),
468 condition: Some(
469 Condition::try_build_with_operator(
470 engine.clone(),
471 "10".to_string(),
472 "20".to_string(),
473 crate::condition::Operator::Equal,
474 )
475 .unwrap(),
476 ),
477 payload: Some(Script::try_build(engine.clone(), &"10".to_value()).unwrap()),
478 else_case: Some(1),
479 ..Default::default()
480 };
481
482 let context = Context::new();
483
484 let result = step.execute(&context).await.unwrap();
485
486 assert_eq!(result.next_step, NextStep::Pipeline(1));
487 assert_eq!(result.output, None);
488 }
489
490 #[tokio::test]
491 async fn test_step_execute_with_return_case() {
492 let engine = build_engine(None);
493 let step = StepWorker {
494 id: ID::new(),
495 return_case: Some(Script::try_build(engine.clone(), &"10".to_value()).unwrap()),
496 ..Default::default()
497 };
498
499 let context = Context::new();
500
501 let result = step.execute(&context).await.unwrap();
502
503 assert_eq!(result.next_step, NextStep::Stop);
504 assert_eq!(result.output, Some(Value::from(10i64)));
505 }
506
507 #[tokio::test]
508 async fn test_step_execute_with_return_case_and_payload() {
509 let engine = build_engine(None);
510 let step = StepWorker {
511 id: ID::new(),
512 payload: Some(Script::try_build(engine.clone(), &"10".to_value()).unwrap()),
513 return_case: Some(Script::try_build(engine.clone(), &"20".to_value()).unwrap()),
514 ..Default::default()
515 };
516
517 let context = Context::new();
518
519 let result = step.execute(&context).await.unwrap();
520
521 assert_eq!(result.next_step, NextStep::Stop);
522 assert_eq!(result.output, Some(Value::from(20i64)));
523 }
524
525 #[tokio::test]
526 async fn test_step_execute_with_return_case_and_condition() {
527 let engine = build_engine(None);
528 let step = StepWorker {
529 id: ID::new(),
530 condition: Some(
531 Condition::try_build_with_operator(
532 engine.clone(),
533 "10".to_string(),
534 "20".to_string(),
535 crate::condition::Operator::Equal,
536 )
537 .unwrap(),
538 ),
539 return_case: Some(Script::try_build(engine.clone(), &"10".to_value()).unwrap()),
540 ..Default::default()
541 };
542
543 let context = Context::new();
544
545 let result = step.execute(&context).await.unwrap();
546
547 assert_eq!(result.next_step, NextStep::Stop);
548 assert_eq!(result.output, Some(Value::from(10i64)));
549 }
550
551 #[tokio::test]
552 async fn test_step_execute_with_return_case_and_condition_then_case() {
553 let engine = build_engine(None);
554 let step = StepWorker {
555 id: ID::new(),
556 condition: Some(
557 Condition::try_build_with_operator(
558 engine.clone(),
559 "10".to_string(),
560 "20".to_string(),
561 crate::condition::Operator::Equal,
562 )
563 .unwrap(),
564 ),
565 then_case: Some(0),
566 return_case: Some(Script::try_build(engine.clone(), &"Ok".to_value()).unwrap()),
567 ..Default::default()
568 };
569
570 let context = Context::new();
571 let output = step.execute(&context).await.unwrap();
572
573 assert_eq!(output.next_step, NextStep::Stop);
574 assert_eq!(output.output, Some(Value::from("Ok")));
575 }
576
577 #[tokio::test]
578 async fn test_step_execute_with_return_case_and_condition_else_case() {
579 let engine = build_engine(None);
580 let step = StepWorker {
581 id: ID::new(),
582 condition: Some(
583 Condition::try_build_with_operator(
584 engine.clone(),
585 "10".to_string(),
586 "20".to_string(),
587 crate::condition::Operator::Equal,
588 )
589 .unwrap(),
590 ),
591 else_case: Some(0),
592 return_case: Some(Script::try_build(engine.clone(), &"Ok".to_value()).unwrap()),
593 ..Default::default()
594 };
595
596 let context = Context::new();
597 let result = step.execute(&context).await.unwrap();
598
599 assert_eq!(result.next_step, NextStep::Stop);
600 assert_eq!(result.output, Some(Value::from("Ok")));
601 }
602}