1use serde_json::Value;
2use std::collections::HashMap;
3use std::sync::atomic::{AtomicUsize, Ordering};
4use std::sync::Arc;
5
6#[derive(Debug, Clone)]
8pub struct CloudEvent {
9 pub event_type: String,
11 pub source: Option<String>,
13 pub data: Value,
15 pub attributes: HashMap<String, Value>,
17}
18
19impl CloudEvent {
20 pub fn new(event_type: &str, data: Value) -> Self {
21 Self {
22 event_type: event_type.to_string(),
23 source: None,
24 data,
25 attributes: HashMap::new(),
26 }
27 }
28
29 pub fn with_source(mut self, source: &str) -> Self {
30 self.source = Some(source.to_string());
31 self
32 }
33
34 pub fn with_attribute(mut self, key: &str, value: Value) -> Self {
35 self.attributes.insert(key.to_string(), value);
36 self
37 }
38
39 pub fn to_json_value(&self) -> Value {
41 let mut obj = serde_json::Map::new();
42 obj.insert("type".to_string(), Value::String(self.event_type.clone()));
43 if let Some(ref source) = self.source {
44 obj.insert("source".to_string(), Value::String(source.clone()));
45 }
46 obj.insert("data".to_string(), self.data.clone());
47 for (k, v) in &self.attributes {
48 obj.insert(k.clone(), v.clone());
49 }
50 Value::Object(obj)
51 }
52}
53
54pub struct EventSubscription {
56 pub id: usize,
58 pub event_type: Option<String>,
60 receiver: tokio::sync::broadcast::Receiver<CloudEvent>,
62}
63
64#[async_trait::async_trait]
69pub trait EventBus: Send + Sync {
70 async fn publish(&self, event: CloudEvent);
72
73 async fn subscribe(&self, event_type: &str) -> EventSubscription;
76
77 async fn subscribe_all(&self) -> EventSubscription;
79
80 async fn unsubscribe(&self, subscription: EventSubscription);
82
83 async fn recv(&self, subscription: &mut EventSubscription) -> Option<CloudEvent>;
85}
86
87pub struct InMemoryEventBus {
89 sender: tokio::sync::broadcast::Sender<CloudEvent>,
90 next_id: AtomicUsize,
91}
92
93const DEFAULT_CHANNEL_CAPACITY: usize = 1024;
94
95impl InMemoryEventBus {
96 pub fn new() -> Self {
97 Self::with_capacity(DEFAULT_CHANNEL_CAPACITY)
98 }
99
100 pub fn with_capacity(capacity: usize) -> Self {
101 let (sender, _) = tokio::sync::broadcast::channel(capacity);
102 Self {
103 sender,
104 next_id: AtomicUsize::new(0),
105 }
106 }
107
108 fn allocate_id(&self) -> usize {
109 self.next_id.fetch_add(1, Ordering::Relaxed)
110 }
111}
112
113impl Default for InMemoryEventBus {
114 fn default() -> Self {
115 Self::new()
116 }
117}
118
119#[async_trait::async_trait]
120impl EventBus for InMemoryEventBus {
121 async fn publish(&self, event: CloudEvent) {
122 let _ = self.sender.send(event);
123 }
124
125 async fn subscribe(&self, event_type: &str) -> EventSubscription {
126 let id = self.allocate_id();
127 EventSubscription {
128 id,
129 event_type: Some(event_type.to_string()),
130 receiver: self.sender.subscribe(),
131 }
132 }
133
134 async fn subscribe_all(&self) -> EventSubscription {
135 let id = self.allocate_id();
136 EventSubscription {
137 id,
138 event_type: None,
139 receiver: self.sender.subscribe(),
140 }
141 }
142
143 async fn unsubscribe(&self, _subscription: EventSubscription) {
144 }
146
147 async fn recv(&self, subscription: &mut EventSubscription) -> Option<CloudEvent> {
148 loop {
149 match subscription.receiver.recv().await {
150 Ok(event) => {
151 if let Some(ref filter_type) = subscription.event_type {
152 if event.event_type != *filter_type {
153 continue;
154 }
155 }
156 return Some(event);
157 }
158 Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => {
159 tokio::task::yield_now().await;
160 continue;
161 }
162 Err(tokio::sync::broadcast::error::RecvError::Closed) => return None,
163 }
164 }
165 }
166}
167
168pub type SharedEventBus = Arc<dyn EventBus>;
170
171#[cfg(test)]
172mod tests {
173 use super::*;
174 use serde_json::json;
175
176 #[tokio::test]
177 async fn test_publish_subscribe() {
178 let bus = InMemoryEventBus::new();
179
180 let mut sub = bus.subscribe("com.example.test").await;
181 bus.publish(CloudEvent::new("com.example.test", json!({"msg": "hello"})))
182 .await;
183
184 let event = bus.recv(&mut sub).await.unwrap();
185 assert_eq!(event.event_type, "com.example.test");
186 assert_eq!(event.data["msg"], "hello");
187 }
188
189 #[tokio::test]
190 async fn test_subscribe_filters_by_type() {
191 let bus = Arc::new(InMemoryEventBus::new());
192
193 let mut sub = bus.subscribe("com.example.target").await;
194
195 let bus_clone = bus.clone();
196 tokio::spawn(async move {
197 bus_clone
198 .publish(CloudEvent::new("com.example.other", json!({})))
199 .await;
200 bus_clone
201 .publish(CloudEvent::new(
202 "com.example.target",
203 json!({"found": true}),
204 ))
205 .await;
206 });
207
208 let event = bus.recv(&mut sub).await.unwrap();
209 assert_eq!(event.event_type, "com.example.target");
210 assert_eq!(event.data["found"], true);
211 }
212
213 #[tokio::test]
214 async fn test_subscribe_all() {
215 let bus = InMemoryEventBus::new();
216
217 let mut sub = bus.subscribe_all().await;
218 bus.publish(CloudEvent::new("type.a", json!({"a": 1})))
219 .await;
220
221 let event = bus.recv(&mut sub).await.unwrap();
222 assert_eq!(event.event_type, "type.a");
223 }
224
225 #[tokio::test]
226 async fn test_cloud_event_builder() {
227 let event = CloudEvent::new("test.event", json!({"key": "value"}))
228 .with_source("https://example.com")
229 .with_attribute("correlationId", json!("abc-123"));
230
231 assert_eq!(event.event_type, "test.event");
232 assert_eq!(event.source, Some("https://example.com".to_string()));
233 assert_eq!(event.attributes["correlationId"], json!("abc-123"));
234 }
235
236 #[tokio::test]
237 async fn test_multiple_events() {
238 let bus = InMemoryEventBus::new();
239
240 let mut sub = bus.subscribe("test").await;
241 bus.publish(CloudEvent::new("test", json!(1))).await;
242 bus.publish(CloudEvent::new("test", json!(2))).await;
243 bus.publish(CloudEvent::new("test", json!(3))).await;
244
245 let e1 = bus.recv(&mut sub).await.unwrap();
246 assert_eq!(e1.data, json!(1));
247 let e2 = bus.recv(&mut sub).await.unwrap();
248 assert_eq!(e2.data, json!(2));
249 let e3 = bus.recv(&mut sub).await.unwrap();
250 assert_eq!(e3.data, json!(3));
251 }
252}