somatize_runtime/
event_bus.rs1use somatize_core::event::Event;
7use tokio::sync::broadcast;
8
9pub struct EventBus {
14 sender: broadcast::Sender<Event>,
15}
16
17impl EventBus {
18 pub fn new(capacity: usize) -> Self {
20 let (sender, _) = broadcast::channel(capacity);
21 Self { sender }
22 }
23
24 pub fn emit(&self, event: Event) -> usize {
28 self.sender.send(event).unwrap_or(0)
29 }
30
31 pub fn subscribe(&self) -> broadcast::Receiver<Event> {
33 self.sender.subscribe()
34 }
35
36 pub fn subscriber_count(&self) -> usize {
38 self.sender.receiver_count()
39 }
40}
41
42impl Default for EventBus {
43 fn default() -> Self {
44 Self::new(256)
45 }
46}
47
48#[cfg(test)]
49mod tests {
50 use super::*;
51 use somatize_core::event::PlanSummary;
52 use std::time::Duration;
53
54 #[tokio::test]
55 async fn emit_without_subscribers_succeeds() {
56 let bus = EventBus::new(16);
57 let count = bus.emit(Event::RunStarted {
58 run_id: "r1".into(),
59 plan_summary: PlanSummary {
60 total_nodes: 1,
61 cached_nodes: 0,
62 parallel_branches: 0,
63 },
64 });
65 assert_eq!(count, 0);
66 }
67
68 #[tokio::test]
69 async fn subscriber_receives_events() {
70 let bus = EventBus::new(16);
71 let mut rx = bus.subscribe();
72
73 bus.emit(Event::RunStarted {
74 run_id: "r1".into(),
75 plan_summary: PlanSummary {
76 total_nodes: 2,
77 cached_nodes: 0,
78 parallel_branches: 0,
79 },
80 });
81 bus.emit(Event::RunCompleted {
82 run_id: "r1".into(),
83 duration: Duration::from_millis(100),
84 });
85
86 let e1 = rx.recv().await.unwrap();
87 assert!(matches!(e1, Event::RunStarted { .. }));
88
89 let e2 = rx.recv().await.unwrap();
90 assert!(matches!(e2, Event::RunCompleted { .. }));
91 }
92
93 #[tokio::test]
94 async fn multiple_subscribers() {
95 let bus = EventBus::new(16);
96 let mut rx1 = bus.subscribe();
97 let mut rx2 = bus.subscribe();
98
99 assert_eq!(bus.subscriber_count(), 2);
100
101 bus.emit(Event::RunCompleted {
102 run_id: "r1".into(),
103 duration: Duration::from_secs(1),
104 });
105
106 let e1 = rx1.recv().await.unwrap();
107 let e2 = rx2.recv().await.unwrap();
108 assert!(matches!(e1, Event::RunCompleted { .. }));
109 assert!(matches!(e2, Event::RunCompleted { .. }));
110 }
111
112 #[tokio::test]
113 async fn subscriber_after_emit_misses_earlier_events() {
114 let bus = EventBus::new(16);
115 bus.emit(Event::RunCompleted {
116 run_id: "r1".into(),
117 duration: Duration::from_secs(1),
118 });
119
120 let mut rx = bus.subscribe();
121 bus.emit(Event::RunCompleted {
122 run_id: "r2".into(),
123 duration: Duration::from_secs(2),
124 });
125
126 let event = rx.recv().await.unwrap();
127 if let Event::RunCompleted { run_id, .. } = event {
128 assert_eq!(run_id, "r2"); } else {
130 panic!("wrong event type");
131 }
132 }
133}