cuenv_events/
bus.rs

1//! Multi-subscriber `EventBus` for cuenv events.
2//!
3//! Provides a broadcast-capable event bus that allows multiple subscribers
4//! to receive events concurrently.
5
6use crate::event::CuenvEvent;
7use tokio::sync::{broadcast, mpsc};
8
9/// Default channel capacity for the broadcast channel.
10const DEFAULT_BROADCAST_CAPACITY: usize = 1000;
11
12/// Multi-subscriber event bus.
13///
14/// Events sent to this bus are broadcast to all subscribers.
15/// Uses tokio's broadcast channel for fan-out delivery.
16#[derive(Debug)]
17pub struct EventBus {
18    /// Sender for submitting events.
19    sender: mpsc::UnboundedSender<CuenvEvent>,
20    /// Broadcast sender for fan-out.
21    broadcast_tx: broadcast::Sender<CuenvEvent>,
22}
23
24impl EventBus {
25    /// Create a new event bus.
26    ///
27    /// Spawns a background task to forward events from the mpsc channel
28    /// to the broadcast channel.
29    #[must_use]
30    pub fn new() -> Self {
31        Self::with_capacity(DEFAULT_BROADCAST_CAPACITY)
32    }
33
34    /// Create a new event bus with a specific broadcast capacity.
35    #[must_use]
36    pub fn with_capacity(capacity: usize) -> Self {
37        let (sender, mut receiver) = mpsc::unbounded_channel::<CuenvEvent>();
38        let (broadcast_tx, _) = broadcast::channel(capacity);
39
40        let broadcast_tx_clone = broadcast_tx.clone();
41        tokio::spawn(async move {
42            while let Some(event) = receiver.recv().await {
43                // Ignore send errors - they occur when there are no subscribers
44                let _ = broadcast_tx_clone.send(event);
45            }
46        });
47
48        Self {
49            sender,
50            broadcast_tx,
51        }
52    }
53
54    /// Get a sender for submitting events to the bus.
55    #[must_use]
56    pub fn sender(&self) -> EventSender {
57        EventSender {
58            inner: self.sender.clone(),
59        }
60    }
61
62    /// Subscribe to events from this bus.
63    ///
64    /// Returns a receiver that will receive all events sent to the bus
65    /// after this subscription is created.
66    #[must_use]
67    pub fn subscribe(&self) -> EventReceiver {
68        EventReceiver {
69            inner: self.broadcast_tx.subscribe(),
70        }
71    }
72
73    /// Get the number of active subscribers.
74    #[must_use]
75    pub fn subscriber_count(&self) -> usize {
76        self.broadcast_tx.receiver_count()
77    }
78}
79
80impl Default for EventBus {
81    fn default() -> Self {
82        Self::new()
83    }
84}
85
86/// Sender handle for submitting events to an `EventBus`.
87#[derive(Debug, Clone)]
88pub struct EventSender {
89    inner: mpsc::UnboundedSender<CuenvEvent>,
90}
91
92impl EventSender {
93    /// Get the raw mpsc sender for use with the tracing layer.
94    ///
95    /// This is primarily used by `CuenvEventLayer` to send events directly.
96    #[must_use]
97    pub fn into_inner(self) -> mpsc::UnboundedSender<CuenvEvent> {
98        self.inner
99    }
100
101    /// Send an event to the bus.
102    ///
103    /// # Errors
104    ///
105    /// Returns an error if the bus has been dropped.
106    pub fn send(&self, event: CuenvEvent) -> Result<(), SendError> {
107        self.inner.send(event).map_err(|_| SendError::Closed)
108    }
109
110    /// Check if the bus is still open.
111    #[must_use]
112    pub fn is_closed(&self) -> bool {
113        self.inner.is_closed()
114    }
115}
116
117/// Receiver handle for receiving events from an `EventBus`.
118#[derive(Debug)]
119pub struct EventReceiver {
120    inner: broadcast::Receiver<CuenvEvent>,
121}
122
123impl EventReceiver {
124    /// Receive the next event.
125    ///
126    /// Returns `None` if the bus has been dropped.
127    /// May skip events if the receiver falls behind.
128    pub async fn recv(&mut self) -> Option<CuenvEvent> {
129        loop {
130            match self.inner.recv().await {
131                Ok(event) => return Some(event),
132                Err(broadcast::error::RecvError::Lagged(n)) => {
133                    tracing::warn!(skipped = n, "Event receiver lagged, skipped events");
134                }
135                Err(broadcast::error::RecvError::Closed) => return None,
136            }
137        }
138    }
139
140    /// Try to receive an event without waiting.
141    ///
142    /// Returns `None` if no event is immediately available or the bus is closed.
143    pub fn try_recv(&mut self) -> Option<CuenvEvent> {
144        loop {
145            match self.inner.try_recv() {
146                Ok(event) => return Some(event),
147                Err(broadcast::error::TryRecvError::Lagged(n)) => {
148                    tracing::warn!(skipped = n, "Event receiver lagged, skipped events");
149                }
150                Err(
151                    broadcast::error::TryRecvError::Empty | broadcast::error::TryRecvError::Closed,
152                ) => {
153                    return None;
154                }
155            }
156        }
157    }
158}
159
160/// Error returned when sending to a closed bus.
161#[derive(Debug, Clone, Copy, PartialEq, Eq)]
162pub enum SendError {
163    /// The event bus has been closed.
164    Closed,
165}
166
167impl std::fmt::Display for SendError {
168    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
169        match self {
170            Self::Closed => write!(f, "event bus is closed"),
171        }
172    }
173}
174
175impl std::error::Error for SendError {}
176
177#[cfg(test)]
178#[allow(clippy::similar_names)]
179mod tests {
180    use super::*;
181    use crate::event::{EventCategory, EventSource, OutputEvent};
182    use uuid::Uuid;
183
184    fn make_test_event() -> CuenvEvent {
185        CuenvEvent::new(
186            Uuid::new_v4(),
187            EventSource::new("cuenv::test"),
188            EventCategory::Output(OutputEvent::Stdout {
189                content: "test".to_string(),
190            }),
191        )
192    }
193
194    #[tokio::test]
195    async fn test_event_bus_creation() {
196        let bus = EventBus::new();
197        let sender = bus.sender();
198        assert!(!sender.is_closed());
199    }
200
201    #[tokio::test]
202    async fn test_event_bus_send_receive() {
203        let bus = EventBus::new();
204        let sender = bus.sender();
205        let mut receiver = bus.subscribe();
206
207        let event = make_test_event();
208        let event_id = event.id;
209
210        sender.send(event).unwrap();
211
212        let received = receiver.recv().await.unwrap();
213        assert_eq!(received.id, event_id);
214    }
215
216    #[tokio::test]
217    async fn test_event_bus_multiple_subscribers() {
218        let bus = EventBus::new();
219        let sender = bus.sender();
220        let mut receiver1 = bus.subscribe();
221        let mut receiver2 = bus.subscribe();
222
223        assert_eq!(bus.subscriber_count(), 2);
224
225        let event = make_test_event();
226        let event_id = event.id;
227
228        sender.send(event).unwrap();
229
230        let received1 = receiver1.recv().await.unwrap();
231        let received2 = receiver2.recv().await.unwrap();
232
233        assert_eq!(received1.id, event_id);
234        assert_eq!(received2.id, event_id);
235    }
236
237    #[tokio::test]
238    async fn test_event_bus_sender_survives_bus_drop() {
239        // EventSender clones the underlying mpsc sender, so it remains valid
240        // even after the EventBus is dropped. This is intentional - senders
241        // are independent handles that can outlive the bus.
242        let sender = {
243            let bus = EventBus::new();
244            bus.sender()
245        };
246
247        // Give time for the bus to be dropped
248        tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
249
250        let event = make_test_event();
251        let result = sender.send(event);
252        // Sender still works because it has its own clone of the channel
253        assert!(result.is_ok());
254    }
255
256    #[tokio::test]
257    async fn test_event_bus_with_capacity() {
258        let bus = EventBus::with_capacity(10);
259        let sender = bus.sender();
260        assert!(!sender.is_closed());
261        assert_eq!(bus.subscriber_count(), 0);
262    }
263
264    #[tokio::test]
265    async fn test_event_bus_default() {
266        let bus = EventBus::default();
267        let sender = bus.sender();
268        assert!(!sender.is_closed());
269    }
270
271    #[tokio::test]
272    async fn test_event_receiver_try_recv_empty() {
273        let bus = EventBus::new();
274        let _sender = bus.sender();
275        let mut receiver = bus.subscribe();
276
277        // No events sent yet
278        let result = receiver.try_recv();
279        assert!(result.is_none());
280    }
281
282    #[tokio::test]
283    async fn test_event_receiver_try_recv_with_event() {
284        let bus = EventBus::new();
285        let sender = bus.sender();
286        let mut receiver = bus.subscribe();
287
288        let event = make_test_event();
289        let event_id = event.id;
290        sender.send(event).unwrap();
291
292        // Give the background task time to process
293        tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
294
295        let result = receiver.try_recv();
296        assert!(result.is_some());
297        assert_eq!(result.unwrap().id, event_id);
298    }
299
300    #[test]
301    fn test_send_error_display() {
302        let err = SendError::Closed;
303        assert_eq!(format!("{err}"), "event bus is closed");
304    }
305
306    #[test]
307    fn test_send_error_equality() {
308        assert_eq!(SendError::Closed, SendError::Closed);
309    }
310
311    #[test]
312    fn test_send_error_debug() {
313        let err = SendError::Closed;
314        let debug_str = format!("{err:?}");
315        assert!(debug_str.contains("Closed"));
316    }
317
318    #[test]
319    fn test_send_error_is_error() {
320        let err = SendError::Closed;
321        let _: &dyn std::error::Error = &err;
322    }
323
324    #[tokio::test]
325    async fn test_event_sender_into_inner() {
326        let bus = EventBus::new();
327        let sender = bus.sender();
328        let inner = sender.into_inner();
329        assert!(!inner.is_closed());
330    }
331
332    #[tokio::test]
333    async fn test_event_bus_debug() {
334        let bus = EventBus::new();
335        let debug_str = format!("{bus:?}");
336        assert!(debug_str.contains("EventBus"));
337    }
338
339    #[tokio::test]
340    async fn test_event_sender_debug() {
341        let bus = EventBus::new();
342        let sender = bus.sender();
343        let debug_str = format!("{sender:?}");
344        assert!(debug_str.contains("EventSender"));
345    }
346
347    #[tokio::test]
348    async fn test_event_receiver_debug() {
349        let bus = EventBus::new();
350        let receiver = bus.subscribe();
351        let debug_str = format!("{receiver:?}");
352        assert!(debug_str.contains("EventReceiver"));
353    }
354
355    #[tokio::test]
356    async fn test_multiple_events_in_order() {
357        let bus = EventBus::new();
358        let sender = bus.sender();
359        let mut receiver = bus.subscribe();
360
361        let event1 = make_test_event();
362        let event2 = make_test_event();
363        let event3 = make_test_event();
364
365        let id1 = event1.id;
366        let id2 = event2.id;
367        let id3 = event3.id;
368
369        sender.send(event1).unwrap();
370        sender.send(event2).unwrap();
371        sender.send(event3).unwrap();
372
373        let r1 = receiver.recv().await.unwrap();
374        let r2 = receiver.recv().await.unwrap();
375        let r3 = receiver.recv().await.unwrap();
376
377        assert_eq!(r1.id, id1);
378        assert_eq!(r2.id, id2);
379        assert_eq!(r3.id, id3);
380    }
381
382    #[tokio::test]
383    async fn test_sender_clone() {
384        let bus = EventBus::new();
385        let sender1 = bus.sender();
386        let sender2 = sender1.clone();
387
388        let mut receiver = bus.subscribe();
389
390        let event1 = make_test_event();
391        let event2 = make_test_event();
392
393        let id1 = event1.id;
394        let id2 = event2.id;
395
396        sender1.send(event1).unwrap();
397        sender2.send(event2).unwrap();
398
399        let r1 = receiver.recv().await.unwrap();
400        let r2 = receiver.recv().await.unwrap();
401
402        assert_eq!(r1.id, id1);
403        assert_eq!(r2.id, id2);
404    }
405
406    #[tokio::test]
407    async fn test_subscriber_count_changes() {
408        let bus = EventBus::new();
409        assert_eq!(bus.subscriber_count(), 0);
410
411        let recv1 = bus.subscribe();
412        assert_eq!(bus.subscriber_count(), 1);
413
414        let recv2 = bus.subscribe();
415        assert_eq!(bus.subscriber_count(), 2);
416
417        drop(recv1);
418        assert_eq!(bus.subscriber_count(), 1);
419
420        drop(recv2);
421        assert_eq!(bus.subscriber_count(), 0);
422    }
423}