mod_events/dispatcher.rs
1//! Main event dispatcher implementation
2
3use crate::error::panic_payload_to_listener_error;
4use crate::metrics::EventMetricsCounters;
5use crate::middleware::MiddlewareManager;
6use crate::type_id_map::TypeIdMap;
7use crate::{
8 DispatchResult, Event, EventMetadata, ListenerError, ListenerId, ListenerWrapper, Priority,
9};
10use parking_lot::RwLock;
11use std::any::TypeId;
12use std::collections::HashMap;
13use std::panic::{catch_unwind, AssertUnwindSafe};
14use std::sync::atomic::{AtomicUsize, Ordering};
15use std::sync::Arc;
16
17#[cfg(feature = "async")]
18use crate::{AsyncEventResult, AsyncListenerWrapper};
19
20#[cfg(feature = "async")]
21type AsyncHandler = Arc<dyn for<'a> Fn(&'a dyn Event) -> AsyncEventResult<'a> + Send + Sync>;
22
23/// High-performance event dispatcher
24///
25/// The main component of the Mod Events system. Thread-safe and optimized
26/// for high-performance event dispatch with minimal overhead.
27///
28/// # Example
29///
30/// ```rust
31/// use mod_events::{EventDispatcher, Event};
32///
33/// #[derive(Debug, Clone)]
34/// struct MyEvent {
35/// message: String,
36/// }
37///
38/// impl Event for MyEvent {
39/// fn as_any(&self) -> &dyn std::any::Any {
40/// self
41/// }
42/// }
43///
44/// let dispatcher = EventDispatcher::new();
45///
46/// dispatcher.on(|event: &MyEvent| {
47/// println!("Received: {}", event.message);
48/// });
49///
50/// dispatcher.emit(MyEvent {
51/// message: "Hello, World!".to_string(),
52/// });
53/// ```
54pub struct EventDispatcher {
55 listeners: RwLock<TypeIdMap<Vec<ListenerWrapper>>>,
56 #[cfg(feature = "async")]
57 async_listeners: RwLock<TypeIdMap<Vec<AsyncListenerWrapper>>>,
58 next_id: AtomicUsize,
59 // Per-event-type counters live behind an `Arc` so the dispatch hot
60 // path can clone the counter pointer under a read lock and increment
61 // its atomics without ever touching the outer map's write lock.
62 metrics: RwLock<TypeIdMap<Arc<EventMetricsCounters>>>,
63 middleware: RwLock<MiddlewareManager>,
64}
65
66impl EventDispatcher {
67 /// Create a new event dispatcher.
68 #[must_use]
69 pub fn new() -> Self {
70 Self {
71 listeners: RwLock::new(TypeIdMap::default()),
72 #[cfg(feature = "async")]
73 async_listeners: RwLock::new(TypeIdMap::default()),
74 next_id: AtomicUsize::new(0),
75 metrics: RwLock::new(TypeIdMap::default()),
76 middleware: RwLock::new(MiddlewareManager::new()),
77 }
78 }
79
80 /// Subscribe to an event with a closure that can return errors.
81 ///
82 /// # Example
83 ///
84 /// ```rust
85 /// use mod_events::{EventDispatcher, Event};
86 ///
87 /// #[derive(Debug, Clone)]
88 /// struct MyEvent {
89 /// message: String,
90 /// }
91 ///
92 /// impl Event for MyEvent {
93 /// fn as_any(&self) -> &dyn std::any::Any {
94 /// self
95 /// }
96 /// }
97 ///
98 /// let dispatcher = EventDispatcher::new();
99 /// dispatcher.subscribe(|event: &MyEvent| {
100 /// if event.message.is_empty() {
101 /// return Err("Message cannot be empty".into());
102 /// }
103 /// println!("Message: {}", event.message);
104 /// Ok(())
105 /// });
106 /// ```
107 pub fn subscribe<T, F>(&self, listener: F) -> ListenerId
108 where
109 T: Event + 'static,
110 F: Fn(&T) -> Result<(), ListenerError> + Send + Sync + 'static,
111 {
112 self.subscribe_with_priority(listener, Priority::Normal)
113 }
114
115 /// Subscribe to an event with a specific priority.
116 pub fn subscribe_with_priority<T, F>(&self, listener: F, priority: Priority) -> ListenerId
117 where
118 T: Event + 'static,
119 F: Fn(&T) -> Result<(), ListenerError> + Send + Sync + 'static,
120 {
121 let type_id = TypeId::of::<T>();
122 let id = self.next_id.fetch_add(1, Ordering::Relaxed);
123
124 let wrapper = ListenerWrapper::new(listener, priority, id);
125
126 {
127 let mut listeners = self.listeners.write();
128 let event_listeners = listeners.entry(type_id).or_default();
129 // Binary insertion preserves descending-priority order in O(n)
130 // (one shift), avoiding the O(n log n) full re-sort the
131 // previous push + sort_by_key combination performed.
132 // `partition_point` finds the first index whose listener has
133 // a strictly lower priority than the new one, which is also
134 // where to insert. Equal-priority listeners run in
135 // registration order (FIFO).
136 let pos = event_listeners.partition_point(|existing| existing.priority >= priority);
137 event_listeners.insert(pos, wrapper);
138 }
139
140 // Make sure a metrics entry exists for this type so `metrics()`
141 // can report it even before the first dispatch.
142 let _counters = self.counters_for::<T>();
143
144 ListenerId::new(id, type_id)
145 }
146
147 /// Subscribe to an event with simple closure (no error handling).
148 ///
149 /// This is the most convenient method for simple event handling.
150 ///
151 /// # Example
152 ///
153 /// ```rust
154 /// use mod_events::{EventDispatcher, Event};
155 ///
156 /// #[derive(Debug, Clone)]
157 /// struct MyEvent {
158 /// message: String,
159 /// }
160 ///
161 /// impl Event for MyEvent {
162 /// fn as_any(&self) -> &dyn std::any::Any {
163 /// self
164 /// }
165 /// }
166 ///
167 /// let dispatcher = EventDispatcher::new();
168 /// dispatcher.on(|event: &MyEvent| {
169 /// println!("Received: {}", event.message);
170 /// });
171 /// ```
172 pub fn on<T, F>(&self, listener: F) -> ListenerId
173 where
174 T: Event + 'static,
175 F: Fn(&T) + Send + Sync + 'static,
176 {
177 self.subscribe(move |event: &T| {
178 listener(event);
179 Ok(())
180 })
181 }
182
183 /// Subscribe an async listener at [`Priority::Normal`] (requires the
184 /// `async` feature).
185 ///
186 /// The listener receives a borrowed event and returns a future that
187 /// resolves to `Result<(), ListenerError>`.
188 ///
189 /// # Example
190 ///
191 /// ```rust
192 /// # #[cfg(feature = "async")]
193 /// # {
194 /// use mod_events::{Event, EventDispatcher};
195 ///
196 /// #[derive(Debug, Clone)]
197 /// struct EmailSent { to: String }
198 ///
199 /// impl Event for EmailSent {
200 /// fn as_any(&self) -> &dyn std::any::Any { self }
201 /// }
202 ///
203 /// let dispatcher = EventDispatcher::new();
204 /// dispatcher.subscribe_async(|event: &EmailSent| {
205 /// let to = event.to.clone();
206 /// async move {
207 /// println!("delivered to {}", to);
208 /// Ok(())
209 /// }
210 /// });
211 /// # }
212 /// ```
213 #[cfg(feature = "async")]
214 pub fn subscribe_async<T, F, Fut>(&self, listener: F) -> ListenerId
215 where
216 T: Event + 'static,
217 F: Fn(&T) -> Fut + Send + Sync + 'static,
218 Fut: std::future::Future<Output = Result<(), ListenerError>> + Send + 'static,
219 {
220 self.subscribe_async_with_priority(listener, Priority::Normal)
221 }
222
223 /// Subscribe an async listener at a specific priority (requires the
224 /// `async` feature).
225 ///
226 /// Higher-priority listeners are awaited first within a single
227 /// [`Self::dispatch_async`] call.
228 ///
229 /// # Example
230 ///
231 /// ```rust
232 /// # #[cfg(feature = "async")]
233 /// # {
234 /// use mod_events::{Event, EventDispatcher, Priority};
235 ///
236 /// #[derive(Debug, Clone)]
237 /// struct EmailSent { to: String }
238 ///
239 /// impl Event for EmailSent {
240 /// fn as_any(&self) -> &dyn std::any::Any { self }
241 /// }
242 ///
243 /// let dispatcher = EventDispatcher::new();
244 /// dispatcher.subscribe_async_with_priority(
245 /// |_event: &EmailSent| async move {
246 /// // logged before any other listener
247 /// Ok(())
248 /// },
249 /// Priority::High,
250 /// );
251 /// # }
252 /// ```
253 #[cfg(feature = "async")]
254 pub fn subscribe_async_with_priority<T, F, Fut>(
255 &self,
256 listener: F,
257 priority: Priority,
258 ) -> ListenerId
259 where
260 T: Event + 'static,
261 F: Fn(&T) -> Fut + Send + Sync + 'static,
262 Fut: std::future::Future<Output = Result<(), ListenerError>> + Send + 'static,
263 {
264 let type_id = TypeId::of::<T>();
265 let id = self.next_id.fetch_add(1, Ordering::Relaxed);
266
267 let wrapper = AsyncListenerWrapper::new(listener, priority, id);
268
269 {
270 let mut async_listeners = self.async_listeners.write();
271 let event_listeners = async_listeners.entry(type_id).or_default();
272 // Binary insertion preserves descending-priority order in O(n).
273 // See `subscribe_with_priority` for the rationale.
274 let pos = event_listeners.partition_point(|existing| existing.priority >= priority);
275 event_listeners.insert(pos, wrapper);
276 }
277
278 // Make sure a metrics entry exists for this type so `metrics()`
279 // can report it even before the first dispatch.
280 let _counters = self.counters_for::<T>();
281
282 ListenerId::new(id, type_id)
283 }
284
285 /// Dispatch an event synchronously.
286 ///
287 /// Returns a [`DispatchResult`] containing per-listener outcomes.
288 ///
289 /// # Example
290 ///
291 /// ```rust
292 /// use mod_events::{EventDispatcher, Event};
293 ///
294 /// #[derive(Debug, Clone)]
295 /// struct MyEvent {
296 /// message: String,
297 /// }
298 ///
299 /// impl Event for MyEvent {
300 /// fn as_any(&self) -> &dyn std::any::Any {
301 /// self
302 /// }
303 /// }
304 ///
305 /// let dispatcher = EventDispatcher::new();
306 /// let result = dispatcher.dispatch(MyEvent {
307 /// message: "Hello".to_string(),
308 /// });
309 ///
310 /// if result.all_succeeded() {
311 /// println!("All listeners handled the event successfully");
312 /// }
313 /// ```
314 pub fn dispatch<T: Event>(&self, event: T) -> DispatchResult {
315 // Update metrics
316 self.update_metrics(&event);
317
318 // Check middleware
319 if !self.check_middleware(&event) {
320 return DispatchResult::blocked();
321 }
322
323 let type_id = TypeId::of::<T>();
324 let listeners = self.listeners.read();
325
326 // Most dispatches succeed. `errors` stays empty (and
327 // unallocated — `Vec::new()` does not allocate) on the
328 // success path; we only push when a listener returns `Err`
329 // or panics.
330 let mut errors: Vec<ListenerError> = Vec::new();
331 let mut listener_count = 0_usize;
332
333 if let Some(event_listeners) = listeners.get(&type_id) {
334 for listener in event_listeners {
335 listener_count += 1;
336 // `catch_unwind` keeps a panicking listener from
337 // unwinding the dispatch thread (and the read lock we
338 // are holding above) into the caller. Panics are
339 // converted into the same `ListenerError` shape a
340 // well-behaved listener would have returned, so the
341 // caller's `DispatchResult::errors()` is the single
342 // place to look for failures. `parking_lot::RwLock`
343 // does not poison, so dropping the guard after a
344 // caught panic leaves the registry in a usable state.
345 match catch_unwind(AssertUnwindSafe(|| (listener.handler)(&event))) {
346 Ok(Ok(())) => {}
347 Ok(Err(err)) => errors.push(err),
348 Err(payload) => errors.push(panic_payload_to_listener_error(payload)),
349 }
350 }
351 }
352
353 DispatchResult::new(listener_count, errors)
354 }
355
356 /// Dispatch an event asynchronously and await every async listener
357 /// in priority order (requires the `async` feature).
358 ///
359 /// Listeners are awaited **sequentially** in descending priority
360 /// order. This preserves the priority contract — a high-priority
361 /// listener completes (or errors) before the next listener is
362 /// polled. Concurrent execution would lose ordering. If you want
363 /// true concurrent execution, drive `spawn` (e.g. `tokio::spawn`,
364 /// `async_std::task::spawn`) from inside each listener and return
365 /// `Ok(())` immediately.
366 ///
367 /// Each listener future is wrapped in `catch_unwind`, mirroring
368 /// the panic-safety guarantee of the sync [`Self::dispatch`]
369 /// path. A panic during `.await` becomes a `ListenerError` in
370 /// [`DispatchResult::errors`] with the prefix
371 /// `"listener panicked: "`. Subsequent listeners still run.
372 ///
373 /// Sync listeners registered via [`Self::on`] / [`Self::subscribe`]
374 /// are not invoked here; only listeners registered through
375 /// [`Self::subscribe_async`] / [`Self::subscribe_async_with_priority`]
376 /// are awaited.
377 ///
378 /// # Example
379 ///
380 /// ```rust
381 /// # #[cfg(feature = "async")]
382 /// # async fn doc_example() {
383 /// use mod_events::{Event, EventDispatcher};
384 ///
385 /// #[derive(Debug, Clone)]
386 /// struct OrderShipped { order_id: u64 }
387 ///
388 /// impl Event for OrderShipped {
389 /// fn as_any(&self) -> &dyn std::any::Any { self }
390 /// }
391 ///
392 /// let dispatcher = EventDispatcher::new();
393 /// dispatcher.subscribe_async(|event: &OrderShipped| {
394 /// let id = event.order_id;
395 /// async move {
396 /// println!("notifying customer about order {}", id);
397 /// Ok(())
398 /// }
399 /// });
400 ///
401 /// let result = dispatcher
402 /// .dispatch_async(OrderShipped { order_id: 42 })
403 /// .await;
404 /// assert!(result.all_succeeded());
405 /// # }
406 /// ```
407 #[cfg(feature = "async")]
408 pub async fn dispatch_async<T: Event>(&self, event: T) -> DispatchResult {
409 // Update metrics
410 self.update_metrics(&event);
411
412 // Check middleware
413 if !self.check_middleware(&event) {
414 return DispatchResult::blocked();
415 }
416
417 let type_id = TypeId::of::<T>();
418
419 // Collect cloned handlers without holding the lock across await points.
420 let handlers: Vec<AsyncHandler> = {
421 let async_listeners = self.async_listeners.read();
422 async_listeners
423 .get(&type_id)
424 .map(|event_listeners| {
425 event_listeners
426 .iter()
427 .map(|listener| listener.handler.clone())
428 .collect()
429 })
430 .unwrap_or_default()
431 };
432
433 // Wrap each listener future in `catch_unwind` so a panicking
434 // listener becomes a `ListenerError` in `DispatchResult` rather
435 // than unwinding the dispatching task. Mirrors the panic-safety
436 // wrapping on the sync `dispatch` path.
437 //
438 // `AssertUnwindSafe` is required because the listener future
439 // closes over arbitrary user state that may not implement
440 // `UnwindSafe`. The contract is: if a listener panics, the
441 // dispatcher catches it and reports it; the listener author is
442 // responsible for not leaving captured state in a broken
443 // condition before panicking. Same contract as the sync path.
444 use futures_util::future::FutureExt;
445 use std::panic::AssertUnwindSafe;
446
447 // `errors` stays empty (and unallocated) on the success path.
448 let listener_count = handlers.len();
449 let mut errors: Vec<ListenerError> = Vec::new();
450 for handler in handlers {
451 match AssertUnwindSafe(handler(&event)).catch_unwind().await {
452 Ok(Ok(())) => {}
453 Ok(Err(err)) => errors.push(err),
454 Err(payload) => errors.push(panic_payload_to_listener_error(payload)),
455 }
456 }
457
458 DispatchResult::new(listener_count, errors)
459 }
460
461 /// Fire and forget — dispatch without inspecting the result.
462 ///
463 /// Use this when listeners' success or failure is not actionable at
464 /// the call site (logging, fanout to passive observers). The errors
465 /// returned by failing listeners are discarded; if you need them,
466 /// call [`Self::dispatch`] instead.
467 ///
468 /// # Example
469 ///
470 /// ```rust
471 /// use mod_events::{EventDispatcher, Event};
472 ///
473 /// #[derive(Debug, Clone)]
474 /// struct MyEvent {
475 /// message: String,
476 /// }
477 ///
478 /// impl Event for MyEvent {
479 /// fn as_any(&self) -> &dyn std::any::Any {
480 /// self
481 /// }
482 /// }
483 ///
484 /// let dispatcher = EventDispatcher::new();
485 /// dispatcher.emit(MyEvent {
486 /// message: "Fire and forget".to_string(),
487 /// });
488 /// ```
489 pub fn emit<T: Event>(&self, event: T) {
490 // Update metrics
491 self.update_metrics(&event);
492
493 // Check middleware
494 if !self.check_middleware(&event) {
495 return;
496 }
497
498 let type_id = TypeId::of::<T>();
499 let listeners = self.listeners.read();
500
501 if let Some(event_listeners) = listeners.get(&type_id) {
502 for listener in event_listeners {
503 // Same panic-safety contract as `dispatch`, but the
504 // outcome is intentionally discarded. This avoids
505 // allocating the per-call result vector that `dispatch`
506 // returns — the documented win of `emit` over
507 // `dispatch` for fire-and-forget callers.
508 let _ = catch_unwind(AssertUnwindSafe(|| (listener.handler)(&event)));
509 }
510 }
511 }
512
513 /// Add middleware that can block events.
514 ///
515 /// Middleware functions receive events and return `true` to allow
516 /// processing or `false` to block the event.
517 ///
518 /// # Example
519 ///
520 /// ```rust
521 /// use mod_events::{EventDispatcher, Event};
522 ///
523 /// let dispatcher = EventDispatcher::new();
524 /// dispatcher.add_middleware(|event: &dyn Event| {
525 /// println!("Processing event: {}", event.event_name());
526 /// true // Allow all events
527 /// });
528 /// ```
529 pub fn add_middleware<F>(&self, middleware: F)
530 where
531 F: Fn(&dyn Event) -> bool + Send + Sync + 'static,
532 {
533 self.middleware.write().add(middleware);
534 }
535
536 /// Remove a previously registered listener.
537 ///
538 /// Returns `true` if the listener was found and removed, `false`
539 /// if no listener with that id was registered (already removed,
540 /// never registered, or registered against a different event type).
541 ///
542 /// # Example
543 ///
544 /// ```rust
545 /// use mod_events::{Event, EventDispatcher};
546 ///
547 /// #[derive(Debug, Clone)]
548 /// struct Tick;
549 ///
550 /// impl Event for Tick {
551 /// fn as_any(&self) -> &dyn std::any::Any { self }
552 /// }
553 ///
554 /// let dispatcher = EventDispatcher::new();
555 /// let id = dispatcher.on(|_: &Tick| {});
556 /// assert!(dispatcher.unsubscribe(id));
557 /// // Subsequent removals of the same id return false.
558 /// assert!(!dispatcher.unsubscribe(id));
559 /// ```
560 pub fn unsubscribe(&self, listener_id: ListenerId) -> bool {
561 // Try sync listeners first
562 {
563 let mut listeners = self.listeners.write();
564 if let Some(event_listeners) = listeners.get_mut(&listener_id.type_id) {
565 if let Some(pos) = event_listeners.iter().position(|l| l.id == listener_id.id) {
566 let _removed = event_listeners.remove(pos);
567 return true;
568 }
569 }
570 }
571
572 // Try async listeners
573 #[cfg(feature = "async")]
574 {
575 let mut async_listeners = self.async_listeners.write();
576 if let Some(event_listeners) = async_listeners.get_mut(&listener_id.type_id) {
577 if let Some(pos) = event_listeners.iter().position(|l| l.id == listener_id.id) {
578 let _removed = event_listeners.remove(pos);
579 return true;
580 }
581 }
582 }
583
584 false
585 }
586
587 /// Get the total number of listeners (sync + async, when the
588 /// `async` feature is enabled) registered for an event type.
589 ///
590 /// # Example
591 ///
592 /// ```rust
593 /// use mod_events::{Event, EventDispatcher};
594 ///
595 /// #[derive(Debug, Clone)]
596 /// struct Tick;
597 ///
598 /// impl Event for Tick {
599 /// fn as_any(&self) -> &dyn std::any::Any { self }
600 /// }
601 ///
602 /// let dispatcher = EventDispatcher::new();
603 /// assert_eq!(dispatcher.listener_count::<Tick>(), 0);
604 /// let _ = dispatcher.on(|_: &Tick| {});
605 /// let _ = dispatcher.on(|_: &Tick| {});
606 /// assert_eq!(dispatcher.listener_count::<Tick>(), 2);
607 /// ```
608 #[must_use]
609 pub fn listener_count<T: Event + 'static>(&self) -> usize {
610 let type_id = TypeId::of::<T>();
611 let sync_count = self
612 .listeners
613 .read()
614 .get(&type_id)
615 .map(Vec::len)
616 .unwrap_or(0);
617
618 #[cfg(feature = "async")]
619 let async_count = self
620 .async_listeners
621 .read()
622 .get(&type_id)
623 .map(Vec::len)
624 .unwrap_or(0);
625
626 #[cfg(not(feature = "async"))]
627 let async_count = 0;
628
629 sync_count + async_count
630 }
631
632 /// Get a snapshot of per-event-type [`EventMetadata`] keyed by
633 /// `TypeId`.
634 ///
635 /// The returned map is a fresh snapshot. Subsequent dispatches do
636 /// not mutate it, but the snapshot may be slightly behind the live
637 /// counters because `dispatch_count` is read independently of
638 /// `last_dispatch`.
639 ///
640 /// # Example
641 ///
642 /// ```rust
643 /// use mod_events::{Event, EventDispatcher};
644 /// use std::any::TypeId;
645 ///
646 /// #[derive(Debug, Clone)]
647 /// struct Tick;
648 ///
649 /// impl Event for Tick {
650 /// fn as_any(&self) -> &dyn std::any::Any { self }
651 /// }
652 ///
653 /// let dispatcher = EventDispatcher::new();
654 /// let _ = dispatcher.on(|_: &Tick| {});
655 /// dispatcher.emit(Tick);
656 /// dispatcher.emit(Tick);
657 ///
658 /// let snapshot = dispatcher.metrics();
659 /// let meta = snapshot.get(&TypeId::of::<Tick>()).unwrap();
660 /// assert_eq!(meta.dispatch_count, 2);
661 /// ```
662 #[must_use]
663 pub fn metrics(&self) -> HashMap<TypeId, EventMetadata> {
664 // Snapshot the metric counters first, then enrich each entry
665 // with the live listener count derived from the registry. This
666 // guarantees `listener_count` cannot drift because it is never
667 // stored — it is computed at the moment of observation.
668 let counters_map = self.metrics.read();
669 let listeners_map = self.listeners.read();
670 #[cfg(feature = "async")]
671 let async_listeners_map = self.async_listeners.read();
672
673 counters_map
674 .iter()
675 .map(|(type_id, counters)| {
676 let mut snap = counters.snapshot();
677 let sync_count = listeners_map.get(type_id).map(Vec::len).unwrap_or(0);
678 #[cfg(feature = "async")]
679 let async_count = async_listeners_map.get(type_id).map(Vec::len).unwrap_or(0);
680 #[cfg(not(feature = "async"))]
681 let async_count = 0;
682 snap.listener_count = sync_count + async_count;
683 (*type_id, snap)
684 })
685 .collect()
686 }
687
688 /// Drop every registered listener, both sync and async.
689 ///
690 /// Middleware and accumulated metrics are unaffected. Use
691 /// [`Self::clear_middleware`] if you also need to drop the
692 /// middleware chain.
693 ///
694 /// # Example
695 ///
696 /// ```rust
697 /// use mod_events::{Event, EventDispatcher};
698 ///
699 /// #[derive(Debug, Clone)]
700 /// struct Tick;
701 ///
702 /// impl Event for Tick {
703 /// fn as_any(&self) -> &dyn std::any::Any { self }
704 /// }
705 ///
706 /// let dispatcher = EventDispatcher::new();
707 /// let _ = dispatcher.on(|_: &Tick| {});
708 /// dispatcher.clear();
709 /// assert_eq!(dispatcher.listener_count::<Tick>(), 0);
710 /// ```
711 pub fn clear(&self) {
712 self.listeners.write().clear();
713
714 #[cfg(feature = "async")]
715 self.async_listeners.write().clear();
716 }
717
718 /// Drop every registered middleware function.
719 ///
720 /// Listeners and accumulated metrics are unaffected. Useful in
721 /// test setup/teardown when you want to reset the middleware
722 /// chain between cases without rebuilding the dispatcher.
723 pub fn clear_middleware(&self) {
724 self.middleware.write().clear();
725 }
726
727 /// Hot-path metric update. Tries a read-only fast path first; only
728 /// promotes to a write lock if the entry doesn't exist yet.
729 #[inline]
730 fn update_metrics<T: Event>(&self, _event: &T) {
731 let counters = self.counters_for::<T>();
732 counters.record_dispatch();
733 }
734
735 /// Look up (or create) the per-type counters. The fast path holds
736 /// only a read lock; the slow path promotes to a write lock and
737 /// double-checks the entry to avoid a torn-creation race.
738 #[inline]
739 fn counters_for<T: Event + 'static>(&self) -> Arc<EventMetricsCounters> {
740 let type_id = TypeId::of::<T>();
741
742 if let Some(existing) = self.metrics.read().get(&type_id) {
743 return Arc::clone(existing);
744 }
745
746 let mut metrics = self.metrics.write();
747 Arc::clone(
748 metrics
749 .entry(type_id)
750 .or_insert_with(|| Arc::new(EventMetricsCounters::new::<T>())),
751 )
752 }
753
754 #[inline]
755 fn check_middleware(&self, event: &dyn Event) -> bool {
756 self.middleware.read().process(event)
757 }
758}
759
760impl Default for EventDispatcher {
761 fn default() -> Self {
762 Self::new()
763 }
764}