Skip to main content

mod_events/
dispatcher.rs

1//! Main event dispatcher implementation
2
3use crate::error::panic_payload_to_listener_error;
4use crate::metrics::EventMetricsCounters;
5use crate::{
6    DispatchResult, Event, EventMetadata, ListenerError, ListenerId, ListenerWrapper,
7    MiddlewareManager, Priority,
8};
9use parking_lot::RwLock;
10use std::any::TypeId;
11use std::collections::HashMap;
12use std::panic::{catch_unwind, AssertUnwindSafe};
13use std::sync::atomic::{AtomicUsize, Ordering};
14use std::sync::Arc;
15
16#[cfg(feature = "async")]
17use crate::{AsyncEventResult, AsyncListenerWrapper};
18
19#[cfg(feature = "async")]
20type AsyncHandler = Arc<dyn for<'a> Fn(&'a dyn Event) -> AsyncEventResult<'a> + Send + Sync>;
21
22/// High-performance event dispatcher
23///
24/// The main component of the Mod Events system. Thread-safe and optimized
25/// for high-performance event dispatch with minimal overhead.
26///
27/// # Example
28///
29/// ```rust
30/// use mod_events::{EventDispatcher, Event};
31///
32/// #[derive(Debug, Clone)]
33/// struct MyEvent {
34///     message: String,
35/// }
36///
37/// impl Event for MyEvent {
38///     fn as_any(&self) -> &dyn std::any::Any {
39///         self
40///     }
41/// }
42///
43/// let dispatcher = EventDispatcher::new();
44///
45/// dispatcher.on(|event: &MyEvent| {
46///     println!("Received: {}", event.message);
47/// });
48///
49/// dispatcher.emit(MyEvent {
50///     message: "Hello, World!".to_string(),
51/// });
52/// ```
53pub struct EventDispatcher {
54    listeners: Arc<RwLock<HashMap<TypeId, Vec<ListenerWrapper>>>>,
55    #[cfg(feature = "async")]
56    async_listeners: Arc<RwLock<HashMap<TypeId, Vec<AsyncListenerWrapper>>>>,
57    next_id: AtomicUsize,
58    // Per-event-type counters live behind an `Arc` so the dispatch hot
59    // path can clone the counter pointer under a read lock and increment
60    // its atomics without ever touching the outer map's write lock.
61    metrics: Arc<RwLock<HashMap<TypeId, Arc<EventMetricsCounters>>>>,
62    middleware: Arc<RwLock<MiddlewareManager>>,
63}
64
65impl EventDispatcher {
66    /// Create a new event dispatcher.
67    #[must_use]
68    pub fn new() -> Self {
69        Self {
70            listeners: Arc::new(RwLock::new(HashMap::new())),
71            #[cfg(feature = "async")]
72            async_listeners: Arc::new(RwLock::new(HashMap::new())),
73            next_id: AtomicUsize::new(0),
74            metrics: Arc::new(RwLock::new(HashMap::new())),
75            middleware: Arc::new(RwLock::new(MiddlewareManager::new())),
76        }
77    }
78
79    /// Subscribe to an event with a closure that can return errors.
80    ///
81    /// # Example
82    ///
83    /// ```rust
84    /// use mod_events::{EventDispatcher, Event};
85    ///
86    /// #[derive(Debug, Clone)]
87    /// struct MyEvent {
88    ///     message: String,
89    /// }
90    ///
91    /// impl Event for MyEvent {
92    ///     fn as_any(&self) -> &dyn std::any::Any {
93    ///         self
94    ///     }
95    /// }
96    ///
97    /// let dispatcher = EventDispatcher::new();
98    /// dispatcher.subscribe(|event: &MyEvent| {
99    ///     if event.message.is_empty() {
100    ///         return Err("Message cannot be empty".into());
101    ///     }
102    ///     println!("Message: {}", event.message);
103    ///     Ok(())
104    /// });
105    /// ```
106    pub fn subscribe<T, F>(&self, listener: F) -> ListenerId
107    where
108        T: Event + 'static,
109        F: Fn(&T) -> Result<(), ListenerError> + Send + Sync + 'static,
110    {
111        self.subscribe_with_priority(listener, Priority::Normal)
112    }
113
114    /// Subscribe to an event with a specific priority.
115    pub fn subscribe_with_priority<T, F>(&self, listener: F, priority: Priority) -> ListenerId
116    where
117        T: Event + 'static,
118        F: Fn(&T) -> Result<(), ListenerError> + Send + Sync + 'static,
119    {
120        let type_id = TypeId::of::<T>();
121        let id = self.next_id.fetch_add(1, Ordering::Relaxed);
122
123        let wrapper = ListenerWrapper::new(listener, priority, id);
124
125        {
126            let mut listeners = self.listeners.write();
127            let event_listeners = listeners.entry(type_id).or_default();
128            // Binary insertion preserves descending-priority order in O(n)
129            // (one shift), avoiding the O(n log n) full re-sort the
130            // previous push + sort_by_key combination performed.
131            // `partition_point` finds the first index whose listener has
132            // a strictly lower priority than the new one, which is also
133            // where to insert. Equal-priority listeners run in
134            // registration order (FIFO).
135            let pos = event_listeners.partition_point(|existing| existing.priority >= priority);
136            event_listeners.insert(pos, wrapper);
137        }
138
139        // Make sure a metrics entry exists for this type so `metrics()`
140        // can report it even before the first dispatch.
141        let _counters = self.counters_for::<T>();
142
143        ListenerId::new(id, type_id)
144    }
145
146    /// Subscribe to an event with simple closure (no error handling).
147    ///
148    /// This is the most convenient method for simple event handling.
149    ///
150    /// # Example
151    ///
152    /// ```rust
153    /// use mod_events::{EventDispatcher, Event};
154    ///
155    /// #[derive(Debug, Clone)]
156    /// struct MyEvent {
157    ///     message: String,
158    /// }
159    ///
160    /// impl Event for MyEvent {
161    ///     fn as_any(&self) -> &dyn std::any::Any {
162    ///         self
163    ///     }
164    /// }
165    ///
166    /// let dispatcher = EventDispatcher::new();
167    /// dispatcher.on(|event: &MyEvent| {
168    ///     println!("Received: {}", event.message);
169    /// });
170    /// ```
171    pub fn on<T, F>(&self, listener: F) -> ListenerId
172    where
173        T: Event + 'static,
174        F: Fn(&T) + Send + Sync + 'static,
175    {
176        self.subscribe(move |event: &T| {
177            listener(event);
178            Ok(())
179        })
180    }
181
182    /// Subscribe an async listener at [`Priority::Normal`] (requires the
183    /// `async` feature).
184    ///
185    /// The listener receives a borrowed event and returns a future that
186    /// resolves to `Result<(), ListenerError>`.
187    ///
188    /// # Example
189    ///
190    /// ```rust
191    /// # #[cfg(feature = "async")]
192    /// # {
193    /// use mod_events::{Event, EventDispatcher};
194    ///
195    /// #[derive(Debug, Clone)]
196    /// struct EmailSent { to: String }
197    ///
198    /// impl Event for EmailSent {
199    ///     fn as_any(&self) -> &dyn std::any::Any { self }
200    /// }
201    ///
202    /// let dispatcher = EventDispatcher::new();
203    /// dispatcher.subscribe_async(|event: &EmailSent| {
204    ///     let to = event.to.clone();
205    ///     async move {
206    ///         println!("delivered to {}", to);
207    ///         Ok(())
208    ///     }
209    /// });
210    /// # }
211    /// ```
212    #[cfg(feature = "async")]
213    pub fn subscribe_async<T, F, Fut>(&self, listener: F) -> ListenerId
214    where
215        T: Event + 'static,
216        F: Fn(&T) -> Fut + Send + Sync + 'static,
217        Fut: std::future::Future<Output = Result<(), ListenerError>> + Send + 'static,
218    {
219        self.subscribe_async_with_priority(listener, Priority::Normal)
220    }
221
222    /// Subscribe an async listener at a specific priority (requires the
223    /// `async` feature).
224    ///
225    /// Higher-priority listeners are awaited first within a single
226    /// [`Self::dispatch_async`] call.
227    ///
228    /// # Example
229    ///
230    /// ```rust
231    /// # #[cfg(feature = "async")]
232    /// # {
233    /// use mod_events::{Event, EventDispatcher, Priority};
234    ///
235    /// #[derive(Debug, Clone)]
236    /// struct EmailSent { to: String }
237    ///
238    /// impl Event for EmailSent {
239    ///     fn as_any(&self) -> &dyn std::any::Any { self }
240    /// }
241    ///
242    /// let dispatcher = EventDispatcher::new();
243    /// dispatcher.subscribe_async_with_priority(
244    ///     |_event: &EmailSent| async move {
245    ///         // logged before any other listener
246    ///         Ok(())
247    ///     },
248    ///     Priority::High,
249    /// );
250    /// # }
251    /// ```
252    #[cfg(feature = "async")]
253    pub fn subscribe_async_with_priority<T, F, Fut>(
254        &self,
255        listener: F,
256        priority: Priority,
257    ) -> ListenerId
258    where
259        T: Event + 'static,
260        F: Fn(&T) -> Fut + Send + Sync + 'static,
261        Fut: std::future::Future<Output = Result<(), ListenerError>> + Send + 'static,
262    {
263        let type_id = TypeId::of::<T>();
264        let id = self.next_id.fetch_add(1, Ordering::Relaxed);
265
266        let wrapper = AsyncListenerWrapper::new(listener, priority, id);
267
268        {
269            let mut async_listeners = self.async_listeners.write();
270            let event_listeners = async_listeners.entry(type_id).or_default();
271            // Binary insertion preserves descending-priority order in O(n).
272            // See `subscribe_with_priority` for the rationale.
273            let pos = event_listeners.partition_point(|existing| existing.priority >= priority);
274            event_listeners.insert(pos, wrapper);
275        }
276
277        // Make sure a metrics entry exists for this type so `metrics()`
278        // can report it even before the first dispatch.
279        let _counters = self.counters_for::<T>();
280
281        ListenerId::new(id, type_id)
282    }
283
284    /// Dispatch an event synchronously.
285    ///
286    /// Returns a [`DispatchResult`] containing per-listener outcomes.
287    ///
288    /// # Example
289    ///
290    /// ```rust
291    /// use mod_events::{EventDispatcher, Event};
292    ///
293    /// #[derive(Debug, Clone)]
294    /// struct MyEvent {
295    ///     message: String,
296    /// }
297    ///
298    /// impl Event for MyEvent {
299    ///     fn as_any(&self) -> &dyn std::any::Any {
300    ///         self
301    ///     }
302    /// }
303    ///
304    /// let dispatcher = EventDispatcher::new();
305    /// let result = dispatcher.dispatch(MyEvent {
306    ///     message: "Hello".to_string(),
307    /// });
308    ///
309    /// if result.all_succeeded() {
310    ///     println!("All listeners handled the event successfully");
311    /// }
312    /// ```
313    pub fn dispatch<T: Event>(&self, event: T) -> DispatchResult {
314        // Update metrics
315        self.update_metrics(&event);
316
317        // Check middleware
318        if !self.check_middleware(&event) {
319            return DispatchResult::blocked();
320        }
321
322        let type_id = TypeId::of::<T>();
323        let listeners = self.listeners.read();
324
325        // Most dispatches succeed. `errors` stays empty (and
326        // unallocated — `Vec::new()` does not allocate) on the
327        // success path; we only push when a listener returns `Err`
328        // or panics.
329        let mut errors: Vec<ListenerError> = Vec::new();
330        let mut listener_count = 0_usize;
331
332        if let Some(event_listeners) = listeners.get(&type_id) {
333            for listener in event_listeners {
334                listener_count += 1;
335                // `catch_unwind` keeps a panicking listener from
336                // unwinding the dispatch thread (and the read lock we
337                // are holding above) into the caller. Panics are
338                // converted into the same `ListenerError` shape a
339                // well-behaved listener would have returned, so the
340                // caller's `DispatchResult::errors()` is the single
341                // place to look for failures. `parking_lot::RwLock`
342                // does not poison, so dropping the guard after a
343                // caught panic leaves the registry in a usable state.
344                match catch_unwind(AssertUnwindSafe(|| (listener.handler)(&event))) {
345                    Ok(Ok(())) => {}
346                    Ok(Err(err)) => errors.push(err),
347                    Err(payload) => errors.push(panic_payload_to_listener_error(payload)),
348                }
349            }
350        }
351
352        DispatchResult::new(listener_count, errors)
353    }
354
355    /// Dispatch an event asynchronously and await every async listener
356    /// in priority order (requires the `async` feature).
357    ///
358    /// Listeners are awaited **sequentially** in descending priority
359    /// order. This preserves the priority contract — a high-priority
360    /// listener completes (or errors) before the next listener is
361    /// polled. Concurrent execution would lose ordering. If you want
362    /// true concurrent execution, drive `spawn` (e.g. `tokio::spawn`,
363    /// `async_std::task::spawn`) from inside each listener and return
364    /// `Ok(())` immediately.
365    ///
366    /// Each listener future is wrapped in `catch_unwind`, mirroring
367    /// the panic-safety guarantee of the sync [`Self::dispatch`]
368    /// path. A panic during `.await` becomes a `ListenerError` in
369    /// [`DispatchResult::errors`] with the prefix
370    /// `"listener panicked: "`. Subsequent listeners still run.
371    ///
372    /// Sync listeners registered via [`Self::on`] / [`Self::subscribe`]
373    /// are not invoked here; only listeners registered through
374    /// [`Self::subscribe_async`] / [`Self::subscribe_async_with_priority`]
375    /// are awaited.
376    ///
377    /// # Example
378    ///
379    /// ```rust
380    /// # #[cfg(feature = "async")]
381    /// # async fn doc_example() {
382    /// use mod_events::{Event, EventDispatcher};
383    ///
384    /// #[derive(Debug, Clone)]
385    /// struct OrderShipped { order_id: u64 }
386    ///
387    /// impl Event for OrderShipped {
388    ///     fn as_any(&self) -> &dyn std::any::Any { self }
389    /// }
390    ///
391    /// let dispatcher = EventDispatcher::new();
392    /// dispatcher.subscribe_async(|event: &OrderShipped| {
393    ///     let id = event.order_id;
394    ///     async move {
395    ///         println!("notifying customer about order {}", id);
396    ///         Ok(())
397    ///     }
398    /// });
399    ///
400    /// let result = dispatcher
401    ///     .dispatch_async(OrderShipped { order_id: 42 })
402    ///     .await;
403    /// assert!(result.all_succeeded());
404    /// # }
405    /// ```
406    #[cfg(feature = "async")]
407    pub async fn dispatch_async<T: Event>(&self, event: T) -> DispatchResult {
408        // Update metrics
409        self.update_metrics(&event);
410
411        // Check middleware
412        if !self.check_middleware(&event) {
413            return DispatchResult::blocked();
414        }
415
416        let type_id = TypeId::of::<T>();
417
418        // Collect cloned handlers without holding the lock across await points.
419        let handlers: Vec<AsyncHandler> = {
420            let async_listeners = self.async_listeners.read();
421            async_listeners
422                .get(&type_id)
423                .map(|event_listeners| {
424                    event_listeners
425                        .iter()
426                        .map(|listener| listener.handler.clone())
427                        .collect()
428                })
429                .unwrap_or_default()
430        };
431
432        // Wrap each listener future in `catch_unwind` so a panicking
433        // listener becomes a `ListenerError` in `DispatchResult` rather
434        // than unwinding the dispatching task. Mirrors the panic-safety
435        // wrapping on the sync `dispatch` path.
436        //
437        // `AssertUnwindSafe` is required because the listener future
438        // closes over arbitrary user state that may not implement
439        // `UnwindSafe`. The contract is: if a listener panics, the
440        // dispatcher catches it and reports it; the listener author is
441        // responsible for not leaving captured state in a broken
442        // condition before panicking. Same contract as the sync path.
443        use futures_util::future::FutureExt;
444        use std::panic::AssertUnwindSafe;
445
446        // `errors` stays empty (and unallocated) on the success path.
447        let listener_count = handlers.len();
448        let mut errors: Vec<ListenerError> = Vec::new();
449        for handler in handlers {
450            match AssertUnwindSafe(handler(&event)).catch_unwind().await {
451                Ok(Ok(())) => {}
452                Ok(Err(err)) => errors.push(err),
453                Err(payload) => errors.push(panic_payload_to_listener_error(payload)),
454            }
455        }
456
457        DispatchResult::new(listener_count, errors)
458    }
459
460    /// Fire and forget — dispatch without inspecting the result.
461    ///
462    /// Use this when listeners' success or failure is not actionable at
463    /// the call site (logging, fanout to passive observers). The errors
464    /// returned by failing listeners are discarded; if you need them,
465    /// call [`Self::dispatch`] instead.
466    ///
467    /// # Example
468    ///
469    /// ```rust
470    /// use mod_events::{EventDispatcher, Event};
471    ///
472    /// #[derive(Debug, Clone)]
473    /// struct MyEvent {
474    ///     message: String,
475    /// }
476    ///
477    /// impl Event for MyEvent {
478    ///     fn as_any(&self) -> &dyn std::any::Any {
479    ///         self
480    ///     }
481    /// }
482    ///
483    /// let dispatcher = EventDispatcher::new();
484    /// dispatcher.emit(MyEvent {
485    ///     message: "Fire and forget".to_string(),
486    /// });
487    /// ```
488    pub fn emit<T: Event>(&self, event: T) {
489        // Update metrics
490        self.update_metrics(&event);
491
492        // Check middleware
493        if !self.check_middleware(&event) {
494            return;
495        }
496
497        let type_id = TypeId::of::<T>();
498        let listeners = self.listeners.read();
499
500        if let Some(event_listeners) = listeners.get(&type_id) {
501            for listener in event_listeners {
502                // Same panic-safety contract as `dispatch`, but the
503                // outcome is intentionally discarded. This avoids
504                // allocating the per-call result vector that `dispatch`
505                // returns — the documented win of `emit` over
506                // `dispatch` for fire-and-forget callers.
507                let _ = catch_unwind(AssertUnwindSafe(|| (listener.handler)(&event)));
508            }
509        }
510    }
511
512    /// Add middleware that can block events.
513    ///
514    /// Middleware functions receive events and return `true` to allow
515    /// processing or `false` to block the event.
516    ///
517    /// # Example
518    ///
519    /// ```rust
520    /// use mod_events::{EventDispatcher, Event};
521    ///
522    /// let dispatcher = EventDispatcher::new();
523    /// dispatcher.add_middleware(|event: &dyn Event| {
524    ///     println!("Processing event: {}", event.event_name());
525    ///     true // Allow all events
526    /// });
527    /// ```
528    pub fn add_middleware<F>(&self, middleware: F)
529    where
530        F: Fn(&dyn Event) -> bool + Send + Sync + 'static,
531    {
532        self.middleware.write().add(middleware);
533    }
534
535    /// Remove a previously registered listener.
536    ///
537    /// Returns `true` if the listener was found and removed, `false`
538    /// if no listener with that id was registered (already removed,
539    /// never registered, or registered against a different event type).
540    ///
541    /// # Example
542    ///
543    /// ```rust
544    /// use mod_events::{Event, EventDispatcher};
545    ///
546    /// #[derive(Debug, Clone)]
547    /// struct Tick;
548    ///
549    /// impl Event for Tick {
550    ///     fn as_any(&self) -> &dyn std::any::Any { self }
551    /// }
552    ///
553    /// let dispatcher = EventDispatcher::new();
554    /// let id = dispatcher.on(|_: &Tick| {});
555    /// assert!(dispatcher.unsubscribe(id));
556    /// // Subsequent removals of the same id return false.
557    /// assert!(!dispatcher.unsubscribe(id));
558    /// ```
559    pub fn unsubscribe(&self, listener_id: ListenerId) -> bool {
560        // Try sync listeners first
561        {
562            let mut listeners = self.listeners.write();
563            if let Some(event_listeners) = listeners.get_mut(&listener_id.type_id) {
564                if let Some(pos) = event_listeners.iter().position(|l| l.id == listener_id.id) {
565                    let _removed = event_listeners.remove(pos);
566                    return true;
567                }
568            }
569        }
570
571        // Try async listeners
572        #[cfg(feature = "async")]
573        {
574            let mut async_listeners = self.async_listeners.write();
575            if let Some(event_listeners) = async_listeners.get_mut(&listener_id.type_id) {
576                if let Some(pos) = event_listeners.iter().position(|l| l.id == listener_id.id) {
577                    let _removed = event_listeners.remove(pos);
578                    return true;
579                }
580            }
581        }
582
583        false
584    }
585
586    /// Get the total number of listeners (sync + async, when the
587    /// `async` feature is enabled) registered for an event type.
588    ///
589    /// # Example
590    ///
591    /// ```rust
592    /// use mod_events::{Event, EventDispatcher};
593    ///
594    /// #[derive(Debug, Clone)]
595    /// struct Tick;
596    ///
597    /// impl Event for Tick {
598    ///     fn as_any(&self) -> &dyn std::any::Any { self }
599    /// }
600    ///
601    /// let dispatcher = EventDispatcher::new();
602    /// assert_eq!(dispatcher.listener_count::<Tick>(), 0);
603    /// let _ = dispatcher.on(|_: &Tick| {});
604    /// let _ = dispatcher.on(|_: &Tick| {});
605    /// assert_eq!(dispatcher.listener_count::<Tick>(), 2);
606    /// ```
607    #[must_use]
608    pub fn listener_count<T: Event + 'static>(&self) -> usize {
609        let type_id = TypeId::of::<T>();
610        let sync_count = self
611            .listeners
612            .read()
613            .get(&type_id)
614            .map(Vec::len)
615            .unwrap_or(0);
616
617        #[cfg(feature = "async")]
618        let async_count = self
619            .async_listeners
620            .read()
621            .get(&type_id)
622            .map(Vec::len)
623            .unwrap_or(0);
624
625        #[cfg(not(feature = "async"))]
626        let async_count = 0;
627
628        sync_count + async_count
629    }
630
631    /// Get a snapshot of per-event-type [`EventMetadata`] keyed by
632    /// `TypeId`.
633    ///
634    /// The returned map is a fresh snapshot. Subsequent dispatches do
635    /// not mutate it, but the snapshot may be slightly behind the live
636    /// counters because `dispatch_count` is read independently of
637    /// `last_dispatch`.
638    ///
639    /// # Example
640    ///
641    /// ```rust
642    /// use mod_events::{Event, EventDispatcher};
643    /// use std::any::TypeId;
644    ///
645    /// #[derive(Debug, Clone)]
646    /// struct Tick;
647    ///
648    /// impl Event for Tick {
649    ///     fn as_any(&self) -> &dyn std::any::Any { self }
650    /// }
651    ///
652    /// let dispatcher = EventDispatcher::new();
653    /// let _ = dispatcher.on(|_: &Tick| {});
654    /// dispatcher.emit(Tick);
655    /// dispatcher.emit(Tick);
656    ///
657    /// let snapshot = dispatcher.metrics();
658    /// let meta = snapshot.get(&TypeId::of::<Tick>()).unwrap();
659    /// assert_eq!(meta.dispatch_count, 2);
660    /// ```
661    #[must_use]
662    pub fn metrics(&self) -> HashMap<TypeId, EventMetadata> {
663        // Snapshot the metric counters first, then enrich each entry
664        // with the live listener count derived from the registry. This
665        // guarantees `listener_count` cannot drift because it is never
666        // stored — it is computed at the moment of observation.
667        let counters_map = self.metrics.read();
668        let listeners_map = self.listeners.read();
669        #[cfg(feature = "async")]
670        let async_listeners_map = self.async_listeners.read();
671
672        counters_map
673            .iter()
674            .map(|(type_id, counters)| {
675                let mut snap = counters.snapshot();
676                let sync_count = listeners_map.get(type_id).map(Vec::len).unwrap_or(0);
677                #[cfg(feature = "async")]
678                let async_count = async_listeners_map.get(type_id).map(Vec::len).unwrap_or(0);
679                #[cfg(not(feature = "async"))]
680                let async_count = 0;
681                snap.listener_count = sync_count + async_count;
682                (*type_id, snap)
683            })
684            .collect()
685    }
686
687    /// Drop every registered listener, both sync and async.
688    ///
689    /// Middleware and accumulated metrics are unaffected. Use
690    /// [`Self::clear_middleware`] if you also need to drop the
691    /// middleware chain.
692    ///
693    /// # Example
694    ///
695    /// ```rust
696    /// use mod_events::{Event, EventDispatcher};
697    ///
698    /// #[derive(Debug, Clone)]
699    /// struct Tick;
700    ///
701    /// impl Event for Tick {
702    ///     fn as_any(&self) -> &dyn std::any::Any { self }
703    /// }
704    ///
705    /// let dispatcher = EventDispatcher::new();
706    /// let _ = dispatcher.on(|_: &Tick| {});
707    /// dispatcher.clear();
708    /// assert_eq!(dispatcher.listener_count::<Tick>(), 0);
709    /// ```
710    pub fn clear(&self) {
711        self.listeners.write().clear();
712
713        #[cfg(feature = "async")]
714        self.async_listeners.write().clear();
715    }
716
717    /// Drop every registered middleware function.
718    ///
719    /// Listeners and accumulated metrics are unaffected. Useful in
720    /// test setup/teardown when you want to reset the middleware
721    /// chain between cases without rebuilding the dispatcher.
722    pub fn clear_middleware(&self) {
723        self.middleware.write().clear();
724    }
725
726    /// Hot-path metric update. Tries a read-only fast path first; only
727    /// promotes to a write lock if the entry doesn't exist yet.
728    fn update_metrics<T: Event>(&self, _event: &T) {
729        let counters = self.counters_for::<T>();
730        counters.record_dispatch();
731    }
732
733    /// Look up (or create) the per-type counters. The fast path holds
734    /// only a read lock; the slow path promotes to a write lock and
735    /// double-checks the entry to avoid a torn-creation race.
736    fn counters_for<T: Event + 'static>(&self) -> Arc<EventMetricsCounters> {
737        let type_id = TypeId::of::<T>();
738
739        if let Some(existing) = self.metrics.read().get(&type_id) {
740            return Arc::clone(existing);
741        }
742
743        let mut metrics = self.metrics.write();
744        Arc::clone(
745            metrics
746                .entry(type_id)
747                .or_insert_with(|| Arc::new(EventMetricsCounters::new::<T>())),
748        )
749    }
750
751    fn check_middleware(&self, event: &dyn Event) -> bool {
752        self.middleware.read().process(event)
753    }
754}
755
756impl Default for EventDispatcher {
757    fn default() -> Self {
758        Self::new()
759    }
760}