Skip to main content

wfe_core/primitives/
wait_for.rs

1use async_trait::async_trait;
2use chrono::Utc;
3
4use crate::models::ExecutionResult;
5use crate::traits::step::{StepBody, StepExecutionContext};
6
7/// A step that waits for an external event before proceeding.
8#[derive(Default)]
9pub struct WaitForStep {
10    pub event_name: String,
11    pub event_key: String,
12}
13
14#[async_trait]
15impl StepBody for WaitForStep {
16    async fn run(&mut self, context: &StepExecutionContext<'_>) -> crate::Result<ExecutionResult> {
17        // If event data has arrived, proceed.
18        if context.execution_pointer.event_data.is_some() {
19            return Ok(ExecutionResult::next());
20        }
21
22        // Read event_name/event_key from step_config if our fields are empty.
23        let event_name = if self.event_name.is_empty() {
24            context
25                .step
26                .step_config
27                .as_ref()
28                .and_then(|c| c.get("event_name"))
29                .and_then(|v| v.as_str())
30                .unwrap_or_default()
31                .to_string()
32        } else {
33            self.event_name.clone()
34        };
35        let event_key = if self.event_key.is_empty() {
36            context
37                .step
38                .step_config
39                .as_ref()
40                .and_then(|c| c.get("event_key"))
41                .and_then(|v| v.as_str())
42                .unwrap_or_default()
43                .to_string()
44        } else {
45            self.event_key.clone()
46        };
47
48        // Otherwise, subscribe and wait for the event.
49        Ok(ExecutionResult::wait_for_event(
50            event_name,
51            event_key,
52            Utc::now(),
53        ))
54    }
55}
56
57#[cfg(test)]
58mod tests {
59    use super::*;
60    use crate::models::ExecutionPointer;
61    use crate::primitives::test_helpers::*;
62    use serde_json::json;
63
64    #[tokio::test]
65    async fn first_run_waits_for_event() {
66        let mut step = WaitForStep {
67            event_name: "order.completed".into(),
68            event_key: "order-123".into(),
69        };
70        let pointer = ExecutionPointer::new(0);
71        let wf_step = default_step();
72        let workflow = default_workflow();
73        let ctx = make_context(&pointer, &wf_step, &workflow);
74
75        let result = step.run(&ctx).await.unwrap();
76        assert!(!result.proceed);
77        assert_eq!(result.event_name.as_deref(), Some("order.completed"));
78        assert_eq!(result.event_key.as_deref(), Some("order-123"));
79        assert!(result.event_as_of.is_some());
80    }
81
82    #[tokio::test]
83    async fn event_arrived_proceeds() {
84        let mut step = WaitForStep {
85            event_name: "order.completed".into(),
86            event_key: "order-123".into(),
87        };
88        let mut pointer = ExecutionPointer::new(0);
89        pointer.event_data = Some(json!({"status": "done"}));
90        let wf_step = default_step();
91        let workflow = default_workflow();
92        let ctx = make_context(&pointer, &wf_step, &workflow);
93
94        let result = step.run(&ctx).await.unwrap();
95        assert!(result.proceed);
96        assert!(result.event_name.is_none());
97    }
98}