Skip to main content

cuenv_events/
bus.rs

1//! Multi-subscriber `EventBus` for cuenv events.
2//!
3//! Provides a broadcast-capable event bus that allows multiple subscribers
4//! to receive events concurrently.
5
6use crate::event::CuenvEvent;
7use std::sync::Mutex;
8use tokio::sync::{broadcast, mpsc};
9
10/// Default channel capacity for the broadcast channel.
11const DEFAULT_BROADCAST_CAPACITY: usize = 1000;
12
13/// Multi-subscriber event bus.
14///
15/// Events sent to this bus are broadcast to all subscribers.
16/// Uses tokio's broadcast channel for fan-out delivery.
17#[derive(Debug)]
18pub struct EventBus {
19    /// Sender for submitting events (wrapped in Option for shutdown support).
20    /// When `shutdown()` is called, this is set to None, which drops the sender
21    /// and causes the forwarding task to exit gracefully.
22    sender: Mutex<Option<mpsc::UnboundedSender<CuenvEvent>>>,
23    /// Broadcast sender for fan-out.
24    broadcast_tx: broadcast::Sender<CuenvEvent>,
25}
26
27impl EventBus {
28    /// Create a new event bus.
29    ///
30    /// Spawns a background task to forward events from the mpsc channel
31    /// to the broadcast channel.
32    #[must_use]
33    pub fn new() -> Self {
34        Self::with_capacity(DEFAULT_BROADCAST_CAPACITY)
35    }
36
37    /// Create a new event bus with a specific broadcast capacity.
38    #[must_use]
39    pub fn with_capacity(capacity: usize) -> Self {
40        let (sender, mut receiver) = mpsc::unbounded_channel::<CuenvEvent>();
41        let (broadcast_tx, _) = broadcast::channel(capacity);
42
43        let broadcast_tx_clone = broadcast_tx.clone();
44        tokio::spawn(async move {
45            while let Some(event) = receiver.recv().await {
46                // Ignore send errors - they occur when there are no subscribers
47                let _ = broadcast_tx_clone.send(event);
48            }
49        });
50
51        Self {
52            sender: Mutex::new(Some(sender)),
53            broadcast_tx,
54        }
55    }
56
57    /// Get a sender for submitting events to the bus.
58    ///
59    /// Returns `None` if the bus has been shut down.
60    #[must_use]
61    pub fn sender(&self) -> Option<EventSender> {
62        self.sender
63            .lock()
64            .ok()
65            .and_then(|guard| guard.as_ref().map(|s| EventSender { inner: s.clone() }))
66    }
67
68    /// Shut down the event bus.
69    ///
70    /// This drops the internal sender, causing the forwarding task to exit.
71    /// After shutdown, no more events can be sent and `sender()` returns `None`.
72    ///
73    /// This method is safe to call multiple times.
74    pub fn shutdown(&self) {
75        if let Ok(mut guard) = self.sender.lock() {
76            // Take and drop the sender to signal the forwarding task to exit
77            let _ = guard.take();
78        }
79    }
80
81    /// Subscribe to events from this bus.
82    ///
83    /// Returns a receiver that will receive all events sent to the bus
84    /// after this subscription is created.
85    #[must_use]
86    pub fn subscribe(&self) -> EventReceiver {
87        EventReceiver {
88            inner: self.broadcast_tx.subscribe(),
89        }
90    }
91
92    /// Get the number of active subscribers.
93    #[must_use]
94    pub fn subscriber_count(&self) -> usize {
95        self.broadcast_tx.receiver_count()
96    }
97}
98
99impl Default for EventBus {
100    fn default() -> Self {
101        Self::new()
102    }
103}
104
105/// Sender handle for submitting events to an `EventBus`.
106#[derive(Debug, Clone)]
107pub struct EventSender {
108    inner: mpsc::UnboundedSender<CuenvEvent>,
109}
110
111impl EventSender {
112    /// Get the raw mpsc sender for use with the tracing layer.
113    ///
114    /// This is primarily used by `CuenvEventLayer` to send events directly.
115    #[must_use]
116    pub fn into_inner(self) -> mpsc::UnboundedSender<CuenvEvent> {
117        self.inner
118    }
119
120    /// Send an event to the bus.
121    ///
122    /// # Errors
123    ///
124    /// Returns an error if the bus has been dropped.
125    pub fn send(&self, event: CuenvEvent) -> Result<(), SendError> {
126        self.inner.send(event).map_err(|_| SendError::Closed)
127    }
128
129    /// Check if the bus is still open.
130    #[must_use]
131    pub fn is_closed(&self) -> bool {
132        self.inner.is_closed()
133    }
134}
135
136/// Receiver handle for receiving events from an `EventBus`.
137#[derive(Debug)]
138pub struct EventReceiver {
139    inner: broadcast::Receiver<CuenvEvent>,
140}
141
142impl EventReceiver {
143    /// Receive the next event.
144    ///
145    /// Returns `None` if the bus has been dropped.
146    /// May skip events if the receiver falls behind.
147    pub async fn recv(&mut self) -> Option<CuenvEvent> {
148        loop {
149            match self.inner.recv().await {
150                Ok(event) => return Some(event),
151                Err(broadcast::error::RecvError::Lagged(n)) => {
152                    tracing::warn!(skipped = n, "Event receiver lagged, skipped events");
153                }
154                Err(broadcast::error::RecvError::Closed) => return None,
155            }
156        }
157    }
158
159    /// Try to receive an event without waiting.
160    ///
161    /// Returns `None` if no event is immediately available or the bus is closed.
162    pub fn try_recv(&mut self) -> Option<CuenvEvent> {
163        loop {
164            match self.inner.try_recv() {
165                Ok(event) => return Some(event),
166                Err(broadcast::error::TryRecvError::Lagged(n)) => {
167                    tracing::warn!(skipped = n, "Event receiver lagged, skipped events");
168                }
169                Err(
170                    broadcast::error::TryRecvError::Empty | broadcast::error::TryRecvError::Closed,
171                ) => {
172                    return None;
173                }
174            }
175        }
176    }
177}
178
179/// Error returned when sending to a closed bus.
180#[derive(Debug, Clone, Copy, PartialEq, Eq)]
181pub enum SendError {
182    /// The event bus has been closed.
183    Closed,
184}
185
186impl std::fmt::Display for SendError {
187    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
188        match self {
189            Self::Closed => write!(f, "event bus is closed"),
190        }
191    }
192}
193
194impl std::error::Error for SendError {}
195
196#[cfg(test)]
197#[allow(clippy::similar_names)]
198mod tests {
199    use super::*;
200    use crate::event::{EventCategory, EventSource, OutputEvent};
201    use uuid::Uuid;
202
203    fn make_test_event() -> CuenvEvent {
204        CuenvEvent::new(
205            Uuid::new_v4(),
206            EventSource::new("cuenv::test"),
207            EventCategory::Output(OutputEvent::Stdout {
208                content: "test".to_string(),
209            }),
210        )
211    }
212
213    #[tokio::test]
214    async fn test_event_bus_creation() {
215        let bus = EventBus::new();
216        let sender = bus.sender().expect("sender should be available");
217        assert!(!sender.is_closed());
218    }
219
220    #[tokio::test]
221    async fn test_event_bus_send_receive() {
222        let bus = EventBus::new();
223        let sender = bus.sender().expect("sender should be available");
224        let mut receiver = bus.subscribe();
225
226        let event = make_test_event();
227        let event_id = event.id;
228
229        sender.send(event).unwrap();
230
231        let received = receiver.recv().await.unwrap();
232        assert_eq!(received.id, event_id);
233    }
234
235    #[tokio::test]
236    async fn test_event_bus_multiple_subscribers() {
237        let bus = EventBus::new();
238        let sender = bus.sender().expect("sender should be available");
239        let mut receiver1 = bus.subscribe();
240        let mut receiver2 = bus.subscribe();
241
242        assert_eq!(bus.subscriber_count(), 2);
243
244        let event = make_test_event();
245        let event_id = event.id;
246
247        sender.send(event).unwrap();
248
249        let received1 = receiver1.recv().await.unwrap();
250        let received2 = receiver2.recv().await.unwrap();
251
252        assert_eq!(received1.id, event_id);
253        assert_eq!(received2.id, event_id);
254    }
255
256    #[tokio::test]
257    async fn test_event_bus_sender_survives_bus_drop() {
258        // EventSender clones the underlying mpsc sender, so it remains valid
259        // even after the EventBus is dropped. This is intentional - senders
260        // are independent handles that can outlive the bus.
261        let sender = {
262            let bus = EventBus::new();
263            bus.sender().expect("sender should be available")
264        };
265
266        // Give time for the bus to be dropped
267        tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
268
269        let event = make_test_event();
270        let result = sender.send(event);
271        // Sender still works because it has its own clone of the channel
272        assert!(result.is_ok());
273    }
274
275    #[tokio::test]
276    async fn test_event_bus_with_capacity() {
277        let bus = EventBus::with_capacity(10);
278        let sender = bus.sender().expect("sender should be available");
279        assert!(!sender.is_closed());
280        assert_eq!(bus.subscriber_count(), 0);
281    }
282
283    #[tokio::test]
284    async fn test_event_bus_default() {
285        let bus = EventBus::default();
286        let sender = bus.sender().expect("sender should be available");
287        assert!(!sender.is_closed());
288    }
289
290    #[tokio::test]
291    async fn test_event_receiver_try_recv_empty() {
292        let bus = EventBus::new();
293        let _sender = bus.sender().expect("sender should be available");
294        let mut receiver = bus.subscribe();
295
296        // No events sent yet
297        let result = receiver.try_recv();
298        assert!(result.is_none());
299    }
300
301    #[tokio::test]
302    async fn test_event_receiver_try_recv_with_event() {
303        let bus = EventBus::new();
304        let sender = bus.sender().expect("sender should be available");
305        let mut receiver = bus.subscribe();
306
307        let event = make_test_event();
308        let event_id = event.id;
309        sender.send(event).unwrap();
310
311        // Give the background task time to process
312        tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
313
314        let result = receiver.try_recv();
315        assert!(result.is_some());
316        assert_eq!(result.unwrap().id, event_id);
317    }
318
319    #[test]
320    fn test_send_error_display() {
321        let err = SendError::Closed;
322        assert_eq!(format!("{err}"), "event bus is closed");
323    }
324
325    #[test]
326    fn test_send_error_equality() {
327        assert_eq!(SendError::Closed, SendError::Closed);
328    }
329
330    #[test]
331    fn test_send_error_debug() {
332        let err = SendError::Closed;
333        let debug_str = format!("{err:?}");
334        assert!(debug_str.contains("Closed"));
335    }
336
337    #[test]
338    fn test_send_error_is_error() {
339        let err = SendError::Closed;
340        let _: &dyn std::error::Error = &err;
341    }
342
343    #[tokio::test]
344    async fn test_event_sender_into_inner() {
345        let bus = EventBus::new();
346        let sender = bus.sender().expect("sender should be available");
347        let inner = sender.into_inner();
348        assert!(!inner.is_closed());
349    }
350
351    #[tokio::test]
352    async fn test_event_bus_debug() {
353        let bus = EventBus::new();
354        let debug_str = format!("{bus:?}");
355        assert!(debug_str.contains("EventBus"));
356    }
357
358    #[tokio::test]
359    async fn test_event_sender_debug() {
360        let bus = EventBus::new();
361        let sender = bus.sender().expect("sender should be available");
362        let debug_str = format!("{sender:?}");
363        assert!(debug_str.contains("EventSender"));
364    }
365
366    #[tokio::test]
367    async fn test_event_receiver_debug() {
368        let bus = EventBus::new();
369        let receiver = bus.subscribe();
370        let debug_str = format!("{receiver:?}");
371        assert!(debug_str.contains("EventReceiver"));
372    }
373
374    #[tokio::test]
375    async fn test_multiple_events_in_order() {
376        let bus = EventBus::new();
377        let sender = bus.sender().expect("sender should be available");
378        let mut receiver = bus.subscribe();
379
380        let event1 = make_test_event();
381        let event2 = make_test_event();
382        let event3 = make_test_event();
383
384        let id1 = event1.id;
385        let id2 = event2.id;
386        let id3 = event3.id;
387
388        sender.send(event1).unwrap();
389        sender.send(event2).unwrap();
390        sender.send(event3).unwrap();
391
392        let r1 = receiver.recv().await.unwrap();
393        let r2 = receiver.recv().await.unwrap();
394        let r3 = receiver.recv().await.unwrap();
395
396        assert_eq!(r1.id, id1);
397        assert_eq!(r2.id, id2);
398        assert_eq!(r3.id, id3);
399    }
400
401    #[tokio::test]
402    async fn test_sender_clone() {
403        let bus = EventBus::new();
404        let sender1 = bus.sender().expect("sender should be available");
405        let sender2 = sender1.clone();
406
407        let mut receiver = bus.subscribe();
408
409        let event1 = make_test_event();
410        let event2 = make_test_event();
411
412        let id1 = event1.id;
413        let id2 = event2.id;
414
415        sender1.send(event1).unwrap();
416        sender2.send(event2).unwrap();
417
418        let r1 = receiver.recv().await.unwrap();
419        let r2 = receiver.recv().await.unwrap();
420
421        assert_eq!(r1.id, id1);
422        assert_eq!(r2.id, id2);
423    }
424
425    #[tokio::test]
426    async fn test_subscriber_count_changes() {
427        let bus = EventBus::new();
428        assert_eq!(bus.subscriber_count(), 0);
429
430        let recv1 = bus.subscribe();
431        assert_eq!(bus.subscriber_count(), 1);
432
433        let recv2 = bus.subscribe();
434        assert_eq!(bus.subscriber_count(), 2);
435
436        drop(recv1);
437        assert_eq!(bus.subscriber_count(), 1);
438
439        drop(recv2);
440        assert_eq!(bus.subscriber_count(), 0);
441    }
442}