1use crate::{
4 DispatchResult, Event, EventMetadata, ListenerId, ListenerWrapper, MiddlewareManager, Priority,
5};
6use std::any::TypeId;
7use std::collections::HashMap;
8use std::sync::atomic::{AtomicUsize, Ordering};
9use std::sync::{Arc, RwLock};
10
11#[cfg(feature = "async")]
12use crate::AsyncListenerWrapper;
13#[cfg(feature = "async")]
14use std::future::Future;
15#[cfg(feature = "async")]
16use std::pin::Pin;
17
18#[cfg(feature = "async")]
20type AsyncResult = Result<(), Box<dyn std::error::Error + Send + Sync>>;
21#[cfg(feature = "async")]
22type AsyncHandler = Arc<
23 dyn for<'a> Fn(&'a dyn Event) -> Pin<Box<dyn Future<Output = AsyncResult> + Send + 'a>>
24 + Send
25 + Sync,
26>;
27
28pub struct EventDispatcher {
60 listeners: Arc<RwLock<HashMap<TypeId, Vec<ListenerWrapper>>>>,
61 #[cfg(feature = "async")]
62 async_listeners: Arc<RwLock<HashMap<TypeId, Vec<AsyncListenerWrapper>>>>,
63 next_id: AtomicUsize,
64 metrics: Arc<RwLock<HashMap<TypeId, EventMetadata>>>,
65 middleware: Arc<RwLock<MiddlewareManager>>,
66}
67
68impl EventDispatcher {
69 pub fn new() -> Self {
71 Self {
72 listeners: Arc::new(RwLock::new(HashMap::new())),
73 #[cfg(feature = "async")]
74 async_listeners: Arc::new(RwLock::new(HashMap::new())),
75 next_id: AtomicUsize::new(0),
76 metrics: Arc::new(RwLock::new(HashMap::new())),
77 middleware: Arc::new(RwLock::new(MiddlewareManager::new())),
78 }
79 }
80
81 pub fn subscribe<T, F>(&self, listener: F) -> ListenerId
110 where
111 T: Event + 'static,
112 F: Fn(&T) -> Result<(), Box<dyn std::error::Error + Send + Sync>> + Send + Sync + 'static,
113 {
114 self.subscribe_with_priority(listener, Priority::Normal)
115 }
116
117 pub fn subscribe_with_priority<T, F>(&self, listener: F, priority: Priority) -> ListenerId
119 where
120 T: Event + 'static,
121 F: Fn(&T) -> Result<(), Box<dyn std::error::Error + Send + Sync>> + Send + Sync + 'static,
122 {
123 let type_id = TypeId::of::<T>();
124 let id = self.next_id.fetch_add(1, Ordering::Relaxed);
125
126 let wrapper = ListenerWrapper::new(listener, priority, id);
127
128 let mut listeners = self.listeners.write().unwrap();
129 let event_listeners = listeners.entry(type_id).or_default();
130 event_listeners.push(wrapper);
131
132 event_listeners.sort_by(|a, b| b.priority.cmp(&a.priority));
134
135 drop(listeners); self.update_listener_count::<T>();
138
139 ListenerId::new(id, type_id)
140 }
141
142 pub fn on<T, F>(&self, listener: F) -> ListenerId
168 where
169 T: Event + 'static,
170 F: Fn(&T) + Send + Sync + 'static,
171 {
172 self.subscribe(move |event: &T| {
173 listener(event);
174 Ok(())
175 })
176 }
177
178 #[cfg(feature = "async")]
180 pub fn subscribe_async<T, F, Fut>(&self, listener: F) -> ListenerId
181 where
182 T: Event + 'static,
183 F: Fn(&T) -> Fut + Send + Sync + 'static,
184 Fut: std::future::Future<Output = Result<(), Box<dyn std::error::Error + Send + Sync>>>
185 + Send
186 + 'static,
187 {
188 self.subscribe_async_with_priority(listener, Priority::Normal)
189 }
190
191 #[cfg(feature = "async")]
193 pub fn subscribe_async_with_priority<T, F, Fut>(
194 &self,
195 listener: F,
196 priority: Priority,
197 ) -> ListenerId
198 where
199 T: Event + 'static,
200 F: Fn(&T) -> Fut + Send + Sync + 'static,
201 Fut: std::future::Future<Output = Result<(), Box<dyn std::error::Error + Send + Sync>>>
202 + Send
203 + 'static,
204 {
205 let type_id = TypeId::of::<T>();
206 let id = self.next_id.fetch_add(1, Ordering::Relaxed);
207
208 let wrapper = AsyncListenerWrapper::new(listener, priority, id);
209
210 let mut async_listeners = self.async_listeners.write().unwrap();
211 let event_listeners = async_listeners.entry(type_id).or_default();
212 event_listeners.push(wrapper);
213
214 event_listeners.sort_by(|a, b| b.priority.cmp(&a.priority));
216
217 drop(async_listeners); self.update_listener_count::<T>();
220
221 ListenerId::new(id, type_id)
222 }
223
224 pub fn dispatch<T: Event>(&self, event: T) -> DispatchResult {
254 self.update_metrics(&event);
256
257 if !self.check_middleware(&event) {
259 return DispatchResult::blocked();
260 }
261
262 let type_id = TypeId::of::<T>();
263 let listeners = self.listeners.read().unwrap();
264 let mut results = Vec::new();
265
266 if let Some(event_listeners) = listeners.get(&type_id) {
267 results.reserve(event_listeners.len());
268 for listener in event_listeners {
269 results.push((listener.handler)(&event));
270 }
271 }
272
273 DispatchResult::new(results)
274 }
275
276 #[cfg(feature = "async")]
278 pub async fn dispatch_async<T: Event>(&self, event: T) -> DispatchResult {
279 self.update_metrics(&event);
281
282 if !self.check_middleware(&event) {
284 return DispatchResult::blocked();
285 }
286
287 let type_id = TypeId::of::<T>();
288
289 let handlers: Vec<AsyncHandler> = {
291 let async_listeners = self.async_listeners.read().unwrap();
292 if let Some(event_listeners) = async_listeners.get(&type_id) {
293 event_listeners
294 .iter()
295 .map(|listener| listener.handler.clone())
296 .collect()
297 } else {
298 Vec::new()
299 }
300 }; let mut results = Vec::with_capacity(handlers.len());
304
305 for handler in handlers {
306 let future = handler(&event);
307 results.push(future.await);
308 }
309
310 DispatchResult::new(results)
311 }
312
313 pub fn emit<T: Event>(&self, event: T) {
340 let _ = self.dispatch(event);
341 }
342
343 pub fn add_middleware<F>(&self, middleware: F)
360 where
361 F: Fn(&dyn Event) -> bool + Send + Sync + 'static,
362 {
363 let mut middleware_manager = self.middleware.write().unwrap();
364 middleware_manager.add(middleware);
365 }
366
367 pub fn unsubscribe(&self, listener_id: ListenerId) -> bool {
371 {
373 let mut listeners = self.listeners.write().unwrap();
374 if let Some(event_listeners) = listeners.get_mut(&listener_id.type_id) {
375 if let Some(pos) = event_listeners.iter().position(|l| l.id == listener_id.id) {
376 event_listeners.remove(pos);
377 return true;
378 }
379 }
380 }
381
382 #[cfg(feature = "async")]
384 {
385 let mut async_listeners = self.async_listeners.write().unwrap();
386 if let Some(event_listeners) = async_listeners.get_mut(&listener_id.type_id) {
387 if let Some(pos) = event_listeners.iter().position(|l| l.id == listener_id.id) {
388 event_listeners.remove(pos);
389 return true;
390 }
391 }
392 }
393
394 false
395 }
396
397 pub fn listener_count<T: Event + 'static>(&self) -> usize {
399 let type_id = TypeId::of::<T>();
400 let sync_count = self
401 .listeners
402 .read()
403 .unwrap()
404 .get(&type_id)
405 .map(|v| v.len())
406 .unwrap_or(0);
407
408 #[cfg(feature = "async")]
409 let async_count = self
410 .async_listeners
411 .read()
412 .unwrap()
413 .get(&type_id)
414 .map(|v| v.len())
415 .unwrap_or(0);
416
417 #[cfg(not(feature = "async"))]
418 let async_count = 0;
419
420 sync_count + async_count
421 }
422
423 pub fn metrics(&self) -> HashMap<TypeId, EventMetadata> {
425 self.metrics.read().unwrap().clone()
426 }
427
428 pub fn clear(&self) {
430 self.listeners.write().unwrap().clear();
431
432 #[cfg(feature = "async")]
433 self.async_listeners.write().unwrap().clear();
434 }
435
436 fn update_metrics<T: Event>(&self, _event: &T) {
437 let mut metrics = self.metrics.write().unwrap();
438 let type_id = TypeId::of::<T>();
439
440 match metrics.get_mut(&type_id) {
441 Some(meta) => {
442 meta.increment_dispatch();
443 }
444 None => {
445 let mut meta = EventMetadata::new::<T>();
446 meta.increment_dispatch();
447 metrics.insert(type_id, meta);
448 }
449 }
450 }
451
452 fn update_listener_count<T: Event + 'static>(&self) {
453 let mut metrics = self.metrics.write().unwrap();
454 let type_id = TypeId::of::<T>();
455 let count = self.listener_count::<T>();
456
457 match metrics.get_mut(&type_id) {
458 Some(meta) => {
459 meta.update_listener_count(count);
460 }
461 None => {
462 let mut meta = EventMetadata::new::<T>();
463 meta.update_listener_count(count);
464 metrics.insert(type_id, meta);
465 }
466 }
467 }
468
469 fn check_middleware(&self, event: &dyn Event) -> bool {
470 let middleware = self.middleware.read().unwrap();
471 middleware.process(event)
472 }
473}
474
475impl Default for EventDispatcher {
476 fn default() -> Self {
477 Self::new()
478 }
479}
480
481unsafe impl Send for EventDispatcher {}
482unsafe impl Sync for EventDispatcher {}