next_web_dev/event/
default_application_event_multicaster.rs

1use super::key::Key;
2use super::{
3    application_event::ApplicationEvent,
4    application_event_multicaster::ApplicationEventMulticaster,
5    application_listener::ApplicationListener,
6};
7
8use flume::{Receiver, Sender};
9use hashbrown::HashMap;
10use parking_lot::Mutex;
11use std::any::TypeId;
12use std::borrow::Cow;
13use std::sync::Arc;
14use tracing::{debug, info};
15
16/// 默认的事件多播器实现
17/// Default implementation of event multicaster
18///
19#[derive(Clone)]
20pub struct DefaultApplicationEventMulticaster {
21    // 使用 TypeId 存储不同类型事件的监听器
22    // Use TypeId to store listeners for different event types
23    listeners: Arc<Mutex<HashMap<Key, Sender<Box<dyn ApplicationEvent>>>>>,
24    listeners_type: Arc<Mutex<HashMap<TypeId, Sender<Box<dyn ApplicationEvent>>>>>,
25
26    // 事件通道
27    // Event channel
28    event_channel: Option<Receiver<(Cow<'static, str>, Box<dyn ApplicationEvent>)>>,
29}
30
31impl DefaultApplicationEventMulticaster {
32    /// 创建新的事件多播器实例
33    /// Create a new event multicaster instance
34    pub fn new() -> Self {
35        DefaultApplicationEventMulticaster {
36            listeners: Arc::new(Mutex::new(HashMap::new())),
37            listeners_type: Arc::new(Mutex::new(HashMap::new())),
38
39            event_channel: None,
40        }
41    }
42
43    /// 设置事件通道
44    /// Set event channel
45    pub fn set_event_channel(
46        &mut self,
47        channel: Receiver<(Cow<'static, str>, Box<dyn ApplicationEvent>)>,
48    ) {
49        self.event_channel = Some(channel);
50    }
51
52    /// 运行事件多播器
53    /// Run event multicaster
54    pub fn run(&self) {
55        let channel = self.event_channel.clone().unwrap();
56        let listeners = self.listeners.clone();
57        let listeners_type = self.listeners_type.clone();
58        tokio::spawn(async move {
59            while let Ok(event) = channel.recv() {
60                if event.0.is_empty() {
61                    if let Some(listeners) = listeners_type.lock().get(&event.1.tid()) {
62                        let _ = listeners.send(event.1);
63                    }
64                } else {
65                    if let Some(listeners) = listeners.lock().get(&Key::new(event.0, event.1.tid()))
66                    {
67                        let _ = listeners.send(event.1);
68                    }
69                }
70            }
71        });
72    }
73}
74
75impl ApplicationEventMulticaster for DefaultApplicationEventMulticaster {
76    fn add_application_listener(&mut self, mut listener: Box<dyn ApplicationListener>) {
77        let id = listener.id().clone();
78        let tid = listener.tid().clone();
79
80        let (sender, receiver) = flume::unbounded();
81        if id.is_empty() {
82            if let None = self.listeners_type.lock().get(&tid) {
83                self.listeners_type.lock().insert(tid, sender);
84            };
85        } else {
86            let key = Key::new(listener.id(), listener.tid());
87            if let None = self.listeners_type.lock().get(&tid) {
88                self.listeners.lock().insert(key, sender);
89            };
90        }
91        tokio::spawn(async move {
92            while let Ok(event) = receiver.recv() {
93                listener.on_application_event(&event).await;
94                info!("Received event: {:?}", event.source());
95            }
96        });
97        debug!("Added listener for event type: {:?}, id: {}", tid, id);
98    }
99
100    fn remove_application_listener(&mut self, key: &Key) {
101        // 使用监听器的唯一ID进行匹配删除
102        // Use listener's unique ID to match and remove
103        if let Some(_) = self.listeners.lock().remove(key) {
104            debug!("Removed listener for event type: {}", key);
105        };
106    }
107
108    // async fn multicast_event(&mut self, id: &Cow<'static, str>, event: &dyn ApplicationEvent) {
109
110    //     if id.is_empty() {
111    //         // 调用所有监听器处理事件
112    //         // Invoke all listeners to handle the event
113    //         if let Some(listeners) = self.listeners_type.get_mut(&event.type_id()) {
114    //             for listener in listeners.iter_mut() {
115    //                 listener.on_application_event(event).await;
116    //             }
117    //         }
118    //     } else {
119    //         // 获取该事件类型的所有监听器
120    //         // Get all listeners for this event type
121    //         if let Some(listeners) = self
122    //             .listeners
123    //             .get_mut(&Key::new(id.clone(), event.type_id()))
124    //         {
125    //             for listener in listeners.iter_mut() {
126    //                 listener.on_application_event(event).await;
127    //             }
128    //         }
129    //     }
130    // }
131}