1use crate::event::CuenvEvent;
7use tokio::sync::{broadcast, mpsc};
8
9const DEFAULT_BROADCAST_CAPACITY: usize = 1000;
11
12#[derive(Debug)]
17pub struct EventBus {
18 sender: mpsc::UnboundedSender<CuenvEvent>,
20 broadcast_tx: broadcast::Sender<CuenvEvent>,
22}
23
24impl EventBus {
25 #[must_use]
30 pub fn new() -> Self {
31 Self::with_capacity(DEFAULT_BROADCAST_CAPACITY)
32 }
33
34 #[must_use]
36 pub fn with_capacity(capacity: usize) -> Self {
37 let (sender, mut receiver) = mpsc::unbounded_channel::<CuenvEvent>();
38 let (broadcast_tx, _) = broadcast::channel(capacity);
39
40 let broadcast_tx_clone = broadcast_tx.clone();
41 tokio::spawn(async move {
42 while let Some(event) = receiver.recv().await {
43 let _ = broadcast_tx_clone.send(event);
45 }
46 });
47
48 Self {
49 sender,
50 broadcast_tx,
51 }
52 }
53
54 #[must_use]
56 pub fn sender(&self) -> EventSender {
57 EventSender {
58 inner: self.sender.clone(),
59 }
60 }
61
62 #[must_use]
67 pub fn subscribe(&self) -> EventReceiver {
68 EventReceiver {
69 inner: self.broadcast_tx.subscribe(),
70 }
71 }
72
73 #[must_use]
75 pub fn subscriber_count(&self) -> usize {
76 self.broadcast_tx.receiver_count()
77 }
78}
79
80impl Default for EventBus {
81 fn default() -> Self {
82 Self::new()
83 }
84}
85
86#[derive(Debug, Clone)]
88pub struct EventSender {
89 inner: mpsc::UnboundedSender<CuenvEvent>,
90}
91
92impl EventSender {
93 #[must_use]
97 pub fn into_inner(self) -> mpsc::UnboundedSender<CuenvEvent> {
98 self.inner
99 }
100
101 pub fn send(&self, event: CuenvEvent) -> Result<(), SendError> {
107 self.inner.send(event).map_err(|_| SendError::Closed)
108 }
109
110 #[must_use]
112 pub fn is_closed(&self) -> bool {
113 self.inner.is_closed()
114 }
115}
116
117#[derive(Debug)]
119pub struct EventReceiver {
120 inner: broadcast::Receiver<CuenvEvent>,
121}
122
123impl EventReceiver {
124 pub async fn recv(&mut self) -> Option<CuenvEvent> {
129 loop {
130 match self.inner.recv().await {
131 Ok(event) => return Some(event),
132 Err(broadcast::error::RecvError::Lagged(n)) => {
133 tracing::warn!(skipped = n, "Event receiver lagged, skipped events");
134 }
135 Err(broadcast::error::RecvError::Closed) => return None,
136 }
137 }
138 }
139
140 pub fn try_recv(&mut self) -> Option<CuenvEvent> {
144 loop {
145 match self.inner.try_recv() {
146 Ok(event) => return Some(event),
147 Err(broadcast::error::TryRecvError::Lagged(n)) => {
148 tracing::warn!(skipped = n, "Event receiver lagged, skipped events");
149 }
150 Err(
151 broadcast::error::TryRecvError::Empty | broadcast::error::TryRecvError::Closed,
152 ) => {
153 return None;
154 }
155 }
156 }
157 }
158}
159
160#[derive(Debug, Clone, Copy, PartialEq, Eq)]
162pub enum SendError {
163 Closed,
165}
166
167impl std::fmt::Display for SendError {
168 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
169 match self {
170 SendError::Closed => write!(f, "event bus is closed"),
171 }
172 }
173}
174
175impl std::error::Error for SendError {}
176
177#[cfg(test)]
178#[allow(clippy::similar_names)]
179mod tests {
180 use super::*;
181 use crate::event::{EventCategory, EventSource, OutputEvent};
182 use uuid::Uuid;
183
184 fn make_test_event() -> CuenvEvent {
185 CuenvEvent::new(
186 Uuid::new_v4(),
187 EventSource::new("cuenv::test"),
188 EventCategory::Output(OutputEvent::Stdout {
189 content: "test".to_string(),
190 }),
191 )
192 }
193
194 #[tokio::test]
195 async fn test_event_bus_creation() {
196 let bus = EventBus::new();
197 let sender = bus.sender();
198 assert!(!sender.is_closed());
199 }
200
201 #[tokio::test]
202 async fn test_event_bus_send_receive() {
203 let bus = EventBus::new();
204 let sender = bus.sender();
205 let mut receiver = bus.subscribe();
206
207 let event = make_test_event();
208 let event_id = event.id;
209
210 sender.send(event).unwrap();
211
212 let received = receiver.recv().await.unwrap();
213 assert_eq!(received.id, event_id);
214 }
215
216 #[tokio::test]
217 async fn test_event_bus_multiple_subscribers() {
218 let bus = EventBus::new();
219 let sender = bus.sender();
220 let mut receiver1 = bus.subscribe();
221 let mut receiver2 = bus.subscribe();
222
223 assert_eq!(bus.subscriber_count(), 2);
224
225 let event = make_test_event();
226 let event_id = event.id;
227
228 sender.send(event).unwrap();
229
230 let received1 = receiver1.recv().await.unwrap();
231 let received2 = receiver2.recv().await.unwrap();
232
233 assert_eq!(received1.id, event_id);
234 assert_eq!(received2.id, event_id);
235 }
236
237 #[tokio::test]
238 async fn test_event_bus_sender_survives_bus_drop() {
239 let sender = {
243 let bus = EventBus::new();
244 bus.sender()
245 };
246
247 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
249
250 let event = make_test_event();
251 let result = sender.send(event);
252 assert!(result.is_ok());
254 }
255}