Skip to main content

swf_runtime/
listener.rs

1use crate::events::CloudEvent;
2use serde_json::Value;
3use std::sync::Arc;
4
5/// Events emitted during workflow execution
6#[derive(Debug, Clone)]
7pub enum WorkflowEvent {
8    /// Workflow started
9    WorkflowStarted { instance_id: String, input: Value },
10    /// Workflow completed successfully
11    WorkflowCompleted { instance_id: String, output: Value },
12    /// Workflow failed with an error
13    WorkflowFailed { instance_id: String, error: String },
14    /// Workflow suspended
15    WorkflowSuspended { instance_id: String },
16    /// Workflow resumed after suspension
17    WorkflowResumed { instance_id: String },
18    /// Workflow cancelled
19    WorkflowCancelled { instance_id: String },
20    /// Task started
21    TaskStarted {
22        instance_id: String,
23        task_name: String,
24    },
25    /// Task completed successfully
26    TaskCompleted {
27        instance_id: String,
28        task_name: String,
29        output: Value,
30    },
31    /// Task failed
32    TaskFailed {
33        instance_id: String,
34        task_name: String,
35        error: String,
36    },
37    /// Task retried
38    TaskRetried {
39        instance_id: String,
40        task_name: String,
41        attempt: u32,
42    },
43    /// Task suspended
44    TaskSuspended {
45        instance_id: String,
46        task_name: String,
47    },
48    /// Task resumed after suspension
49    TaskResumed {
50        instance_id: String,
51        task_name: String,
52    },
53    /// Task cancelled
54    TaskCancelled {
55        instance_id: String,
56        task_name: String,
57    },
58    /// Workflow status changed
59    WorkflowStatusChanged { instance_id: String, status: String },
60}
61
62impl WorkflowEvent {
63    /// CloudEvent type constants matching Java SDK's lifecycle event types
64    pub const WORKFLOW_STARTED_TYPE: &'static str = "io.serverlessworkflow.workflow.started.v1";
65    pub const WORKFLOW_COMPLETED_TYPE: &'static str = "io.serverlessworkflow.workflow.completed.v1";
66    pub const WORKFLOW_FAILED_TYPE: &'static str = "io.serverlessworkflow.workflow.faulted.v1";
67    pub const WORKFLOW_SUSPENDED_TYPE: &'static str = "io.serverlessworkflow.workflow.suspended.v1";
68    pub const WORKFLOW_RESUMED_TYPE: &'static str = "io.serverlessworkflow.workflow.resumed.v1";
69    pub const WORKFLOW_CANCELLED_TYPE: &'static str = "io.serverlessworkflow.workflow.cancelled.v1";
70    pub const TASK_STARTED_TYPE: &'static str = "io.serverlessworkflow.task.started.v1";
71    pub const TASK_COMPLETED_TYPE: &'static str = "io.serverlessworkflow.task.completed.v1";
72    pub const TASK_FAILED_TYPE: &'static str = "io.serverlessworkflow.task.faulted.v1";
73    pub const TASK_RETRIED_TYPE: &'static str = "io.serverlessworkflow.task.retried.v1";
74    pub const TASK_SUSPENDED_TYPE: &'static str = "io.serverlessworkflow.task.suspended.v1";
75    pub const TASK_RESUMED_TYPE: &'static str = "io.serverlessworkflow.task.resumed.v1";
76    pub const TASK_CANCELLED_TYPE: &'static str = "io.serverlessworkflow.task.cancelled.v1";
77    pub const WORKFLOW_STATUS_CHANGED_TYPE: &'static str =
78        "io.serverlessworkflow.workflow.status-changed.v1";
79
80    /// Converts this WorkflowEvent to a CloudEvent for publishing to the EventBus
81    pub fn to_cloud_event(&self) -> CloudEvent {
82        match self {
83            WorkflowEvent::WorkflowStarted { instance_id, input } => CloudEvent::new(
84                Self::WORKFLOW_STARTED_TYPE,
85                serde_json::json!({
86                    "instanceId": instance_id,
87                    "startedAt": now_millis(),
88                    "input": input,
89                }),
90            ),
91            WorkflowEvent::WorkflowCompleted {
92                instance_id,
93                output,
94            } => CloudEvent::new(
95                Self::WORKFLOW_COMPLETED_TYPE,
96                serde_json::json!({
97                    "instanceId": instance_id,
98                    "completedAt": now_millis(),
99                    "output": output,
100                }),
101            ),
102            WorkflowEvent::WorkflowFailed { instance_id, error } => CloudEvent::new(
103                Self::WORKFLOW_FAILED_TYPE,
104                serde_json::json!({
105                    "instanceId": instance_id,
106                    "failedAt": now_millis(),
107                    "error": { "detail": error },
108                }),
109            ),
110            WorkflowEvent::WorkflowSuspended { instance_id } => CloudEvent::new(
111                Self::WORKFLOW_SUSPENDED_TYPE,
112                serde_json::json!({
113                    "instanceId": instance_id,
114                    "suspendedAt": now_millis(),
115                }),
116            ),
117            WorkflowEvent::WorkflowResumed { instance_id } => CloudEvent::new(
118                Self::WORKFLOW_RESUMED_TYPE,
119                serde_json::json!({
120                    "instanceId": instance_id,
121                    "resumedAt": now_millis(),
122                }),
123            ),
124            WorkflowEvent::WorkflowCancelled { instance_id } => CloudEvent::new(
125                Self::WORKFLOW_CANCELLED_TYPE,
126                serde_json::json!({
127                    "instanceId": instance_id,
128                    "cancelledAt": now_millis(),
129                }),
130            ),
131            WorkflowEvent::TaskStarted {
132                instance_id,
133                task_name,
134            } => CloudEvent::new(
135                Self::TASK_STARTED_TYPE,
136                serde_json::json!({
137                    "instanceId": instance_id,
138                    "taskName": task_name,
139                    "startedAt": now_millis(),
140                }),
141            ),
142            WorkflowEvent::TaskCompleted {
143                instance_id,
144                task_name,
145                output,
146            } => CloudEvent::new(
147                Self::TASK_COMPLETED_TYPE,
148                serde_json::json!({
149                    "instanceId": instance_id,
150                    "taskName": task_name,
151                    "completedAt": now_millis(),
152                    "output": output,
153                }),
154            ),
155            WorkflowEvent::TaskFailed {
156                instance_id,
157                task_name,
158                error,
159            } => CloudEvent::new(
160                Self::TASK_FAILED_TYPE,
161                serde_json::json!({
162                    "instanceId": instance_id,
163                    "taskName": task_name,
164                    "failedAt": now_millis(),
165                    "error": { "detail": error },
166                }),
167            ),
168            WorkflowEvent::TaskRetried {
169                instance_id,
170                task_name,
171                attempt,
172            } => CloudEvent::new(
173                Self::TASK_RETRIED_TYPE,
174                serde_json::json!({
175                    "instanceId": instance_id,
176                    "taskName": task_name,
177                    "attempt": attempt,
178                    "retriedAt": now_millis(),
179                }),
180            ),
181            WorkflowEvent::TaskSuspended {
182                instance_id,
183                task_name,
184            } => CloudEvent::new(
185                Self::TASK_SUSPENDED_TYPE,
186                serde_json::json!({
187                    "instanceId": instance_id,
188                    "taskName": task_name,
189                    "suspendedAt": now_millis(),
190                }),
191            ),
192            WorkflowEvent::TaskResumed {
193                instance_id,
194                task_name,
195            } => CloudEvent::new(
196                Self::TASK_RESUMED_TYPE,
197                serde_json::json!({
198                    "instanceId": instance_id,
199                    "taskName": task_name,
200                    "resumedAt": now_millis(),
201                }),
202            ),
203            WorkflowEvent::TaskCancelled {
204                instance_id,
205                task_name,
206            } => CloudEvent::new(
207                Self::TASK_CANCELLED_TYPE,
208                serde_json::json!({
209                    "instanceId": instance_id,
210                    "taskName": task_name,
211                    "cancelledAt": now_millis(),
212                }),
213            ),
214            WorkflowEvent::WorkflowStatusChanged {
215                instance_id,
216                status,
217            } => CloudEvent::new(
218                Self::WORKFLOW_STATUS_CHANGED_TYPE,
219                serde_json::json!({
220                    "instanceId": instance_id,
221                    "changedAt": now_millis(),
222                    "status": status,
223                }),
224            ),
225        }
226    }
227}
228
229/// Returns current time as milliseconds since epoch
230pub fn now_millis() -> u64 {
231    std::time::SystemTime::now()
232        .duration_since(std::time::UNIX_EPOCH)
233        .unwrap_or_default()
234        .as_millis() as u64
235}
236
237/// Trait for listening to workflow execution events
238///
239/// Implement this trait to observe workflow execution lifecycle events.
240/// This is useful for logging, metrics, tracing, and debugging.
241///
242/// # Example
243///
244/// ```
245/// use swf_runtime::listener::{WorkflowExecutionListener, WorkflowEvent};
246///
247/// struct LoggingListener;
248///
249/// impl WorkflowExecutionListener for LoggingListener {
250///     fn on_event(&self, event: &WorkflowEvent) {
251///         match event {
252///             WorkflowEvent::WorkflowStarted { instance_id, .. } => {
253///                 println!("Workflow {} started", instance_id);
254///             }
255///             WorkflowEvent::TaskCompleted { task_name, .. } => {
256///                 println!("Task {} completed", task_name);
257///             }
258///             _ => {}
259///         }
260///     }
261/// }
262/// ```
263pub trait WorkflowExecutionListener: Send + Sync {
264    /// Called when a workflow execution event occurs
265    fn on_event(&self, event: &WorkflowEvent);
266}
267
268/// A listener that collects events in a thread-safe Vec
269#[derive(Debug, Default)]
270pub struct CollectingListener {
271    events: std::sync::Mutex<Vec<WorkflowEvent>>,
272}
273
274impl CollectingListener {
275    /// Creates a new empty CollectingListener
276    pub fn new() -> Self {
277        Self::default()
278    }
279
280    /// Returns all collected events
281    pub fn events(&self) -> Vec<WorkflowEvent> {
282        self.events
283            .lock()
284            .unwrap_or_else(|e| e.into_inner())
285            .clone()
286    }
287
288    /// Returns the number of collected events
289    pub fn len(&self) -> usize {
290        self.events.lock().unwrap_or_else(|e| e.into_inner()).len()
291    }
292
293    /// Returns true if no events have been collected
294    pub fn is_empty(&self) -> bool {
295        self.len() == 0
296    }
297
298    /// Clears all collected events
299    pub fn clear(&self) {
300        self.events
301            .lock()
302            .unwrap_or_else(|e| e.into_inner())
303            .clear();
304    }
305}
306
307impl WorkflowExecutionListener for CollectingListener {
308    fn on_event(&self, event: &WorkflowEvent) {
309        self.events
310            .lock()
311            .unwrap_or_else(|e| e.into_inner())
312            .push(event.clone());
313    }
314}
315
316/// A no-op listener that does nothing
317#[derive(Debug, Default)]
318pub struct NoOpListener;
319
320impl WorkflowExecutionListener for NoOpListener {
321    fn on_event(&self, _event: &WorkflowEvent) {}
322}
323
324/// Multi-listener that delegates to multiple listeners
325pub struct MultiListener {
326    listeners: Vec<Arc<dyn WorkflowExecutionListener>>,
327}
328
329impl MultiListener {
330    /// Creates a new MultiListener
331    pub fn new(listeners: Vec<Arc<dyn WorkflowExecutionListener>>) -> Self {
332        Self { listeners }
333    }
334}
335
336impl WorkflowExecutionListener for MultiListener {
337    fn on_event(&self, event: &WorkflowEvent) {
338        for listener in &self.listeners {
339            listener.on_event(event);
340        }
341    }
342}
343
344#[cfg(test)]
345mod tests {
346    use super::*;
347    use serde_json::json;
348
349    #[test]
350    fn test_collecting_listener() {
351        let listener = CollectingListener::new();
352        assert!(listener.is_empty());
353
354        listener.on_event(&WorkflowEvent::WorkflowStarted {
355            instance_id: "test-1".to_string(),
356            input: json!({}),
357        });
358        listener.on_event(&WorkflowEvent::TaskStarted {
359            instance_id: "test-1".to_string(),
360            task_name: "task1".to_string(),
361        });
362        assert_eq!(listener.len(), 2);
363
364        let events = listener.events();
365        assert!(
366            matches!(&events[0], WorkflowEvent::WorkflowStarted { instance_id, .. } if instance_id == "test-1")
367        );
368        assert!(
369            matches!(&events[1], WorkflowEvent::TaskStarted { task_name, .. } if task_name == "task1")
370        );
371    }
372
373    #[test]
374    fn test_multi_listener() {
375        let l1 = Arc::new(CollectingListener::new());
376        let l2 = Arc::new(CollectingListener::new());
377
378        let multi = MultiListener::new(vec![l1.clone(), l2.clone()]);
379        multi.on_event(&WorkflowEvent::WorkflowStarted {
380            instance_id: "test".to_string(),
381            input: json!({}),
382        });
383
384        assert_eq!(l1.len(), 1);
385        assert_eq!(l2.len(), 1);
386    }
387}