Skip to main content

swf_runtime/
events.rs

1use serde_json::Value;
2use std::collections::HashMap;
3use std::sync::atomic::{AtomicUsize, Ordering};
4use std::sync::Arc;
5
6/// A simplified CloudEvent-like structure for event publishing and consumption
7#[derive(Debug, Clone)]
8pub struct CloudEvent {
9    /// Event type (e.g., "com.example.event.occurred")
10    pub event_type: String,
11    /// Event source
12    pub source: Option<String>,
13    /// Event data payload
14    pub data: Value,
15    /// Additional attributes
16    pub attributes: HashMap<String, Value>,
17}
18
19impl CloudEvent {
20    pub fn new(event_type: &str, data: Value) -> Self {
21        Self {
22            event_type: event_type.to_string(),
23            source: None,
24            data,
25            attributes: HashMap::new(),
26        }
27    }
28
29    pub fn with_source(mut self, source: &str) -> Self {
30        self.source = Some(source.to_string());
31        self
32    }
33
34    pub fn with_attribute(mut self, key: &str, value: Value) -> Self {
35        self.attributes.insert(key.to_string(), value);
36        self
37    }
38
39    /// Converts this CloudEvent to a JSON Value for expression evaluation
40    pub fn to_json_value(&self) -> Value {
41        let mut obj = serde_json::Map::new();
42        obj.insert("type".to_string(), Value::String(self.event_type.clone()));
43        if let Some(ref source) = self.source {
44            obj.insert("source".to_string(), Value::String(source.clone()));
45        }
46        obj.insert("data".to_string(), self.data.clone());
47        for (k, v) in &self.attributes {
48            obj.insert(k.clone(), v.clone());
49        }
50        Value::Object(obj)
51    }
52}
53
54/// A subscription handle that holds a receiver for events
55pub struct EventSubscription {
56    /// Unique subscription ID
57    pub id: usize,
58    /// The event type filter (None means subscribe to all events)
59    pub event_type: Option<String>,
60    /// The broadcast receiver — stored so events published after subscribe are captured
61    receiver: tokio::sync::broadcast::Receiver<CloudEvent>,
62}
63
64/// Trait for event bus implementations
65///
66/// Provides publish/subscribe functionality for workflow events.
67/// Used by `emit` tasks to publish and `listen` tasks to consume events.
68#[async_trait::async_trait]
69pub trait EventBus: Send + Sync {
70    /// Publish an event to the bus
71    async fn publish(&self, event: CloudEvent);
72
73    /// Subscribe to events matching a specific type.
74    /// The subscription starts receiving events from the point of subscription.
75    async fn subscribe(&self, event_type: &str) -> EventSubscription;
76
77    /// Subscribe to all events
78    async fn subscribe_all(&self) -> EventSubscription;
79
80    /// Unsubscribe a previously registered subscription
81    async fn unsubscribe(&self, subscription: EventSubscription);
82
83    /// Receive the next event matching a subscription (blocking wait)
84    async fn recv(&self, subscription: &mut EventSubscription) -> Option<CloudEvent>;
85}
86
87/// In-memory event bus implementation using broadcast channels
88pub struct InMemoryEventBus {
89    sender: tokio::sync::broadcast::Sender<CloudEvent>,
90    next_id: AtomicUsize,
91}
92
93const DEFAULT_CHANNEL_CAPACITY: usize = 1024;
94
95impl InMemoryEventBus {
96    pub fn new() -> Self {
97        Self::with_capacity(DEFAULT_CHANNEL_CAPACITY)
98    }
99
100    pub fn with_capacity(capacity: usize) -> Self {
101        let (sender, _) = tokio::sync::broadcast::channel(capacity);
102        Self {
103            sender,
104            next_id: AtomicUsize::new(0),
105        }
106    }
107
108    fn allocate_id(&self) -> usize {
109        self.next_id.fetch_add(1, Ordering::Relaxed)
110    }
111}
112
113impl Default for InMemoryEventBus {
114    fn default() -> Self {
115        Self::new()
116    }
117}
118
119#[async_trait::async_trait]
120impl EventBus for InMemoryEventBus {
121    async fn publish(&self, event: CloudEvent) {
122        let _ = self.sender.send(event);
123    }
124
125    async fn subscribe(&self, event_type: &str) -> EventSubscription {
126        let id = self.allocate_id();
127        EventSubscription {
128            id,
129            event_type: Some(event_type.to_string()),
130            receiver: self.sender.subscribe(),
131        }
132    }
133
134    async fn subscribe_all(&self) -> EventSubscription {
135        let id = self.allocate_id();
136        EventSubscription {
137            id,
138            event_type: None,
139            receiver: self.sender.subscribe(),
140        }
141    }
142
143    async fn unsubscribe(&self, _subscription: EventSubscription) {
144        // Receiver is dropped when subscription is dropped
145    }
146
147    async fn recv(&self, subscription: &mut EventSubscription) -> Option<CloudEvent> {
148        loop {
149            match subscription.receiver.recv().await {
150                Ok(event) => {
151                    if let Some(ref filter_type) = subscription.event_type {
152                        if event.event_type != *filter_type {
153                            continue;
154                        }
155                    }
156                    return Some(event);
157                }
158                Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => {
159                    tokio::task::yield_now().await;
160                    continue;
161                }
162                Err(tokio::sync::broadcast::error::RecvError::Closed) => return None,
163            }
164        }
165    }
166}
167
168/// A shared, thread-safe event bus
169pub type SharedEventBus = Arc<dyn EventBus>;
170
171#[cfg(test)]
172mod tests {
173    use super::*;
174    use serde_json::json;
175
176    #[tokio::test]
177    async fn test_publish_subscribe() {
178        let bus = InMemoryEventBus::new();
179
180        let mut sub = bus.subscribe("com.example.test").await;
181        bus.publish(CloudEvent::new("com.example.test", json!({"msg": "hello"})))
182            .await;
183
184        let event = bus.recv(&mut sub).await.unwrap();
185        assert_eq!(event.event_type, "com.example.test");
186        assert_eq!(event.data["msg"], "hello");
187    }
188
189    #[tokio::test]
190    async fn test_subscribe_filters_by_type() {
191        let bus = Arc::new(InMemoryEventBus::new());
192
193        let mut sub = bus.subscribe("com.example.target").await;
194
195        let bus_clone = bus.clone();
196        tokio::spawn(async move {
197            bus_clone
198                .publish(CloudEvent::new("com.example.other", json!({})))
199                .await;
200            bus_clone
201                .publish(CloudEvent::new(
202                    "com.example.target",
203                    json!({"found": true}),
204                ))
205                .await;
206        });
207
208        let event = bus.recv(&mut sub).await.unwrap();
209        assert_eq!(event.event_type, "com.example.target");
210        assert_eq!(event.data["found"], true);
211    }
212
213    #[tokio::test]
214    async fn test_subscribe_all() {
215        let bus = InMemoryEventBus::new();
216
217        let mut sub = bus.subscribe_all().await;
218        bus.publish(CloudEvent::new("type.a", json!({"a": 1})))
219            .await;
220
221        let event = bus.recv(&mut sub).await.unwrap();
222        assert_eq!(event.event_type, "type.a");
223    }
224
225    #[tokio::test]
226    async fn test_cloud_event_builder() {
227        let event = CloudEvent::new("test.event", json!({"key": "value"}))
228            .with_source("https://example.com")
229            .with_attribute("correlationId", json!("abc-123"));
230
231        assert_eq!(event.event_type, "test.event");
232        assert_eq!(event.source, Some("https://example.com".to_string()));
233        assert_eq!(event.attributes["correlationId"], json!("abc-123"));
234    }
235
236    #[tokio::test]
237    async fn test_multiple_events() {
238        let bus = InMemoryEventBus::new();
239
240        let mut sub = bus.subscribe("test").await;
241        bus.publish(CloudEvent::new("test", json!(1))).await;
242        bus.publish(CloudEvent::new("test", json!(2))).await;
243        bus.publish(CloudEvent::new("test", json!(3))).await;
244
245        let e1 = bus.recv(&mut sub).await.unwrap();
246        assert_eq!(e1.data, json!(1));
247        let e2 = bus.recv(&mut sub).await.unwrap();
248        assert_eq!(e2.data, json!(2));
249        let e3 = bus.recv(&mut sub).await.unwrap();
250        assert_eq!(e3.data, json!(3));
251    }
252}