1use crate::events::CloudEvent;
2use serde_json::Value;
3use std::sync::Arc;
4
5#[derive(Debug, Clone)]
7pub enum WorkflowEvent {
8 WorkflowStarted { instance_id: String, input: Value },
10 WorkflowCompleted { instance_id: String, output: Value },
12 WorkflowFailed { instance_id: String, error: String },
14 WorkflowSuspended { instance_id: String },
16 WorkflowResumed { instance_id: String },
18 WorkflowCancelled { instance_id: String },
20 TaskStarted {
22 instance_id: String,
23 task_name: String,
24 },
25 TaskCompleted {
27 instance_id: String,
28 task_name: String,
29 output: Value,
30 },
31 TaskFailed {
33 instance_id: String,
34 task_name: String,
35 error: String,
36 },
37 TaskRetried {
39 instance_id: String,
40 task_name: String,
41 attempt: u32,
42 },
43 TaskSuspended {
45 instance_id: String,
46 task_name: String,
47 },
48 TaskResumed {
50 instance_id: String,
51 task_name: String,
52 },
53 TaskCancelled {
55 instance_id: String,
56 task_name: String,
57 },
58 WorkflowStatusChanged { instance_id: String, status: String },
60}
61
62impl WorkflowEvent {
63 pub const WORKFLOW_STARTED_TYPE: &'static str = "io.serverlessworkflow.workflow.started.v1";
65 pub const WORKFLOW_COMPLETED_TYPE: &'static str = "io.serverlessworkflow.workflow.completed.v1";
66 pub const WORKFLOW_FAILED_TYPE: &'static str = "io.serverlessworkflow.workflow.faulted.v1";
67 pub const WORKFLOW_SUSPENDED_TYPE: &'static str = "io.serverlessworkflow.workflow.suspended.v1";
68 pub const WORKFLOW_RESUMED_TYPE: &'static str = "io.serverlessworkflow.workflow.resumed.v1";
69 pub const WORKFLOW_CANCELLED_TYPE: &'static str = "io.serverlessworkflow.workflow.cancelled.v1";
70 pub const TASK_STARTED_TYPE: &'static str = "io.serverlessworkflow.task.started.v1";
71 pub const TASK_COMPLETED_TYPE: &'static str = "io.serverlessworkflow.task.completed.v1";
72 pub const TASK_FAILED_TYPE: &'static str = "io.serverlessworkflow.task.faulted.v1";
73 pub const TASK_RETRIED_TYPE: &'static str = "io.serverlessworkflow.task.retried.v1";
74 pub const TASK_SUSPENDED_TYPE: &'static str = "io.serverlessworkflow.task.suspended.v1";
75 pub const TASK_RESUMED_TYPE: &'static str = "io.serverlessworkflow.task.resumed.v1";
76 pub const TASK_CANCELLED_TYPE: &'static str = "io.serverlessworkflow.task.cancelled.v1";
77 pub const WORKFLOW_STATUS_CHANGED_TYPE: &'static str =
78 "io.serverlessworkflow.workflow.status-changed.v1";
79
80 pub fn to_cloud_event(&self) -> CloudEvent {
82 match self {
83 WorkflowEvent::WorkflowStarted { instance_id, input } => CloudEvent::new(
84 Self::WORKFLOW_STARTED_TYPE,
85 serde_json::json!({
86 "instanceId": instance_id,
87 "startedAt": now_millis(),
88 "input": input,
89 }),
90 ),
91 WorkflowEvent::WorkflowCompleted {
92 instance_id,
93 output,
94 } => CloudEvent::new(
95 Self::WORKFLOW_COMPLETED_TYPE,
96 serde_json::json!({
97 "instanceId": instance_id,
98 "completedAt": now_millis(),
99 "output": output,
100 }),
101 ),
102 WorkflowEvent::WorkflowFailed { instance_id, error } => CloudEvent::new(
103 Self::WORKFLOW_FAILED_TYPE,
104 serde_json::json!({
105 "instanceId": instance_id,
106 "failedAt": now_millis(),
107 "error": { "detail": error },
108 }),
109 ),
110 WorkflowEvent::WorkflowSuspended { instance_id } => CloudEvent::new(
111 Self::WORKFLOW_SUSPENDED_TYPE,
112 serde_json::json!({
113 "instanceId": instance_id,
114 "suspendedAt": now_millis(),
115 }),
116 ),
117 WorkflowEvent::WorkflowResumed { instance_id } => CloudEvent::new(
118 Self::WORKFLOW_RESUMED_TYPE,
119 serde_json::json!({
120 "instanceId": instance_id,
121 "resumedAt": now_millis(),
122 }),
123 ),
124 WorkflowEvent::WorkflowCancelled { instance_id } => CloudEvent::new(
125 Self::WORKFLOW_CANCELLED_TYPE,
126 serde_json::json!({
127 "instanceId": instance_id,
128 "cancelledAt": now_millis(),
129 }),
130 ),
131 WorkflowEvent::TaskStarted {
132 instance_id,
133 task_name,
134 } => CloudEvent::new(
135 Self::TASK_STARTED_TYPE,
136 serde_json::json!({
137 "instanceId": instance_id,
138 "taskName": task_name,
139 "startedAt": now_millis(),
140 }),
141 ),
142 WorkflowEvent::TaskCompleted {
143 instance_id,
144 task_name,
145 output,
146 } => CloudEvent::new(
147 Self::TASK_COMPLETED_TYPE,
148 serde_json::json!({
149 "instanceId": instance_id,
150 "taskName": task_name,
151 "completedAt": now_millis(),
152 "output": output,
153 }),
154 ),
155 WorkflowEvent::TaskFailed {
156 instance_id,
157 task_name,
158 error,
159 } => CloudEvent::new(
160 Self::TASK_FAILED_TYPE,
161 serde_json::json!({
162 "instanceId": instance_id,
163 "taskName": task_name,
164 "failedAt": now_millis(),
165 "error": { "detail": error },
166 }),
167 ),
168 WorkflowEvent::TaskRetried {
169 instance_id,
170 task_name,
171 attempt,
172 } => CloudEvent::new(
173 Self::TASK_RETRIED_TYPE,
174 serde_json::json!({
175 "instanceId": instance_id,
176 "taskName": task_name,
177 "attempt": attempt,
178 "retriedAt": now_millis(),
179 }),
180 ),
181 WorkflowEvent::TaskSuspended {
182 instance_id,
183 task_name,
184 } => CloudEvent::new(
185 Self::TASK_SUSPENDED_TYPE,
186 serde_json::json!({
187 "instanceId": instance_id,
188 "taskName": task_name,
189 "suspendedAt": now_millis(),
190 }),
191 ),
192 WorkflowEvent::TaskResumed {
193 instance_id,
194 task_name,
195 } => CloudEvent::new(
196 Self::TASK_RESUMED_TYPE,
197 serde_json::json!({
198 "instanceId": instance_id,
199 "taskName": task_name,
200 "resumedAt": now_millis(),
201 }),
202 ),
203 WorkflowEvent::TaskCancelled {
204 instance_id,
205 task_name,
206 } => CloudEvent::new(
207 Self::TASK_CANCELLED_TYPE,
208 serde_json::json!({
209 "instanceId": instance_id,
210 "taskName": task_name,
211 "cancelledAt": now_millis(),
212 }),
213 ),
214 WorkflowEvent::WorkflowStatusChanged {
215 instance_id,
216 status,
217 } => CloudEvent::new(
218 Self::WORKFLOW_STATUS_CHANGED_TYPE,
219 serde_json::json!({
220 "instanceId": instance_id,
221 "changedAt": now_millis(),
222 "status": status,
223 }),
224 ),
225 }
226 }
227}
228
229pub fn now_millis() -> u64 {
231 std::time::SystemTime::now()
232 .duration_since(std::time::UNIX_EPOCH)
233 .unwrap_or_default()
234 .as_millis() as u64
235}
236
237pub trait WorkflowExecutionListener: Send + Sync {
264 fn on_event(&self, event: &WorkflowEvent);
266}
267
268#[derive(Debug, Default)]
270pub struct CollectingListener {
271 events: std::sync::Mutex<Vec<WorkflowEvent>>,
272}
273
274impl CollectingListener {
275 pub fn new() -> Self {
277 Self::default()
278 }
279
280 pub fn events(&self) -> Vec<WorkflowEvent> {
282 self.events
283 .lock()
284 .unwrap_or_else(|e| e.into_inner())
285 .clone()
286 }
287
288 pub fn len(&self) -> usize {
290 self.events.lock().unwrap_or_else(|e| e.into_inner()).len()
291 }
292
293 pub fn is_empty(&self) -> bool {
295 self.len() == 0
296 }
297
298 pub fn clear(&self) {
300 self.events
301 .lock()
302 .unwrap_or_else(|e| e.into_inner())
303 .clear();
304 }
305}
306
307impl WorkflowExecutionListener for CollectingListener {
308 fn on_event(&self, event: &WorkflowEvent) {
309 self.events
310 .lock()
311 .unwrap_or_else(|e| e.into_inner())
312 .push(event.clone());
313 }
314}
315
316#[derive(Debug, Default)]
318pub struct NoOpListener;
319
320impl WorkflowExecutionListener for NoOpListener {
321 fn on_event(&self, _event: &WorkflowEvent) {}
322}
323
324pub struct MultiListener {
326 listeners: Vec<Arc<dyn WorkflowExecutionListener>>,
327}
328
329impl MultiListener {
330 pub fn new(listeners: Vec<Arc<dyn WorkflowExecutionListener>>) -> Self {
332 Self { listeners }
333 }
334}
335
336impl WorkflowExecutionListener for MultiListener {
337 fn on_event(&self, event: &WorkflowEvent) {
338 for listener in &self.listeners {
339 listener.on_event(event);
340 }
341 }
342}
343
344#[cfg(test)]
345mod tests {
346 use super::*;
347 use serde_json::json;
348
349 #[test]
350 fn test_collecting_listener() {
351 let listener = CollectingListener::new();
352 assert!(listener.is_empty());
353
354 listener.on_event(&WorkflowEvent::WorkflowStarted {
355 instance_id: "test-1".to_string(),
356 input: json!({}),
357 });
358 listener.on_event(&WorkflowEvent::TaskStarted {
359 instance_id: "test-1".to_string(),
360 task_name: "task1".to_string(),
361 });
362 assert_eq!(listener.len(), 2);
363
364 let events = listener.events();
365 assert!(
366 matches!(&events[0], WorkflowEvent::WorkflowStarted { instance_id, .. } if instance_id == "test-1")
367 );
368 assert!(
369 matches!(&events[1], WorkflowEvent::TaskStarted { task_name, .. } if task_name == "task1")
370 );
371 }
372
373 #[test]
374 fn test_multi_listener() {
375 let l1 = Arc::new(CollectingListener::new());
376 let l2 = Arc::new(CollectingListener::new());
377
378 let multi = MultiListener::new(vec![l1.clone(), l2.clone()]);
379 multi.on_event(&WorkflowEvent::WorkflowStarted {
380 instance_id: "test".to_string(),
381 input: json!({}),
382 });
383
384 assert_eq!(l1.len(), 1);
385 assert_eq!(l2.len(), 1);
386 }
387}