Skip to main content

wfe_core/primitives/
recur.rs

1use std::time::Duration;
2
3use async_trait::async_trait;
4use serde_json::json;
5
6use crate::models::ExecutionResult;
7use crate::traits::step::{StepBody, StepExecutionContext};
8
9/// A step that repeatedly schedules child execution at an interval
10/// until a stop condition is met.
11#[derive(Default)]
12pub struct RecurStep {
13    /// Interval.
14    pub interval: Duration,
15    /// Stop condition.
16    pub stop_condition: bool,
17}
18
19#[async_trait]
20impl StepBody for RecurStep {
21    async fn run(&mut self, context: &StepExecutionContext<'_>) -> crate::Result<ExecutionResult> {
22        if self.stop_condition {
23            return Ok(ExecutionResult::next());
24        }
25
26        let children_active = context
27            .persistence_data
28            .and_then(|d| d.get("children_active"))
29            .and_then(|v| v.as_bool())
30            .unwrap_or(false);
31
32        if children_active {
33            let mut scope = context.execution_pointer.scope.clone();
34            scope.push(context.execution_pointer.id.clone());
35
36            if context.workflow.is_branch_complete(&scope) {
37                // Re-arm: sleep again for the next iteration.
38                Ok(ExecutionResult::sleep(
39                    self.interval,
40                    Some(json!({"children_active": true})),
41                ))
42            } else {
43                Ok(ExecutionResult::persist(json!({"children_active": true})))
44            }
45        } else {
46            // First run: sleep for the interval, then create children.
47            Ok(ExecutionResult::sleep(
48                self.interval,
49                Some(json!({"children_active": true})),
50            ))
51        }
52    }
53}
54
55#[cfg(test)]
56mod tests {
57    use super::*;
58    use crate::models::{ExecutionPointer, PointerStatus};
59    use crate::primitives::test_helpers::*;
60
61    #[tokio::test]
62    async fn stop_condition_true_proceeds() {
63        let mut step = RecurStep {
64            interval: Duration::from_secs(10),
65            stop_condition: true,
66        };
67        let pointer = ExecutionPointer::new(0);
68        let wf_step = default_step();
69        let workflow = default_workflow();
70        let ctx = make_context(&pointer, &wf_step, &workflow);
71
72        let result = step.run(&ctx).await.unwrap();
73        assert!(result.proceed);
74    }
75
76    #[tokio::test]
77    async fn first_run_sleeps() {
78        let mut step = RecurStep {
79            interval: Duration::from_secs(10),
80            stop_condition: false,
81        };
82        let pointer = ExecutionPointer::new(0);
83        let wf_step = default_step();
84        let workflow = default_workflow();
85        let ctx = make_context(&pointer, &wf_step, &workflow);
86
87        let result = step.run(&ctx).await.unwrap();
88        assert!(!result.proceed);
89        assert_eq!(result.sleep_for, Some(Duration::from_secs(10)));
90        assert_eq!(
91            result.persistence_data,
92            Some(json!({"children_active": true}))
93        );
94    }
95
96    #[tokio::test]
97    async fn children_complete_re_arms() {
98        let mut step = RecurStep {
99            interval: Duration::from_secs(10),
100            stop_condition: false,
101        };
102        let mut pointer = ExecutionPointer::new(0);
103        pointer.persistence_data = Some(json!({"children_active": true}));
104
105        let wf_step = default_step();
106        let mut workflow = default_workflow();
107        let mut child = ExecutionPointer::new(1);
108        child.scope = vec![pointer.id.clone()];
109        child.status = PointerStatus::Complete;
110        workflow.execution_pointers.push(child);
111
112        let ctx = make_context(&pointer, &wf_step, &workflow);
113        let result = step.run(&ctx).await.unwrap();
114        assert!(!result.proceed);
115        assert_eq!(result.sleep_for, Some(Duration::from_secs(10)));
116    }
117
118    #[tokio::test]
119    async fn children_incomplete_persists() {
120        let mut step = RecurStep {
121            interval: Duration::from_secs(10),
122            stop_condition: false,
123        };
124        let mut pointer = ExecutionPointer::new(0);
125        pointer.persistence_data = Some(json!({"children_active": true}));
126
127        let wf_step = default_step();
128        let mut workflow = default_workflow();
129        let mut child = ExecutionPointer::new(1);
130        child.scope = vec![pointer.id.clone()];
131        child.status = PointerStatus::Running;
132        workflow.execution_pointers.push(child);
133
134        let ctx = make_context(&pointer, &wf_step, &workflow);
135        let result = step.run(&ctx).await.unwrap();
136        assert!(!result.proceed);
137        assert!(result.sleep_for.is_none());
138        assert_eq!(
139            result.persistence_data,
140            Some(json!({"children_active": true}))
141        );
142    }
143}