Skip to main content

punch_kernel/
event_bus.rs

1//! Event bus for publishing and subscribing to system-wide [`PunchEvent`]s.
2//!
3//! Built on top of [`tokio::sync::broadcast`] so that multiple subscribers can
4//! independently receive every event without blocking the publisher. Maintains
5//! an in-memory history ring buffer so that the dashboard audit trail can
6//! display recent events even if no WebSocket client was connected at the time.
7
8use std::collections::VecDeque;
9use std::sync::Mutex;
10
11use tokio::sync::broadcast;
12use tracing::{debug, warn};
13
14use punch_types::{EventPayload, PunchEvent};
15
16/// Default channel capacity for the broadcast bus.
17const DEFAULT_CAPACITY: usize = 1024;
18
19/// Maximum number of events retained in the history ring buffer.
20const HISTORY_CAPACITY: usize = 500;
21
22/// A broadcast-based event bus for the Punch system.
23///
24/// Cloning an `EventBus` yields a handle to the **same** underlying channel.
25#[derive(Clone)]
26pub struct EventBus {
27    sender: broadcast::Sender<EventPayload>,
28    /// Ring buffer of recent events for the audit trail.
29    history: std::sync::Arc<Mutex<VecDeque<EventPayload>>>,
30}
31
32impl EventBus {
33    /// Create a new event bus with the default capacity.
34    pub fn new() -> Self {
35        Self::with_capacity(DEFAULT_CAPACITY)
36    }
37
38    /// Create a new event bus with a specific channel capacity.
39    pub fn with_capacity(capacity: usize) -> Self {
40        let (sender, _) = broadcast::channel(capacity);
41        Self {
42            sender,
43            history: std::sync::Arc::new(Mutex::new(VecDeque::with_capacity(HISTORY_CAPACITY))),
44        }
45    }
46
47    /// Publish an event to all active subscribers.
48    ///
49    /// The event is also stored in the history buffer for the audit trail.
50    pub fn publish(&self, event: PunchEvent) {
51        let payload = EventPayload::new(event);
52        self.store_in_history(&payload);
53        match self.sender.send(payload) {
54            Ok(receivers) => {
55                debug!(receivers, "event published");
56            }
57            Err(_) => {
58                // No active receivers — this is not an error.
59                debug!("event published with no active subscribers");
60            }
61        }
62    }
63
64    /// Publish a pre-built [`EventPayload`] (useful when you need a custom
65    /// correlation ID or timestamp).
66    pub fn publish_payload(&self, payload: EventPayload) {
67        self.store_in_history(&payload);
68        match self.sender.send(payload) {
69            Ok(receivers) => {
70                debug!(receivers, "event payload published");
71            }
72            Err(_) => {
73                debug!("event payload published with no active subscribers");
74            }
75        }
76    }
77
78    /// Subscribe to all future events on this bus.
79    ///
80    /// Returns a [`broadcast::Receiver`] that will yield every event published
81    /// after the subscription is created. If the receiver falls behind by more
82    /// than the channel capacity, older events will be dropped and the receiver
83    /// will see a [`broadcast::error::RecvError::Lagged`] error.
84    pub fn subscribe(&self) -> broadcast::Receiver<EventPayload> {
85        self.sender.subscribe()
86    }
87
88    /// Subscribe and return only events matching a predicate.
89    ///
90    /// This is a convenience wrapper — filtering happens on the receiver side.
91    /// For high-throughput scenarios consider filtering inside the subscriber's
92    /// own task loop instead.
93    pub fn subscribe_filtered<F>(&self, predicate: F) -> FilteredReceiver<F>
94    where
95        F: Fn(&PunchEvent) -> bool + Send + 'static,
96    {
97        FilteredReceiver {
98            inner: self.sender.subscribe(),
99            predicate,
100        }
101    }
102
103    /// Return the current number of active subscribers.
104    pub fn subscriber_count(&self) -> usize {
105        self.sender.receiver_count()
106    }
107
108    /// Return recent events from the history buffer.
109    ///
110    /// Returns up to `limit` events, optionally filtering to only those with
111    /// a sequence (position) greater than `since`. Events are returned in
112    /// chronological order (oldest first).
113    pub fn recent_events(&self, limit: usize, since: usize) -> Vec<(usize, EventPayload)> {
114        let history = self.history.lock().expect("history lock poisoned");
115        history
116            .iter()
117            .enumerate()
118            .filter(|(idx, _)| *idx >= since)
119            .take(limit)
120            .map(|(idx, payload)| (idx + 1, payload.clone()))
121            .collect()
122    }
123
124    /// Store a payload in the history ring buffer.
125    fn store_in_history(&self, payload: &EventPayload) {
126        let mut history = self.history.lock().expect("history lock poisoned");
127        if history.len() >= HISTORY_CAPACITY {
128            history.pop_front();
129        }
130        history.push_back(payload.clone());
131    }
132}
133
134impl Default for EventBus {
135    fn default() -> Self {
136        Self::new()
137    }
138}
139
140/// A receiver that applies a user-supplied predicate to incoming events.
141pub struct FilteredReceiver<F>
142where
143    F: Fn(&PunchEvent) -> bool,
144{
145    inner: broadcast::Receiver<EventPayload>,
146    predicate: F,
147}
148
149impl<F> FilteredReceiver<F>
150where
151    F: Fn(&PunchEvent) -> bool,
152{
153    /// Receive the next event that passes the filter.
154    ///
155    /// Skips events that do not match the predicate. Returns `None` when the
156    /// channel is closed.
157    pub async fn recv(&mut self) -> Option<EventPayload> {
158        loop {
159            match self.inner.recv().await {
160                Ok(payload) => {
161                    if (self.predicate)(&payload.event) {
162                        return Some(payload);
163                    }
164                    // Skip non-matching events.
165                }
166                Err(broadcast::error::RecvError::Lagged(n)) => {
167                    warn!(skipped = n, "filtered receiver lagged behind");
168                    // Continue receiving after the gap.
169                }
170                Err(broadcast::error::RecvError::Closed) => {
171                    return None;
172                }
173            }
174        }
175    }
176}
177
178// ---------------------------------------------------------------------------
179// Tests
180// ---------------------------------------------------------------------------
181
182#[cfg(test)]
183mod tests {
184    use super::*;
185    use punch_types::FighterId;
186    use tokio::time::{Duration, timeout};
187
188    #[tokio::test]
189    async fn publish_and_receive_single_event() {
190        let bus = EventBus::new();
191        let mut rx = bus.subscribe();
192
193        let fighter_id = FighterId::new();
194        bus.publish(PunchEvent::FighterSpawned {
195            fighter_id,
196            name: "test-fighter".to_string(),
197        });
198
199        let payload = timeout(Duration::from_secs(1), rx.recv())
200            .await
201            .expect("timed out")
202            .expect("channel closed");
203
204        match &payload.event {
205            PunchEvent::FighterSpawned { name, .. } => {
206                assert_eq!(name, "test-fighter");
207            }
208            other => panic!("unexpected event: {:?}", other),
209        }
210    }
211
212    #[tokio::test]
213    async fn multiple_subscribers_receive_same_event() {
214        let bus = EventBus::new();
215        let mut rx1 = bus.subscribe();
216        let mut rx2 = bus.subscribe();
217
218        bus.publish(PunchEvent::Error {
219            source: "test".to_string(),
220            message: "hello".to_string(),
221        });
222
223        let p1 = rx1.recv().await.unwrap();
224        let p2 = rx2.recv().await.unwrap();
225
226        assert_eq!(p1.id, p2.id);
227    }
228
229    #[tokio::test]
230    async fn no_subscribers_does_not_panic() {
231        let bus = EventBus::new();
232        // Should not panic even with zero receivers.
233        bus.publish(PunchEvent::Error {
234            source: "test".to_string(),
235            message: "nobody listening".to_string(),
236        });
237    }
238
239    #[tokio::test]
240    async fn filtered_receiver_only_gets_matching_events() {
241        let bus = EventBus::new();
242        let mut filtered =
243            bus.subscribe_filtered(|event| matches!(event, PunchEvent::FighterSpawned { .. }));
244
245        // Publish a non-matching event first.
246        bus.publish(PunchEvent::Error {
247            source: "test".to_string(),
248            message: "should be skipped".to_string(),
249        });
250
251        // Then a matching event.
252        let fighter_id = FighterId::new();
253        bus.publish(PunchEvent::FighterSpawned {
254            fighter_id,
255            name: "filtered-fighter".to_string(),
256        });
257
258        let payload = timeout(Duration::from_secs(1), filtered.recv())
259            .await
260            .expect("timed out")
261            .expect("channel closed");
262
263        match &payload.event {
264            PunchEvent::FighterSpawned { name, .. } => {
265                assert_eq!(name, "filtered-fighter");
266            }
267            other => panic!("unexpected event: {:?}", other),
268        }
269    }
270
271    #[tokio::test]
272    async fn subscriber_count_tracks_active_receivers() {
273        let bus = EventBus::new();
274        assert_eq!(bus.subscriber_count(), 0);
275
276        let _rx1 = bus.subscribe();
277        assert_eq!(bus.subscriber_count(), 1);
278
279        let _rx2 = bus.subscribe();
280        assert_eq!(bus.subscriber_count(), 2);
281
282        drop(_rx1);
283        assert_eq!(bus.subscriber_count(), 1);
284    }
285
286    #[test]
287    fn default_creates_event_bus() {
288        let bus = EventBus::default();
289        assert_eq!(bus.subscriber_count(), 0);
290    }
291
292    #[test]
293    fn clone_shares_same_channel() {
294        let bus1 = EventBus::new();
295        let bus2 = bus1.clone();
296        let _rx = bus2.subscribe();
297        // The subscriber from bus2 is visible on bus1 since they share the channel.
298        assert_eq!(bus1.subscriber_count(), 1);
299    }
300
301    #[tokio::test]
302    async fn publish_payload_delivers_custom_payload() {
303        let bus = EventBus::new();
304        let mut rx = bus.subscribe();
305
306        let correlation = uuid::Uuid::new_v4();
307        let payload = EventPayload::new(PunchEvent::Error {
308            source: "custom".to_string(),
309            message: "test payload".to_string(),
310        })
311        .with_correlation(correlation);
312
313        let expected_id = payload.id;
314        bus.publish_payload(payload);
315
316        let received = timeout(Duration::from_secs(1), rx.recv())
317            .await
318            .expect("timed out")
319            .expect("channel closed");
320        assert_eq!(received.id, expected_id);
321        assert_eq!(received.correlation_id, Some(correlation));
322    }
323
324    #[tokio::test]
325    async fn with_capacity_creates_bus_with_custom_size() {
326        let bus = EventBus::with_capacity(2);
327        let mut rx = bus.subscribe();
328
329        // Publish two events (within capacity).
330        bus.publish(PunchEvent::Error {
331            source: "a".to_string(),
332            message: "1".to_string(),
333        });
334        bus.publish(PunchEvent::Error {
335            source: "b".to_string(),
336            message: "2".to_string(),
337        });
338
339        let p1 = rx.recv().await.unwrap();
340        let p2 = rx.recv().await.unwrap();
341        assert!(matches!(p1.event, PunchEvent::Error { .. }));
342        assert!(matches!(p2.event, PunchEvent::Error { .. }));
343    }
344
345    #[tokio::test]
346    async fn subscriber_receives_multiple_events_in_order() {
347        let bus = EventBus::new();
348        let mut rx = bus.subscribe();
349
350        for i in 0..5 {
351            bus.publish(PunchEvent::Error {
352                source: "order-test".to_string(),
353                message: format!("msg-{}", i),
354            });
355        }
356
357        for i in 0..5 {
358            let payload = rx.recv().await.unwrap();
359            match &payload.event {
360                PunchEvent::Error { message, .. } => {
361                    assert_eq!(message, &format!("msg-{}", i));
362                }
363                _ => panic!("unexpected event"),
364            }
365        }
366    }
367
368    #[tokio::test]
369    async fn filtered_receiver_skips_all_non_matching() {
370        let bus = EventBus::new();
371        let mut filtered =
372            bus.subscribe_filtered(|event| matches!(event, PunchEvent::GorillaUnleashed { .. }));
373
374        // Publish many non-matching events.
375        for _ in 0..10 {
376            bus.publish(PunchEvent::Error {
377                source: "test".to_string(),
378                message: "skip".to_string(),
379            });
380        }
381
382        // Now the matching one.
383        let gorilla_id = punch_types::GorillaId::new();
384        bus.publish(PunchEvent::GorillaUnleashed {
385            gorilla_id,
386            name: "kong".to_string(),
387        });
388
389        let payload = timeout(Duration::from_secs(1), filtered.recv())
390            .await
391            .expect("timed out")
392            .expect("channel closed");
393
394        match &payload.event {
395            PunchEvent::GorillaUnleashed { name, .. } => assert_eq!(name, "kong"),
396            _ => panic!("wrong event"),
397        }
398    }
399
400    #[tokio::test]
401    async fn concurrent_publish_from_multiple_tasks() {
402        let bus = EventBus::new();
403        let mut rx = bus.subscribe();
404
405        let mut handles = Vec::new();
406        for i in 0..10 {
407            let bus_clone = bus.clone();
408            handles.push(tokio::spawn(async move {
409                bus_clone.publish(PunchEvent::Error {
410                    source: format!("task-{}", i),
411                    message: "concurrent".to_string(),
412                });
413            }));
414        }
415
416        for h in handles {
417            h.await.unwrap();
418        }
419
420        // All 10 events should arrive.
421        let mut count = 0;
422        for _ in 0..10 {
423            let _ = timeout(Duration::from_secs(1), rx.recv())
424                .await
425                .expect("timed out")
426                .expect("channel closed");
427            count += 1;
428        }
429        assert_eq!(count, 10);
430    }
431
432    #[tokio::test]
433    async fn subscriber_dropped_does_not_affect_other_subscribers() {
434        let bus = EventBus::new();
435        let rx1 = bus.subscribe();
436        let mut rx2 = bus.subscribe();
437
438        drop(rx1);
439
440        bus.publish(PunchEvent::Error {
441            source: "test".to_string(),
442            message: "after drop".to_string(),
443        });
444
445        let payload = rx2.recv().await.unwrap();
446        match &payload.event {
447            PunchEvent::Error { message, .. } => assert_eq!(message, "after drop"),
448            _ => panic!("unexpected event"),
449        }
450    }
451
452    #[tokio::test]
453    async fn event_payload_has_timestamp() {
454        let bus = EventBus::new();
455        let mut rx = bus.subscribe();
456
457        let before = chrono::Utc::now();
458        bus.publish(PunchEvent::Error {
459            source: "ts".to_string(),
460            message: "test".to_string(),
461        });
462
463        let payload = rx.recv().await.unwrap();
464        assert!(payload.timestamp >= before);
465    }
466}