1use parking_lot::RwLock;
6use std::any::Any;
7use std::collections::HashMap;
8use std::future::Future;
9use std::pin::Pin;
10use std::sync::Arc;
11use tokio::sync::broadcast;
12
13pub mod events {
15 pub const SOFT_DELETE: &str = "onSoftDelete";
17 pub const SERVER_READY: &str = "onServerReady";
19 pub const READY: &str = "onReady";
21 pub const ES_DATA_CHANGE: &str = "esDataChange";
23}
24
25pub type EventData = Arc<dyn Any + Send + Sync>;
27
28pub type EventHandler =
30 Arc<dyn Fn(EventData) -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync>;
31
32pub struct EventManager {
34 handlers: RwLock<HashMap<String, Vec<EventHandler>>>,
36 sender: broadcast::Sender<(String, EventData)>,
38}
39
40impl EventManager {
41 pub fn new() -> Self {
43 let (sender, _) = broadcast::channel(1024);
44 Self {
45 handlers: RwLock::new(HashMap::new()),
46 sender,
47 }
48 }
49
50 pub fn on<F, Fut>(&self, event: &str, handler: F)
52 where
53 F: Fn(EventData) -> Fut + Send + Sync + 'static,
54 Fut: Future<Output = ()> + Send + 'static,
55 {
56 let handler: EventHandler = Arc::new(move |data: EventData| {
57 let fut = handler(data);
58 Box::pin(fut) as Pin<Box<dyn Future<Output = ()> + Send>>
59 });
60
61 let mut handlers = self.handlers.write();
62 handlers.entry(event.to_string()).or_default().push(handler);
63 }
64
65 pub async fn emit<T: Any + Send + Sync>(&self, event: &str, data: T) {
67 let data: EventData = Arc::new(data);
68
69 let handlers_vec: Vec<EventHandler> = {
71 let handlers = self.handlers.read();
72 handlers.get(event).cloned().unwrap_or_default()
73 };
74 for handler in handlers_vec {
75 let data_clone = Arc::clone(&data);
76 handler(data_clone).await;
77 }
78
79 let _ = self.sender.send((event.to_string(), data));
81 }
82
83 pub fn subscribe(&self) -> broadcast::Receiver<(String, EventData)> {
85 self.sender.subscribe()
86 }
87
88 pub fn off(&self, event: &str) {
90 let mut handlers = self.handlers.write();
91 handlers.remove(event);
92 }
93
94 pub fn clear(&self) {
96 let mut handlers = self.handlers.write();
97 handlers.clear();
98 }
99}
100
101impl Default for EventManager {
102 fn default() -> Self {
103 Self::new()
104 }
105}
106
107static GLOBAL_EVENT_MANAGER: once_cell::sync::Lazy<EventManager> =
109 once_cell::sync::Lazy::new(EventManager::new);
110
111pub fn global_event_manager() -> &'static EventManager {
113 &GLOBAL_EVENT_MANAGER
114}
115
116#[derive(Debug, Clone)]
118pub struct SoftDeleteEvent {
119 pub entity: String,
121 pub ids: Vec<i64>,
123 pub tenant_id: Option<i64>,
125}
126
127#[derive(Debug, Clone)]
129pub struct ServerReadyEvent {
130 pub address: String,
132 pub port: u16,
134}
135
136#[derive(Debug, Clone)]
138pub struct EsDataChangeEvent {
139 pub index: String,
141 pub operation: String,
143 pub doc_id: Option<String>,
145}
146
147#[cfg(test)]
148mod tests {
149 use super::*;
150
151 #[tokio::test]
152 async fn test_event_manager() {
153 let manager = EventManager::new();
154 let received = Arc::new(RwLock::new(false));
155 let received_clone = Arc::clone(&received);
156
157 manager.on("test", move |_data| {
158 let received = Arc::clone(&received_clone);
159 async move {
160 *received.write() = true;
161 }
162 });
163
164 manager.emit("test", "hello").await;
165
166 assert!(*received.read());
167 }
168}