1use crate::events::log::EventLog;
36use crate::events::types::SeqNo;
37use chrono::{DateTime, Utc};
38use serde::{Deserialize, Serialize};
39use std::sync::Arc;
40use tokio::sync::broadcast;
41use uuid::Uuid;
42
43#[derive(Debug, Clone, Serialize, Deserialize)]
45#[serde(tag = "action", rename_all = "snake_case")]
46pub enum EntityEvent {
47 Created {
49 entity_type: String,
50 entity_id: Uuid,
51 data: serde_json::Value,
52 },
53 Updated {
55 entity_type: String,
56 entity_id: Uuid,
57 data: serde_json::Value,
58 },
59 Deleted {
61 entity_type: String,
62 entity_id: Uuid,
63 },
64}
65
66#[derive(Debug, Clone, Serialize, Deserialize)]
68#[serde(tag = "action", rename_all = "snake_case")]
69pub enum LinkEvent {
70 Created {
72 link_type: String,
73 link_id: Uuid,
74 source_id: Uuid,
75 target_id: Uuid,
76 metadata: Option<serde_json::Value>,
77 },
78 Deleted {
80 link_type: String,
81 link_id: Uuid,
82 source_id: Uuid,
83 target_id: Uuid,
84 },
85}
86
87#[derive(Debug, Clone, Serialize, Deserialize)]
89#[serde(tag = "kind", rename_all = "snake_case")]
90pub enum FrameworkEvent {
91 Entity(EntityEvent),
93 Link(LinkEvent),
95}
96
97impl FrameworkEvent {
98 pub fn event_kind(&self) -> &str {
101 match self {
102 FrameworkEvent::Entity(_) => "entity",
103 FrameworkEvent::Link(_) => "link",
104 }
105 }
106
107 pub fn entity_type(&self) -> Option<&str> {
109 match self {
110 FrameworkEvent::Entity(e) => match e {
111 EntityEvent::Created { entity_type, .. }
112 | EntityEvent::Updated { entity_type, .. }
113 | EntityEvent::Deleted { entity_type, .. } => Some(entity_type),
114 },
115 FrameworkEvent::Link(_) => None,
116 }
117 }
118
119 pub fn entity_id(&self) -> Option<Uuid> {
121 match self {
122 FrameworkEvent::Entity(e) => match e {
123 EntityEvent::Created { entity_id, .. }
124 | EntityEvent::Updated { entity_id, .. }
125 | EntityEvent::Deleted { entity_id, .. } => Some(*entity_id),
126 },
127 FrameworkEvent::Link(l) => match l {
128 LinkEvent::Created { link_id, .. } | LinkEvent::Deleted { link_id, .. } => {
129 Some(*link_id)
130 }
131 },
132 }
133 }
134
135 pub fn action(&self) -> &str {
137 match self {
138 FrameworkEvent::Entity(e) => match e {
139 EntityEvent::Created { .. } => "created",
140 EntityEvent::Updated { .. } => "updated",
141 EntityEvent::Deleted { .. } => "deleted",
142 },
143 FrameworkEvent::Link(l) => match l {
144 LinkEvent::Created { .. } => "created",
145 LinkEvent::Deleted { .. } => "deleted",
146 },
147 }
148 }
149}
150
151#[derive(Debug, Clone, Serialize, Deserialize)]
153pub struct EventEnvelope {
154 pub id: Uuid,
156 pub timestamp: DateTime<Utc>,
158 pub event: FrameworkEvent,
160 #[serde(skip_serializing_if = "Option::is_none", default)]
162 pub seq_no: Option<SeqNo>,
163}
164
165impl EventEnvelope {
166 pub fn new(event: FrameworkEvent) -> Self {
168 Self {
169 id: Uuid::new_v4(),
170 timestamp: Utc::now(),
171 event,
172 seq_no: None,
173 }
174 }
175}
176
177#[derive(Clone)]
195pub struct EventBus {
196 sender: broadcast::Sender<EventEnvelope>,
197 event_log: Option<Arc<dyn EventLog>>,
199}
200
201impl std::fmt::Debug for EventBus {
202 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
203 f.debug_struct("EventBus")
204 .field("sender", &self.sender)
205 .field("has_event_log", &self.event_log.is_some())
206 .finish()
207 }
208}
209
210impl EventBus {
211 pub fn new(capacity: usize) -> Self {
220 let (sender, _) = broadcast::channel(capacity);
221 Self {
222 sender,
223 event_log: None,
224 }
225 }
226
227 pub fn with_event_log(mut self, event_log: Arc<dyn EventLog>) -> Self {
235 self.event_log = Some(event_log);
236 self
237 }
238
239 pub fn event_log(&self) -> Option<&Arc<dyn EventLog>> {
241 self.event_log.as_ref()
242 }
243
244 pub fn publish(&self, event: FrameworkEvent) -> usize {
255 let envelope = EventEnvelope::new(event);
257
258 if let Some(event_log) = &self.event_log {
260 let log = event_log.clone();
261 let envelope_clone = envelope.clone();
262 tokio::spawn(async move {
263 if let Err(e) = log.append(envelope_clone).await {
264 tracing::warn!("Failed to append event to EventLog: {}", e);
265 }
266 });
267 }
268
269 self.sender.send(envelope).unwrap_or(0)
271 }
272
273 pub fn subscribe(&self) -> broadcast::Receiver<EventEnvelope> {
278 self.sender.subscribe()
279 }
280
281 pub fn receiver_count(&self) -> usize {
283 self.sender.receiver_count()
284 }
285}
286
287impl Default for EventBus {
288 fn default() -> Self {
289 Self::new(1024)
290 }
291}
292
293#[cfg(test)]
294mod tests {
295 use super::*;
296 use serde_json::json;
297
298 #[test]
299 fn test_entity_event_created() {
300 let event = EntityEvent::Created {
301 entity_type: "order".to_string(),
302 entity_id: Uuid::new_v4(),
303 data: json!({"name": "Order #1"}),
304 };
305
306 let json = serde_json::to_value(&event).unwrap();
307 assert_eq!(json["action"], "created");
308 assert_eq!(json["entity_type"], "order");
309 }
310
311 #[test]
312 fn test_link_event_created() {
313 let event = LinkEvent::Created {
314 link_type: "has_invoice".to_string(),
315 link_id: Uuid::new_v4(),
316 source_id: Uuid::new_v4(),
317 target_id: Uuid::new_v4(),
318 metadata: Some(json!({"priority": "high"})),
319 };
320
321 let json = serde_json::to_value(&event).unwrap();
322 assert_eq!(json["action"], "created");
323 assert_eq!(json["link_type"], "has_invoice");
324 }
325
326 #[test]
327 fn test_framework_event_entity_type() {
328 let event = FrameworkEvent::Entity(EntityEvent::Updated {
329 entity_type: "invoice".to_string(),
330 entity_id: Uuid::new_v4(),
331 data: json!({"status": "paid"}),
332 });
333
334 assert_eq!(event.entity_type(), Some("invoice"));
335 assert_eq!(event.action(), "updated");
336 assert_eq!(event.event_kind(), "entity");
337 }
338
339 #[test]
340 fn test_framework_event_link() {
341 let event = FrameworkEvent::Link(LinkEvent::Deleted {
342 link_type: "has_invoice".to_string(),
343 link_id: Uuid::new_v4(),
344 source_id: Uuid::new_v4(),
345 target_id: Uuid::new_v4(),
346 });
347
348 assert_eq!(event.entity_type(), None);
349 assert_eq!(event.action(), "deleted");
350 assert_eq!(event.event_kind(), "link");
351 }
352
353 #[test]
354 fn test_event_envelope_has_metadata() {
355 let event = FrameworkEvent::Entity(EntityEvent::Created {
356 entity_type: "order".to_string(),
357 entity_id: Uuid::new_v4(),
358 data: json!({}),
359 });
360
361 let envelope = EventEnvelope::new(event);
362 assert!(!envelope.id.is_nil());
363 assert!(envelope.timestamp <= Utc::now());
364 }
365
366 #[test]
367 fn test_event_envelope_serialization_roundtrip() {
368 let event = FrameworkEvent::Entity(EntityEvent::Created {
369 entity_type: "order".to_string(),
370 entity_id: Uuid::new_v4(),
371 data: json!({"amount": 42.0}),
372 });
373
374 let envelope = EventEnvelope::new(event);
375 let json = serde_json::to_string(&envelope).unwrap();
376 let deserialized: EventEnvelope = serde_json::from_str(&json).unwrap();
377
378 assert_eq!(envelope.id, deserialized.id);
379 assert_eq!(envelope.event.event_kind(), deserialized.event.event_kind());
380 }
381
382 #[tokio::test]
383 async fn test_event_bus_publish_subscribe() {
384 let bus = EventBus::new(16);
385 let mut rx = bus.subscribe();
386
387 let entity_id = Uuid::new_v4();
388 let event = FrameworkEvent::Entity(EntityEvent::Created {
389 entity_type: "order".to_string(),
390 entity_id,
391 data: json!({"name": "Test Order"}),
392 });
393
394 let receivers = bus.publish(event);
395 assert_eq!(receivers, 1);
396
397 let received = rx.recv().await.unwrap();
398 assert_eq!(received.event.entity_id(), Some(entity_id));
399 assert_eq!(received.event.action(), "created");
400 }
401
402 #[tokio::test]
403 async fn test_event_bus_multiple_subscribers() {
404 let bus = EventBus::new(16);
405 let mut rx1 = bus.subscribe();
406 let mut rx2 = bus.subscribe();
407
408 assert_eq!(bus.receiver_count(), 2);
409
410 let event = FrameworkEvent::Entity(EntityEvent::Deleted {
411 entity_type: "order".to_string(),
412 entity_id: Uuid::new_v4(),
413 });
414
415 let receivers = bus.publish(event);
416 assert_eq!(receivers, 2);
417
418 let e1 = rx1.recv().await.unwrap();
419 let e2 = rx2.recv().await.unwrap();
420
421 assert_eq!(e1.id, e2.id); }
423
424 #[test]
425 fn test_event_bus_publish_without_subscribers() {
426 let bus = EventBus::new(16);
427
428 let event = FrameworkEvent::Entity(EntityEvent::Created {
429 entity_type: "order".to_string(),
430 entity_id: Uuid::new_v4(),
431 data: json!({}),
432 });
433
434 let receivers = bus.publish(event);
436 assert_eq!(receivers, 0);
437 }
438
439 #[test]
440 fn test_event_bus_default() {
441 let bus = EventBus::default();
442 assert_eq!(bus.receiver_count(), 0);
443 }
444
445 #[test]
446 fn test_event_bus_clone() {
447 let bus = EventBus::new(16);
448 let _rx = bus.subscribe();
449
450 let bus2 = bus.clone();
451 assert_eq!(bus2.receiver_count(), 1);
452
453 let _rx2 = bus2.subscribe();
454 assert_eq!(bus.receiver_count(), 2);
455 }
456
457 #[test]
458 fn test_entity_event_deleted_serialization() {
459 let entity_id = Uuid::new_v4();
460 let event = EntityEvent::Deleted {
461 entity_type: "invoice".to_string(),
462 entity_id,
463 };
464
465 let json = serde_json::to_value(&event).expect("EntityEvent::Deleted should serialize");
466 assert_eq!(json["action"], "deleted");
467 assert_eq!(json["entity_type"], "invoice");
468 assert_eq!(json["entity_id"], entity_id.to_string());
469 assert!(json.get("data").is_none());
471 }
472
473 #[test]
474 fn test_link_event_deleted_serialization() {
475 let link_id = Uuid::new_v4();
476 let source_id = Uuid::new_v4();
477 let target_id = Uuid::new_v4();
478 let event = LinkEvent::Deleted {
479 link_type: "ownership".to_string(),
480 link_id,
481 source_id,
482 target_id,
483 };
484
485 let json = serde_json::to_value(&event).expect("LinkEvent::Deleted should serialize");
486 assert_eq!(json["action"], "deleted");
487 assert_eq!(json["link_type"], "ownership");
488 assert_eq!(json["link_id"], link_id.to_string());
489 assert_eq!(json["source_id"], source_id.to_string());
490 assert_eq!(json["target_id"], target_id.to_string());
491 assert!(json.get("metadata").is_none());
493 }
494
495 #[test]
496 fn test_framework_event_entity_id_for_link_created() {
497 let link_id = Uuid::new_v4();
498 let event = FrameworkEvent::Link(LinkEvent::Created {
499 link_type: "worker".to_string(),
500 link_id,
501 source_id: Uuid::new_v4(),
502 target_id: Uuid::new_v4(),
503 metadata: None,
504 });
505
506 assert_eq!(event.entity_id(), Some(link_id));
508 assert_eq!(event.entity_type(), None);
510 }
511
512 #[test]
513 fn test_framework_event_pattern_matching_all_entity_actions() {
514 let id = Uuid::new_v4();
515
516 let created = FrameworkEvent::Entity(EntityEvent::Created {
517 entity_type: "order".to_string(),
518 entity_id: id,
519 data: json!({}),
520 });
521 assert_eq!(created.action(), "created");
522 assert_eq!(created.event_kind(), "entity");
523 assert_eq!(created.entity_type(), Some("order"));
524 assert_eq!(created.entity_id(), Some(id));
525
526 let updated = FrameworkEvent::Entity(EntityEvent::Updated {
527 entity_type: "order".to_string(),
528 entity_id: id,
529 data: json!({"status": "shipped"}),
530 });
531 assert_eq!(updated.action(), "updated");
532
533 let deleted = FrameworkEvent::Entity(EntityEvent::Deleted {
534 entity_type: "order".to_string(),
535 entity_id: id,
536 });
537 assert_eq!(deleted.action(), "deleted");
538 assert_eq!(deleted.entity_id(), Some(id));
539 }
540
541 #[test]
542 fn test_framework_event_pattern_matching_all_link_actions() {
543 let link_id = Uuid::new_v4();
544
545 let created = FrameworkEvent::Link(LinkEvent::Created {
546 link_type: "driver".to_string(),
547 link_id,
548 source_id: Uuid::new_v4(),
549 target_id: Uuid::new_v4(),
550 metadata: Some(json!({"license": "B"})),
551 });
552 assert_eq!(created.action(), "created");
553 assert_eq!(created.event_kind(), "link");
554 assert_eq!(created.entity_id(), Some(link_id));
555
556 let deleted = FrameworkEvent::Link(LinkEvent::Deleted {
557 link_type: "driver".to_string(),
558 link_id,
559 source_id: Uuid::new_v4(),
560 target_id: Uuid::new_v4(),
561 });
562 assert_eq!(deleted.action(), "deleted");
563 assert_eq!(deleted.event_kind(), "link");
564 assert_eq!(deleted.entity_id(), Some(link_id));
565 }
566
567 #[tokio::test]
568 async fn test_event_bus_without_event_log() {
569 let bus = EventBus::new(16);
570 assert!(bus.event_log().is_none());
571
572 let mut rx = bus.subscribe();
573 bus.publish(FrameworkEvent::Entity(EntityEvent::Created {
574 entity_type: "order".to_string(),
575 entity_id: Uuid::new_v4(),
576 data: json!({}),
577 }));
578
579 let received = rx.recv().await.unwrap();
580 assert_eq!(received.event.action(), "created");
581 }
582
583 #[tokio::test]
584 async fn test_event_bus_with_event_log_bridge() {
585 use crate::events::log::EventLog;
586 use crate::events::memory::InMemoryEventLog;
587
588 let event_log = Arc::new(InMemoryEventLog::new());
589 let bus = EventBus::new(16).with_event_log(event_log.clone());
590
591 assert!(bus.event_log().is_some());
592
593 let mut rx = bus.subscribe();
594
595 bus.publish(FrameworkEvent::Entity(EntityEvent::Created {
597 entity_type: "user".to_string(),
598 entity_id: Uuid::new_v4(),
599 data: json!({"name": "Alice"}),
600 }));
601
602 let received = rx.recv().await.unwrap();
604 assert_eq!(received.event.entity_type(), Some("user"));
605
606 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
608
609 assert_eq!(event_log.last_seq_no().await, Some(1));
611 }
612
613 #[tokio::test]
614 async fn test_event_bus_bridge_multiple_events() {
615 use crate::events::log::EventLog;
616 use crate::events::memory::InMemoryEventLog;
617 use crate::events::types::SeekPosition;
618 use tokio_stream::StreamExt;
619
620 let event_log = Arc::new(InMemoryEventLog::new());
621 let bus = EventBus::new(16).with_event_log(event_log.clone());
622
623 for i in 0..5 {
625 bus.publish(FrameworkEvent::Entity(EntityEvent::Created {
626 entity_type: format!("type_{i}"),
627 entity_id: Uuid::new_v4(),
628 data: json!({}),
629 }));
630 }
631
632 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
634
635 assert_eq!(event_log.last_seq_no().await, Some(5));
637
638 let stream = event_log
640 .subscribe("test", SeekPosition::Beginning)
641 .await
642 .unwrap();
643 let events: Vec<_> = stream.take(5).collect().await;
644 assert_eq!(events.len(), 5);
645 }
646
647 #[tokio::test]
648 async fn test_event_bus_backward_compatible_default() {
649 let bus = EventBus::default();
651 assert!(bus.event_log().is_none());
652 assert_eq!(bus.receiver_count(), 0);
653
654 let receivers = bus.publish(FrameworkEvent::Entity(EntityEvent::Created {
656 entity_type: "order".to_string(),
657 entity_id: Uuid::new_v4(),
658 data: json!({}),
659 }));
660 assert_eq!(receivers, 0);
661 }
662}