wfe_core/primitives/
schedule.rs1use std::time::Duration;
2
3use async_trait::async_trait;
4use serde_json::json;
5
6use crate::models::ExecutionResult;
7use crate::traits::step::{StepBody, StepExecutionContext};
8
9#[derive(Default)]
11pub struct ScheduleStep {
12 pub interval: Duration,
14}
15
16#[async_trait]
17impl StepBody for ScheduleStep {
18 async fn run(&mut self, context: &StepExecutionContext<'_>) -> crate::Result<ExecutionResult> {
19 let children_active = context
20 .persistence_data
21 .and_then(|d| d.get("children_active"))
22 .and_then(|v| v.as_bool())
23 .unwrap_or(false);
24
25 if children_active {
26 let mut scope = context.execution_pointer.scope.clone();
28 scope.push(context.execution_pointer.id.clone());
29
30 if context.workflow.is_branch_complete(&scope) {
31 Ok(ExecutionResult::next())
32 } else {
33 Ok(ExecutionResult::persist(json!({"children_active": true})))
34 }
35 } else {
36 Ok(ExecutionResult::sleep(
38 self.interval,
39 Some(json!({"children_active": true})),
40 ))
41 }
42 }
43}
44
45#[cfg(test)]
46mod tests {
47 use super::*;
48 use crate::models::{ExecutionPointer, PointerStatus};
49 use crate::primitives::test_helpers::*;
50
51 #[tokio::test]
52 async fn first_run_schedules_sleep() {
53 let mut step = ScheduleStep {
54 interval: Duration::from_secs(30),
55 };
56 let pointer = ExecutionPointer::new(0);
57 let wf_step = default_step();
58 let workflow = default_workflow();
59 let ctx = make_context(&pointer, &wf_step, &workflow);
60
61 let result = step.run(&ctx).await.unwrap();
62 assert!(!result.proceed);
63 assert_eq!(result.sleep_for, Some(Duration::from_secs(30)));
64 assert_eq!(
65 result.persistence_data,
66 Some(json!({"children_active": true}))
67 );
68 }
69
70 #[tokio::test]
71 async fn children_complete_proceeds() {
72 let mut step = ScheduleStep {
73 interval: Duration::from_secs(30),
74 };
75 let mut pointer = ExecutionPointer::new(0);
76 pointer.persistence_data = Some(json!({"children_active": true}));
77
78 let wf_step = default_step();
79 let mut workflow = default_workflow();
80 let mut child = ExecutionPointer::new(1);
81 child.scope = vec![pointer.id.clone()];
82 child.status = PointerStatus::Complete;
83 workflow.execution_pointers.push(child);
84
85 let ctx = make_context(&pointer, &wf_step, &workflow);
86 let result = step.run(&ctx).await.unwrap();
87 assert!(result.proceed);
88 }
89
90 #[tokio::test]
91 async fn children_incomplete_persists() {
92 let mut step = ScheduleStep {
93 interval: Duration::from_secs(30),
94 };
95 let mut pointer = ExecutionPointer::new(0);
96 pointer.persistence_data = Some(json!({"children_active": true}));
97
98 let wf_step = default_step();
99 let mut workflow = default_workflow();
100 let mut child = ExecutionPointer::new(1);
101 child.scope = vec![pointer.id.clone()];
102 child.status = PointerStatus::Running;
103 workflow.execution_pointers.push(child);
104
105 let ctx = make_context(&pointer, &wf_step, &workflow);
106 let result = step.run(&ctx).await.unwrap();
107 assert!(!result.proceed);
108 assert_eq!(
109 result.persistence_data,
110 Some(json!({"children_active": true}))
111 );
112 }
113}