1use crate::event::CuenvEvent;
7use tokio::sync::{broadcast, mpsc};
8
9const DEFAULT_BROADCAST_CAPACITY: usize = 1000;
11
12#[derive(Debug)]
17pub struct EventBus {
18 sender: mpsc::UnboundedSender<CuenvEvent>,
20 broadcast_tx: broadcast::Sender<CuenvEvent>,
22}
23
24impl EventBus {
25 #[must_use]
30 pub fn new() -> Self {
31 Self::with_capacity(DEFAULT_BROADCAST_CAPACITY)
32 }
33
34 #[must_use]
36 pub fn with_capacity(capacity: usize) -> Self {
37 let (sender, mut receiver) = mpsc::unbounded_channel::<CuenvEvent>();
38 let (broadcast_tx, _) = broadcast::channel(capacity);
39
40 let broadcast_tx_clone = broadcast_tx.clone();
41 tokio::spawn(async move {
42 while let Some(event) = receiver.recv().await {
43 let _ = broadcast_tx_clone.send(event);
45 }
46 });
47
48 Self {
49 sender,
50 broadcast_tx,
51 }
52 }
53
54 #[must_use]
56 pub fn sender(&self) -> EventSender {
57 EventSender {
58 inner: self.sender.clone(),
59 }
60 }
61
62 #[must_use]
67 pub fn subscribe(&self) -> EventReceiver {
68 EventReceiver {
69 inner: self.broadcast_tx.subscribe(),
70 }
71 }
72
73 #[must_use]
75 pub fn subscriber_count(&self) -> usize {
76 self.broadcast_tx.receiver_count()
77 }
78}
79
80impl Default for EventBus {
81 fn default() -> Self {
82 Self::new()
83 }
84}
85
86#[derive(Debug, Clone)]
88pub struct EventSender {
89 inner: mpsc::UnboundedSender<CuenvEvent>,
90}
91
92impl EventSender {
93 #[must_use]
97 pub fn into_inner(self) -> mpsc::UnboundedSender<CuenvEvent> {
98 self.inner
99 }
100
101 pub fn send(&self, event: CuenvEvent) -> Result<(), SendError> {
107 self.inner.send(event).map_err(|_| SendError::Closed)
108 }
109
110 #[must_use]
112 pub fn is_closed(&self) -> bool {
113 self.inner.is_closed()
114 }
115}
116
117#[derive(Debug)]
119pub struct EventReceiver {
120 inner: broadcast::Receiver<CuenvEvent>,
121}
122
123impl EventReceiver {
124 pub async fn recv(&mut self) -> Option<CuenvEvent> {
129 loop {
130 match self.inner.recv().await {
131 Ok(event) => return Some(event),
132 Err(broadcast::error::RecvError::Lagged(n)) => {
133 tracing::warn!(skipped = n, "Event receiver lagged, skipped events");
134 }
135 Err(broadcast::error::RecvError::Closed) => return None,
136 }
137 }
138 }
139
140 pub fn try_recv(&mut self) -> Option<CuenvEvent> {
144 loop {
145 match self.inner.try_recv() {
146 Ok(event) => return Some(event),
147 Err(broadcast::error::TryRecvError::Lagged(n)) => {
148 tracing::warn!(skipped = n, "Event receiver lagged, skipped events");
149 }
150 Err(
151 broadcast::error::TryRecvError::Empty | broadcast::error::TryRecvError::Closed,
152 ) => {
153 return None;
154 }
155 }
156 }
157 }
158}
159
160#[derive(Debug, Clone, Copy, PartialEq, Eq)]
162pub enum SendError {
163 Closed,
165}
166
167impl std::fmt::Display for SendError {
168 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
169 match self {
170 Self::Closed => write!(f, "event bus is closed"),
171 }
172 }
173}
174
175impl std::error::Error for SendError {}
176
177#[cfg(test)]
178#[allow(clippy::similar_names)]
179mod tests {
180 use super::*;
181 use crate::event::{EventCategory, EventSource, OutputEvent};
182 use uuid::Uuid;
183
184 fn make_test_event() -> CuenvEvent {
185 CuenvEvent::new(
186 Uuid::new_v4(),
187 EventSource::new("cuenv::test"),
188 EventCategory::Output(OutputEvent::Stdout {
189 content: "test".to_string(),
190 }),
191 )
192 }
193
194 #[tokio::test]
195 async fn test_event_bus_creation() {
196 let bus = EventBus::new();
197 let sender = bus.sender();
198 assert!(!sender.is_closed());
199 }
200
201 #[tokio::test]
202 async fn test_event_bus_send_receive() {
203 let bus = EventBus::new();
204 let sender = bus.sender();
205 let mut receiver = bus.subscribe();
206
207 let event = make_test_event();
208 let event_id = event.id;
209
210 sender.send(event).unwrap();
211
212 let received = receiver.recv().await.unwrap();
213 assert_eq!(received.id, event_id);
214 }
215
216 #[tokio::test]
217 async fn test_event_bus_multiple_subscribers() {
218 let bus = EventBus::new();
219 let sender = bus.sender();
220 let mut receiver1 = bus.subscribe();
221 let mut receiver2 = bus.subscribe();
222
223 assert_eq!(bus.subscriber_count(), 2);
224
225 let event = make_test_event();
226 let event_id = event.id;
227
228 sender.send(event).unwrap();
229
230 let received1 = receiver1.recv().await.unwrap();
231 let received2 = receiver2.recv().await.unwrap();
232
233 assert_eq!(received1.id, event_id);
234 assert_eq!(received2.id, event_id);
235 }
236
237 #[tokio::test]
238 async fn test_event_bus_sender_survives_bus_drop() {
239 let sender = {
243 let bus = EventBus::new();
244 bus.sender()
245 };
246
247 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
249
250 let event = make_test_event();
251 let result = sender.send(event);
252 assert!(result.is_ok());
254 }
255
256 #[tokio::test]
257 async fn test_event_bus_with_capacity() {
258 let bus = EventBus::with_capacity(10);
259 let sender = bus.sender();
260 assert!(!sender.is_closed());
261 assert_eq!(bus.subscriber_count(), 0);
262 }
263
264 #[tokio::test]
265 async fn test_event_bus_default() {
266 let bus = EventBus::default();
267 let sender = bus.sender();
268 assert!(!sender.is_closed());
269 }
270
271 #[tokio::test]
272 async fn test_event_receiver_try_recv_empty() {
273 let bus = EventBus::new();
274 let _sender = bus.sender();
275 let mut receiver = bus.subscribe();
276
277 let result = receiver.try_recv();
279 assert!(result.is_none());
280 }
281
282 #[tokio::test]
283 async fn test_event_receiver_try_recv_with_event() {
284 let bus = EventBus::new();
285 let sender = bus.sender();
286 let mut receiver = bus.subscribe();
287
288 let event = make_test_event();
289 let event_id = event.id;
290 sender.send(event).unwrap();
291
292 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
294
295 let result = receiver.try_recv();
296 assert!(result.is_some());
297 assert_eq!(result.unwrap().id, event_id);
298 }
299
300 #[test]
301 fn test_send_error_display() {
302 let err = SendError::Closed;
303 assert_eq!(format!("{err}"), "event bus is closed");
304 }
305
306 #[test]
307 fn test_send_error_equality() {
308 assert_eq!(SendError::Closed, SendError::Closed);
309 }
310
311 #[test]
312 fn test_send_error_debug() {
313 let err = SendError::Closed;
314 let debug_str = format!("{err:?}");
315 assert!(debug_str.contains("Closed"));
316 }
317
318 #[test]
319 fn test_send_error_is_error() {
320 let err = SendError::Closed;
321 let _: &dyn std::error::Error = &err;
322 }
323
324 #[tokio::test]
325 async fn test_event_sender_into_inner() {
326 let bus = EventBus::new();
327 let sender = bus.sender();
328 let inner = sender.into_inner();
329 assert!(!inner.is_closed());
330 }
331
332 #[tokio::test]
333 async fn test_event_bus_debug() {
334 let bus = EventBus::new();
335 let debug_str = format!("{bus:?}");
336 assert!(debug_str.contains("EventBus"));
337 }
338
339 #[tokio::test]
340 async fn test_event_sender_debug() {
341 let bus = EventBus::new();
342 let sender = bus.sender();
343 let debug_str = format!("{sender:?}");
344 assert!(debug_str.contains("EventSender"));
345 }
346
347 #[tokio::test]
348 async fn test_event_receiver_debug() {
349 let bus = EventBus::new();
350 let receiver = bus.subscribe();
351 let debug_str = format!("{receiver:?}");
352 assert!(debug_str.contains("EventReceiver"));
353 }
354
355 #[tokio::test]
356 async fn test_multiple_events_in_order() {
357 let bus = EventBus::new();
358 let sender = bus.sender();
359 let mut receiver = bus.subscribe();
360
361 let event1 = make_test_event();
362 let event2 = make_test_event();
363 let event3 = make_test_event();
364
365 let id1 = event1.id;
366 let id2 = event2.id;
367 let id3 = event3.id;
368
369 sender.send(event1).unwrap();
370 sender.send(event2).unwrap();
371 sender.send(event3).unwrap();
372
373 let r1 = receiver.recv().await.unwrap();
374 let r2 = receiver.recv().await.unwrap();
375 let r3 = receiver.recv().await.unwrap();
376
377 assert_eq!(r1.id, id1);
378 assert_eq!(r2.id, id2);
379 assert_eq!(r3.id, id3);
380 }
381
382 #[tokio::test]
383 async fn test_sender_clone() {
384 let bus = EventBus::new();
385 let sender1 = bus.sender();
386 let sender2 = sender1.clone();
387
388 let mut receiver = bus.subscribe();
389
390 let event1 = make_test_event();
391 let event2 = make_test_event();
392
393 let id1 = event1.id;
394 let id2 = event2.id;
395
396 sender1.send(event1).unwrap();
397 sender2.send(event2).unwrap();
398
399 let r1 = receiver.recv().await.unwrap();
400 let r2 = receiver.recv().await.unwrap();
401
402 assert_eq!(r1.id, id1);
403 assert_eq!(r2.id, id2);
404 }
405
406 #[tokio::test]
407 async fn test_subscriber_count_changes() {
408 let bus = EventBus::new();
409 assert_eq!(bus.subscriber_count(), 0);
410
411 let recv1 = bus.subscribe();
412 assert_eq!(bus.subscriber_count(), 1);
413
414 let recv2 = bus.subscribe();
415 assert_eq!(bus.subscriber_count(), 2);
416
417 drop(recv1);
418 assert_eq!(bus.subscriber_count(), 1);
419
420 drop(recv2);
421 assert_eq!(bus.subscriber_count(), 0);
422 }
423}