1use fluers_core::{EventSink, RunEvent};
13use tokio::sync::broadcast;
14
15pub type Event = RunEvent;
20
21#[derive(Clone)]
28pub struct EventBus {
29 sender: broadcast::Sender<RunEvent>,
30}
31
32impl EventBus {
33 pub const DEFAULT_CAPACITY: usize = 256;
35
36 #[must_use]
41 pub fn new(capacity: usize) -> Self {
42 let capacity = capacity.max(1);
43 let (sender, _) = broadcast::channel(capacity);
44 Self { sender }
45 }
46
47 #[must_use]
49 pub fn new_default() -> Self {
50 Self::new(Self::DEFAULT_CAPACITY)
51 }
52
53 #[must_use]
58 pub fn subscribe(&self) -> broadcast::Receiver<RunEvent> {
59 self.sender.subscribe()
60 }
61
62 pub fn emit(&self, event: RunEvent) -> bool {
68 self.sender.send(event).is_ok()
69 }
70}
71
72impl EventSink for EventBus {
75 fn emit(&self, event: RunEvent) {
76 let _ = EventBus::emit(self, event);
78 }
79}
80
81impl Default for EventBus {
82 fn default() -> Self {
83 Self::new_default()
84 }
85}
86
87#[cfg(test)]
88mod tests {
89 use std::time::Duration;
90
91 use tokio::time::timeout;
92
93 use super::EventBus;
94 use fluers_core::{EventSink, RunEvent};
95 use uuid::Uuid;
96
97 const TEST_TIMEOUT: Duration = Duration::from_secs(2);
98
99 type TestResult = Result<(), Box<dyn std::error::Error + Send + Sync>>;
100
101 #[tokio::test]
102 async fn emit_delivers_to_subscriber() -> TestResult {
103 let bus = EventBus::new(16);
104 let session = Uuid::nil();
105 let mut receiver = bus.subscribe();
106
107 let handle = tokio::spawn(async move { receiver.recv().await.ok() });
108
109 assert!(bus.emit(RunEvent::SessionStarted { session }));
110
111 let received = timeout(TEST_TIMEOUT, handle).await??;
112 assert!(matches!(
113 received,
114 Some(RunEvent::SessionStarted { session: s }) if s == session
115 ));
116
117 Ok(())
118 }
119
120 #[tokio::test]
121 async fn event_sink_trait_delegates_to_emit() -> TestResult {
122 let bus = EventBus::new(16);
123 let session = Uuid::nil();
124 let mut receiver = bus.subscribe();
125
126 EventSink::emit(&bus, RunEvent::SessionStarted { session });
128
129 let received = timeout(TEST_TIMEOUT, receiver.recv()).await?;
130 assert!(matches!(
131 received,
132 Ok(RunEvent::SessionStarted { session: s }) if s == session
133 ));
134 Ok(())
135 }
136
137 #[tokio::test]
138 async fn emit_with_no_receivers_returns_false() {
139 let bus = EventBus::new(16);
140 let session = Uuid::nil();
141
142 assert!(!bus.emit(RunEvent::SessionStarted { session }));
143 }
144
145 #[tokio::test]
146 async fn multiple_subscribers_each_receive() -> TestResult {
147 let bus = EventBus::new(16);
148 let session = Uuid::nil();
149 let mut first = bus.subscribe();
150 let mut second = bus.subscribe();
151
152 assert!(bus.emit(RunEvent::TurnStarted { session, turn: 1 }));
153
154 let first_event = timeout(TEST_TIMEOUT, first.recv()).await?;
155 let second_event = timeout(TEST_TIMEOUT, second.recv()).await?;
156
157 assert!(matches!(
158 first_event,
159 Ok(RunEvent::TurnStarted { session: s, turn: 1 }) if s == session
160 ));
161 assert!(matches!(
162 second_event,
163 Ok(RunEvent::TurnStarted { session: s, turn: 1 }) if s == session
164 ));
165
166 Ok(())
167 }
168}