Skip to main content

basalt_api/events/
bus.rs

1//! Generic event bus with staged handler dispatch.
2//!
3//! The [`EventBus`] stores handlers indexed by event `TypeId` and
4//! sorted by `(Stage, priority)`. Registration is typed — handlers
5//! receive concrete event and context references. Dispatch is a
6//! single linear pass through pre-sorted handlers, with short-circuit
7//! on cancellation after the Validate stage.
8
9use std::any::TypeId;
10use std::collections::HashMap;
11
12use super::traits::{Event, Stage};
13
14/// A type-erased handler function stored in the bus.
15///
16/// Takes the event as `&mut dyn Event` and a `&dyn Context` reference.
17/// The concrete event type is recovered via `downcast_mut` inside the
18/// wrapper closure created by [`EventBus::on`].
19type ErasedHandler = Box<dyn Fn(&mut dyn Event, &dyn crate::context::Context) + Send + Sync>;
20
21/// A registered handler with its stage and priority.
22struct HandlerEntry {
23    /// Which stage this handler runs in.
24    stage: Stage,
25    /// Priority within the stage. Lower values run first.
26    priority: i32,
27    /// The type-erased handler function.
28    handler: ErasedHandler,
29}
30
31/// Generic event bus that dispatches events through staged handlers.
32///
33/// Handlers register for specific event types at specific stages.
34/// During dispatch, handlers run in `(Stage, priority)` order.
35/// If any Validate handler cancels the event, Process and Post
36/// handlers are skipped.
37///
38/// The bus is `Send + Sync` and can be shared via `Arc` across
39/// connection tasks. Registration happens at startup; dispatch
40/// happens per-task.
41pub struct EventBus {
42    /// Handlers indexed by the `TypeId` of the concrete event type.
43    /// Each entry list is pre-sorted by `(Stage, priority)`.
44    handlers: HashMap<TypeId, Vec<HandlerEntry>>,
45}
46
47impl EventBus {
48    /// Creates an empty event bus with no registered handlers.
49    pub fn new() -> Self {
50        Self {
51            handlers: HashMap::new(),
52        }
53    }
54
55    /// Registers a handler for a specific event type at a given stage.
56    ///
57    /// The handler receives a mutable reference to the concrete event
58    /// and a `&dyn Context` reference. Lower priority values run first
59    /// within the same stage.
60    ///
61    /// `E` must be `'static` for type erasure via `Any`. The handler
62    /// closure must be `Send + Sync` since the bus is shared across
63    /// connection tasks.
64    pub fn on<E>(
65        &mut self,
66        stage: Stage,
67        priority: i32,
68        handler: impl Fn(&mut E, &dyn crate::context::Context) + Send + Sync + 'static,
69    ) where
70        E: Event + 'static,
71    {
72        let type_id = TypeId::of::<E>();
73        let erased: ErasedHandler = Box::new(move |event, ctx| {
74            let concrete_event = event.as_any_mut().downcast_mut::<E>().unwrap();
75            handler(concrete_event, ctx);
76        });
77
78        let entries = self.handlers.entry(type_id).or_default();
79        entries.push(HandlerEntry {
80            stage,
81            priority,
82            handler: erased,
83        });
84        // Keep entries sorted by (stage, priority) for linear dispatch
85        entries.sort_by_key(|e| (e.stage, e.priority));
86    }
87
88    /// Dispatches a concrete event through all registered handlers.
89    ///
90    /// Runs handlers in `(Stage, priority)` order. If the event is
91    /// cancelled during Validate, Process and Post are skipped.
92    pub fn dispatch<E>(&self, event: &mut E, ctx: &dyn crate::context::Context)
93    where
94        E: Event + 'static,
95    {
96        let type_id = TypeId::of::<E>();
97        let Some(entries) = self.handlers.get(&type_id) else {
98            return;
99        };
100
101        for entry in entries {
102            if event.is_cancelled() && entry.stage != Stage::Validate {
103                return;
104            }
105            (entry.handler)(event, ctx);
106        }
107    }
108
109    /// Dispatches a type-erased event using its runtime `TypeId`.
110    ///
111    /// Used when the concrete event type is not known at the call
112    /// site (e.g., `Box<dyn Event>` from `packet_to_event`).
113    pub fn dispatch_dyn(&self, event: &mut dyn Event, ctx: &dyn crate::context::Context) {
114        let type_id = event.as_any().type_id();
115        let Some(entries) = self.handlers.get(&type_id) else {
116            return;
117        };
118
119        for entry in entries {
120            if event.is_cancelled() && entry.stage != Stage::Validate {
121                return;
122            }
123            (entry.handler)(event, ctx);
124        }
125    }
126
127    /// Returns the number of event types that have registered handlers.
128    pub fn event_type_count(&self) -> usize {
129        self.handlers.len()
130    }
131
132    /// Returns the total number of registered handler entries.
133    pub fn handler_count(&self) -> usize {
134        self.handlers.values().map(|v| v.len()).sum()
135    }
136}
137
138impl Default for EventBus {
139    fn default() -> Self {
140        Self::new()
141    }
142}
143
144#[cfg(test)]
145mod tests {
146    use super::*;
147    use crate::events::traits::BusKind;
148    use crate::testing::NoopContext;
149    use std::any::Any;
150    use std::sync::Arc;
151    use std::sync::atomic::{AtomicI32, Ordering};
152
153    // -- Test event types --
154
155    struct CounterEvent {
156        value: i32,
157        cancelled: bool,
158    }
159
160    impl Event for CounterEvent {
161        fn is_cancelled(&self) -> bool {
162            self.cancelled
163        }
164        fn cancel(&mut self) {
165            self.cancelled = true;
166        }
167        fn as_any(&self) -> &dyn Any {
168            self
169        }
170        fn as_any_mut(&mut self) -> &mut dyn Any {
171            self
172        }
173        fn bus_kind(&self) -> BusKind {
174            BusKind::Game
175        }
176    }
177
178    struct OtherEvent;
179
180    impl Event for OtherEvent {
181        fn is_cancelled(&self) -> bool {
182            false
183        }
184        fn cancel(&mut self) {}
185        fn as_any(&self) -> &dyn Any {
186            self
187        }
188        fn as_any_mut(&mut self) -> &mut dyn Any {
189            self
190        }
191        fn bus_kind(&self) -> BusKind {
192            BusKind::Game
193        }
194    }
195
196    #[test]
197    fn dispatch_runs_handlers_in_priority_order() {
198        let mut bus = EventBus::new();
199        let order = Arc::new(std::sync::Mutex::new(Vec::<i32>::new()));
200
201        for &priority in &[10, 5, 1, 20] {
202            let order_ref = Arc::clone(&order);
203            bus.on::<CounterEvent>(Stage::Process, priority, move |_, _| {
204                order_ref.lock().unwrap().push(priority);
205            });
206        }
207
208        let mut event = CounterEvent {
209            value: 0,
210            cancelled: false,
211        };
212        bus.dispatch(&mut event, &NoopContext);
213
214        assert_eq!(*order.lock().unwrap(), vec![1, 5, 10, 20]);
215    }
216
217    #[test]
218    fn dispatch_skips_post_when_cancelled_in_validate() {
219        let mut bus = EventBus::new();
220        let post_ran = Arc::new(AtomicI32::new(0));
221
222        bus.on::<CounterEvent>(Stage::Validate, 0, |event, _| {
223            event.cancel();
224        });
225        let post_ref = Arc::clone(&post_ran);
226        bus.on::<CounterEvent>(Stage::Post, 0, move |_, _| {
227            post_ref.fetch_add(1, Ordering::Relaxed);
228        });
229
230        let mut event = CounterEvent {
231            value: 0,
232            cancelled: false,
233        };
234        bus.dispatch(&mut event, &NoopContext);
235
236        assert_eq!(post_ran.load(Ordering::Relaxed), 0);
237    }
238
239    #[test]
240    fn dispatch_skips_process_when_cancelled_in_validate() {
241        let mut bus = EventBus::new();
242        let process_ran = Arc::new(AtomicI32::new(0));
243
244        bus.on::<CounterEvent>(Stage::Validate, 0, |event, _| {
245            event.cancel();
246        });
247        let proc_ref = Arc::clone(&process_ran);
248        bus.on::<CounterEvent>(Stage::Process, 0, move |_, _| {
249            proc_ref.fetch_add(1, Ordering::Relaxed);
250        });
251
252        let mut event = CounterEvent {
253            value: 0,
254            cancelled: false,
255        };
256        bus.dispatch(&mut event, &NoopContext);
257
258        assert_eq!(process_ran.load(Ordering::Relaxed), 0);
259    }
260
261    #[test]
262    fn dispatch_runs_remaining_validate_handlers_after_cancel() {
263        let mut bus = EventBus::new();
264        let count = Arc::new(AtomicI32::new(0));
265
266        // Priority 0: cancel early
267        bus.on::<CounterEvent>(Stage::Validate, 0, |event, _| {
268            event.cancel();
269        });
270        // Priority 1: should still run (same stage, higher priority value)
271        let c = Arc::clone(&count);
272        bus.on::<CounterEvent>(Stage::Validate, 1, move |_, _| {
273            c.fetch_add(1, Ordering::Relaxed);
274        });
275
276        let mut event = CounterEvent {
277            value: 0,
278            cancelled: false,
279        };
280        bus.dispatch(&mut event, &NoopContext);
281
282        assert_eq!(count.load(Ordering::Relaxed), 1);
283    }
284
285    #[test]
286    fn dispatch_with_no_handlers_is_noop() {
287        let bus = EventBus::new();
288        let mut event = CounterEvent {
289            value: 42,
290            cancelled: false,
291        };
292        bus.dispatch(&mut event, &NoopContext);
293        assert_eq!(event.value, 42);
294    }
295
296    #[test]
297    fn handlers_for_different_events_are_isolated() {
298        let mut bus = EventBus::new();
299        let counter_ran = Arc::new(AtomicI32::new(0));
300        let other_ran = Arc::new(AtomicI32::new(0));
301
302        let cr = Arc::clone(&counter_ran);
303        bus.on::<CounterEvent>(Stage::Process, 0, move |_, _| {
304            cr.fetch_add(1, Ordering::Relaxed);
305        });
306        let or = Arc::clone(&other_ran);
307        bus.on::<OtherEvent>(Stage::Process, 0, move |_, _| {
308            or.fetch_add(1, Ordering::Relaxed);
309        });
310
311        let mut counter = CounterEvent {
312            value: 0,
313            cancelled: false,
314        };
315        bus.dispatch(&mut counter, &NoopContext);
316
317        assert_eq!(counter_ran.load(Ordering::Relaxed), 1);
318        assert_eq!(other_ran.load(Ordering::Relaxed), 0);
319    }
320
321    #[test]
322    fn dispatch_dyn_routes_by_runtime_type_id() {
323        let mut bus = EventBus::new();
324        let count = Arc::new(AtomicI32::new(0));
325
326        let c = Arc::clone(&count);
327        bus.on::<CounterEvent>(Stage::Process, 0, move |_, _| {
328            c.fetch_add(1, Ordering::Relaxed);
329        });
330
331        let mut event: Box<dyn Event> = Box::new(CounterEvent {
332            value: 0,
333            cancelled: false,
334        });
335        bus.dispatch_dyn(&mut *event, &NoopContext);
336        assert_eq!(count.load(Ordering::Relaxed), 1);
337    }
338
339    #[test]
340    fn event_type_count_tracks_distinct_types() {
341        let mut bus = EventBus::new();
342        assert_eq!(bus.event_type_count(), 0);
343
344        bus.on::<CounterEvent>(Stage::Process, 0, |_, _| {});
345        assert_eq!(bus.event_type_count(), 1);
346
347        bus.on::<CounterEvent>(Stage::Post, 0, |_, _| {});
348        assert_eq!(bus.event_type_count(), 1);
349
350        bus.on::<OtherEvent>(Stage::Process, 0, |_, _| {});
351        assert_eq!(bus.event_type_count(), 2);
352    }
353
354    #[test]
355    fn handler_count_sums_across_types_and_stages() {
356        let mut bus = EventBus::new();
357        bus.on::<CounterEvent>(Stage::Process, 0, |_, _| {});
358        bus.on::<CounterEvent>(Stage::Post, 0, |_, _| {});
359        bus.on::<OtherEvent>(Stage::Process, 0, |_, _| {});
360        assert_eq!(bus.handler_count(), 3);
361    }
362}