1use crate::{
2 collector::{ContextSender, Step},
3 condition::{Condition, ConditionError},
4 context::Context,
5 id::ID,
6 modules::{Modules, ModulesError},
7 script::{Script, ScriptError},
8};
9use phlow_sdk::{sender_safe, tracing};
10use rhai::Engine;
11use serde::Serialize;
12use std::sync::Arc;
13use valu3::prelude::NumberBehavior;
14use valu3::{prelude::StringBehavior, value::Value};
15
16#[derive(Debug)]
17pub enum StepWorkerError {
18 ConditionError(ConditionError),
19 PayloadError(ScriptError),
20 ModulesError(ModulesError),
21 InputError(ScriptError),
22}
23
24#[derive(Debug, Clone, PartialEq, Serialize)]
25pub enum NextStep {
26 Pipeline(usize),
27 Stop,
28 Next,
29}
30
31#[derive(Debug)]
32pub struct StepOutput {
33 pub next_step: NextStep,
34 pub output: Option<Value>,
35}
36
37#[derive(Debug, Clone, Default)]
38pub struct StepWorker {
39 pub(crate) id: ID,
40 pub(crate) label: Option<String>,
41 pub(crate) module: Option<String>,
42 pub(crate) condition: Option<Condition>,
43 pub(crate) input: Option<Script>,
44 pub(crate) payload: Option<Script>,
45 pub(crate) then_case: Option<usize>,
46 pub(crate) else_case: Option<usize>,
47 pub(crate) modules: Arc<Modules>,
48 pub(crate) return_case: Option<Script>,
49 pub(crate) trace_sender: Option<ContextSender>,
50}
51
52impl StepWorker {
53 pub fn try_from_value(
54 engine: Arc<Engine>,
55 modules: Arc<Modules>,
56 trace_sender: Option<ContextSender>,
57 value: &Value,
58 ) -> Result<Self, StepWorkerError> {
59 let id = match value.get("id") {
60 Some(id) => ID::from(id),
61 None => ID::new(),
62 };
63 let label: Option<String> = match value.get("label") {
64 Some(label) => Some(label.as_string()),
65 None => None,
66 };
67 let condition = {
68 if let Some(condition) = value
69 .get("condition")
70 .map(|condition| Condition::try_from_value(engine.clone(), condition))
71 {
72 Some(condition.map_err(StepWorkerError::ConditionError)?)
73 } else {
74 None
75 }
76 };
77 let payload = match value.get("payload") {
78 Some(payload) => match Script::try_build(engine.clone(), payload) {
79 Ok(payload) => Some(payload),
80 Err(err) => return Err(StepWorkerError::PayloadError(err)),
81 },
82 None => None,
83 };
84 let input = match value.get("input") {
85 Some(input) => match Script::try_build(engine.clone(), input) {
86 Ok(input) => Some(input),
87 Err(err) => return Err(StepWorkerError::InputError(err)),
88 },
89 None => None,
90 };
91 let then_case = match value.get("then") {
92 Some(then_case) => match then_case.to_u64() {
93 Some(then_case) => Some(then_case as usize),
94 None => None,
95 },
96 None => None,
97 };
98 let else_case = match value.get("else") {
99 Some(else_case) => match else_case.to_u64() {
100 Some(else_case) => Some(else_case as usize),
101 None => None,
102 },
103 None => None,
104 };
105 let return_case = match value.get("return") {
106 Some(return_case) => match Script::try_build(engine, return_case) {
107 Ok(return_case) => Some(return_case),
108 Err(err) => return Err(StepWorkerError::PayloadError(err)),
109 },
110 None => None,
111 };
112 let module = match value.get("use") {
113 Some(module) => Some(module.to_string()),
114 None => None,
115 };
116
117 Ok(Self {
118 id,
119 label,
120 module,
121 input,
122 condition,
123 payload,
124 then_case,
125 else_case,
126 modules,
127 return_case,
128 trace_sender,
129 })
130 }
131
132 pub fn get_id(&self) -> &ID {
133 &self.id
134 }
135
136 fn evaluate_payload(
137 &self,
138 context: &Context,
139 default: Option<Value>,
140 ) -> Result<Option<Value>, StepWorkerError> {
141 if let Some(ref payload) = self.payload {
142 let value = Some(
143 payload
144 .evaluate(context)
145 .map_err(StepWorkerError::PayloadError)?,
146 );
147 Ok(value)
148 } else {
149 Ok(default)
150 }
151 }
152
153 fn evaluate_input(&self, context: &Context) -> Result<Option<Value>, StepWorkerError> {
154 if let Some(ref input) = self.input {
155 let value = Some(
156 input
157 .evaluate(context)
158 .map_err(StepWorkerError::InputError)?,
159 );
160 Ok(value)
161 } else {
162 Ok(None)
163 }
164 }
165
166 fn evaluate_return(&self, context: &Context) -> Result<Option<Value>, StepWorkerError> {
167 if let Some(ref return_case) = self.return_case {
168 let value = Some(
169 return_case
170 .evaluate(context)
171 .map_err(StepWorkerError::PayloadError)?,
172 );
173 Ok(value)
174 } else {
175 Ok(None)
176 }
177 }
178
179 async fn evaluate_module(
180 &self,
181 context: &Context,
182 ) -> Result<Option<(Option<String>, Option<Value>, Context)>, StepWorkerError> {
183 if let Some(ref module) = self.module {
184 let input = self.evaluate_input(context)?;
185
186 let context = if let Some(input) = &input {
187 context.add_module_input(input.clone())
188 } else {
189 context.clone()
190 };
191
192 match self.modules.execute(module, &context).await {
193 Ok(value) => Ok(Some((Some(module.clone()), Some(value), context))),
194 Err(err) => Err(StepWorkerError::ModulesError(err)),
195 }
196 } else {
197 Ok(None)
198 }
199 }
200
201 #[tracing::instrument(skip(context))]
202 pub async fn execute(&self, context: &Context) -> Result<StepOutput, StepWorkerError> {
203 if let Some(output) = self.evaluate_return(context)? {
204 if let Some(sender) = &self.trace_sender {
205 sender_safe!(
206 sender,
207 Step {
208 id: self.id.clone(),
209 label: self.label.clone(),
210 input: None,
211 module: None,
212 condition: None,
213 payload: None,
214 return_case: Some(output.clone()),
215 }
216 );
217 }
218
219 return Ok(StepOutput {
220 next_step: NextStep::Stop,
221 output: Some(output),
222 });
223 }
224
225 if let Ok(Some((module, output, context))) = self.evaluate_module(context).await {
226 if let Some(sender) = &self.trace_sender {
227 sender_safe!(
228 sender,
229 Step {
230 id: self.id.clone(),
231 label: self.label.clone(),
232 input: context.input.clone(),
233 module,
234 condition: None,
235 payload: output.clone(),
236 return_case: None,
237 }
238 );
239 }
240
241 let context = if let Some(output) = output.clone() {
242 context.add_module_output(output)
243 } else {
244 context.clone()
245 };
246
247 return Ok(StepOutput {
248 next_step: NextStep::Next,
249 output: self.evaluate_payload(&context, output)?,
250 });
251 }
252
253 if let Some(condition) = &self.condition {
254 let (next_step, output) = if condition
255 .evaluate(context)
256 .map_err(StepWorkerError::ConditionError)?
257 {
258 let next_step = if let Some(ref then_case) = self.then_case {
259 NextStep::Pipeline(*then_case)
260 } else {
261 NextStep::Next
262 };
263
264 (next_step, self.evaluate_payload(context, None)?)
265 } else {
266 let next_step = if let Some(ref else_case) = self.else_case {
267 NextStep::Pipeline(*else_case)
268 } else {
269 NextStep::Next
270 };
271
272 (next_step, None)
273 };
274
275 if let Some(sender) = &self.trace_sender {
276 sender_safe!(
277 sender,
278 Step {
279 id: self.id.clone(),
280 label: self.label.clone(),
281 module: None,
282 input: None,
283 condition: Some(condition.raw.clone()),
284 payload: output.clone(),
285 return_case: None,
286 }
287 );
288 }
289
290 return Ok(StepOutput { next_step, output });
291 }
292
293 let output = self.evaluate_payload(context, None)?;
294
295 if let Some(sender) = &self.trace_sender {
296 sender_safe!(
297 sender,
298 Step {
299 id: self.id.clone(),
300 label: self.label.clone(),
301 module: None,
302 input: None,
303 condition: None,
304 payload: output.clone(),
305 return_case: None,
306 }
307 );
308 }
309
310 return Ok(StepOutput {
311 next_step: NextStep::Next,
312 output,
313 });
314 }
315}
316
317#[cfg(test)]
318mod test {
319 use crate::engine::build_engine_async;
320
321 use super::*;
322 use valu3::prelude::ToValueBehavior;
323 use valu3::value::Value;
324
325 #[tokio::test]
326 async fn test_step_get_reference_id() {
327 let step = StepWorker {
328 id: ID::from("id"),
329 label: Some("label".to_string()),
330 ..Default::default()
331 };
332
333 assert_eq!(step.get_id(), &ID::from("id"));
334 }
335
336 #[tokio::test]
337 async fn test_step_execute() {
338 let engine = build_engine_async(None);
339 let step = StepWorker {
340 payload: Some(Script::try_build(engine, &"10".to_value()).unwrap()),
341 ..Default::default()
342 };
343
344 let context = Context::new(None);
345
346 let result = step.execute(&context).await.unwrap();
347
348 assert_eq!(result.next_step, NextStep::Next);
349 assert_eq!(result.output, Some(Value::from(10i64)));
350 }
351
352 #[tokio::test]
353 async fn test_step_execute_with_condition() {
354 let engine = build_engine_async(None);
355 let step = StepWorker {
356 id: ID::new(),
357 condition: Some(
358 Condition::try_build_with_operator(
359 engine.clone(),
360 "10".to_string(),
361 "20".to_string(),
362 crate::condition::Operator::NotEqual,
363 )
364 .unwrap(),
365 ),
366 payload: Some(Script::try_build(engine, &"10".to_value()).unwrap()),
367 ..Default::default()
368 };
369
370 let context = Context::new(None);
371
372 let result = step.execute(&context).await.unwrap();
373
374 assert_eq!(result.next_step, NextStep::Next);
375 assert_eq!(result.output, Some(Value::from(10i64)));
376 }
377
378 #[tokio::test]
379 async fn test_step_execute_with_condition_then_case() {
380 let engine = build_engine_async(None);
381 let step = StepWorker {
382 id: ID::new(),
383 condition: Some(
384 Condition::try_build_with_operator(
385 engine.clone(),
386 "10".to_string(),
387 "20".to_string(),
388 crate::condition::Operator::NotEqual,
389 )
390 .unwrap(),
391 ),
392 payload: Some(Script::try_build(engine, &"10".to_value()).unwrap()),
393 then_case: Some(0),
394 ..Default::default()
395 };
396
397 let context = Context::new(None);
398
399 let result = step.execute(&context).await.unwrap();
400
401 assert_eq!(result.next_step, NextStep::Pipeline(0));
402 assert_eq!(result.output, Some(Value::from(10i64)));
403 }
404
405 #[tokio::test]
406 async fn test_step_execute_with_condition_else_case() {
407 let engine = build_engine_async(None);
408 let step = StepWorker {
409 id: ID::new(),
410 condition: Some(
411 Condition::try_build_with_operator(
412 engine.clone(),
413 "10".to_string(),
414 "20".to_string(),
415 crate::condition::Operator::Equal,
416 )
417 .unwrap(),
418 ),
419 payload: Some(Script::try_build(engine.clone(), &"10".to_value()).unwrap()),
420 else_case: Some(1),
421 ..Default::default()
422 };
423
424 let context = Context::new(None);
425
426 let result = step.execute(&context).await.unwrap();
427
428 assert_eq!(result.next_step, NextStep::Pipeline(1));
429 assert_eq!(result.output, None);
430 }
431
432 #[tokio::test]
433 async fn test_step_execute_with_return_case() {
434 let engine = build_engine_async(None);
435 let step = StepWorker {
436 id: ID::new(),
437 return_case: Some(Script::try_build(engine.clone(), &"10".to_value()).unwrap()),
438 ..Default::default()
439 };
440
441 let context = Context::new(None);
442
443 let result = step.execute(&context).await.unwrap();
444
445 assert_eq!(result.next_step, NextStep::Stop);
446 assert_eq!(result.output, Some(Value::from(10i64)));
447 }
448
449 #[tokio::test]
450 async fn test_step_execute_with_return_case_and_payload() {
451 let engine = build_engine_async(None);
452 let step = StepWorker {
453 id: ID::new(),
454 payload: Some(Script::try_build(engine.clone(), &"10".to_value()).unwrap()),
455 return_case: Some(Script::try_build(engine.clone(), &"20".to_value()).unwrap()),
456 ..Default::default()
457 };
458
459 let context = Context::new(None);
460
461 let result = step.execute(&context).await.unwrap();
462
463 assert_eq!(result.next_step, NextStep::Stop);
464 assert_eq!(result.output, Some(Value::from(20i64)));
465 }
466
467 #[tokio::test]
468 async fn test_step_execute_with_return_case_and_condition() {
469 let engine = build_engine_async(None);
470 let step = StepWorker {
471 id: ID::new(),
472 condition: Some(
473 Condition::try_build_with_operator(
474 engine.clone(),
475 "10".to_string(),
476 "20".to_string(),
477 crate::condition::Operator::Equal,
478 )
479 .unwrap(),
480 ),
481 return_case: Some(Script::try_build(engine.clone(), &"10".to_value()).unwrap()),
482 ..Default::default()
483 };
484
485 let context = Context::new(None);
486
487 let result = step.execute(&context).await.unwrap();
488
489 assert_eq!(result.next_step, NextStep::Stop);
490 assert_eq!(result.output, Some(Value::from(10i64)));
491 }
492
493 #[tokio::test]
494 async fn test_step_execute_with_return_case_and_condition_then_case() {
495 let engine = build_engine_async(None);
496 let step = StepWorker {
497 id: ID::new(),
498 condition: Some(
499 Condition::try_build_with_operator(
500 engine.clone(),
501 "10".to_string(),
502 "20".to_string(),
503 crate::condition::Operator::Equal,
504 )
505 .unwrap(),
506 ),
507 then_case: Some(0),
508 return_case: Some(Script::try_build(engine.clone(), &"Ok".to_value()).unwrap()),
509 ..Default::default()
510 };
511
512 let context = Context::new(None);
513 let output = step.execute(&context).await.unwrap();
514
515 assert_eq!(output.next_step, NextStep::Stop);
516 assert_eq!(output.output, Some(Value::from("Ok")));
517 }
518
519 #[tokio::test]
520 async fn test_step_execute_with_return_case_and_condition_else_case() {
521 let engine = build_engine_async(None);
522 let step = StepWorker {
523 id: ID::new(),
524 condition: Some(
525 Condition::try_build_with_operator(
526 engine.clone(),
527 "10".to_string(),
528 "20".to_string(),
529 crate::condition::Operator::Equal,
530 )
531 .unwrap(),
532 ),
533 else_case: Some(0),
534 return_case: Some(Script::try_build(engine.clone(), &"Ok".to_value()).unwrap()),
535 ..Default::default()
536 };
537
538 let context = Context::new(None);
539 let result = step.execute(&context).await.unwrap();
540
541 assert_eq!(result.next_step, NextStep::Stop);
542 assert_eq!(result.output, Some(Value::from("Ok")));
543 }
544}