Skip to main content

wfe_core/primitives/
wait_for.rs

1use async_trait::async_trait;
2use chrono::Utc;
3
4use crate::models::ExecutionResult;
5use crate::traits::step::{StepBody, StepExecutionContext};
6
7/// A step that waits for an external event before proceeding.
8#[derive(Default)]
9pub struct WaitForStep {
10    /// Event name.
11    pub event_name: String,
12    /// Event key.
13    pub event_key: String,
14}
15
16#[async_trait]
17impl StepBody for WaitForStep {
18    async fn run(&mut self, context: &StepExecutionContext<'_>) -> crate::Result<ExecutionResult> {
19        // If event data has arrived, proceed.
20        if context.execution_pointer.event_data.is_some() {
21            return Ok(ExecutionResult::next());
22        }
23
24        // Read event_name/event_key from step_config if our fields are empty.
25        let event_name = if self.event_name.is_empty() {
26            context
27                .step
28                .step_config
29                .as_ref()
30                .and_then(|c| c.get("event_name"))
31                .and_then(|v| v.as_str())
32                .unwrap_or_default()
33                .to_string()
34        } else {
35            self.event_name.clone()
36        };
37        let event_key = if self.event_key.is_empty() {
38            context
39                .step
40                .step_config
41                .as_ref()
42                .and_then(|c| c.get("event_key"))
43                .and_then(|v| v.as_str())
44                .unwrap_or_default()
45                .to_string()
46        } else {
47            self.event_key.clone()
48        };
49
50        // Otherwise, subscribe and wait for the event.
51        Ok(ExecutionResult::wait_for_event(
52            event_name,
53            event_key,
54            Utc::now(),
55        ))
56    }
57}
58
59#[cfg(test)]
60mod tests {
61    use super::*;
62    use crate::models::ExecutionPointer;
63    use crate::primitives::test_helpers::*;
64    use serde_json::json;
65
66    #[tokio::test]
67    async fn first_run_waits_for_event() {
68        let mut step = WaitForStep {
69            event_name: "order.completed".into(),
70            event_key: "order-123".into(),
71        };
72        let pointer = ExecutionPointer::new(0);
73        let wf_step = default_step();
74        let workflow = default_workflow();
75        let ctx = make_context(&pointer, &wf_step, &workflow);
76
77        let result = step.run(&ctx).await.unwrap();
78        assert!(!result.proceed);
79        assert_eq!(result.event_name.as_deref(), Some("order.completed"));
80        assert_eq!(result.event_key.as_deref(), Some("order-123"));
81        assert!(result.event_as_of.is_some());
82    }
83
84    #[tokio::test]
85    async fn event_arrived_proceeds() {
86        let mut step = WaitForStep {
87            event_name: "order.completed".into(),
88            event_key: "order-123".into(),
89        };
90        let mut pointer = ExecutionPointer::new(0);
91        pointer.event_data = Some(json!({"status": "done"}));
92        let wf_step = default_step();
93        let workflow = default_workflow();
94        let ctx = make_context(&pointer, &wf_step, &workflow);
95
96        let result = step.run(&ctx).await.unwrap();
97        assert!(result.proceed);
98        assert!(result.event_name.is_none());
99    }
100}