mod_events/
dispatcher.rs

1//! Main event dispatcher implementation
2
3use crate::{
4    DispatchResult, Event, EventMetadata, ListenerId, ListenerWrapper, MiddlewareManager, Priority,
5};
6use std::any::TypeId;
7use std::collections::HashMap;
8use std::sync::atomic::{AtomicUsize, Ordering};
9use std::sync::{Arc, RwLock};
10
11#[cfg(feature = "async")]
12use crate::AsyncListenerWrapper;
13#[cfg(feature = "async")]
14use std::future::Future;
15#[cfg(feature = "async")]
16use std::pin::Pin;
17
18// Type aliases for complex types
19#[cfg(feature = "async")]
20type AsyncResult = Result<(), Box<dyn std::error::Error + Send + Sync>>;
21#[cfg(feature = "async")]
22type AsyncHandler = Arc<
23    dyn for<'a> Fn(&'a dyn Event) -> Pin<Box<dyn Future<Output = AsyncResult> + Send + 'a>>
24        + Send
25        + Sync,
26>;
27
28/// High-performance event dispatcher
29///
30/// The main component of the Mod Events system. Thread-safe and optimized
31/// for high-performance event dispatch with minimal overhead.
32///
33/// # Example
34///
35/// ```rust
36/// use mod_events::{EventDispatcher, Event};
37///
38/// #[derive(Debug, Clone)]
39/// struct MyEvent {
40///     message: String,
41/// }
42///
43/// impl Event for MyEvent {
44///     fn as_any(&self) -> &dyn std::any::Any {
45///         self
46///     }
47/// }
48///
49/// let dispatcher = EventDispatcher::new();
50///
51/// dispatcher.on(|event: &MyEvent| {
52///     println!("Received: {}", event.message);
53/// });
54///
55/// dispatcher.emit(MyEvent {
56///     message: "Hello, World!".to_string(),
57/// });
58/// ```
59pub struct EventDispatcher {
60    listeners: Arc<RwLock<HashMap<TypeId, Vec<ListenerWrapper>>>>,
61    #[cfg(feature = "async")]
62    async_listeners: Arc<RwLock<HashMap<TypeId, Vec<AsyncListenerWrapper>>>>,
63    next_id: AtomicUsize,
64    metrics: Arc<RwLock<HashMap<TypeId, EventMetadata>>>,
65    middleware: Arc<RwLock<MiddlewareManager>>,
66}
67
68impl EventDispatcher {
69    /// Create a new event dispatcher
70    pub fn new() -> Self {
71        Self {
72            listeners: Arc::new(RwLock::new(HashMap::new())),
73            #[cfg(feature = "async")]
74            async_listeners: Arc::new(RwLock::new(HashMap::new())),
75            next_id: AtomicUsize::new(0),
76            metrics: Arc::new(RwLock::new(HashMap::new())),
77            middleware: Arc::new(RwLock::new(MiddlewareManager::new())),
78        }
79    }
80
81    // Subscribe to an event with a closure that can return errors
82    ///
83    /// # Example
84    ///
85    /// ```rust
86    /// use mod_events::{EventDispatcher, Event};
87    ///
88    /// #[derive(Debug, Clone)]
89    /// struct MyEvent {
90    ///     message: String,
91    /// }
92    ///
93    /// impl Event for MyEvent {
94    ///     fn as_any(&self) -> &dyn std::any::Any {
95    ///         self
96    ///     }
97    /// }
98    ///
99    /// let dispatcher = EventDispatcher::new();
100    /// dispatcher.subscribe(|event: &MyEvent| {
101    ///     // Handle event, can return errors
102    ///     if event.message.is_empty() {
103    ///         return Err("Message cannot be empty".into());
104    ///     }
105    ///     println!("Message: {}", event.message);
106    ///     Ok(())
107    /// });
108    /// ```
109    pub fn subscribe<T, F>(&self, listener: F) -> ListenerId
110    where
111        T: Event + 'static,
112        F: Fn(&T) -> Result<(), Box<dyn std::error::Error + Send + Sync>> + Send + Sync + 'static,
113    {
114        self.subscribe_with_priority(listener, Priority::Normal)
115    }
116
117    /// Subscribe to an event with a specific priority
118    pub fn subscribe_with_priority<T, F>(&self, listener: F, priority: Priority) -> ListenerId
119    where
120        T: Event + 'static,
121        F: Fn(&T) -> Result<(), Box<dyn std::error::Error + Send + Sync>> + Send + Sync + 'static,
122    {
123        let type_id = TypeId::of::<T>();
124        let id = self.next_id.fetch_add(1, Ordering::Relaxed);
125
126        let wrapper = ListenerWrapper::new(listener, priority, id);
127
128        let mut listeners = self.listeners.write().unwrap();
129        let event_listeners = listeners.entry(type_id).or_default();
130        event_listeners.push(wrapper);
131
132        // Sort by priority (highest first)
133        event_listeners.sort_by(|a, b| b.priority.cmp(&a.priority));
134
135        // Update metrics
136        drop(listeners); // Drop the lock before calling update_listener_count
137        self.update_listener_count::<T>();
138
139        ListenerId::new(id, type_id)
140    }
141
142    /// Subscribe to an event with simple closure (no error handling)
143    ///
144    /// This is the most convenient method for simple event handling.
145    ///
146    /// # Example
147    ///
148    /// ```rust
149    /// use mod_events::{EventDispatcher, Event};
150    ///
151    /// #[derive(Debug, Clone)]
152    /// struct MyEvent {
153    ///     message: String,
154    /// }
155    ///
156    /// impl Event for MyEvent {
157    ///     fn as_any(&self) -> &dyn std::any::Any {
158    ///         self
159    ///     }
160    /// }
161    ///
162    /// let dispatcher = EventDispatcher::new();
163    /// dispatcher.on(|event: &MyEvent| {
164    ///     println!("Received: {}", event.message);
165    /// });
166    /// ```
167    pub fn on<T, F>(&self, listener: F) -> ListenerId
168    where
169        T: Event + 'static,
170        F: Fn(&T) + Send + Sync + 'static,
171    {
172        self.subscribe(move |event: &T| {
173            listener(event);
174            Ok(())
175        })
176    }
177
178    /// Subscribe to an async event (requires "async" feature)
179    #[cfg(feature = "async")]
180    pub fn subscribe_async<T, F, Fut>(&self, listener: F) -> ListenerId
181    where
182        T: Event + 'static,
183        F: Fn(&T) -> Fut + Send + Sync + 'static,
184        Fut: std::future::Future<Output = Result<(), Box<dyn std::error::Error + Send + Sync>>>
185            + Send
186            + 'static,
187    {
188        self.subscribe_async_with_priority(listener, Priority::Normal)
189    }
190
191    /// Subscribe to an async event with priority (requires "async" feature)
192    #[cfg(feature = "async")]
193    pub fn subscribe_async_with_priority<T, F, Fut>(
194        &self,
195        listener: F,
196        priority: Priority,
197    ) -> ListenerId
198    where
199        T: Event + 'static,
200        F: Fn(&T) -> Fut + Send + Sync + 'static,
201        Fut: std::future::Future<Output = Result<(), Box<dyn std::error::Error + Send + Sync>>>
202            + Send
203            + 'static,
204    {
205        let type_id = TypeId::of::<T>();
206        let id = self.next_id.fetch_add(1, Ordering::Relaxed);
207
208        let wrapper = AsyncListenerWrapper::new(listener, priority, id);
209
210        let mut async_listeners = self.async_listeners.write().unwrap();
211        let event_listeners = async_listeners.entry(type_id).or_default();
212        event_listeners.push(wrapper);
213
214        // Sort by priority (highest first)
215        event_listeners.sort_by(|a, b| b.priority.cmp(&a.priority));
216
217        // Update metrics
218        drop(async_listeners); // Drop the lock before calling update_listener_count
219        self.update_listener_count::<T>();
220
221        ListenerId::new(id, type_id)
222    }
223
224    /// Dispatch an event synchronously
225    ///
226    /// Returns a `DispatchResult` containing information about the dispatch.
227    ///
228    /// # Example
229    ///
230    /// ```rust
231    /// use mod_events::{EventDispatcher, Event};
232    ///
233    /// #[derive(Debug, Clone)]
234    /// struct MyEvent {
235    ///     message: String,
236    /// }
237    ///
238    /// impl Event for MyEvent {
239    ///     fn as_any(&self) -> &dyn std::any::Any {
240    ///         self
241    ///     }
242    /// }
243    ///
244    /// let dispatcher = EventDispatcher::new();
245    /// let result = dispatcher.dispatch(MyEvent {
246    ///     message: "Hello".to_string(),
247    /// });
248    ///
249    /// if result.all_succeeded() {
250    ///     println!("All listeners handled the event successfully");
251    /// }
252    /// ```
253    pub fn dispatch<T: Event>(&self, event: T) -> DispatchResult {
254        // Update metrics
255        self.update_metrics(&event);
256
257        // Check middleware
258        if !self.check_middleware(&event) {
259            return DispatchResult::blocked();
260        }
261
262        let type_id = TypeId::of::<T>();
263        let listeners = self.listeners.read().unwrap();
264        let mut results = Vec::new();
265
266        if let Some(event_listeners) = listeners.get(&type_id) {
267            results.reserve(event_listeners.len());
268            for listener in event_listeners {
269                results.push((listener.handler)(&event));
270            }
271        }
272
273        DispatchResult::new(results)
274    }
275
276    /// Dispatch an event asynchronously (requires "async" feature)
277    #[cfg(feature = "async")]
278    pub async fn dispatch_async<T: Event>(&self, event: T) -> DispatchResult {
279        // Update metrics
280        self.update_metrics(&event);
281
282        // Check middleware
283        if !self.check_middleware(&event) {
284            return DispatchResult::blocked();
285        }
286
287        let type_id = TypeId::of::<T>();
288
289        // Collect cloned handlers without holding the lock
290        let handlers: Vec<AsyncHandler> = {
291            let async_listeners = self.async_listeners.read().unwrap();
292            if let Some(event_listeners) = async_listeners.get(&type_id) {
293                event_listeners
294                    .iter()
295                    .map(|listener| listener.handler.clone())
296                    .collect()
297            } else {
298                Vec::new()
299            }
300        }; // Lock is dropped here
301
302        // Now execute all handlers without holding any locks
303        let mut results = Vec::with_capacity(handlers.len());
304
305        for handler in handlers {
306            let future = handler(&event);
307            results.push(future.await);
308        }
309
310        DispatchResult::new(results)
311    }
312
313    /// Fire and forget - dispatch without waiting for results
314    ///
315    /// This is the most efficient way to dispatch events when you don't
316    /// need to check the results.
317    ///
318    /// # Example
319    ///
320    /// ```rust
321    /// use mod_events::{EventDispatcher, Event};
322    ///
323    /// #[derive(Debug, Clone)]
324    /// struct MyEvent {
325    ///     message: String,
326    /// }
327    ///
328    /// impl Event for MyEvent {
329    ///     fn as_any(&self) -> &dyn std::any::Any {
330    ///         self
331    ///     }
332    /// }
333    ///
334    /// let dispatcher = EventDispatcher::new();
335    /// dispatcher.emit(MyEvent {
336    ///     message: "Fire and forget".to_string(),
337    /// });
338    /// ```
339    pub fn emit<T: Event>(&self, event: T) {
340        let _ = self.dispatch(event);
341    }
342
343    /// Add middleware that can block events
344    ///
345    /// Middleware functions receive events and return `true` to allow
346    /// processing or `false` to block the event.
347    ///
348    /// # Example
349    ///
350    /// ```rust
351    /// use mod_events::{EventDispatcher, Event};
352    ///
353    /// let dispatcher = EventDispatcher::new();
354    /// dispatcher.add_middleware(|event: &dyn Event| {
355    ///     println!("Processing event: {}", event.event_name());
356    ///     true // Allow all events
357    /// });
358    /// ```
359    pub fn add_middleware<F>(&self, middleware: F)
360    where
361        F: Fn(&dyn Event) -> bool + Send + Sync + 'static,
362    {
363        let mut middleware_manager = self.middleware.write().unwrap();
364        middleware_manager.add(middleware);
365    }
366
367    /// Remove a listener
368    ///
369    /// Returns `true` if the listener was found and removed, `false` otherwise.
370    pub fn unsubscribe(&self, listener_id: ListenerId) -> bool {
371        // Try sync listeners first
372        {
373            let mut listeners = self.listeners.write().unwrap();
374            if let Some(event_listeners) = listeners.get_mut(&listener_id.type_id) {
375                if let Some(pos) = event_listeners.iter().position(|l| l.id == listener_id.id) {
376                    event_listeners.remove(pos);
377                    return true;
378                }
379            }
380        }
381
382        // Try async listeners
383        #[cfg(feature = "async")]
384        {
385            let mut async_listeners = self.async_listeners.write().unwrap();
386            if let Some(event_listeners) = async_listeners.get_mut(&listener_id.type_id) {
387                if let Some(pos) = event_listeners.iter().position(|l| l.id == listener_id.id) {
388                    event_listeners.remove(pos);
389                    return true;
390                }
391            }
392        }
393
394        false
395    }
396
397    /// Get the number of listeners for an event type
398    pub fn listener_count<T: Event + 'static>(&self) -> usize {
399        let type_id = TypeId::of::<T>();
400        let sync_count = self
401            .listeners
402            .read()
403            .unwrap()
404            .get(&type_id)
405            .map(|v| v.len())
406            .unwrap_or(0);
407
408        #[cfg(feature = "async")]
409        let async_count = self
410            .async_listeners
411            .read()
412            .unwrap()
413            .get(&type_id)
414            .map(|v| v.len())
415            .unwrap_or(0);
416
417        #[cfg(not(feature = "async"))]
418        let async_count = 0;
419
420        sync_count + async_count
421    }
422
423    /// Get event metrics
424    pub fn metrics(&self) -> HashMap<TypeId, EventMetadata> {
425        self.metrics.read().unwrap().clone()
426    }
427
428    /// Clear all listeners
429    pub fn clear(&self) {
430        self.listeners.write().unwrap().clear();
431
432        #[cfg(feature = "async")]
433        self.async_listeners.write().unwrap().clear();
434    }
435
436    fn update_metrics<T: Event>(&self, _event: &T) {
437        let mut metrics = self.metrics.write().unwrap();
438        let type_id = TypeId::of::<T>();
439
440        match metrics.get_mut(&type_id) {
441            Some(meta) => {
442                meta.increment_dispatch();
443            }
444            None => {
445                let mut meta = EventMetadata::new::<T>();
446                meta.increment_dispatch();
447                metrics.insert(type_id, meta);
448            }
449        }
450    }
451
452    fn update_listener_count<T: Event + 'static>(&self) {
453        let mut metrics = self.metrics.write().unwrap();
454        let type_id = TypeId::of::<T>();
455        let count = self.listener_count::<T>();
456
457        match metrics.get_mut(&type_id) {
458            Some(meta) => {
459                meta.update_listener_count(count);
460            }
461            None => {
462                let mut meta = EventMetadata::new::<T>();
463                meta.update_listener_count(count);
464                metrics.insert(type_id, meta);
465            }
466        }
467    }
468
469    fn check_middleware(&self, event: &dyn Event) -> bool {
470        let middleware = self.middleware.read().unwrap();
471        middleware.process(event)
472    }
473}
474
475impl Default for EventDispatcher {
476    fn default() -> Self {
477        Self::new()
478    }
479}
480
481unsafe impl Send for EventDispatcher {}
482unsafe impl Sync for EventDispatcher {}