Skip to main content

wfe_core/primitives/
schedule.rs

1use std::time::Duration;
2
3use async_trait::async_trait;
4use serde_json::json;
5
6use crate::models::ExecutionResult;
7use crate::traits::step::{StepBody, StepExecutionContext};
8
9/// A step that schedules child execution after a delay.
10#[derive(Default)]
11pub struct ScheduleStep {
12    pub interval: Duration,
13}
14
15#[async_trait]
16impl StepBody for ScheduleStep {
17    async fn run(&mut self, context: &StepExecutionContext<'_>) -> crate::Result<ExecutionResult> {
18        let children_active = context
19            .persistence_data
20            .and_then(|d| d.get("children_active"))
21            .and_then(|v| v.as_bool())
22            .unwrap_or(false);
23
24        if children_active {
25            // Children have been created, check if they are complete.
26            let mut scope = context.execution_pointer.scope.clone();
27            scope.push(context.execution_pointer.id.clone());
28
29            if context.workflow.is_branch_complete(&scope) {
30                Ok(ExecutionResult::next())
31            } else {
32                Ok(ExecutionResult::persist(json!({"children_active": true})))
33            }
34        } else {
35            // First run: sleep for the interval, then create children.
36            Ok(ExecutionResult::sleep(
37                self.interval,
38                Some(json!({"children_active": true})),
39            ))
40        }
41    }
42}
43
44#[cfg(test)]
45mod tests {
46    use super::*;
47    use crate::models::{ExecutionPointer, PointerStatus};
48    use crate::primitives::test_helpers::*;
49
50    #[tokio::test]
51    async fn first_run_schedules_sleep() {
52        let mut step = ScheduleStep {
53            interval: Duration::from_secs(30),
54        };
55        let pointer = ExecutionPointer::new(0);
56        let wf_step = default_step();
57        let workflow = default_workflow();
58        let ctx = make_context(&pointer, &wf_step, &workflow);
59
60        let result = step.run(&ctx).await.unwrap();
61        assert!(!result.proceed);
62        assert_eq!(result.sleep_for, Some(Duration::from_secs(30)));
63        assert_eq!(result.persistence_data, Some(json!({"children_active": true})));
64    }
65
66    #[tokio::test]
67    async fn children_complete_proceeds() {
68        let mut step = ScheduleStep {
69            interval: Duration::from_secs(30),
70        };
71        let mut pointer = ExecutionPointer::new(0);
72        pointer.persistence_data = Some(json!({"children_active": true}));
73
74        let wf_step = default_step();
75        let mut workflow = default_workflow();
76        let mut child = ExecutionPointer::new(1);
77        child.scope = vec![pointer.id.clone()];
78        child.status = PointerStatus::Complete;
79        workflow.execution_pointers.push(child);
80
81        let ctx = make_context(&pointer, &wf_step, &workflow);
82        let result = step.run(&ctx).await.unwrap();
83        assert!(result.proceed);
84    }
85
86    #[tokio::test]
87    async fn children_incomplete_persists() {
88        let mut step = ScheduleStep {
89            interval: Duration::from_secs(30),
90        };
91        let mut pointer = ExecutionPointer::new(0);
92        pointer.persistence_data = Some(json!({"children_active": true}));
93
94        let wf_step = default_step();
95        let mut workflow = default_workflow();
96        let mut child = ExecutionPointer::new(1);
97        child.scope = vec![pointer.id.clone()];
98        child.status = PointerStatus::Running;
99        workflow.execution_pointers.push(child);
100
101        let ctx = make_context(&pointer, &wf_step, &workflow);
102        let result = step.run(&ctx).await.unwrap();
103        assert!(!result.proceed);
104        assert_eq!(result.persistence_data, Some(json!({"children_active": true})));
105    }
106}