wfe_core/primitives/
recur.rs1use std::time::Duration;
2
3use async_trait::async_trait;
4use serde_json::json;
5
6use crate::models::ExecutionResult;
7use crate::traits::step::{StepBody, StepExecutionContext};
8
9#[derive(Default)]
12pub struct RecurStep {
13 pub interval: Duration,
15 pub stop_condition: bool,
17}
18
19#[async_trait]
20impl StepBody for RecurStep {
21 async fn run(&mut self, context: &StepExecutionContext<'_>) -> crate::Result<ExecutionResult> {
22 if self.stop_condition {
23 return Ok(ExecutionResult::next());
24 }
25
26 let children_active = context
27 .persistence_data
28 .and_then(|d| d.get("children_active"))
29 .and_then(|v| v.as_bool())
30 .unwrap_or(false);
31
32 if children_active {
33 let mut scope = context.execution_pointer.scope.clone();
34 scope.push(context.execution_pointer.id.clone());
35
36 if context.workflow.is_branch_complete(&scope) {
37 Ok(ExecutionResult::sleep(
39 self.interval,
40 Some(json!({"children_active": true})),
41 ))
42 } else {
43 Ok(ExecutionResult::persist(json!({"children_active": true})))
44 }
45 } else {
46 Ok(ExecutionResult::sleep(
48 self.interval,
49 Some(json!({"children_active": true})),
50 ))
51 }
52 }
53}
54
55#[cfg(test)]
56mod tests {
57 use super::*;
58 use crate::models::{ExecutionPointer, PointerStatus};
59 use crate::primitives::test_helpers::*;
60
61 #[tokio::test]
62 async fn stop_condition_true_proceeds() {
63 let mut step = RecurStep {
64 interval: Duration::from_secs(10),
65 stop_condition: true,
66 };
67 let pointer = ExecutionPointer::new(0);
68 let wf_step = default_step();
69 let workflow = default_workflow();
70 let ctx = make_context(&pointer, &wf_step, &workflow);
71
72 let result = step.run(&ctx).await.unwrap();
73 assert!(result.proceed);
74 }
75
76 #[tokio::test]
77 async fn first_run_sleeps() {
78 let mut step = RecurStep {
79 interval: Duration::from_secs(10),
80 stop_condition: false,
81 };
82 let pointer = ExecutionPointer::new(0);
83 let wf_step = default_step();
84 let workflow = default_workflow();
85 let ctx = make_context(&pointer, &wf_step, &workflow);
86
87 let result = step.run(&ctx).await.unwrap();
88 assert!(!result.proceed);
89 assert_eq!(result.sleep_for, Some(Duration::from_secs(10)));
90 assert_eq!(
91 result.persistence_data,
92 Some(json!({"children_active": true}))
93 );
94 }
95
96 #[tokio::test]
97 async fn children_complete_re_arms() {
98 let mut step = RecurStep {
99 interval: Duration::from_secs(10),
100 stop_condition: false,
101 };
102 let mut pointer = ExecutionPointer::new(0);
103 pointer.persistence_data = Some(json!({"children_active": true}));
104
105 let wf_step = default_step();
106 let mut workflow = default_workflow();
107 let mut child = ExecutionPointer::new(1);
108 child.scope = vec![pointer.id.clone()];
109 child.status = PointerStatus::Complete;
110 workflow.execution_pointers.push(child);
111
112 let ctx = make_context(&pointer, &wf_step, &workflow);
113 let result = step.run(&ctx).await.unwrap();
114 assert!(!result.proceed);
115 assert_eq!(result.sleep_for, Some(Duration::from_secs(10)));
116 }
117
118 #[tokio::test]
119 async fn children_incomplete_persists() {
120 let mut step = RecurStep {
121 interval: Duration::from_secs(10),
122 stop_condition: false,
123 };
124 let mut pointer = ExecutionPointer::new(0);
125 pointer.persistence_data = Some(json!({"children_active": true}));
126
127 let wf_step = default_step();
128 let mut workflow = default_workflow();
129 let mut child = ExecutionPointer::new(1);
130 child.scope = vec![pointer.id.clone()];
131 child.status = PointerStatus::Running;
132 workflow.execution_pointers.push(child);
133
134 let ctx = make_context(&pointer, &wf_step, &workflow);
135 let result = step.run(&ctx).await.unwrap();
136 assert!(!result.proceed);
137 assert!(result.sleep_for.is_none());
138 assert_eq!(
139 result.persistence_data,
140 Some(json!({"children_active": true}))
141 );
142 }
143}