1use std::collections::HashMap;
2use std::sync::{Arc, Mutex};
3
4#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
6pub struct SubscriptionId(u64);
7
8type Listener = Box<dyn Fn(&serde_json::Value) + Send + Sync>;
9type ListenerList = Vec<(SubscriptionId, Arc<Listener>)>;
10
11pub struct EventEmitter {
34 next_id: Mutex<u64>,
35 listeners: Mutex<HashMap<String, ListenerList>>,
36 once_ids: Mutex<Vec<SubscriptionId>>,
37}
38
39impl EventEmitter {
40 pub fn new() -> Self {
41 Self {
42 next_id: Mutex::new(0),
43 listeners: Mutex::new(HashMap::new()),
44 once_ids: Mutex::new(Vec::new()),
45 }
46 }
47
48 pub fn on<F>(&self, event: impl Into<String>, handler: F) -> SubscriptionId
50 where
51 F: Fn(&serde_json::Value) + Send + Sync + 'static,
52 {
53 let id = self.next_sub_id();
54 let mut listeners = self.listeners.lock().unwrap();
55 listeners
56 .entry(event.into())
57 .or_default()
58 .push((id, Arc::new(Box::new(handler))));
59 id
60 }
61
62 pub fn once<F>(&self, event: impl Into<String>, handler: F) -> SubscriptionId
64 where
65 F: Fn(&serde_json::Value) + Send + Sync + 'static,
66 {
67 let id = self.on(event, handler);
68 self.once_ids.lock().unwrap().push(id);
69 id
70 }
71
72 pub fn emit(&self, event: &str, payload: &serde_json::Value) {
74 let to_call: Vec<Arc<Listener>> = {
76 let listeners = self.listeners.lock().unwrap();
77 listeners
78 .get(event)
79 .map(|list| list.iter().map(|(_, h)| Arc::clone(h)).collect())
80 .unwrap_or_default()
81 };
82
83 for handler in &to_call {
84 handler(payload);
85 }
86
87 let once_ids: Vec<SubscriptionId> = {
89 let mut once = self.once_ids.lock().unwrap();
90 std::mem::take(&mut *once)
91 };
92 for id in once_ids {
93 self.off(id);
94 }
95 }
96
97 pub fn off(&self, id: SubscriptionId) {
99 let mut listeners = self.listeners.lock().unwrap();
100 for list in listeners.values_mut() {
101 list.retain(|(sub_id, _)| *sub_id != id);
102 }
103 }
104
105 pub fn remove_all(&self, event: &str) {
107 let mut listeners = self.listeners.lock().unwrap();
108 listeners.remove(event);
109 }
110
111 pub fn clear(&self) {
113 let mut listeners = self.listeners.lock().unwrap();
114 listeners.clear();
115 }
116
117 pub fn listener_count(&self, event: &str) -> usize {
119 let listeners = self.listeners.lock().unwrap();
120 listeners.get(event).map(|l| l.len()).unwrap_or(0)
121 }
122
123 pub fn event_names(&self) -> Vec<String> {
125 let listeners = self.listeners.lock().unwrap();
126 listeners
127 .iter()
128 .filter(|(_, v)| !v.is_empty())
129 .map(|(k, _)| k.clone())
130 .collect()
131 }
132
133 fn next_sub_id(&self) -> SubscriptionId {
134 let mut id = self.next_id.lock().unwrap();
135 let sub = SubscriptionId(*id);
136 *id += 1;
137 sub
138 }
139}
140
141impl Default for EventEmitter {
142 fn default() -> Self {
143 Self::new()
144 }
145}
146
147pub mod framework {
149 pub const MODULE_CREATED: &str = "module:created";
150 pub const MODULE_DESTROYED: &str = "module:destroyed";
151 pub const ROUTE_CHANGED: &str = "route:changed";
152 pub const STATE_UPDATED: &str = "state:updated";
153 pub const ACTION_DISPATCHED: &str = "action:dispatched";
154 pub const ERROR: &str = "error";
155}
156
157#[cfg(test)]
158mod tests {
159 use super::*;
160 use serde_json::json;
161 use std::sync::atomic::{AtomicI32, Ordering};
162
163 #[test]
164 fn test_on_and_emit() {
165 let emitter = EventEmitter::new();
166 let count = Arc::new(AtomicI32::new(0));
167 let count_clone = count.clone();
168
169 emitter.on("test", move |_| {
170 count_clone.fetch_add(1, Ordering::SeqCst);
171 });
172
173 emitter.emit("test", &json!(null));
174 emitter.emit("test", &json!(null));
175
176 assert_eq!(count.load(Ordering::SeqCst), 2);
177 }
178
179 #[test]
180 fn test_once() {
181 let emitter = EventEmitter::new();
182 let count = Arc::new(AtomicI32::new(0));
183 let count_clone = count.clone();
184
185 emitter.once("test", move |_| {
186 count_clone.fetch_add(1, Ordering::SeqCst);
187 });
188
189 emitter.emit("test", &json!(null));
190 emitter.emit("test", &json!(null));
191
192 assert_eq!(count.load(Ordering::SeqCst), 1);
193 }
194
195 #[test]
196 fn test_off() {
197 let emitter = EventEmitter::new();
198 let count = Arc::new(AtomicI32::new(0));
199 let count_clone = count.clone();
200
201 let id = emitter.on("test", move |_| {
202 count_clone.fetch_add(1, Ordering::SeqCst);
203 });
204
205 emitter.emit("test", &json!(null));
206 emitter.off(id);
207 emitter.emit("test", &json!(null));
208
209 assert_eq!(count.load(Ordering::SeqCst), 1);
210 }
211
212 #[test]
213 fn test_payload() {
214 let emitter = EventEmitter::new();
215 let received = Arc::new(Mutex::new(None));
216 let received_clone = received.clone();
217
218 emitter.on("data", move |payload| {
219 *received_clone.lock().unwrap() = Some(payload.clone());
220 });
221
222 emitter.emit("data", &json!({"name": "Alice"}));
223
224 let val = received.lock().unwrap().take().unwrap();
225 assert_eq!(val["name"], "Alice");
226 }
227
228 #[test]
229 fn test_listener_count() {
230 let emitter = EventEmitter::new();
231
232 assert_eq!(emitter.listener_count("test"), 0);
233
234 let id = emitter.on("test", |_| {});
235 assert_eq!(emitter.listener_count("test"), 1);
236
237 emitter.on("test", |_| {});
238 assert_eq!(emitter.listener_count("test"), 2);
239
240 emitter.off(id);
241 assert_eq!(emitter.listener_count("test"), 1);
242 }
243
244 #[test]
245 fn test_event_names() {
246 let emitter = EventEmitter::new();
247 emitter.on("a", |_| {});
248 emitter.on("b", |_| {});
249
250 let mut names = emitter.event_names();
251 names.sort();
252 assert_eq!(names, vec!["a", "b"]);
253 }
254
255 #[test]
256 fn test_remove_all_and_clear() {
257 let emitter = EventEmitter::new();
258 emitter.on("a", |_| {});
259 emitter.on("a", |_| {});
260 emitter.on("b", |_| {});
261
262 emitter.remove_all("a");
263 assert_eq!(emitter.listener_count("a"), 0);
264 assert_eq!(emitter.listener_count("b"), 1);
265
266 emitter.clear();
267 assert_eq!(emitter.listener_count("b"), 0);
268 }
269}