1use crate::event::CuenvEvent;
7use std::sync::Mutex;
8use tokio::sync::{broadcast, mpsc};
9
10const DEFAULT_BROADCAST_CAPACITY: usize = 1000;
12
13#[derive(Debug)]
18pub struct EventBus {
19 sender: Mutex<Option<mpsc::UnboundedSender<CuenvEvent>>>,
23 broadcast_tx: broadcast::Sender<CuenvEvent>,
25}
26
27impl EventBus {
28 #[must_use]
33 pub fn new() -> Self {
34 Self::with_capacity(DEFAULT_BROADCAST_CAPACITY)
35 }
36
37 #[must_use]
39 pub fn with_capacity(capacity: usize) -> Self {
40 let (sender, mut receiver) = mpsc::unbounded_channel::<CuenvEvent>();
41 let (broadcast_tx, _) = broadcast::channel(capacity);
42
43 let broadcast_tx_clone = broadcast_tx.clone();
44 tokio::spawn(async move {
45 while let Some(event) = receiver.recv().await {
46 let _ = broadcast_tx_clone.send(event);
48 }
49 });
50
51 Self {
52 sender: Mutex::new(Some(sender)),
53 broadcast_tx,
54 }
55 }
56
57 #[must_use]
61 pub fn sender(&self) -> Option<EventSender> {
62 self.sender
63 .lock()
64 .ok()
65 .and_then(|guard| guard.as_ref().map(|s| EventSender { inner: s.clone() }))
66 }
67
68 pub fn shutdown(&self) {
75 if let Ok(mut guard) = self.sender.lock() {
76 let _ = guard.take();
78 }
79 }
80
81 #[must_use]
86 pub fn subscribe(&self) -> EventReceiver {
87 EventReceiver {
88 inner: self.broadcast_tx.subscribe(),
89 }
90 }
91
92 #[must_use]
94 pub fn subscriber_count(&self) -> usize {
95 self.broadcast_tx.receiver_count()
96 }
97}
98
99impl Default for EventBus {
100 fn default() -> Self {
101 Self::new()
102 }
103}
104
105#[derive(Debug, Clone)]
107pub struct EventSender {
108 inner: mpsc::UnboundedSender<CuenvEvent>,
109}
110
111impl EventSender {
112 #[must_use]
116 pub fn into_inner(self) -> mpsc::UnboundedSender<CuenvEvent> {
117 self.inner
118 }
119
120 pub fn send(&self, event: CuenvEvent) -> Result<(), SendError> {
126 self.inner.send(event).map_err(|_| SendError::Closed)
127 }
128
129 #[must_use]
131 pub fn is_closed(&self) -> bool {
132 self.inner.is_closed()
133 }
134}
135
136#[derive(Debug)]
138pub struct EventReceiver {
139 inner: broadcast::Receiver<CuenvEvent>,
140}
141
142impl EventReceiver {
143 pub async fn recv(&mut self) -> Option<CuenvEvent> {
148 loop {
149 match self.inner.recv().await {
150 Ok(event) => return Some(event),
151 Err(broadcast::error::RecvError::Lagged(n)) => {
152 tracing::warn!(skipped = n, "Event receiver lagged, skipped events");
153 }
154 Err(broadcast::error::RecvError::Closed) => return None,
155 }
156 }
157 }
158
159 pub fn try_recv(&mut self) -> Option<CuenvEvent> {
163 loop {
164 match self.inner.try_recv() {
165 Ok(event) => return Some(event),
166 Err(broadcast::error::TryRecvError::Lagged(n)) => {
167 tracing::warn!(skipped = n, "Event receiver lagged, skipped events");
168 }
169 Err(
170 broadcast::error::TryRecvError::Empty | broadcast::error::TryRecvError::Closed,
171 ) => {
172 return None;
173 }
174 }
175 }
176 }
177}
178
179#[derive(Debug, Clone, Copy, PartialEq, Eq)]
181pub enum SendError {
182 Closed,
184}
185
186impl std::fmt::Display for SendError {
187 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
188 match self {
189 Self::Closed => write!(f, "event bus is closed"),
190 }
191 }
192}
193
194impl std::error::Error for SendError {}
195
196#[cfg(test)]
197#[allow(clippy::similar_names)]
198mod tests {
199 use super::*;
200 use crate::event::{EventCategory, EventSource, OutputEvent};
201 use uuid::Uuid;
202
203 fn make_test_event() -> CuenvEvent {
204 CuenvEvent::new(
205 Uuid::new_v4(),
206 EventSource::new("cuenv::test"),
207 EventCategory::Output(OutputEvent::Stdout {
208 content: "test".to_string(),
209 }),
210 )
211 }
212
213 #[tokio::test]
214 async fn test_event_bus_creation() {
215 let bus = EventBus::new();
216 let sender = bus.sender().expect("sender should be available");
217 assert!(!sender.is_closed());
218 }
219
220 #[tokio::test]
221 async fn test_event_bus_send_receive() {
222 let bus = EventBus::new();
223 let sender = bus.sender().expect("sender should be available");
224 let mut receiver = bus.subscribe();
225
226 let event = make_test_event();
227 let event_id = event.id;
228
229 sender.send(event).unwrap();
230
231 let received = receiver.recv().await.unwrap();
232 assert_eq!(received.id, event_id);
233 }
234
235 #[tokio::test]
236 async fn test_event_bus_multiple_subscribers() {
237 let bus = EventBus::new();
238 let sender = bus.sender().expect("sender should be available");
239 let mut receiver1 = bus.subscribe();
240 let mut receiver2 = bus.subscribe();
241
242 assert_eq!(bus.subscriber_count(), 2);
243
244 let event = make_test_event();
245 let event_id = event.id;
246
247 sender.send(event).unwrap();
248
249 let received1 = receiver1.recv().await.unwrap();
250 let received2 = receiver2.recv().await.unwrap();
251
252 assert_eq!(received1.id, event_id);
253 assert_eq!(received2.id, event_id);
254 }
255
256 #[tokio::test]
257 async fn test_event_bus_sender_survives_bus_drop() {
258 let sender = {
262 let bus = EventBus::new();
263 bus.sender().expect("sender should be available")
264 };
265
266 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
268
269 let event = make_test_event();
270 let result = sender.send(event);
271 assert!(result.is_ok());
273 }
274
275 #[tokio::test]
276 async fn test_event_bus_with_capacity() {
277 let bus = EventBus::with_capacity(10);
278 let sender = bus.sender().expect("sender should be available");
279 assert!(!sender.is_closed());
280 assert_eq!(bus.subscriber_count(), 0);
281 }
282
283 #[tokio::test]
284 async fn test_event_bus_default() {
285 let bus = EventBus::default();
286 let sender = bus.sender().expect("sender should be available");
287 assert!(!sender.is_closed());
288 }
289
290 #[tokio::test]
291 async fn test_event_receiver_try_recv_empty() {
292 let bus = EventBus::new();
293 let _sender = bus.sender().expect("sender should be available");
294 let mut receiver = bus.subscribe();
295
296 let result = receiver.try_recv();
298 assert!(result.is_none());
299 }
300
301 #[tokio::test]
302 async fn test_event_receiver_try_recv_with_event() {
303 let bus = EventBus::new();
304 let sender = bus.sender().expect("sender should be available");
305 let mut receiver = bus.subscribe();
306
307 let event = make_test_event();
308 let event_id = event.id;
309 sender.send(event).unwrap();
310
311 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
313
314 let result = receiver.try_recv();
315 assert!(result.is_some());
316 assert_eq!(result.unwrap().id, event_id);
317 }
318
319 #[test]
320 fn test_send_error_display() {
321 let err = SendError::Closed;
322 assert_eq!(format!("{err}"), "event bus is closed");
323 }
324
325 #[test]
326 fn test_send_error_equality() {
327 assert_eq!(SendError::Closed, SendError::Closed);
328 }
329
330 #[test]
331 fn test_send_error_debug() {
332 let err = SendError::Closed;
333 let debug_str = format!("{err:?}");
334 assert!(debug_str.contains("Closed"));
335 }
336
337 #[test]
338 fn test_send_error_is_error() {
339 let err = SendError::Closed;
340 let _: &dyn std::error::Error = &err;
341 }
342
343 #[tokio::test]
344 async fn test_event_sender_into_inner() {
345 let bus = EventBus::new();
346 let sender = bus.sender().expect("sender should be available");
347 let inner = sender.into_inner();
348 assert!(!inner.is_closed());
349 }
350
351 #[tokio::test]
352 async fn test_event_bus_debug() {
353 let bus = EventBus::new();
354 let debug_str = format!("{bus:?}");
355 assert!(debug_str.contains("EventBus"));
356 }
357
358 #[tokio::test]
359 async fn test_event_sender_debug() {
360 let bus = EventBus::new();
361 let sender = bus.sender().expect("sender should be available");
362 let debug_str = format!("{sender:?}");
363 assert!(debug_str.contains("EventSender"));
364 }
365
366 #[tokio::test]
367 async fn test_event_receiver_debug() {
368 let bus = EventBus::new();
369 let receiver = bus.subscribe();
370 let debug_str = format!("{receiver:?}");
371 assert!(debug_str.contains("EventReceiver"));
372 }
373
374 #[tokio::test]
375 async fn test_multiple_events_in_order() {
376 let bus = EventBus::new();
377 let sender = bus.sender().expect("sender should be available");
378 let mut receiver = bus.subscribe();
379
380 let event1 = make_test_event();
381 let event2 = make_test_event();
382 let event3 = make_test_event();
383
384 let id1 = event1.id;
385 let id2 = event2.id;
386 let id3 = event3.id;
387
388 sender.send(event1).unwrap();
389 sender.send(event2).unwrap();
390 sender.send(event3).unwrap();
391
392 let r1 = receiver.recv().await.unwrap();
393 let r2 = receiver.recv().await.unwrap();
394 let r3 = receiver.recv().await.unwrap();
395
396 assert_eq!(r1.id, id1);
397 assert_eq!(r2.id, id2);
398 assert_eq!(r3.id, id3);
399 }
400
401 #[tokio::test]
402 async fn test_sender_clone() {
403 let bus = EventBus::new();
404 let sender1 = bus.sender().expect("sender should be available");
405 let sender2 = sender1.clone();
406
407 let mut receiver = bus.subscribe();
408
409 let event1 = make_test_event();
410 let event2 = make_test_event();
411
412 let id1 = event1.id;
413 let id2 = event2.id;
414
415 sender1.send(event1).unwrap();
416 sender2.send(event2).unwrap();
417
418 let r1 = receiver.recv().await.unwrap();
419 let r2 = receiver.recv().await.unwrap();
420
421 assert_eq!(r1.id, id1);
422 assert_eq!(r2.id, id2);
423 }
424
425 #[tokio::test]
426 async fn test_subscriber_count_changes() {
427 let bus = EventBus::new();
428 assert_eq!(bus.subscriber_count(), 0);
429
430 let recv1 = bus.subscribe();
431 assert_eq!(bus.subscriber_count(), 1);
432
433 let recv2 = bus.subscribe();
434 assert_eq!(bus.subscriber_count(), 2);
435
436 drop(recv1);
437 assert_eq!(bus.subscriber_count(), 1);
438
439 drop(recv2);
440 assert_eq!(bus.subscriber_count(), 0);
441 }
442}