mod_events/dispatcher.rs
1//! Main event dispatcher implementation
2
3use crate::error::panic_payload_to_listener_error;
4use crate::metrics::EventMetricsCounters;
5use crate::{
6 DispatchResult, Event, EventMetadata, ListenerError, ListenerId, ListenerWrapper,
7 MiddlewareManager, Priority,
8};
9use parking_lot::RwLock;
10use std::any::TypeId;
11use std::collections::HashMap;
12use std::panic::{catch_unwind, AssertUnwindSafe};
13use std::sync::atomic::{AtomicUsize, Ordering};
14use std::sync::Arc;
15
16#[cfg(feature = "async")]
17use crate::{AsyncEventResult, AsyncListenerWrapper};
18
19#[cfg(feature = "async")]
20type AsyncHandler = Arc<dyn for<'a> Fn(&'a dyn Event) -> AsyncEventResult<'a> + Send + Sync>;
21
22/// High-performance event dispatcher
23///
24/// The main component of the Mod Events system. Thread-safe and optimized
25/// for high-performance event dispatch with minimal overhead.
26///
27/// # Example
28///
29/// ```rust
30/// use mod_events::{EventDispatcher, Event};
31///
32/// #[derive(Debug, Clone)]
33/// struct MyEvent {
34/// message: String,
35/// }
36///
37/// impl Event for MyEvent {
38/// fn as_any(&self) -> &dyn std::any::Any {
39/// self
40/// }
41/// }
42///
43/// let dispatcher = EventDispatcher::new();
44///
45/// dispatcher.on(|event: &MyEvent| {
46/// println!("Received: {}", event.message);
47/// });
48///
49/// dispatcher.emit(MyEvent {
50/// message: "Hello, World!".to_string(),
51/// });
52/// ```
53pub struct EventDispatcher {
54 listeners: Arc<RwLock<HashMap<TypeId, Vec<ListenerWrapper>>>>,
55 #[cfg(feature = "async")]
56 async_listeners: Arc<RwLock<HashMap<TypeId, Vec<AsyncListenerWrapper>>>>,
57 next_id: AtomicUsize,
58 // Per-event-type counters live behind an `Arc` so the dispatch hot
59 // path can clone the counter pointer under a read lock and increment
60 // its atomics without ever touching the outer map's write lock.
61 metrics: Arc<RwLock<HashMap<TypeId, Arc<EventMetricsCounters>>>>,
62 middleware: Arc<RwLock<MiddlewareManager>>,
63}
64
65impl EventDispatcher {
66 /// Create a new event dispatcher.
67 #[must_use]
68 pub fn new() -> Self {
69 Self {
70 listeners: Arc::new(RwLock::new(HashMap::new())),
71 #[cfg(feature = "async")]
72 async_listeners: Arc::new(RwLock::new(HashMap::new())),
73 next_id: AtomicUsize::new(0),
74 metrics: Arc::new(RwLock::new(HashMap::new())),
75 middleware: Arc::new(RwLock::new(MiddlewareManager::new())),
76 }
77 }
78
79 /// Subscribe to an event with a closure that can return errors.
80 ///
81 /// # Example
82 ///
83 /// ```rust
84 /// use mod_events::{EventDispatcher, Event};
85 ///
86 /// #[derive(Debug, Clone)]
87 /// struct MyEvent {
88 /// message: String,
89 /// }
90 ///
91 /// impl Event for MyEvent {
92 /// fn as_any(&self) -> &dyn std::any::Any {
93 /// self
94 /// }
95 /// }
96 ///
97 /// let dispatcher = EventDispatcher::new();
98 /// dispatcher.subscribe(|event: &MyEvent| {
99 /// if event.message.is_empty() {
100 /// return Err("Message cannot be empty".into());
101 /// }
102 /// println!("Message: {}", event.message);
103 /// Ok(())
104 /// });
105 /// ```
106 pub fn subscribe<T, F>(&self, listener: F) -> ListenerId
107 where
108 T: Event + 'static,
109 F: Fn(&T) -> Result<(), ListenerError> + Send + Sync + 'static,
110 {
111 self.subscribe_with_priority(listener, Priority::Normal)
112 }
113
114 /// Subscribe to an event with a specific priority.
115 pub fn subscribe_with_priority<T, F>(&self, listener: F, priority: Priority) -> ListenerId
116 where
117 T: Event + 'static,
118 F: Fn(&T) -> Result<(), ListenerError> + Send + Sync + 'static,
119 {
120 let type_id = TypeId::of::<T>();
121 let id = self.next_id.fetch_add(1, Ordering::Relaxed);
122
123 let wrapper = ListenerWrapper::new(listener, priority, id);
124
125 {
126 let mut listeners = self.listeners.write();
127 let event_listeners = listeners.entry(type_id).or_default();
128 // Binary insertion preserves descending-priority order in O(n)
129 // (one shift), avoiding the O(n log n) full re-sort the
130 // previous push + sort_by_key combination performed.
131 // `partition_point` finds the first index whose listener has
132 // a strictly lower priority than the new one, which is also
133 // where to insert. Equal-priority listeners run in
134 // registration order (FIFO).
135 let pos = event_listeners.partition_point(|existing| existing.priority >= priority);
136 event_listeners.insert(pos, wrapper);
137 }
138
139 // Make sure a metrics entry exists for this type so `metrics()`
140 // can report it even before the first dispatch.
141 let _counters = self.counters_for::<T>();
142
143 ListenerId::new(id, type_id)
144 }
145
146 /// Subscribe to an event with simple closure (no error handling).
147 ///
148 /// This is the most convenient method for simple event handling.
149 ///
150 /// # Example
151 ///
152 /// ```rust
153 /// use mod_events::{EventDispatcher, Event};
154 ///
155 /// #[derive(Debug, Clone)]
156 /// struct MyEvent {
157 /// message: String,
158 /// }
159 ///
160 /// impl Event for MyEvent {
161 /// fn as_any(&self) -> &dyn std::any::Any {
162 /// self
163 /// }
164 /// }
165 ///
166 /// let dispatcher = EventDispatcher::new();
167 /// dispatcher.on(|event: &MyEvent| {
168 /// println!("Received: {}", event.message);
169 /// });
170 /// ```
171 pub fn on<T, F>(&self, listener: F) -> ListenerId
172 where
173 T: Event + 'static,
174 F: Fn(&T) + Send + Sync + 'static,
175 {
176 self.subscribe(move |event: &T| {
177 listener(event);
178 Ok(())
179 })
180 }
181
182 /// Subscribe an async listener at [`Priority::Normal`] (requires the
183 /// `async` feature).
184 ///
185 /// The listener receives a borrowed event and returns a future that
186 /// resolves to `Result<(), ListenerError>`.
187 ///
188 /// # Example
189 ///
190 /// ```rust
191 /// # #[cfg(feature = "async")]
192 /// # {
193 /// use mod_events::{Event, EventDispatcher};
194 ///
195 /// #[derive(Debug, Clone)]
196 /// struct EmailSent { to: String }
197 ///
198 /// impl Event for EmailSent {
199 /// fn as_any(&self) -> &dyn std::any::Any { self }
200 /// }
201 ///
202 /// let dispatcher = EventDispatcher::new();
203 /// dispatcher.subscribe_async(|event: &EmailSent| {
204 /// let to = event.to.clone();
205 /// async move {
206 /// println!("delivered to {}", to);
207 /// Ok(())
208 /// }
209 /// });
210 /// # }
211 /// ```
212 #[cfg(feature = "async")]
213 pub fn subscribe_async<T, F, Fut>(&self, listener: F) -> ListenerId
214 where
215 T: Event + 'static,
216 F: Fn(&T) -> Fut + Send + Sync + 'static,
217 Fut: std::future::Future<Output = Result<(), ListenerError>> + Send + 'static,
218 {
219 self.subscribe_async_with_priority(listener, Priority::Normal)
220 }
221
222 /// Subscribe an async listener at a specific priority (requires the
223 /// `async` feature).
224 ///
225 /// Higher-priority listeners are awaited first within a single
226 /// [`Self::dispatch_async`] call.
227 ///
228 /// # Example
229 ///
230 /// ```rust
231 /// # #[cfg(feature = "async")]
232 /// # {
233 /// use mod_events::{Event, EventDispatcher, Priority};
234 ///
235 /// #[derive(Debug, Clone)]
236 /// struct EmailSent { to: String }
237 ///
238 /// impl Event for EmailSent {
239 /// fn as_any(&self) -> &dyn std::any::Any { self }
240 /// }
241 ///
242 /// let dispatcher = EventDispatcher::new();
243 /// dispatcher.subscribe_async_with_priority(
244 /// |_event: &EmailSent| async move {
245 /// // logged before any other listener
246 /// Ok(())
247 /// },
248 /// Priority::High,
249 /// );
250 /// # }
251 /// ```
252 #[cfg(feature = "async")]
253 pub fn subscribe_async_with_priority<T, F, Fut>(
254 &self,
255 listener: F,
256 priority: Priority,
257 ) -> ListenerId
258 where
259 T: Event + 'static,
260 F: Fn(&T) -> Fut + Send + Sync + 'static,
261 Fut: std::future::Future<Output = Result<(), ListenerError>> + Send + 'static,
262 {
263 let type_id = TypeId::of::<T>();
264 let id = self.next_id.fetch_add(1, Ordering::Relaxed);
265
266 let wrapper = AsyncListenerWrapper::new(listener, priority, id);
267
268 {
269 let mut async_listeners = self.async_listeners.write();
270 let event_listeners = async_listeners.entry(type_id).or_default();
271 // Binary insertion preserves descending-priority order in O(n).
272 // See `subscribe_with_priority` for the rationale.
273 let pos = event_listeners.partition_point(|existing| existing.priority >= priority);
274 event_listeners.insert(pos, wrapper);
275 }
276
277 // Make sure a metrics entry exists for this type so `metrics()`
278 // can report it even before the first dispatch.
279 let _counters = self.counters_for::<T>();
280
281 ListenerId::new(id, type_id)
282 }
283
284 /// Dispatch an event synchronously.
285 ///
286 /// Returns a [`DispatchResult`] containing per-listener outcomes.
287 ///
288 /// # Example
289 ///
290 /// ```rust
291 /// use mod_events::{EventDispatcher, Event};
292 ///
293 /// #[derive(Debug, Clone)]
294 /// struct MyEvent {
295 /// message: String,
296 /// }
297 ///
298 /// impl Event for MyEvent {
299 /// fn as_any(&self) -> &dyn std::any::Any {
300 /// self
301 /// }
302 /// }
303 ///
304 /// let dispatcher = EventDispatcher::new();
305 /// let result = dispatcher.dispatch(MyEvent {
306 /// message: "Hello".to_string(),
307 /// });
308 ///
309 /// if result.all_succeeded() {
310 /// println!("All listeners handled the event successfully");
311 /// }
312 /// ```
313 pub fn dispatch<T: Event>(&self, event: T) -> DispatchResult {
314 // Update metrics
315 self.update_metrics(&event);
316
317 // Check middleware
318 if !self.check_middleware(&event) {
319 return DispatchResult::blocked();
320 }
321
322 let type_id = TypeId::of::<T>();
323 let listeners = self.listeners.read();
324
325 // Most dispatches succeed. `errors` stays empty (and
326 // unallocated — `Vec::new()` does not allocate) on the
327 // success path; we only push when a listener returns `Err`
328 // or panics.
329 let mut errors: Vec<ListenerError> = Vec::new();
330 let mut listener_count = 0_usize;
331
332 if let Some(event_listeners) = listeners.get(&type_id) {
333 for listener in event_listeners {
334 listener_count += 1;
335 // `catch_unwind` keeps a panicking listener from
336 // unwinding the dispatch thread (and the read lock we
337 // are holding above) into the caller. Panics are
338 // converted into the same `ListenerError` shape a
339 // well-behaved listener would have returned, so the
340 // caller's `DispatchResult::errors()` is the single
341 // place to look for failures. `parking_lot::RwLock`
342 // does not poison, so dropping the guard after a
343 // caught panic leaves the registry in a usable state.
344 match catch_unwind(AssertUnwindSafe(|| (listener.handler)(&event))) {
345 Ok(Ok(())) => {}
346 Ok(Err(err)) => errors.push(err),
347 Err(payload) => errors.push(panic_payload_to_listener_error(payload)),
348 }
349 }
350 }
351
352 DispatchResult::new(listener_count, errors)
353 }
354
355 /// Dispatch an event asynchronously and await every async listener
356 /// in priority order (requires the `async` feature).
357 ///
358 /// Listeners are awaited **sequentially** in descending priority
359 /// order. This preserves the priority contract — a high-priority
360 /// listener completes (or errors) before the next listener is
361 /// polled. Concurrent execution would lose ordering. If you want
362 /// true concurrent execution, drive `spawn` (e.g. `tokio::spawn`,
363 /// `async_std::task::spawn`) from inside each listener and return
364 /// `Ok(())` immediately.
365 ///
366 /// Each listener future is wrapped in `catch_unwind`, mirroring
367 /// the panic-safety guarantee of the sync [`Self::dispatch`]
368 /// path. A panic during `.await` becomes a `ListenerError` in
369 /// [`DispatchResult::errors`] with the prefix
370 /// `"listener panicked: "`. Subsequent listeners still run.
371 ///
372 /// Sync listeners registered via [`Self::on`] / [`Self::subscribe`]
373 /// are not invoked here; only listeners registered through
374 /// [`Self::subscribe_async`] / [`Self::subscribe_async_with_priority`]
375 /// are awaited.
376 ///
377 /// # Example
378 ///
379 /// ```rust
380 /// # #[cfg(feature = "async")]
381 /// # async fn doc_example() {
382 /// use mod_events::{Event, EventDispatcher};
383 ///
384 /// #[derive(Debug, Clone)]
385 /// struct OrderShipped { order_id: u64 }
386 ///
387 /// impl Event for OrderShipped {
388 /// fn as_any(&self) -> &dyn std::any::Any { self }
389 /// }
390 ///
391 /// let dispatcher = EventDispatcher::new();
392 /// dispatcher.subscribe_async(|event: &OrderShipped| {
393 /// let id = event.order_id;
394 /// async move {
395 /// println!("notifying customer about order {}", id);
396 /// Ok(())
397 /// }
398 /// });
399 ///
400 /// let result = dispatcher
401 /// .dispatch_async(OrderShipped { order_id: 42 })
402 /// .await;
403 /// assert!(result.all_succeeded());
404 /// # }
405 /// ```
406 #[cfg(feature = "async")]
407 pub async fn dispatch_async<T: Event>(&self, event: T) -> DispatchResult {
408 // Update metrics
409 self.update_metrics(&event);
410
411 // Check middleware
412 if !self.check_middleware(&event) {
413 return DispatchResult::blocked();
414 }
415
416 let type_id = TypeId::of::<T>();
417
418 // Collect cloned handlers without holding the lock across await points.
419 let handlers: Vec<AsyncHandler> = {
420 let async_listeners = self.async_listeners.read();
421 async_listeners
422 .get(&type_id)
423 .map(|event_listeners| {
424 event_listeners
425 .iter()
426 .map(|listener| listener.handler.clone())
427 .collect()
428 })
429 .unwrap_or_default()
430 };
431
432 // Wrap each listener future in `catch_unwind` so a panicking
433 // listener becomes a `ListenerError` in `DispatchResult` rather
434 // than unwinding the dispatching task. Mirrors the panic-safety
435 // wrapping on the sync `dispatch` path.
436 //
437 // `AssertUnwindSafe` is required because the listener future
438 // closes over arbitrary user state that may not implement
439 // `UnwindSafe`. The contract is: if a listener panics, the
440 // dispatcher catches it and reports it; the listener author is
441 // responsible for not leaving captured state in a broken
442 // condition before panicking. Same contract as the sync path.
443 use futures_util::future::FutureExt;
444 use std::panic::AssertUnwindSafe;
445
446 // `errors` stays empty (and unallocated) on the success path.
447 let listener_count = handlers.len();
448 let mut errors: Vec<ListenerError> = Vec::new();
449 for handler in handlers {
450 match AssertUnwindSafe(handler(&event)).catch_unwind().await {
451 Ok(Ok(())) => {}
452 Ok(Err(err)) => errors.push(err),
453 Err(payload) => errors.push(panic_payload_to_listener_error(payload)),
454 }
455 }
456
457 DispatchResult::new(listener_count, errors)
458 }
459
460 /// Fire and forget — dispatch without inspecting the result.
461 ///
462 /// Use this when listeners' success or failure is not actionable at
463 /// the call site (logging, fanout to passive observers). The errors
464 /// returned by failing listeners are discarded; if you need them,
465 /// call [`Self::dispatch`] instead.
466 ///
467 /// # Example
468 ///
469 /// ```rust
470 /// use mod_events::{EventDispatcher, Event};
471 ///
472 /// #[derive(Debug, Clone)]
473 /// struct MyEvent {
474 /// message: String,
475 /// }
476 ///
477 /// impl Event for MyEvent {
478 /// fn as_any(&self) -> &dyn std::any::Any {
479 /// self
480 /// }
481 /// }
482 ///
483 /// let dispatcher = EventDispatcher::new();
484 /// dispatcher.emit(MyEvent {
485 /// message: "Fire and forget".to_string(),
486 /// });
487 /// ```
488 pub fn emit<T: Event>(&self, event: T) {
489 // Update metrics
490 self.update_metrics(&event);
491
492 // Check middleware
493 if !self.check_middleware(&event) {
494 return;
495 }
496
497 let type_id = TypeId::of::<T>();
498 let listeners = self.listeners.read();
499
500 if let Some(event_listeners) = listeners.get(&type_id) {
501 for listener in event_listeners {
502 // Same panic-safety contract as `dispatch`, but the
503 // outcome is intentionally discarded. This avoids
504 // allocating the per-call result vector that `dispatch`
505 // returns — the documented win of `emit` over
506 // `dispatch` for fire-and-forget callers.
507 let _ = catch_unwind(AssertUnwindSafe(|| (listener.handler)(&event)));
508 }
509 }
510 }
511
512 /// Add middleware that can block events.
513 ///
514 /// Middleware functions receive events and return `true` to allow
515 /// processing or `false` to block the event.
516 ///
517 /// # Example
518 ///
519 /// ```rust
520 /// use mod_events::{EventDispatcher, Event};
521 ///
522 /// let dispatcher = EventDispatcher::new();
523 /// dispatcher.add_middleware(|event: &dyn Event| {
524 /// println!("Processing event: {}", event.event_name());
525 /// true // Allow all events
526 /// });
527 /// ```
528 pub fn add_middleware<F>(&self, middleware: F)
529 where
530 F: Fn(&dyn Event) -> bool + Send + Sync + 'static,
531 {
532 self.middleware.write().add(middleware);
533 }
534
535 /// Remove a previously registered listener.
536 ///
537 /// Returns `true` if the listener was found and removed, `false`
538 /// if no listener with that id was registered (already removed,
539 /// never registered, or registered against a different event type).
540 ///
541 /// # Example
542 ///
543 /// ```rust
544 /// use mod_events::{Event, EventDispatcher};
545 ///
546 /// #[derive(Debug, Clone)]
547 /// struct Tick;
548 ///
549 /// impl Event for Tick {
550 /// fn as_any(&self) -> &dyn std::any::Any { self }
551 /// }
552 ///
553 /// let dispatcher = EventDispatcher::new();
554 /// let id = dispatcher.on(|_: &Tick| {});
555 /// assert!(dispatcher.unsubscribe(id));
556 /// // Subsequent removals of the same id return false.
557 /// assert!(!dispatcher.unsubscribe(id));
558 /// ```
559 pub fn unsubscribe(&self, listener_id: ListenerId) -> bool {
560 // Try sync listeners first
561 {
562 let mut listeners = self.listeners.write();
563 if let Some(event_listeners) = listeners.get_mut(&listener_id.type_id) {
564 if let Some(pos) = event_listeners.iter().position(|l| l.id == listener_id.id) {
565 let _removed = event_listeners.remove(pos);
566 return true;
567 }
568 }
569 }
570
571 // Try async listeners
572 #[cfg(feature = "async")]
573 {
574 let mut async_listeners = self.async_listeners.write();
575 if let Some(event_listeners) = async_listeners.get_mut(&listener_id.type_id) {
576 if let Some(pos) = event_listeners.iter().position(|l| l.id == listener_id.id) {
577 let _removed = event_listeners.remove(pos);
578 return true;
579 }
580 }
581 }
582
583 false
584 }
585
586 /// Get the total number of listeners (sync + async, when the
587 /// `async` feature is enabled) registered for an event type.
588 ///
589 /// # Example
590 ///
591 /// ```rust
592 /// use mod_events::{Event, EventDispatcher};
593 ///
594 /// #[derive(Debug, Clone)]
595 /// struct Tick;
596 ///
597 /// impl Event for Tick {
598 /// fn as_any(&self) -> &dyn std::any::Any { self }
599 /// }
600 ///
601 /// let dispatcher = EventDispatcher::new();
602 /// assert_eq!(dispatcher.listener_count::<Tick>(), 0);
603 /// let _ = dispatcher.on(|_: &Tick| {});
604 /// let _ = dispatcher.on(|_: &Tick| {});
605 /// assert_eq!(dispatcher.listener_count::<Tick>(), 2);
606 /// ```
607 #[must_use]
608 pub fn listener_count<T: Event + 'static>(&self) -> usize {
609 let type_id = TypeId::of::<T>();
610 let sync_count = self
611 .listeners
612 .read()
613 .get(&type_id)
614 .map(Vec::len)
615 .unwrap_or(0);
616
617 #[cfg(feature = "async")]
618 let async_count = self
619 .async_listeners
620 .read()
621 .get(&type_id)
622 .map(Vec::len)
623 .unwrap_or(0);
624
625 #[cfg(not(feature = "async"))]
626 let async_count = 0;
627
628 sync_count + async_count
629 }
630
631 /// Get a snapshot of per-event-type [`EventMetadata`] keyed by
632 /// `TypeId`.
633 ///
634 /// The returned map is a fresh snapshot. Subsequent dispatches do
635 /// not mutate it, but the snapshot may be slightly behind the live
636 /// counters because `dispatch_count` is read independently of
637 /// `last_dispatch`.
638 ///
639 /// # Example
640 ///
641 /// ```rust
642 /// use mod_events::{Event, EventDispatcher};
643 /// use std::any::TypeId;
644 ///
645 /// #[derive(Debug, Clone)]
646 /// struct Tick;
647 ///
648 /// impl Event for Tick {
649 /// fn as_any(&self) -> &dyn std::any::Any { self }
650 /// }
651 ///
652 /// let dispatcher = EventDispatcher::new();
653 /// let _ = dispatcher.on(|_: &Tick| {});
654 /// dispatcher.emit(Tick);
655 /// dispatcher.emit(Tick);
656 ///
657 /// let snapshot = dispatcher.metrics();
658 /// let meta = snapshot.get(&TypeId::of::<Tick>()).unwrap();
659 /// assert_eq!(meta.dispatch_count, 2);
660 /// ```
661 #[must_use]
662 pub fn metrics(&self) -> HashMap<TypeId, EventMetadata> {
663 // Snapshot the metric counters first, then enrich each entry
664 // with the live listener count derived from the registry. This
665 // guarantees `listener_count` cannot drift because it is never
666 // stored — it is computed at the moment of observation.
667 let counters_map = self.metrics.read();
668 let listeners_map = self.listeners.read();
669 #[cfg(feature = "async")]
670 let async_listeners_map = self.async_listeners.read();
671
672 counters_map
673 .iter()
674 .map(|(type_id, counters)| {
675 let mut snap = counters.snapshot();
676 let sync_count = listeners_map.get(type_id).map(Vec::len).unwrap_or(0);
677 #[cfg(feature = "async")]
678 let async_count = async_listeners_map.get(type_id).map(Vec::len).unwrap_or(0);
679 #[cfg(not(feature = "async"))]
680 let async_count = 0;
681 snap.listener_count = sync_count + async_count;
682 (*type_id, snap)
683 })
684 .collect()
685 }
686
687 /// Drop every registered listener, both sync and async.
688 ///
689 /// Middleware and accumulated metrics are unaffected. Use
690 /// [`Self::clear_middleware`] if you also need to drop the
691 /// middleware chain.
692 ///
693 /// # Example
694 ///
695 /// ```rust
696 /// use mod_events::{Event, EventDispatcher};
697 ///
698 /// #[derive(Debug, Clone)]
699 /// struct Tick;
700 ///
701 /// impl Event for Tick {
702 /// fn as_any(&self) -> &dyn std::any::Any { self }
703 /// }
704 ///
705 /// let dispatcher = EventDispatcher::new();
706 /// let _ = dispatcher.on(|_: &Tick| {});
707 /// dispatcher.clear();
708 /// assert_eq!(dispatcher.listener_count::<Tick>(), 0);
709 /// ```
710 pub fn clear(&self) {
711 self.listeners.write().clear();
712
713 #[cfg(feature = "async")]
714 self.async_listeners.write().clear();
715 }
716
717 /// Drop every registered middleware function.
718 ///
719 /// Listeners and accumulated metrics are unaffected. Useful in
720 /// test setup/teardown when you want to reset the middleware
721 /// chain between cases without rebuilding the dispatcher.
722 pub fn clear_middleware(&self) {
723 self.middleware.write().clear();
724 }
725
726 /// Hot-path metric update. Tries a read-only fast path first; only
727 /// promotes to a write lock if the entry doesn't exist yet.
728 fn update_metrics<T: Event>(&self, _event: &T) {
729 let counters = self.counters_for::<T>();
730 counters.record_dispatch();
731 }
732
733 /// Look up (or create) the per-type counters. The fast path holds
734 /// only a read lock; the slow path promotes to a write lock and
735 /// double-checks the entry to avoid a torn-creation race.
736 fn counters_for<T: Event + 'static>(&self) -> Arc<EventMetricsCounters> {
737 let type_id = TypeId::of::<T>();
738
739 if let Some(existing) = self.metrics.read().get(&type_id) {
740 return Arc::clone(existing);
741 }
742
743 let mut metrics = self.metrics.write();
744 Arc::clone(
745 metrics
746 .entry(type_id)
747 .or_insert_with(|| Arc::new(EventMetricsCounters::new::<T>())),
748 )
749 }
750
751 fn check_middleware(&self, event: &dyn Event) -> bool {
752 self.middleware.read().process(event)
753 }
754}
755
756impl Default for EventDispatcher {
757 fn default() -> Self {
758 Self::new()
759 }
760}