1use chrono::{DateTime, Utc};
36use serde::{Deserialize, Serialize};
37use tokio::sync::broadcast;
38use uuid::Uuid;
39
40#[derive(Debug, Clone, Serialize, Deserialize)]
42#[serde(tag = "action", rename_all = "snake_case")]
43pub enum EntityEvent {
44 Created {
46 entity_type: String,
47 entity_id: Uuid,
48 data: serde_json::Value,
49 },
50 Updated {
52 entity_type: String,
53 entity_id: Uuid,
54 data: serde_json::Value,
55 },
56 Deleted {
58 entity_type: String,
59 entity_id: Uuid,
60 },
61}
62
63#[derive(Debug, Clone, Serialize, Deserialize)]
65#[serde(tag = "action", rename_all = "snake_case")]
66pub enum LinkEvent {
67 Created {
69 link_type: String,
70 link_id: Uuid,
71 source_id: Uuid,
72 target_id: Uuid,
73 metadata: Option<serde_json::Value>,
74 },
75 Deleted {
77 link_type: String,
78 link_id: Uuid,
79 source_id: Uuid,
80 target_id: Uuid,
81 },
82}
83
84#[derive(Debug, Clone, Serialize, Deserialize)]
86#[serde(tag = "kind", rename_all = "snake_case")]
87pub enum FrameworkEvent {
88 Entity(EntityEvent),
90 Link(LinkEvent),
92}
93
94impl FrameworkEvent {
95 pub fn event_kind(&self) -> &str {
98 match self {
99 FrameworkEvent::Entity(_) => "entity",
100 FrameworkEvent::Link(_) => "link",
101 }
102 }
103
104 pub fn entity_type(&self) -> Option<&str> {
106 match self {
107 FrameworkEvent::Entity(e) => match e {
108 EntityEvent::Created { entity_type, .. }
109 | EntityEvent::Updated { entity_type, .. }
110 | EntityEvent::Deleted { entity_type, .. } => Some(entity_type),
111 },
112 FrameworkEvent::Link(_) => None,
113 }
114 }
115
116 pub fn entity_id(&self) -> Option<Uuid> {
118 match self {
119 FrameworkEvent::Entity(e) => match e {
120 EntityEvent::Created { entity_id, .. }
121 | EntityEvent::Updated { entity_id, .. }
122 | EntityEvent::Deleted { entity_id, .. } => Some(*entity_id),
123 },
124 FrameworkEvent::Link(l) => match l {
125 LinkEvent::Created { link_id, .. } | LinkEvent::Deleted { link_id, .. } => {
126 Some(*link_id)
127 }
128 },
129 }
130 }
131
132 pub fn action(&self) -> &str {
134 match self {
135 FrameworkEvent::Entity(e) => match e {
136 EntityEvent::Created { .. } => "created",
137 EntityEvent::Updated { .. } => "updated",
138 EntityEvent::Deleted { .. } => "deleted",
139 },
140 FrameworkEvent::Link(l) => match l {
141 LinkEvent::Created { .. } => "created",
142 LinkEvent::Deleted { .. } => "deleted",
143 },
144 }
145 }
146}
147
148#[derive(Debug, Clone, Serialize, Deserialize)]
150pub struct EventEnvelope {
151 pub id: Uuid,
153 pub timestamp: DateTime<Utc>,
155 pub event: FrameworkEvent,
157}
158
159impl EventEnvelope {
160 pub fn new(event: FrameworkEvent) -> Self {
162 Self {
163 id: Uuid::new_v4(),
164 timestamp: Utc::now(),
165 event,
166 }
167 }
168}
169
170#[derive(Debug, Clone)]
177pub struct EventBus {
178 sender: broadcast::Sender<EventEnvelope>,
179}
180
181impl EventBus {
182 pub fn new(capacity: usize) -> Self {
191 let (sender, _) = broadcast::channel(capacity);
192 Self { sender }
193 }
194
195 pub fn publish(&self, event: FrameworkEvent) -> usize {
203 let envelope = EventEnvelope::new(event);
204 self.sender.send(envelope).unwrap_or(0)
206 }
207
208 pub fn subscribe(&self) -> broadcast::Receiver<EventEnvelope> {
213 self.sender.subscribe()
214 }
215
216 pub fn receiver_count(&self) -> usize {
218 self.sender.receiver_count()
219 }
220}
221
222impl Default for EventBus {
223 fn default() -> Self {
224 Self::new(1024)
225 }
226}
227
228#[cfg(test)]
229mod tests {
230 use super::*;
231 use serde_json::json;
232
233 #[test]
234 fn test_entity_event_created() {
235 let event = EntityEvent::Created {
236 entity_type: "order".to_string(),
237 entity_id: Uuid::new_v4(),
238 data: json!({"name": "Order #1"}),
239 };
240
241 let json = serde_json::to_value(&event).unwrap();
242 assert_eq!(json["action"], "created");
243 assert_eq!(json["entity_type"], "order");
244 }
245
246 #[test]
247 fn test_link_event_created() {
248 let event = LinkEvent::Created {
249 link_type: "has_invoice".to_string(),
250 link_id: Uuid::new_v4(),
251 source_id: Uuid::new_v4(),
252 target_id: Uuid::new_v4(),
253 metadata: Some(json!({"priority": "high"})),
254 };
255
256 let json = serde_json::to_value(&event).unwrap();
257 assert_eq!(json["action"], "created");
258 assert_eq!(json["link_type"], "has_invoice");
259 }
260
261 #[test]
262 fn test_framework_event_entity_type() {
263 let event = FrameworkEvent::Entity(EntityEvent::Updated {
264 entity_type: "invoice".to_string(),
265 entity_id: Uuid::new_v4(),
266 data: json!({"status": "paid"}),
267 });
268
269 assert_eq!(event.entity_type(), Some("invoice"));
270 assert_eq!(event.action(), "updated");
271 assert_eq!(event.event_kind(), "entity");
272 }
273
274 #[test]
275 fn test_framework_event_link() {
276 let event = FrameworkEvent::Link(LinkEvent::Deleted {
277 link_type: "has_invoice".to_string(),
278 link_id: Uuid::new_v4(),
279 source_id: Uuid::new_v4(),
280 target_id: Uuid::new_v4(),
281 });
282
283 assert_eq!(event.entity_type(), None);
284 assert_eq!(event.action(), "deleted");
285 assert_eq!(event.event_kind(), "link");
286 }
287
288 #[test]
289 fn test_event_envelope_has_metadata() {
290 let event = FrameworkEvent::Entity(EntityEvent::Created {
291 entity_type: "order".to_string(),
292 entity_id: Uuid::new_v4(),
293 data: json!({}),
294 });
295
296 let envelope = EventEnvelope::new(event);
297 assert!(!envelope.id.is_nil());
298 assert!(envelope.timestamp <= Utc::now());
299 }
300
301 #[test]
302 fn test_event_envelope_serialization_roundtrip() {
303 let event = FrameworkEvent::Entity(EntityEvent::Created {
304 entity_type: "order".to_string(),
305 entity_id: Uuid::new_v4(),
306 data: json!({"amount": 42.0}),
307 });
308
309 let envelope = EventEnvelope::new(event);
310 let json = serde_json::to_string(&envelope).unwrap();
311 let deserialized: EventEnvelope = serde_json::from_str(&json).unwrap();
312
313 assert_eq!(envelope.id, deserialized.id);
314 assert_eq!(envelope.event.event_kind(), deserialized.event.event_kind());
315 }
316
317 #[tokio::test]
318 async fn test_event_bus_publish_subscribe() {
319 let bus = EventBus::new(16);
320 let mut rx = bus.subscribe();
321
322 let entity_id = Uuid::new_v4();
323 let event = FrameworkEvent::Entity(EntityEvent::Created {
324 entity_type: "order".to_string(),
325 entity_id,
326 data: json!({"name": "Test Order"}),
327 });
328
329 let receivers = bus.publish(event);
330 assert_eq!(receivers, 1);
331
332 let received = rx.recv().await.unwrap();
333 assert_eq!(received.event.entity_id(), Some(entity_id));
334 assert_eq!(received.event.action(), "created");
335 }
336
337 #[tokio::test]
338 async fn test_event_bus_multiple_subscribers() {
339 let bus = EventBus::new(16);
340 let mut rx1 = bus.subscribe();
341 let mut rx2 = bus.subscribe();
342
343 assert_eq!(bus.receiver_count(), 2);
344
345 let event = FrameworkEvent::Entity(EntityEvent::Deleted {
346 entity_type: "order".to_string(),
347 entity_id: Uuid::new_v4(),
348 });
349
350 let receivers = bus.publish(event);
351 assert_eq!(receivers, 2);
352
353 let e1 = rx1.recv().await.unwrap();
354 let e2 = rx2.recv().await.unwrap();
355
356 assert_eq!(e1.id, e2.id); }
358
359 #[test]
360 fn test_event_bus_publish_without_subscribers() {
361 let bus = EventBus::new(16);
362
363 let event = FrameworkEvent::Entity(EntityEvent::Created {
364 entity_type: "order".to_string(),
365 entity_id: Uuid::new_v4(),
366 data: json!({}),
367 });
368
369 let receivers = bus.publish(event);
371 assert_eq!(receivers, 0);
372 }
373
374 #[test]
375 fn test_event_bus_default() {
376 let bus = EventBus::default();
377 assert_eq!(bus.receiver_count(), 0);
378 }
379
380 #[test]
381 fn test_event_bus_clone() {
382 let bus = EventBus::new(16);
383 let _rx = bus.subscribe();
384
385 let bus2 = bus.clone();
386 assert_eq!(bus2.receiver_count(), 1);
387
388 let _rx2 = bus2.subscribe();
389 assert_eq!(bus.receiver_count(), 2);
390 }
391}