cool_core/event/
mod.rs

1//! 事件模块
2//!
3//! 对应 TypeScript 版本的 `event/`
4
5use parking_lot::RwLock;
6use std::any::Any;
7use std::collections::HashMap;
8use std::future::Future;
9use std::pin::Pin;
10use std::sync::Arc;
11use tokio::sync::broadcast;
12
13/// 事件名称常量
14pub mod events {
15    /// 软删除事件
16    pub const SOFT_DELETE: &str = "onSoftDelete";
17    /// 服务成功启动事件
18    pub const SERVER_READY: &str = "onServerReady";
19    /// 服务就绪事件
20    pub const READY: &str = "onReady";
21    /// ES 数据改变事件
22    pub const ES_DATA_CHANGE: &str = "esDataChange";
23}
24
25/// 事件数据类型
26pub type EventData = Arc<dyn Any + Send + Sync>;
27
28/// 事件处理器类型
29pub type EventHandler =
30    Arc<dyn Fn(EventData) -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync>;
31
32/// 事件管理器
33pub struct EventManager {
34    /// 事件处理器映射
35    handlers: RwLock<HashMap<String, Vec<EventHandler>>>,
36    /// 广播发送器
37    sender: broadcast::Sender<(String, EventData)>,
38}
39
40impl EventManager {
41    /// 创建新的事件管理器
42    pub fn new() -> Self {
43        let (sender, _) = broadcast::channel(1024);
44        Self {
45            handlers: RwLock::new(HashMap::new()),
46            sender,
47        }
48    }
49
50    /// 注册事件处理器
51    pub fn on<F, Fut>(&self, event: &str, handler: F)
52    where
53        F: Fn(EventData) -> Fut + Send + Sync + 'static,
54        Fut: Future<Output = ()> + Send + 'static,
55    {
56        let handler: EventHandler = Arc::new(move |data: EventData| {
57            let fut = handler(data);
58            Box::pin(fut) as Pin<Box<dyn Future<Output = ()> + Send>>
59        });
60
61        let mut handlers = self.handlers.write();
62        handlers.entry(event.to_string()).or_default().push(handler);
63    }
64
65    /// 触发事件
66    pub async fn emit<T: Any + Send + Sync>(&self, event: &str, data: T) {
67        let data: EventData = Arc::new(data);
68
69        // 调用注册的处理器:先复制一份处理器列表,避免在持有锁的情况下 `.await`
70        let handlers_vec: Vec<EventHandler> = {
71            let handlers = self.handlers.read();
72            handlers.get(event).cloned().unwrap_or_default()
73        };
74        for handler in handlers_vec {
75            let data_clone = Arc::clone(&data);
76            handler(data_clone).await;
77        }
78
79        // 广播事件
80        let _ = self.sender.send((event.to_string(), data));
81    }
82
83    /// 订阅事件(用于跨模块监听)
84    pub fn subscribe(&self) -> broadcast::Receiver<(String, EventData)> {
85        self.sender.subscribe()
86    }
87
88    /// 移除事件处理器
89    pub fn off(&self, event: &str) {
90        let mut handlers = self.handlers.write();
91        handlers.remove(event);
92    }
93
94    /// 移除所有事件处理器
95    pub fn clear(&self) {
96        let mut handlers = self.handlers.write();
97        handlers.clear();
98    }
99}
100
101impl Default for EventManager {
102    fn default() -> Self {
103        Self::new()
104    }
105}
106
107/// 全局事件管理器
108static GLOBAL_EVENT_MANAGER: once_cell::sync::Lazy<EventManager> =
109    once_cell::sync::Lazy::new(EventManager::new);
110
111/// 获取全局事件管理器
112pub fn global_event_manager() -> &'static EventManager {
113    &GLOBAL_EVENT_MANAGER
114}
115
116/// 软删除事件数据
117#[derive(Debug, Clone)]
118pub struct SoftDeleteEvent {
119    /// 实体名称
120    pub entity: String,
121    /// 删除的 ID 列表
122    pub ids: Vec<i64>,
123    /// 租户 ID
124    pub tenant_id: Option<i64>,
125}
126
127/// 服务就绪事件数据
128#[derive(Debug, Clone)]
129pub struct ServerReadyEvent {
130    /// 服务地址
131    pub address: String,
132    /// 端口
133    pub port: u16,
134}
135
136/// ES 数据改变事件数据
137#[derive(Debug, Clone)]
138pub struct EsDataChangeEvent {
139    /// 索引名称
140    pub index: String,
141    /// 操作类型: "create" | "update" | "delete"
142    pub operation: String,
143    /// 文档 ID
144    pub doc_id: Option<String>,
145}
146
147#[cfg(test)]
148mod tests {
149    use super::*;
150
151    #[tokio::test]
152    async fn test_event_manager() {
153        let manager = EventManager::new();
154        let received = Arc::new(RwLock::new(false));
155        let received_clone = Arc::clone(&received);
156
157        manager.on("test", move |_data| {
158            let received = Arc::clone(&received_clone);
159            async move {
160                *received.write() = true;
161            }
162        });
163
164        manager.emit("test", "hello").await;
165
166        assert!(*received.read());
167    }
168}