Skip to main content

mod_events/
dispatcher.rs

1//! Main event dispatcher implementation
2
3use crate::metrics::EventMetricsCounters;
4use crate::{
5    DispatchResult, Event, EventMetadata, ListenerError, ListenerId, ListenerWrapper,
6    MiddlewareManager, Priority,
7};
8use parking_lot::RwLock;
9use std::any::TypeId;
10use std::collections::HashMap;
11use std::sync::atomic::{AtomicUsize, Ordering};
12use std::sync::Arc;
13
14#[cfg(feature = "async")]
15use crate::{AsyncEventResult, AsyncListenerWrapper};
16
17#[cfg(feature = "async")]
18type AsyncHandler = Arc<dyn for<'a> Fn(&'a dyn Event) -> AsyncEventResult<'a> + Send + Sync>;
19
20/// High-performance event dispatcher
21///
22/// The main component of the Mod Events system. Thread-safe and optimized
23/// for high-performance event dispatch with minimal overhead.
24///
25/// # Example
26///
27/// ```rust
28/// use mod_events::{EventDispatcher, Event};
29///
30/// #[derive(Debug, Clone)]
31/// struct MyEvent {
32///     message: String,
33/// }
34///
35/// impl Event for MyEvent {
36///     fn as_any(&self) -> &dyn std::any::Any {
37///         self
38///     }
39/// }
40///
41/// let dispatcher = EventDispatcher::new();
42///
43/// dispatcher.on(|event: &MyEvent| {
44///     println!("Received: {}", event.message);
45/// });
46///
47/// dispatcher.emit(MyEvent {
48///     message: "Hello, World!".to_string(),
49/// });
50/// ```
51pub struct EventDispatcher {
52    listeners: Arc<RwLock<HashMap<TypeId, Vec<ListenerWrapper>>>>,
53    #[cfg(feature = "async")]
54    async_listeners: Arc<RwLock<HashMap<TypeId, Vec<AsyncListenerWrapper>>>>,
55    next_id: AtomicUsize,
56    // Per-event-type counters live behind an `Arc` so the dispatch hot
57    // path can clone the counter pointer under a read lock and increment
58    // its atomics without ever touching the outer map's write lock.
59    metrics: Arc<RwLock<HashMap<TypeId, Arc<EventMetricsCounters>>>>,
60    middleware: Arc<RwLock<MiddlewareManager>>,
61}
62
63impl EventDispatcher {
64    /// Create a new event dispatcher.
65    #[must_use]
66    pub fn new() -> Self {
67        Self {
68            listeners: Arc::new(RwLock::new(HashMap::new())),
69            #[cfg(feature = "async")]
70            async_listeners: Arc::new(RwLock::new(HashMap::new())),
71            next_id: AtomicUsize::new(0),
72            metrics: Arc::new(RwLock::new(HashMap::new())),
73            middleware: Arc::new(RwLock::new(MiddlewareManager::new())),
74        }
75    }
76
77    /// Subscribe to an event with a closure that can return errors.
78    ///
79    /// # Example
80    ///
81    /// ```rust
82    /// use mod_events::{EventDispatcher, Event};
83    ///
84    /// #[derive(Debug, Clone)]
85    /// struct MyEvent {
86    ///     message: String,
87    /// }
88    ///
89    /// impl Event for MyEvent {
90    ///     fn as_any(&self) -> &dyn std::any::Any {
91    ///         self
92    ///     }
93    /// }
94    ///
95    /// let dispatcher = EventDispatcher::new();
96    /// dispatcher.subscribe(|event: &MyEvent| {
97    ///     if event.message.is_empty() {
98    ///         return Err("Message cannot be empty".into());
99    ///     }
100    ///     println!("Message: {}", event.message);
101    ///     Ok(())
102    /// });
103    /// ```
104    pub fn subscribe<T, F>(&self, listener: F) -> ListenerId
105    where
106        T: Event + 'static,
107        F: Fn(&T) -> Result<(), ListenerError> + Send + Sync + 'static,
108    {
109        self.subscribe_with_priority(listener, Priority::Normal)
110    }
111
112    /// Subscribe to an event with a specific priority.
113    pub fn subscribe_with_priority<T, F>(&self, listener: F, priority: Priority) -> ListenerId
114    where
115        T: Event + 'static,
116        F: Fn(&T) -> Result<(), ListenerError> + Send + Sync + 'static,
117    {
118        let type_id = TypeId::of::<T>();
119        let id = self.next_id.fetch_add(1, Ordering::Relaxed);
120
121        let wrapper = ListenerWrapper::new(listener, priority, id);
122
123        {
124            let mut listeners = self.listeners.write();
125            let event_listeners = listeners.entry(type_id).or_default();
126            // Binary insertion preserves descending-priority order in O(n)
127            // (one shift), avoiding the O(n log n) full re-sort the
128            // previous push + sort_by_key combination performed.
129            // `partition_point` finds the first index whose listener has
130            // a strictly lower priority than the new one, which is also
131            // where to insert. Equal-priority listeners run in
132            // registration order (FIFO).
133            let pos = event_listeners.partition_point(|existing| existing.priority >= priority);
134            event_listeners.insert(pos, wrapper);
135        }
136
137        // Make sure a metrics entry exists for this type so `metrics()`
138        // can report it even before the first dispatch.
139        let _counters = self.counters_for::<T>();
140
141        ListenerId::new(id, type_id)
142    }
143
144    /// Subscribe to an event with simple closure (no error handling).
145    ///
146    /// This is the most convenient method for simple event handling.
147    ///
148    /// # Example
149    ///
150    /// ```rust
151    /// use mod_events::{EventDispatcher, Event};
152    ///
153    /// #[derive(Debug, Clone)]
154    /// struct MyEvent {
155    ///     message: String,
156    /// }
157    ///
158    /// impl Event for MyEvent {
159    ///     fn as_any(&self) -> &dyn std::any::Any {
160    ///         self
161    ///     }
162    /// }
163    ///
164    /// let dispatcher = EventDispatcher::new();
165    /// dispatcher.on(|event: &MyEvent| {
166    ///     println!("Received: {}", event.message);
167    /// });
168    /// ```
169    pub fn on<T, F>(&self, listener: F) -> ListenerId
170    where
171        T: Event + 'static,
172        F: Fn(&T) + Send + Sync + 'static,
173    {
174        self.subscribe(move |event: &T| {
175            listener(event);
176            Ok(())
177        })
178    }
179
180    /// Subscribe an async listener at [`Priority::Normal`] (requires the
181    /// `async` feature).
182    ///
183    /// The listener receives a borrowed event and returns a future that
184    /// resolves to `Result<(), ListenerError>`.
185    ///
186    /// # Example
187    ///
188    /// ```rust
189    /// # #[cfg(feature = "async")]
190    /// # {
191    /// use mod_events::{Event, EventDispatcher};
192    ///
193    /// #[derive(Debug, Clone)]
194    /// struct EmailSent { to: String }
195    ///
196    /// impl Event for EmailSent {
197    ///     fn as_any(&self) -> &dyn std::any::Any { self }
198    /// }
199    ///
200    /// let dispatcher = EventDispatcher::new();
201    /// dispatcher.subscribe_async(|event: &EmailSent| {
202    ///     let to = event.to.clone();
203    ///     async move {
204    ///         println!("delivered to {}", to);
205    ///         Ok(())
206    ///     }
207    /// });
208    /// # }
209    /// ```
210    #[cfg(feature = "async")]
211    pub fn subscribe_async<T, F, Fut>(&self, listener: F) -> ListenerId
212    where
213        T: Event + 'static,
214        F: Fn(&T) -> Fut + Send + Sync + 'static,
215        Fut: std::future::Future<Output = Result<(), ListenerError>> + Send + 'static,
216    {
217        self.subscribe_async_with_priority(listener, Priority::Normal)
218    }
219
220    /// Subscribe an async listener at a specific priority (requires the
221    /// `async` feature).
222    ///
223    /// Higher-priority listeners are awaited first within a single
224    /// [`Self::dispatch_async`] call.
225    ///
226    /// # Example
227    ///
228    /// ```rust
229    /// # #[cfg(feature = "async")]
230    /// # {
231    /// use mod_events::{Event, EventDispatcher, Priority};
232    ///
233    /// #[derive(Debug, Clone)]
234    /// struct EmailSent { to: String }
235    ///
236    /// impl Event for EmailSent {
237    ///     fn as_any(&self) -> &dyn std::any::Any { self }
238    /// }
239    ///
240    /// let dispatcher = EventDispatcher::new();
241    /// dispatcher.subscribe_async_with_priority(
242    ///     |_event: &EmailSent| async move {
243    ///         // logged before any other listener
244    ///         Ok(())
245    ///     },
246    ///     Priority::High,
247    /// );
248    /// # }
249    /// ```
250    #[cfg(feature = "async")]
251    pub fn subscribe_async_with_priority<T, F, Fut>(
252        &self,
253        listener: F,
254        priority: Priority,
255    ) -> ListenerId
256    where
257        T: Event + 'static,
258        F: Fn(&T) -> Fut + Send + Sync + 'static,
259        Fut: std::future::Future<Output = Result<(), ListenerError>> + Send + 'static,
260    {
261        let type_id = TypeId::of::<T>();
262        let id = self.next_id.fetch_add(1, Ordering::Relaxed);
263
264        let wrapper = AsyncListenerWrapper::new(listener, priority, id);
265
266        {
267            let mut async_listeners = self.async_listeners.write();
268            let event_listeners = async_listeners.entry(type_id).or_default();
269            // Binary insertion preserves descending-priority order in O(n).
270            // See `subscribe_with_priority` for the rationale.
271            let pos = event_listeners.partition_point(|existing| existing.priority >= priority);
272            event_listeners.insert(pos, wrapper);
273        }
274
275        // Make sure a metrics entry exists for this type so `metrics()`
276        // can report it even before the first dispatch.
277        let _counters = self.counters_for::<T>();
278
279        ListenerId::new(id, type_id)
280    }
281
282    /// Dispatch an event synchronously.
283    ///
284    /// Returns a [`DispatchResult`] containing per-listener outcomes.
285    ///
286    /// # Example
287    ///
288    /// ```rust
289    /// use mod_events::{EventDispatcher, Event};
290    ///
291    /// #[derive(Debug, Clone)]
292    /// struct MyEvent {
293    ///     message: String,
294    /// }
295    ///
296    /// impl Event for MyEvent {
297    ///     fn as_any(&self) -> &dyn std::any::Any {
298    ///         self
299    ///     }
300    /// }
301    ///
302    /// let dispatcher = EventDispatcher::new();
303    /// let result = dispatcher.dispatch(MyEvent {
304    ///     message: "Hello".to_string(),
305    /// });
306    ///
307    /// if result.all_succeeded() {
308    ///     println!("All listeners handled the event successfully");
309    /// }
310    /// ```
311    pub fn dispatch<T: Event>(&self, event: T) -> DispatchResult {
312        // Update metrics
313        self.update_metrics(&event);
314
315        // Check middleware
316        if !self.check_middleware(&event) {
317            return DispatchResult::blocked();
318        }
319
320        let type_id = TypeId::of::<T>();
321        let listeners = self.listeners.read();
322        let mut results = Vec::new();
323
324        if let Some(event_listeners) = listeners.get(&type_id) {
325            results.reserve(event_listeners.len());
326            for listener in event_listeners {
327                results.push((listener.handler)(&event));
328            }
329        }
330
331        DispatchResult::new(results)
332    }
333
334    /// Dispatch an event asynchronously and await every async listener
335    /// in priority order (requires the `async` feature).
336    ///
337    /// Sync listeners registered via [`Self::on`] / [`Self::subscribe`]
338    /// are not invoked here; only listeners registered through
339    /// [`Self::subscribe_async`] / [`Self::subscribe_async_with_priority`]
340    /// are awaited.
341    ///
342    /// # Example
343    ///
344    /// ```rust
345    /// # #[cfg(feature = "async")]
346    /// # async fn doc_example() {
347    /// use mod_events::{Event, EventDispatcher};
348    ///
349    /// #[derive(Debug, Clone)]
350    /// struct OrderShipped { order_id: u64 }
351    ///
352    /// impl Event for OrderShipped {
353    ///     fn as_any(&self) -> &dyn std::any::Any { self }
354    /// }
355    ///
356    /// let dispatcher = EventDispatcher::new();
357    /// dispatcher.subscribe_async(|event: &OrderShipped| {
358    ///     let id = event.order_id;
359    ///     async move {
360    ///         println!("notifying customer about order {}", id);
361    ///         Ok(())
362    ///     }
363    /// });
364    ///
365    /// let result = dispatcher
366    ///     .dispatch_async(OrderShipped { order_id: 42 })
367    ///     .await;
368    /// assert!(result.all_succeeded());
369    /// # }
370    /// ```
371    #[cfg(feature = "async")]
372    pub async fn dispatch_async<T: Event>(&self, event: T) -> DispatchResult {
373        // Update metrics
374        self.update_metrics(&event);
375
376        // Check middleware
377        if !self.check_middleware(&event) {
378            return DispatchResult::blocked();
379        }
380
381        let type_id = TypeId::of::<T>();
382
383        // Collect cloned handlers without holding the lock across await points.
384        let handlers: Vec<AsyncHandler> = {
385            let async_listeners = self.async_listeners.read();
386            async_listeners
387                .get(&type_id)
388                .map(|event_listeners| {
389                    event_listeners
390                        .iter()
391                        .map(|listener| listener.handler.clone())
392                        .collect()
393                })
394                .unwrap_or_default()
395        };
396
397        let mut results = Vec::with_capacity(handlers.len());
398        for handler in handlers {
399            results.push(handler(&event).await);
400        }
401
402        DispatchResult::new(results)
403    }
404
405    /// Fire and forget — dispatch without inspecting the result.
406    ///
407    /// Use this when listeners' success or failure is not actionable at
408    /// the call site (logging, fanout to passive observers). The errors
409    /// returned by failing listeners are discarded; if you need them,
410    /// call [`Self::dispatch`] instead.
411    ///
412    /// # Example
413    ///
414    /// ```rust
415    /// use mod_events::{EventDispatcher, Event};
416    ///
417    /// #[derive(Debug, Clone)]
418    /// struct MyEvent {
419    ///     message: String,
420    /// }
421    ///
422    /// impl Event for MyEvent {
423    ///     fn as_any(&self) -> &dyn std::any::Any {
424    ///         self
425    ///     }
426    /// }
427    ///
428    /// let dispatcher = EventDispatcher::new();
429    /// dispatcher.emit(MyEvent {
430    ///     message: "Fire and forget".to_string(),
431    /// });
432    /// ```
433    pub fn emit<T: Event>(&self, event: T) {
434        // Intentional discard: emit is the documented fire-and-forget entry
435        // point. Callers that need per-listener outcomes use `dispatch`.
436        drop(self.dispatch(event));
437    }
438
439    /// Add middleware that can block events.
440    ///
441    /// Middleware functions receive events and return `true` to allow
442    /// processing or `false` to block the event.
443    ///
444    /// # Example
445    ///
446    /// ```rust
447    /// use mod_events::{EventDispatcher, Event};
448    ///
449    /// let dispatcher = EventDispatcher::new();
450    /// dispatcher.add_middleware(|event: &dyn Event| {
451    ///     println!("Processing event: {}", event.event_name());
452    ///     true // Allow all events
453    /// });
454    /// ```
455    pub fn add_middleware<F>(&self, middleware: F)
456    where
457        F: Fn(&dyn Event) -> bool + Send + Sync + 'static,
458    {
459        self.middleware.write().add(middleware);
460    }
461
462    /// Remove a previously registered listener.
463    ///
464    /// Returns `true` if the listener was found and removed, `false`
465    /// if no listener with that id was registered (already removed,
466    /// never registered, or registered against a different event type).
467    ///
468    /// # Example
469    ///
470    /// ```rust
471    /// use mod_events::{Event, EventDispatcher};
472    ///
473    /// #[derive(Debug, Clone)]
474    /// struct Tick;
475    ///
476    /// impl Event for Tick {
477    ///     fn as_any(&self) -> &dyn std::any::Any { self }
478    /// }
479    ///
480    /// let dispatcher = EventDispatcher::new();
481    /// let id = dispatcher.on(|_: &Tick| {});
482    /// assert!(dispatcher.unsubscribe(id));
483    /// // Subsequent removals of the same id return false.
484    /// assert!(!dispatcher.unsubscribe(id));
485    /// ```
486    pub fn unsubscribe(&self, listener_id: ListenerId) -> bool {
487        // Try sync listeners first
488        {
489            let mut listeners = self.listeners.write();
490            if let Some(event_listeners) = listeners.get_mut(&listener_id.type_id) {
491                if let Some(pos) = event_listeners.iter().position(|l| l.id == listener_id.id) {
492                    let _removed = event_listeners.remove(pos);
493                    return true;
494                }
495            }
496        }
497
498        // Try async listeners
499        #[cfg(feature = "async")]
500        {
501            let mut async_listeners = self.async_listeners.write();
502            if let Some(event_listeners) = async_listeners.get_mut(&listener_id.type_id) {
503                if let Some(pos) = event_listeners.iter().position(|l| l.id == listener_id.id) {
504                    let _removed = event_listeners.remove(pos);
505                    return true;
506                }
507            }
508        }
509
510        false
511    }
512
513    /// Get the total number of listeners (sync + async, when the
514    /// `async` feature is enabled) registered for an event type.
515    ///
516    /// # Example
517    ///
518    /// ```rust
519    /// use mod_events::{Event, EventDispatcher};
520    ///
521    /// #[derive(Debug, Clone)]
522    /// struct Tick;
523    ///
524    /// impl Event for Tick {
525    ///     fn as_any(&self) -> &dyn std::any::Any { self }
526    /// }
527    ///
528    /// let dispatcher = EventDispatcher::new();
529    /// assert_eq!(dispatcher.listener_count::<Tick>(), 0);
530    /// let _ = dispatcher.on(|_: &Tick| {});
531    /// let _ = dispatcher.on(|_: &Tick| {});
532    /// assert_eq!(dispatcher.listener_count::<Tick>(), 2);
533    /// ```
534    #[must_use]
535    pub fn listener_count<T: Event + 'static>(&self) -> usize {
536        let type_id = TypeId::of::<T>();
537        let sync_count = self
538            .listeners
539            .read()
540            .get(&type_id)
541            .map(Vec::len)
542            .unwrap_or(0);
543
544        #[cfg(feature = "async")]
545        let async_count = self
546            .async_listeners
547            .read()
548            .get(&type_id)
549            .map(Vec::len)
550            .unwrap_or(0);
551
552        #[cfg(not(feature = "async"))]
553        let async_count = 0;
554
555        sync_count + async_count
556    }
557
558    /// Get a snapshot of per-event-type [`EventMetadata`] keyed by
559    /// `TypeId`.
560    ///
561    /// The returned map is a fresh snapshot. Subsequent dispatches do
562    /// not mutate it, but the snapshot may be slightly behind the live
563    /// counters because `dispatch_count` is read independently of
564    /// `last_dispatch`.
565    ///
566    /// # Example
567    ///
568    /// ```rust
569    /// use mod_events::{Event, EventDispatcher};
570    /// use std::any::TypeId;
571    ///
572    /// #[derive(Debug, Clone)]
573    /// struct Tick;
574    ///
575    /// impl Event for Tick {
576    ///     fn as_any(&self) -> &dyn std::any::Any { self }
577    /// }
578    ///
579    /// let dispatcher = EventDispatcher::new();
580    /// let _ = dispatcher.on(|_: &Tick| {});
581    /// dispatcher.emit(Tick);
582    /// dispatcher.emit(Tick);
583    ///
584    /// let snapshot = dispatcher.metrics();
585    /// let meta = snapshot.get(&TypeId::of::<Tick>()).unwrap();
586    /// assert_eq!(meta.dispatch_count, 2);
587    /// ```
588    #[must_use]
589    pub fn metrics(&self) -> HashMap<TypeId, EventMetadata> {
590        // Snapshot the metric counters first, then enrich each entry
591        // with the live listener count derived from the registry. This
592        // guarantees `listener_count` cannot drift because it is never
593        // stored — it is computed at the moment of observation.
594        let counters_map = self.metrics.read();
595        let listeners_map = self.listeners.read();
596        #[cfg(feature = "async")]
597        let async_listeners_map = self.async_listeners.read();
598
599        counters_map
600            .iter()
601            .map(|(type_id, counters)| {
602                let mut snap = counters.snapshot();
603                let sync_count = listeners_map.get(type_id).map(Vec::len).unwrap_or(0);
604                #[cfg(feature = "async")]
605                let async_count = async_listeners_map.get(type_id).map(Vec::len).unwrap_or(0);
606                #[cfg(not(feature = "async"))]
607                let async_count = 0;
608                snap.listener_count = sync_count + async_count;
609                (*type_id, snap)
610            })
611            .collect()
612    }
613
614    /// Drop every registered listener, both sync and async.
615    ///
616    /// Middleware and accumulated metrics are unaffected.
617    ///
618    /// # Example
619    ///
620    /// ```rust
621    /// use mod_events::{Event, EventDispatcher};
622    ///
623    /// #[derive(Debug, Clone)]
624    /// struct Tick;
625    ///
626    /// impl Event for Tick {
627    ///     fn as_any(&self) -> &dyn std::any::Any { self }
628    /// }
629    ///
630    /// let dispatcher = EventDispatcher::new();
631    /// let _ = dispatcher.on(|_: &Tick| {});
632    /// dispatcher.clear();
633    /// assert_eq!(dispatcher.listener_count::<Tick>(), 0);
634    /// ```
635    pub fn clear(&self) {
636        self.listeners.write().clear();
637
638        #[cfg(feature = "async")]
639        self.async_listeners.write().clear();
640    }
641
642    /// Hot-path metric update. Tries a read-only fast path first; only
643    /// promotes to a write lock if the entry doesn't exist yet.
644    fn update_metrics<T: Event>(&self, _event: &T) {
645        let counters = self.counters_for::<T>();
646        counters.record_dispatch();
647    }
648
649    /// Look up (or create) the per-type counters. The fast path holds
650    /// only a read lock; the slow path promotes to a write lock and
651    /// double-checks the entry to avoid a torn-creation race.
652    fn counters_for<T: Event + 'static>(&self) -> Arc<EventMetricsCounters> {
653        let type_id = TypeId::of::<T>();
654
655        if let Some(existing) = self.metrics.read().get(&type_id) {
656            return Arc::clone(existing);
657        }
658
659        let mut metrics = self.metrics.write();
660        Arc::clone(
661            metrics
662                .entry(type_id)
663                .or_insert_with(|| Arc::new(EventMetricsCounters::new::<T>())),
664        )
665    }
666
667    fn check_middleware(&self, event: &dyn Event) -> bool {
668        self.middleware.read().process(event)
669    }
670}
671
672impl Default for EventDispatcher {
673    fn default() -> Self {
674        Self::new()
675    }
676}