mod_events/dispatcher.rs
1//! Main event dispatcher implementation
2
3use crate::metrics::EventMetricsCounters;
4use crate::{
5 DispatchResult, Event, EventMetadata, ListenerError, ListenerId, ListenerWrapper,
6 MiddlewareManager, Priority,
7};
8use parking_lot::RwLock;
9use std::any::TypeId;
10use std::collections::HashMap;
11use std::sync::atomic::{AtomicUsize, Ordering};
12use std::sync::Arc;
13
14#[cfg(feature = "async")]
15use crate::{AsyncEventResult, AsyncListenerWrapper};
16
17#[cfg(feature = "async")]
18type AsyncHandler = Arc<dyn for<'a> Fn(&'a dyn Event) -> AsyncEventResult<'a> + Send + Sync>;
19
20/// High-performance event dispatcher
21///
22/// The main component of the Mod Events system. Thread-safe and optimized
23/// for high-performance event dispatch with minimal overhead.
24///
25/// # Example
26///
27/// ```rust
28/// use mod_events::{EventDispatcher, Event};
29///
30/// #[derive(Debug, Clone)]
31/// struct MyEvent {
32/// message: String,
33/// }
34///
35/// impl Event for MyEvent {
36/// fn as_any(&self) -> &dyn std::any::Any {
37/// self
38/// }
39/// }
40///
41/// let dispatcher = EventDispatcher::new();
42///
43/// dispatcher.on(|event: &MyEvent| {
44/// println!("Received: {}", event.message);
45/// });
46///
47/// dispatcher.emit(MyEvent {
48/// message: "Hello, World!".to_string(),
49/// });
50/// ```
51pub struct EventDispatcher {
52 listeners: Arc<RwLock<HashMap<TypeId, Vec<ListenerWrapper>>>>,
53 #[cfg(feature = "async")]
54 async_listeners: Arc<RwLock<HashMap<TypeId, Vec<AsyncListenerWrapper>>>>,
55 next_id: AtomicUsize,
56 // Per-event-type counters live behind an `Arc` so the dispatch hot
57 // path can clone the counter pointer under a read lock and increment
58 // its atomics without ever touching the outer map's write lock.
59 metrics: Arc<RwLock<HashMap<TypeId, Arc<EventMetricsCounters>>>>,
60 middleware: Arc<RwLock<MiddlewareManager>>,
61}
62
63impl EventDispatcher {
64 /// Create a new event dispatcher.
65 #[must_use]
66 pub fn new() -> Self {
67 Self {
68 listeners: Arc::new(RwLock::new(HashMap::new())),
69 #[cfg(feature = "async")]
70 async_listeners: Arc::new(RwLock::new(HashMap::new())),
71 next_id: AtomicUsize::new(0),
72 metrics: Arc::new(RwLock::new(HashMap::new())),
73 middleware: Arc::new(RwLock::new(MiddlewareManager::new())),
74 }
75 }
76
77 /// Subscribe to an event with a closure that can return errors.
78 ///
79 /// # Example
80 ///
81 /// ```rust
82 /// use mod_events::{EventDispatcher, Event};
83 ///
84 /// #[derive(Debug, Clone)]
85 /// struct MyEvent {
86 /// message: String,
87 /// }
88 ///
89 /// impl Event for MyEvent {
90 /// fn as_any(&self) -> &dyn std::any::Any {
91 /// self
92 /// }
93 /// }
94 ///
95 /// let dispatcher = EventDispatcher::new();
96 /// dispatcher.subscribe(|event: &MyEvent| {
97 /// if event.message.is_empty() {
98 /// return Err("Message cannot be empty".into());
99 /// }
100 /// println!("Message: {}", event.message);
101 /// Ok(())
102 /// });
103 /// ```
104 pub fn subscribe<T, F>(&self, listener: F) -> ListenerId
105 where
106 T: Event + 'static,
107 F: Fn(&T) -> Result<(), ListenerError> + Send + Sync + 'static,
108 {
109 self.subscribe_with_priority(listener, Priority::Normal)
110 }
111
112 /// Subscribe to an event with a specific priority.
113 pub fn subscribe_with_priority<T, F>(&self, listener: F, priority: Priority) -> ListenerId
114 where
115 T: Event + 'static,
116 F: Fn(&T) -> Result<(), ListenerError> + Send + Sync + 'static,
117 {
118 let type_id = TypeId::of::<T>();
119 let id = self.next_id.fetch_add(1, Ordering::Relaxed);
120
121 let wrapper = ListenerWrapper::new(listener, priority, id);
122
123 {
124 let mut listeners = self.listeners.write();
125 let event_listeners = listeners.entry(type_id).or_default();
126 // Binary insertion preserves descending-priority order in O(n)
127 // (one shift), avoiding the O(n log n) full re-sort the
128 // previous push + sort_by_key combination performed.
129 // `partition_point` finds the first index whose listener has
130 // a strictly lower priority than the new one, which is also
131 // where to insert. Equal-priority listeners run in
132 // registration order (FIFO).
133 let pos = event_listeners.partition_point(|existing| existing.priority >= priority);
134 event_listeners.insert(pos, wrapper);
135 }
136
137 // Make sure a metrics entry exists for this type so `metrics()`
138 // can report it even before the first dispatch.
139 let _counters = self.counters_for::<T>();
140
141 ListenerId::new(id, type_id)
142 }
143
144 /// Subscribe to an event with simple closure (no error handling).
145 ///
146 /// This is the most convenient method for simple event handling.
147 ///
148 /// # Example
149 ///
150 /// ```rust
151 /// use mod_events::{EventDispatcher, Event};
152 ///
153 /// #[derive(Debug, Clone)]
154 /// struct MyEvent {
155 /// message: String,
156 /// }
157 ///
158 /// impl Event for MyEvent {
159 /// fn as_any(&self) -> &dyn std::any::Any {
160 /// self
161 /// }
162 /// }
163 ///
164 /// let dispatcher = EventDispatcher::new();
165 /// dispatcher.on(|event: &MyEvent| {
166 /// println!("Received: {}", event.message);
167 /// });
168 /// ```
169 pub fn on<T, F>(&self, listener: F) -> ListenerId
170 where
171 T: Event + 'static,
172 F: Fn(&T) + Send + Sync + 'static,
173 {
174 self.subscribe(move |event: &T| {
175 listener(event);
176 Ok(())
177 })
178 }
179
180 /// Subscribe an async listener at [`Priority::Normal`] (requires the
181 /// `async` feature).
182 ///
183 /// The listener receives a borrowed event and returns a future that
184 /// resolves to `Result<(), ListenerError>`.
185 ///
186 /// # Example
187 ///
188 /// ```rust
189 /// # #[cfg(feature = "async")]
190 /// # {
191 /// use mod_events::{Event, EventDispatcher};
192 ///
193 /// #[derive(Debug, Clone)]
194 /// struct EmailSent { to: String }
195 ///
196 /// impl Event for EmailSent {
197 /// fn as_any(&self) -> &dyn std::any::Any { self }
198 /// }
199 ///
200 /// let dispatcher = EventDispatcher::new();
201 /// dispatcher.subscribe_async(|event: &EmailSent| {
202 /// let to = event.to.clone();
203 /// async move {
204 /// println!("delivered to {}", to);
205 /// Ok(())
206 /// }
207 /// });
208 /// # }
209 /// ```
210 #[cfg(feature = "async")]
211 pub fn subscribe_async<T, F, Fut>(&self, listener: F) -> ListenerId
212 where
213 T: Event + 'static,
214 F: Fn(&T) -> Fut + Send + Sync + 'static,
215 Fut: std::future::Future<Output = Result<(), ListenerError>> + Send + 'static,
216 {
217 self.subscribe_async_with_priority(listener, Priority::Normal)
218 }
219
220 /// Subscribe an async listener at a specific priority (requires the
221 /// `async` feature).
222 ///
223 /// Higher-priority listeners are awaited first within a single
224 /// [`Self::dispatch_async`] call.
225 ///
226 /// # Example
227 ///
228 /// ```rust
229 /// # #[cfg(feature = "async")]
230 /// # {
231 /// use mod_events::{Event, EventDispatcher, Priority};
232 ///
233 /// #[derive(Debug, Clone)]
234 /// struct EmailSent { to: String }
235 ///
236 /// impl Event for EmailSent {
237 /// fn as_any(&self) -> &dyn std::any::Any { self }
238 /// }
239 ///
240 /// let dispatcher = EventDispatcher::new();
241 /// dispatcher.subscribe_async_with_priority(
242 /// |_event: &EmailSent| async move {
243 /// // logged before any other listener
244 /// Ok(())
245 /// },
246 /// Priority::High,
247 /// );
248 /// # }
249 /// ```
250 #[cfg(feature = "async")]
251 pub fn subscribe_async_with_priority<T, F, Fut>(
252 &self,
253 listener: F,
254 priority: Priority,
255 ) -> ListenerId
256 where
257 T: Event + 'static,
258 F: Fn(&T) -> Fut + Send + Sync + 'static,
259 Fut: std::future::Future<Output = Result<(), ListenerError>> + Send + 'static,
260 {
261 let type_id = TypeId::of::<T>();
262 let id = self.next_id.fetch_add(1, Ordering::Relaxed);
263
264 let wrapper = AsyncListenerWrapper::new(listener, priority, id);
265
266 {
267 let mut async_listeners = self.async_listeners.write();
268 let event_listeners = async_listeners.entry(type_id).or_default();
269 // Binary insertion preserves descending-priority order in O(n).
270 // See `subscribe_with_priority` for the rationale.
271 let pos = event_listeners.partition_point(|existing| existing.priority >= priority);
272 event_listeners.insert(pos, wrapper);
273 }
274
275 // Make sure a metrics entry exists for this type so `metrics()`
276 // can report it even before the first dispatch.
277 let _counters = self.counters_for::<T>();
278
279 ListenerId::new(id, type_id)
280 }
281
282 /// Dispatch an event synchronously.
283 ///
284 /// Returns a [`DispatchResult`] containing per-listener outcomes.
285 ///
286 /// # Example
287 ///
288 /// ```rust
289 /// use mod_events::{EventDispatcher, Event};
290 ///
291 /// #[derive(Debug, Clone)]
292 /// struct MyEvent {
293 /// message: String,
294 /// }
295 ///
296 /// impl Event for MyEvent {
297 /// fn as_any(&self) -> &dyn std::any::Any {
298 /// self
299 /// }
300 /// }
301 ///
302 /// let dispatcher = EventDispatcher::new();
303 /// let result = dispatcher.dispatch(MyEvent {
304 /// message: "Hello".to_string(),
305 /// });
306 ///
307 /// if result.all_succeeded() {
308 /// println!("All listeners handled the event successfully");
309 /// }
310 /// ```
311 pub fn dispatch<T: Event>(&self, event: T) -> DispatchResult {
312 // Update metrics
313 self.update_metrics(&event);
314
315 // Check middleware
316 if !self.check_middleware(&event) {
317 return DispatchResult::blocked();
318 }
319
320 let type_id = TypeId::of::<T>();
321 let listeners = self.listeners.read();
322 let mut results = Vec::new();
323
324 if let Some(event_listeners) = listeners.get(&type_id) {
325 results.reserve(event_listeners.len());
326 for listener in event_listeners {
327 results.push((listener.handler)(&event));
328 }
329 }
330
331 DispatchResult::new(results)
332 }
333
334 /// Dispatch an event asynchronously and await every async listener
335 /// in priority order (requires the `async` feature).
336 ///
337 /// Sync listeners registered via [`Self::on`] / [`Self::subscribe`]
338 /// are not invoked here; only listeners registered through
339 /// [`Self::subscribe_async`] / [`Self::subscribe_async_with_priority`]
340 /// are awaited.
341 ///
342 /// # Example
343 ///
344 /// ```rust
345 /// # #[cfg(feature = "async")]
346 /// # async fn doc_example() {
347 /// use mod_events::{Event, EventDispatcher};
348 ///
349 /// #[derive(Debug, Clone)]
350 /// struct OrderShipped { order_id: u64 }
351 ///
352 /// impl Event for OrderShipped {
353 /// fn as_any(&self) -> &dyn std::any::Any { self }
354 /// }
355 ///
356 /// let dispatcher = EventDispatcher::new();
357 /// dispatcher.subscribe_async(|event: &OrderShipped| {
358 /// let id = event.order_id;
359 /// async move {
360 /// println!("notifying customer about order {}", id);
361 /// Ok(())
362 /// }
363 /// });
364 ///
365 /// let result = dispatcher
366 /// .dispatch_async(OrderShipped { order_id: 42 })
367 /// .await;
368 /// assert!(result.all_succeeded());
369 /// # }
370 /// ```
371 #[cfg(feature = "async")]
372 pub async fn dispatch_async<T: Event>(&self, event: T) -> DispatchResult {
373 // Update metrics
374 self.update_metrics(&event);
375
376 // Check middleware
377 if !self.check_middleware(&event) {
378 return DispatchResult::blocked();
379 }
380
381 let type_id = TypeId::of::<T>();
382
383 // Collect cloned handlers without holding the lock across await points.
384 let handlers: Vec<AsyncHandler> = {
385 let async_listeners = self.async_listeners.read();
386 async_listeners
387 .get(&type_id)
388 .map(|event_listeners| {
389 event_listeners
390 .iter()
391 .map(|listener| listener.handler.clone())
392 .collect()
393 })
394 .unwrap_or_default()
395 };
396
397 let mut results = Vec::with_capacity(handlers.len());
398 for handler in handlers {
399 results.push(handler(&event).await);
400 }
401
402 DispatchResult::new(results)
403 }
404
405 /// Fire and forget — dispatch without inspecting the result.
406 ///
407 /// Use this when listeners' success or failure is not actionable at
408 /// the call site (logging, fanout to passive observers). The errors
409 /// returned by failing listeners are discarded; if you need them,
410 /// call [`Self::dispatch`] instead.
411 ///
412 /// # Example
413 ///
414 /// ```rust
415 /// use mod_events::{EventDispatcher, Event};
416 ///
417 /// #[derive(Debug, Clone)]
418 /// struct MyEvent {
419 /// message: String,
420 /// }
421 ///
422 /// impl Event for MyEvent {
423 /// fn as_any(&self) -> &dyn std::any::Any {
424 /// self
425 /// }
426 /// }
427 ///
428 /// let dispatcher = EventDispatcher::new();
429 /// dispatcher.emit(MyEvent {
430 /// message: "Fire and forget".to_string(),
431 /// });
432 /// ```
433 pub fn emit<T: Event>(&self, event: T) {
434 // Intentional discard: emit is the documented fire-and-forget entry
435 // point. Callers that need per-listener outcomes use `dispatch`.
436 drop(self.dispatch(event));
437 }
438
439 /// Add middleware that can block events.
440 ///
441 /// Middleware functions receive events and return `true` to allow
442 /// processing or `false` to block the event.
443 ///
444 /// # Example
445 ///
446 /// ```rust
447 /// use mod_events::{EventDispatcher, Event};
448 ///
449 /// let dispatcher = EventDispatcher::new();
450 /// dispatcher.add_middleware(|event: &dyn Event| {
451 /// println!("Processing event: {}", event.event_name());
452 /// true // Allow all events
453 /// });
454 /// ```
455 pub fn add_middleware<F>(&self, middleware: F)
456 where
457 F: Fn(&dyn Event) -> bool + Send + Sync + 'static,
458 {
459 self.middleware.write().add(middleware);
460 }
461
462 /// Remove a previously registered listener.
463 ///
464 /// Returns `true` if the listener was found and removed, `false`
465 /// if no listener with that id was registered (already removed,
466 /// never registered, or registered against a different event type).
467 ///
468 /// # Example
469 ///
470 /// ```rust
471 /// use mod_events::{Event, EventDispatcher};
472 ///
473 /// #[derive(Debug, Clone)]
474 /// struct Tick;
475 ///
476 /// impl Event for Tick {
477 /// fn as_any(&self) -> &dyn std::any::Any { self }
478 /// }
479 ///
480 /// let dispatcher = EventDispatcher::new();
481 /// let id = dispatcher.on(|_: &Tick| {});
482 /// assert!(dispatcher.unsubscribe(id));
483 /// // Subsequent removals of the same id return false.
484 /// assert!(!dispatcher.unsubscribe(id));
485 /// ```
486 pub fn unsubscribe(&self, listener_id: ListenerId) -> bool {
487 // Try sync listeners first
488 {
489 let mut listeners = self.listeners.write();
490 if let Some(event_listeners) = listeners.get_mut(&listener_id.type_id) {
491 if let Some(pos) = event_listeners.iter().position(|l| l.id == listener_id.id) {
492 let _removed = event_listeners.remove(pos);
493 return true;
494 }
495 }
496 }
497
498 // Try async listeners
499 #[cfg(feature = "async")]
500 {
501 let mut async_listeners = self.async_listeners.write();
502 if let Some(event_listeners) = async_listeners.get_mut(&listener_id.type_id) {
503 if let Some(pos) = event_listeners.iter().position(|l| l.id == listener_id.id) {
504 let _removed = event_listeners.remove(pos);
505 return true;
506 }
507 }
508 }
509
510 false
511 }
512
513 /// Get the total number of listeners (sync + async, when the
514 /// `async` feature is enabled) registered for an event type.
515 ///
516 /// # Example
517 ///
518 /// ```rust
519 /// use mod_events::{Event, EventDispatcher};
520 ///
521 /// #[derive(Debug, Clone)]
522 /// struct Tick;
523 ///
524 /// impl Event for Tick {
525 /// fn as_any(&self) -> &dyn std::any::Any { self }
526 /// }
527 ///
528 /// let dispatcher = EventDispatcher::new();
529 /// assert_eq!(dispatcher.listener_count::<Tick>(), 0);
530 /// let _ = dispatcher.on(|_: &Tick| {});
531 /// let _ = dispatcher.on(|_: &Tick| {});
532 /// assert_eq!(dispatcher.listener_count::<Tick>(), 2);
533 /// ```
534 #[must_use]
535 pub fn listener_count<T: Event + 'static>(&self) -> usize {
536 let type_id = TypeId::of::<T>();
537 let sync_count = self
538 .listeners
539 .read()
540 .get(&type_id)
541 .map(Vec::len)
542 .unwrap_or(0);
543
544 #[cfg(feature = "async")]
545 let async_count = self
546 .async_listeners
547 .read()
548 .get(&type_id)
549 .map(Vec::len)
550 .unwrap_or(0);
551
552 #[cfg(not(feature = "async"))]
553 let async_count = 0;
554
555 sync_count + async_count
556 }
557
558 /// Get a snapshot of per-event-type [`EventMetadata`] keyed by
559 /// `TypeId`.
560 ///
561 /// The returned map is a fresh snapshot. Subsequent dispatches do
562 /// not mutate it, but the snapshot may be slightly behind the live
563 /// counters because `dispatch_count` is read independently of
564 /// `last_dispatch`.
565 ///
566 /// # Example
567 ///
568 /// ```rust
569 /// use mod_events::{Event, EventDispatcher};
570 /// use std::any::TypeId;
571 ///
572 /// #[derive(Debug, Clone)]
573 /// struct Tick;
574 ///
575 /// impl Event for Tick {
576 /// fn as_any(&self) -> &dyn std::any::Any { self }
577 /// }
578 ///
579 /// let dispatcher = EventDispatcher::new();
580 /// let _ = dispatcher.on(|_: &Tick| {});
581 /// dispatcher.emit(Tick);
582 /// dispatcher.emit(Tick);
583 ///
584 /// let snapshot = dispatcher.metrics();
585 /// let meta = snapshot.get(&TypeId::of::<Tick>()).unwrap();
586 /// assert_eq!(meta.dispatch_count, 2);
587 /// ```
588 #[must_use]
589 pub fn metrics(&self) -> HashMap<TypeId, EventMetadata> {
590 // Snapshot the metric counters first, then enrich each entry
591 // with the live listener count derived from the registry. This
592 // guarantees `listener_count` cannot drift because it is never
593 // stored — it is computed at the moment of observation.
594 let counters_map = self.metrics.read();
595 let listeners_map = self.listeners.read();
596 #[cfg(feature = "async")]
597 let async_listeners_map = self.async_listeners.read();
598
599 counters_map
600 .iter()
601 .map(|(type_id, counters)| {
602 let mut snap = counters.snapshot();
603 let sync_count = listeners_map.get(type_id).map(Vec::len).unwrap_or(0);
604 #[cfg(feature = "async")]
605 let async_count = async_listeners_map.get(type_id).map(Vec::len).unwrap_or(0);
606 #[cfg(not(feature = "async"))]
607 let async_count = 0;
608 snap.listener_count = sync_count + async_count;
609 (*type_id, snap)
610 })
611 .collect()
612 }
613
614 /// Drop every registered listener, both sync and async.
615 ///
616 /// Middleware and accumulated metrics are unaffected.
617 ///
618 /// # Example
619 ///
620 /// ```rust
621 /// use mod_events::{Event, EventDispatcher};
622 ///
623 /// #[derive(Debug, Clone)]
624 /// struct Tick;
625 ///
626 /// impl Event for Tick {
627 /// fn as_any(&self) -> &dyn std::any::Any { self }
628 /// }
629 ///
630 /// let dispatcher = EventDispatcher::new();
631 /// let _ = dispatcher.on(|_: &Tick| {});
632 /// dispatcher.clear();
633 /// assert_eq!(dispatcher.listener_count::<Tick>(), 0);
634 /// ```
635 pub fn clear(&self) {
636 self.listeners.write().clear();
637
638 #[cfg(feature = "async")]
639 self.async_listeners.write().clear();
640 }
641
642 /// Hot-path metric update. Tries a read-only fast path first; only
643 /// promotes to a write lock if the entry doesn't exist yet.
644 fn update_metrics<T: Event>(&self, _event: &T) {
645 let counters = self.counters_for::<T>();
646 counters.record_dispatch();
647 }
648
649 /// Look up (or create) the per-type counters. The fast path holds
650 /// only a read lock; the slow path promotes to a write lock and
651 /// double-checks the entry to avoid a torn-creation race.
652 fn counters_for<T: Event + 'static>(&self) -> Arc<EventMetricsCounters> {
653 let type_id = TypeId::of::<T>();
654
655 if let Some(existing) = self.metrics.read().get(&type_id) {
656 return Arc::clone(existing);
657 }
658
659 let mut metrics = self.metrics.write();
660 Arc::clone(
661 metrics
662 .entry(type_id)
663 .or_insert_with(|| Arc::new(EventMetricsCounters::new::<T>())),
664 )
665 }
666
667 fn check_middleware(&self, event: &dyn Event) -> bool {
668 self.middleware.read().process(event)
669 }
670}
671
672impl Default for EventDispatcher {
673 fn default() -> Self {
674 Self::new()
675 }
676}