Skip to main content

wfe_core/primitives/
schedule.rs

1use std::time::Duration;
2
3use async_trait::async_trait;
4use serde_json::json;
5
6use crate::models::ExecutionResult;
7use crate::traits::step::{StepBody, StepExecutionContext};
8
9/// A step that schedules child execution after a delay.
10#[derive(Default)]
11pub struct ScheduleStep {
12    /// Interval.
13    pub interval: Duration,
14}
15
16#[async_trait]
17impl StepBody for ScheduleStep {
18    async fn run(&mut self, context: &StepExecutionContext<'_>) -> crate::Result<ExecutionResult> {
19        let children_active = context
20            .persistence_data
21            .and_then(|d| d.get("children_active"))
22            .and_then(|v| v.as_bool())
23            .unwrap_or(false);
24
25        if children_active {
26            // Children have been created, check if they are complete.
27            let mut scope = context.execution_pointer.scope.clone();
28            scope.push(context.execution_pointer.id.clone());
29
30            if context.workflow.is_branch_complete(&scope) {
31                Ok(ExecutionResult::next())
32            } else {
33                Ok(ExecutionResult::persist(json!({"children_active": true})))
34            }
35        } else {
36            // First run: sleep for the interval, then create children.
37            Ok(ExecutionResult::sleep(
38                self.interval,
39                Some(json!({"children_active": true})),
40            ))
41        }
42    }
43}
44
45#[cfg(test)]
46mod tests {
47    use super::*;
48    use crate::models::{ExecutionPointer, PointerStatus};
49    use crate::primitives::test_helpers::*;
50
51    #[tokio::test]
52    async fn first_run_schedules_sleep() {
53        let mut step = ScheduleStep {
54            interval: Duration::from_secs(30),
55        };
56        let pointer = ExecutionPointer::new(0);
57        let wf_step = default_step();
58        let workflow = default_workflow();
59        let ctx = make_context(&pointer, &wf_step, &workflow);
60
61        let result = step.run(&ctx).await.unwrap();
62        assert!(!result.proceed);
63        assert_eq!(result.sleep_for, Some(Duration::from_secs(30)));
64        assert_eq!(
65            result.persistence_data,
66            Some(json!({"children_active": true}))
67        );
68    }
69
70    #[tokio::test]
71    async fn children_complete_proceeds() {
72        let mut step = ScheduleStep {
73            interval: Duration::from_secs(30),
74        };
75        let mut pointer = ExecutionPointer::new(0);
76        pointer.persistence_data = Some(json!({"children_active": true}));
77
78        let wf_step = default_step();
79        let mut workflow = default_workflow();
80        let mut child = ExecutionPointer::new(1);
81        child.scope = vec![pointer.id.clone()];
82        child.status = PointerStatus::Complete;
83        workflow.execution_pointers.push(child);
84
85        let ctx = make_context(&pointer, &wf_step, &workflow);
86        let result = step.run(&ctx).await.unwrap();
87        assert!(result.proceed);
88    }
89
90    #[tokio::test]
91    async fn children_incomplete_persists() {
92        let mut step = ScheduleStep {
93            interval: Duration::from_secs(30),
94        };
95        let mut pointer = ExecutionPointer::new(0);
96        pointer.persistence_data = Some(json!({"children_active": true}));
97
98        let wf_step = default_step();
99        let mut workflow = default_workflow();
100        let mut child = ExecutionPointer::new(1);
101        child.scope = vec![pointer.id.clone()];
102        child.status = PointerStatus::Running;
103        workflow.execution_pointers.push(child);
104
105        let ctx = make_context(&pointer, &wf_step, &workflow);
106        let result = step.run(&ctx).await.unwrap();
107        assert!(!result.proceed);
108        assert_eq!(
109            result.persistence_data,
110            Some(json!({"children_active": true}))
111        );
112    }
113}