Skip to main content

hypen_server/
events.rs

1use std::collections::HashMap;
2use std::sync::{Arc, Mutex};
3
4/// A unique subscription ID for unsubscribing.
5#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
6pub struct SubscriptionId(u64);
7
8type Listener = Box<dyn Fn(&serde_json::Value) + Send + Sync>;
9type ListenerList = Vec<(SubscriptionId, Arc<Listener>)>;
10
11/// A thread-safe, typed event emitter for pub/sub messaging.
12///
13/// # Example
14///
15/// ```rust
16/// use hypen_server::events::EventEmitter;
17/// use serde_json::json;
18/// use std::sync::{Arc, Mutex};
19///
20/// let emitter = EventEmitter::new();
21///
22/// let received = Arc::new(Mutex::new(Vec::new()));
23/// let received_clone = received.clone();
24///
25/// let _sub = emitter.on("user:login", move |payload| {
26///     received_clone.lock().unwrap().push(payload.clone());
27/// });
28///
29/// emitter.emit("user:login", &json!({"name": "Alice"}));
30///
31/// assert_eq!(received.lock().unwrap().len(), 1);
32/// ```
33pub struct EventEmitter {
34    next_id: Mutex<u64>,
35    listeners: Mutex<HashMap<String, ListenerList>>,
36    once_ids: Mutex<Vec<SubscriptionId>>,
37}
38
39impl EventEmitter {
40    pub fn new() -> Self {
41        Self {
42            next_id: Mutex::new(0),
43            listeners: Mutex::new(HashMap::new()),
44            once_ids: Mutex::new(Vec::new()),
45        }
46    }
47
48    /// Subscribe to an event. Returns a `SubscriptionId` for unsubscribing.
49    pub fn on<F>(&self, event: impl Into<String>, handler: F) -> SubscriptionId
50    where
51        F: Fn(&serde_json::Value) + Send + Sync + 'static,
52    {
53        let id = self.next_sub_id();
54        let mut listeners = self.listeners.lock().unwrap();
55        listeners
56            .entry(event.into())
57            .or_default()
58            .push((id, Arc::new(Box::new(handler))));
59        id
60    }
61
62    /// Subscribe to an event once. The handler is removed after the first call.
63    pub fn once<F>(&self, event: impl Into<String>, handler: F) -> SubscriptionId
64    where
65        F: Fn(&serde_json::Value) + Send + Sync + 'static,
66    {
67        let id = self.on(event, handler);
68        self.once_ids.lock().unwrap().push(id);
69        id
70    }
71
72    /// Emit an event to all listeners.
73    pub fn emit(&self, event: &str, payload: &serde_json::Value) {
74        // Collect listeners to call (clone Arcs to avoid holding lock during calls)
75        let to_call: Vec<Arc<Listener>> = {
76            let listeners = self.listeners.lock().unwrap();
77            listeners
78                .get(event)
79                .map(|list| list.iter().map(|(_, h)| Arc::clone(h)).collect())
80                .unwrap_or_default()
81        };
82
83        for handler in &to_call {
84            handler(payload);
85        }
86
87        // Remove once-listeners
88        let once_ids: Vec<SubscriptionId> = {
89            let mut once = self.once_ids.lock().unwrap();
90            std::mem::take(&mut *once)
91        };
92        for id in once_ids {
93            self.off(id);
94        }
95    }
96
97    /// Unsubscribe a specific listener by its `SubscriptionId`.
98    pub fn off(&self, id: SubscriptionId) {
99        let mut listeners = self.listeners.lock().unwrap();
100        for list in listeners.values_mut() {
101            list.retain(|(sub_id, _)| *sub_id != id);
102        }
103    }
104
105    /// Remove all listeners for a specific event.
106    pub fn remove_all(&self, event: &str) {
107        let mut listeners = self.listeners.lock().unwrap();
108        listeners.remove(event);
109    }
110
111    /// Clear all listeners for all events.
112    pub fn clear(&self) {
113        let mut listeners = self.listeners.lock().unwrap();
114        listeners.clear();
115    }
116
117    /// Get the number of listeners for an event.
118    pub fn listener_count(&self, event: &str) -> usize {
119        let listeners = self.listeners.lock().unwrap();
120        listeners.get(event).map(|l| l.len()).unwrap_or(0)
121    }
122
123    /// Get all event names that have at least one listener.
124    pub fn event_names(&self) -> Vec<String> {
125        let listeners = self.listeners.lock().unwrap();
126        listeners
127            .iter()
128            .filter(|(_, v)| !v.is_empty())
129            .map(|(k, _)| k.clone())
130            .collect()
131    }
132
133    fn next_sub_id(&self) -> SubscriptionId {
134        let mut id = self.next_id.lock().unwrap();
135        let sub = SubscriptionId(*id);
136        *id += 1;
137        sub
138    }
139}
140
141impl Default for EventEmitter {
142    fn default() -> Self {
143        Self::new()
144    }
145}
146
147/// Well-known framework event names.
148pub mod framework {
149    pub const MODULE_CREATED: &str = "module:created";
150    pub const MODULE_DESTROYED: &str = "module:destroyed";
151    pub const ROUTE_CHANGED: &str = "route:changed";
152    pub const STATE_UPDATED: &str = "state:updated";
153    pub const ACTION_DISPATCHED: &str = "action:dispatched";
154    pub const ERROR: &str = "error";
155}
156
157#[cfg(test)]
158mod tests {
159    use super::*;
160    use serde_json::json;
161    use std::sync::atomic::{AtomicI32, Ordering};
162
163    #[test]
164    fn test_on_and_emit() {
165        let emitter = EventEmitter::new();
166        let count = Arc::new(AtomicI32::new(0));
167        let count_clone = count.clone();
168
169        emitter.on("test", move |_| {
170            count_clone.fetch_add(1, Ordering::SeqCst);
171        });
172
173        emitter.emit("test", &json!(null));
174        emitter.emit("test", &json!(null));
175
176        assert_eq!(count.load(Ordering::SeqCst), 2);
177    }
178
179    #[test]
180    fn test_once() {
181        let emitter = EventEmitter::new();
182        let count = Arc::new(AtomicI32::new(0));
183        let count_clone = count.clone();
184
185        emitter.once("test", move |_| {
186            count_clone.fetch_add(1, Ordering::SeqCst);
187        });
188
189        emitter.emit("test", &json!(null));
190        emitter.emit("test", &json!(null));
191
192        assert_eq!(count.load(Ordering::SeqCst), 1);
193    }
194
195    #[test]
196    fn test_off() {
197        let emitter = EventEmitter::new();
198        let count = Arc::new(AtomicI32::new(0));
199        let count_clone = count.clone();
200
201        let id = emitter.on("test", move |_| {
202            count_clone.fetch_add(1, Ordering::SeqCst);
203        });
204
205        emitter.emit("test", &json!(null));
206        emitter.off(id);
207        emitter.emit("test", &json!(null));
208
209        assert_eq!(count.load(Ordering::SeqCst), 1);
210    }
211
212    #[test]
213    fn test_payload() {
214        let emitter = EventEmitter::new();
215        let received = Arc::new(Mutex::new(None));
216        let received_clone = received.clone();
217
218        emitter.on("data", move |payload| {
219            *received_clone.lock().unwrap() = Some(payload.clone());
220        });
221
222        emitter.emit("data", &json!({"name": "Alice"}));
223
224        let val = received.lock().unwrap().take().unwrap();
225        assert_eq!(val["name"], "Alice");
226    }
227
228    #[test]
229    fn test_listener_count() {
230        let emitter = EventEmitter::new();
231
232        assert_eq!(emitter.listener_count("test"), 0);
233
234        let id = emitter.on("test", |_| {});
235        assert_eq!(emitter.listener_count("test"), 1);
236
237        emitter.on("test", |_| {});
238        assert_eq!(emitter.listener_count("test"), 2);
239
240        emitter.off(id);
241        assert_eq!(emitter.listener_count("test"), 1);
242    }
243
244    #[test]
245    fn test_event_names() {
246        let emitter = EventEmitter::new();
247        emitter.on("a", |_| {});
248        emitter.on("b", |_| {});
249
250        let mut names = emitter.event_names();
251        names.sort();
252        assert_eq!(names, vec!["a", "b"]);
253    }
254
255    #[test]
256    fn test_remove_all_and_clear() {
257        let emitter = EventEmitter::new();
258        emitter.on("a", |_| {});
259        emitter.on("a", |_| {});
260        emitter.on("b", |_| {});
261
262        emitter.remove_all("a");
263        assert_eq!(emitter.listener_count("a"), 0);
264        assert_eq!(emitter.listener_count("b"), 1);
265
266        emitter.clear();
267        assert_eq!(emitter.listener_count("b"), 0);
268    }
269}