wfe_core/primitives/
schedule.rs1use std::time::Duration;
2
3use async_trait::async_trait;
4use serde_json::json;
5
6use crate::models::ExecutionResult;
7use crate::traits::step::{StepBody, StepExecutionContext};
8
9#[derive(Default)]
11pub struct ScheduleStep {
12 pub interval: Duration,
13}
14
15#[async_trait]
16impl StepBody for ScheduleStep {
17 async fn run(&mut self, context: &StepExecutionContext<'_>) -> crate::Result<ExecutionResult> {
18 let children_active = context
19 .persistence_data
20 .and_then(|d| d.get("children_active"))
21 .and_then(|v| v.as_bool())
22 .unwrap_or(false);
23
24 if children_active {
25 let mut scope = context.execution_pointer.scope.clone();
27 scope.push(context.execution_pointer.id.clone());
28
29 if context.workflow.is_branch_complete(&scope) {
30 Ok(ExecutionResult::next())
31 } else {
32 Ok(ExecutionResult::persist(json!({"children_active": true})))
33 }
34 } else {
35 Ok(ExecutionResult::sleep(
37 self.interval,
38 Some(json!({"children_active": true})),
39 ))
40 }
41 }
42}
43
44#[cfg(test)]
45mod tests {
46 use super::*;
47 use crate::models::{ExecutionPointer, PointerStatus};
48 use crate::primitives::test_helpers::*;
49
50 #[tokio::test]
51 async fn first_run_schedules_sleep() {
52 let mut step = ScheduleStep {
53 interval: Duration::from_secs(30),
54 };
55 let pointer = ExecutionPointer::new(0);
56 let wf_step = default_step();
57 let workflow = default_workflow();
58 let ctx = make_context(&pointer, &wf_step, &workflow);
59
60 let result = step.run(&ctx).await.unwrap();
61 assert!(!result.proceed);
62 assert_eq!(result.sleep_for, Some(Duration::from_secs(30)));
63 assert_eq!(result.persistence_data, Some(json!({"children_active": true})));
64 }
65
66 #[tokio::test]
67 async fn children_complete_proceeds() {
68 let mut step = ScheduleStep {
69 interval: Duration::from_secs(30),
70 };
71 let mut pointer = ExecutionPointer::new(0);
72 pointer.persistence_data = Some(json!({"children_active": true}));
73
74 let wf_step = default_step();
75 let mut workflow = default_workflow();
76 let mut child = ExecutionPointer::new(1);
77 child.scope = vec![pointer.id.clone()];
78 child.status = PointerStatus::Complete;
79 workflow.execution_pointers.push(child);
80
81 let ctx = make_context(&pointer, &wf_step, &workflow);
82 let result = step.run(&ctx).await.unwrap();
83 assert!(result.proceed);
84 }
85
86 #[tokio::test]
87 async fn children_incomplete_persists() {
88 let mut step = ScheduleStep {
89 interval: Duration::from_secs(30),
90 };
91 let mut pointer = ExecutionPointer::new(0);
92 pointer.persistence_data = Some(json!({"children_active": true}));
93
94 let wf_step = default_step();
95 let mut workflow = default_workflow();
96 let mut child = ExecutionPointer::new(1);
97 child.scope = vec![pointer.id.clone()];
98 child.status = PointerStatus::Running;
99 workflow.execution_pointers.push(child);
100
101 let ctx = make_context(&pointer, &wf_step, &workflow);
102 let result = step.run(&ctx).await.unwrap();
103 assert!(!result.proceed);
104 assert_eq!(result.persistence_data, Some(json!({"children_active": true})));
105 }
106}