1use std::collections::VecDeque;
9use std::sync::Mutex;
10
11use tokio::sync::broadcast;
12use tracing::{debug, warn};
13
14use punch_types::{EventPayload, PunchEvent};
15
16const DEFAULT_CAPACITY: usize = 1024;
18
19const HISTORY_CAPACITY: usize = 500;
21
22#[derive(Clone)]
26pub struct EventBus {
27 sender: broadcast::Sender<EventPayload>,
28 history: std::sync::Arc<Mutex<VecDeque<EventPayload>>>,
30}
31
32impl EventBus {
33 pub fn new() -> Self {
35 Self::with_capacity(DEFAULT_CAPACITY)
36 }
37
38 pub fn with_capacity(capacity: usize) -> Self {
40 let (sender, _) = broadcast::channel(capacity);
41 Self {
42 sender,
43 history: std::sync::Arc::new(Mutex::new(VecDeque::with_capacity(HISTORY_CAPACITY))),
44 }
45 }
46
47 pub fn publish(&self, event: PunchEvent) {
51 let payload = EventPayload::new(event);
52 self.store_in_history(&payload);
53 match self.sender.send(payload) {
54 Ok(receivers) => {
55 debug!(receivers, "event published");
56 }
57 Err(_) => {
58 debug!("event published with no active subscribers");
60 }
61 }
62 }
63
64 pub fn publish_payload(&self, payload: EventPayload) {
67 self.store_in_history(&payload);
68 match self.sender.send(payload) {
69 Ok(receivers) => {
70 debug!(receivers, "event payload published");
71 }
72 Err(_) => {
73 debug!("event payload published with no active subscribers");
74 }
75 }
76 }
77
78 pub fn subscribe(&self) -> broadcast::Receiver<EventPayload> {
85 self.sender.subscribe()
86 }
87
88 pub fn subscribe_filtered<F>(&self, predicate: F) -> FilteredReceiver<F>
94 where
95 F: Fn(&PunchEvent) -> bool + Send + 'static,
96 {
97 FilteredReceiver {
98 inner: self.sender.subscribe(),
99 predicate,
100 }
101 }
102
103 pub fn subscriber_count(&self) -> usize {
105 self.sender.receiver_count()
106 }
107
108 pub fn recent_events(&self, limit: usize, since: usize) -> Vec<(usize, EventPayload)> {
114 let history = self.history.lock().expect("history lock poisoned");
115 history
116 .iter()
117 .enumerate()
118 .filter(|(idx, _)| *idx >= since)
119 .take(limit)
120 .map(|(idx, payload)| (idx + 1, payload.clone()))
121 .collect()
122 }
123
124 fn store_in_history(&self, payload: &EventPayload) {
126 let mut history = self.history.lock().expect("history lock poisoned");
127 if history.len() >= HISTORY_CAPACITY {
128 history.pop_front();
129 }
130 history.push_back(payload.clone());
131 }
132}
133
134impl Default for EventBus {
135 fn default() -> Self {
136 Self::new()
137 }
138}
139
140pub struct FilteredReceiver<F>
142where
143 F: Fn(&PunchEvent) -> bool,
144{
145 inner: broadcast::Receiver<EventPayload>,
146 predicate: F,
147}
148
149impl<F> FilteredReceiver<F>
150where
151 F: Fn(&PunchEvent) -> bool,
152{
153 pub async fn recv(&mut self) -> Option<EventPayload> {
158 loop {
159 match self.inner.recv().await {
160 Ok(payload) => {
161 if (self.predicate)(&payload.event) {
162 return Some(payload);
163 }
164 }
166 Err(broadcast::error::RecvError::Lagged(n)) => {
167 warn!(skipped = n, "filtered receiver lagged behind");
168 }
170 Err(broadcast::error::RecvError::Closed) => {
171 return None;
172 }
173 }
174 }
175 }
176}
177
178#[cfg(test)]
183mod tests {
184 use super::*;
185 use punch_types::FighterId;
186 use tokio::time::{Duration, timeout};
187
188 #[tokio::test]
189 async fn publish_and_receive_single_event() {
190 let bus = EventBus::new();
191 let mut rx = bus.subscribe();
192
193 let fighter_id = FighterId::new();
194 bus.publish(PunchEvent::FighterSpawned {
195 fighter_id,
196 name: "test-fighter".to_string(),
197 });
198
199 let payload = timeout(Duration::from_secs(1), rx.recv())
200 .await
201 .expect("timed out")
202 .expect("channel closed");
203
204 match &payload.event {
205 PunchEvent::FighterSpawned { name, .. } => {
206 assert_eq!(name, "test-fighter");
207 }
208 other => panic!("unexpected event: {:?}", other),
209 }
210 }
211
212 #[tokio::test]
213 async fn multiple_subscribers_receive_same_event() {
214 let bus = EventBus::new();
215 let mut rx1 = bus.subscribe();
216 let mut rx2 = bus.subscribe();
217
218 bus.publish(PunchEvent::Error {
219 source: "test".to_string(),
220 message: "hello".to_string(),
221 });
222
223 let p1 = rx1.recv().await.unwrap();
224 let p2 = rx2.recv().await.unwrap();
225
226 assert_eq!(p1.id, p2.id);
227 }
228
229 #[tokio::test]
230 async fn no_subscribers_does_not_panic() {
231 let bus = EventBus::new();
232 bus.publish(PunchEvent::Error {
234 source: "test".to_string(),
235 message: "nobody listening".to_string(),
236 });
237 }
238
239 #[tokio::test]
240 async fn filtered_receiver_only_gets_matching_events() {
241 let bus = EventBus::new();
242 let mut filtered =
243 bus.subscribe_filtered(|event| matches!(event, PunchEvent::FighterSpawned { .. }));
244
245 bus.publish(PunchEvent::Error {
247 source: "test".to_string(),
248 message: "should be skipped".to_string(),
249 });
250
251 let fighter_id = FighterId::new();
253 bus.publish(PunchEvent::FighterSpawned {
254 fighter_id,
255 name: "filtered-fighter".to_string(),
256 });
257
258 let payload = timeout(Duration::from_secs(1), filtered.recv())
259 .await
260 .expect("timed out")
261 .expect("channel closed");
262
263 match &payload.event {
264 PunchEvent::FighterSpawned { name, .. } => {
265 assert_eq!(name, "filtered-fighter");
266 }
267 other => panic!("unexpected event: {:?}", other),
268 }
269 }
270
271 #[tokio::test]
272 async fn subscriber_count_tracks_active_receivers() {
273 let bus = EventBus::new();
274 assert_eq!(bus.subscriber_count(), 0);
275
276 let _rx1 = bus.subscribe();
277 assert_eq!(bus.subscriber_count(), 1);
278
279 let _rx2 = bus.subscribe();
280 assert_eq!(bus.subscriber_count(), 2);
281
282 drop(_rx1);
283 assert_eq!(bus.subscriber_count(), 1);
284 }
285
286 #[test]
287 fn default_creates_event_bus() {
288 let bus = EventBus::default();
289 assert_eq!(bus.subscriber_count(), 0);
290 }
291
292 #[test]
293 fn clone_shares_same_channel() {
294 let bus1 = EventBus::new();
295 let bus2 = bus1.clone();
296 let _rx = bus2.subscribe();
297 assert_eq!(bus1.subscriber_count(), 1);
299 }
300
301 #[tokio::test]
302 async fn publish_payload_delivers_custom_payload() {
303 let bus = EventBus::new();
304 let mut rx = bus.subscribe();
305
306 let correlation = uuid::Uuid::new_v4();
307 let payload = EventPayload::new(PunchEvent::Error {
308 source: "custom".to_string(),
309 message: "test payload".to_string(),
310 })
311 .with_correlation(correlation);
312
313 let expected_id = payload.id;
314 bus.publish_payload(payload);
315
316 let received = timeout(Duration::from_secs(1), rx.recv())
317 .await
318 .expect("timed out")
319 .expect("channel closed");
320 assert_eq!(received.id, expected_id);
321 assert_eq!(received.correlation_id, Some(correlation));
322 }
323
324 #[tokio::test]
325 async fn with_capacity_creates_bus_with_custom_size() {
326 let bus = EventBus::with_capacity(2);
327 let mut rx = bus.subscribe();
328
329 bus.publish(PunchEvent::Error {
331 source: "a".to_string(),
332 message: "1".to_string(),
333 });
334 bus.publish(PunchEvent::Error {
335 source: "b".to_string(),
336 message: "2".to_string(),
337 });
338
339 let p1 = rx.recv().await.unwrap();
340 let p2 = rx.recv().await.unwrap();
341 assert!(matches!(p1.event, PunchEvent::Error { .. }));
342 assert!(matches!(p2.event, PunchEvent::Error { .. }));
343 }
344
345 #[tokio::test]
346 async fn subscriber_receives_multiple_events_in_order() {
347 let bus = EventBus::new();
348 let mut rx = bus.subscribe();
349
350 for i in 0..5 {
351 bus.publish(PunchEvent::Error {
352 source: "order-test".to_string(),
353 message: format!("msg-{}", i),
354 });
355 }
356
357 for i in 0..5 {
358 let payload = rx.recv().await.unwrap();
359 match &payload.event {
360 PunchEvent::Error { message, .. } => {
361 assert_eq!(message, &format!("msg-{}", i));
362 }
363 _ => panic!("unexpected event"),
364 }
365 }
366 }
367
368 #[tokio::test]
369 async fn filtered_receiver_skips_all_non_matching() {
370 let bus = EventBus::new();
371 let mut filtered =
372 bus.subscribe_filtered(|event| matches!(event, PunchEvent::GorillaUnleashed { .. }));
373
374 for _ in 0..10 {
376 bus.publish(PunchEvent::Error {
377 source: "test".to_string(),
378 message: "skip".to_string(),
379 });
380 }
381
382 let gorilla_id = punch_types::GorillaId::new();
384 bus.publish(PunchEvent::GorillaUnleashed {
385 gorilla_id,
386 name: "kong".to_string(),
387 });
388
389 let payload = timeout(Duration::from_secs(1), filtered.recv())
390 .await
391 .expect("timed out")
392 .expect("channel closed");
393
394 match &payload.event {
395 PunchEvent::GorillaUnleashed { name, .. } => assert_eq!(name, "kong"),
396 _ => panic!("wrong event"),
397 }
398 }
399
400 #[tokio::test]
401 async fn concurrent_publish_from_multiple_tasks() {
402 let bus = EventBus::new();
403 let mut rx = bus.subscribe();
404
405 let mut handles = Vec::new();
406 for i in 0..10 {
407 let bus_clone = bus.clone();
408 handles.push(tokio::spawn(async move {
409 bus_clone.publish(PunchEvent::Error {
410 source: format!("task-{}", i),
411 message: "concurrent".to_string(),
412 });
413 }));
414 }
415
416 for h in handles {
417 h.await.unwrap();
418 }
419
420 let mut count = 0;
422 for _ in 0..10 {
423 let _ = timeout(Duration::from_secs(1), rx.recv())
424 .await
425 .expect("timed out")
426 .expect("channel closed");
427 count += 1;
428 }
429 assert_eq!(count, 10);
430 }
431
432 #[tokio::test]
433 async fn subscriber_dropped_does_not_affect_other_subscribers() {
434 let bus = EventBus::new();
435 let rx1 = bus.subscribe();
436 let mut rx2 = bus.subscribe();
437
438 drop(rx1);
439
440 bus.publish(PunchEvent::Error {
441 source: "test".to_string(),
442 message: "after drop".to_string(),
443 });
444
445 let payload = rx2.recv().await.unwrap();
446 match &payload.event {
447 PunchEvent::Error { message, .. } => assert_eq!(message, "after drop"),
448 _ => panic!("unexpected event"),
449 }
450 }
451
452 #[tokio::test]
453 async fn event_payload_has_timestamp() {
454 let bus = EventBus::new();
455 let mut rx = bus.subscribe();
456
457 let before = chrono::Utc::now();
458 bus.publish(PunchEvent::Error {
459 source: "ts".to_string(),
460 message: "test".to_string(),
461 });
462
463 let payload = rx.recv().await.unwrap();
464 assert!(payload.timestamp >= before);
465 }
466}