1use std::collections::HashMap;
43use std::future::Future;
44use std::pin::Pin;
45use std::sync::{Arc, RwLock};
46
47pub(crate) type LifecycleHook =
51 Box<dyn FnOnce() -> Pin<Box<dyn Future<Output = ()> + Send>> + Send>;
52
53pub(crate) struct LifecycleHooks {
55 pub on_start: Vec<LifecycleHook>,
56 pub on_shutdown: Vec<LifecycleHook>,
57}
58
59impl LifecycleHooks {
60 pub fn new() -> Self {
61 Self {
62 on_start: Vec::new(),
63 on_shutdown: Vec::new(),
64 }
65 }
66
67 #[allow(dead_code)]
68 pub fn has_hooks(&self) -> bool {
69 !self.on_start.is_empty() || !self.on_shutdown.is_empty()
70 }
71}
72
73impl Default for LifecycleHooks {
74 fn default() -> Self {
75 Self::new()
76 }
77}
78
79type SyncHandler = Arc<dyn Fn(&str) + Send + Sync>;
83
84type AsyncHandler = Arc<dyn Fn(String) -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync>;
86
87#[derive(Clone)]
108pub struct EventBus {
109 sync_handlers: Arc<RwLock<HashMap<String, Vec<SyncHandler>>>>,
110 async_handlers: Arc<RwLock<HashMap<String, Vec<AsyncHandler>>>>,
111}
112
113impl EventBus {
114 pub fn new() -> Self {
116 Self {
117 sync_handlers: Arc::new(RwLock::new(HashMap::new())),
118 async_handlers: Arc::new(RwLock::new(HashMap::new())),
119 }
120 }
121
122 pub fn on<F>(&self, topic: &str, handler: F)
138 where
139 F: Fn(&str) + Send + Sync + 'static,
140 {
141 let mut handlers = self.sync_handlers.write().unwrap();
142 handlers
143 .entry(topic.to_string())
144 .or_default()
145 .push(Arc::new(handler));
146 }
147
148 pub fn on_async<F>(&self, topic: &str, handler: F)
165 where
166 F: Fn(String) -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync + 'static,
167 {
168 let mut handlers = self.async_handlers.write().unwrap();
169 handlers
170 .entry(topic.to_string())
171 .or_default()
172 .push(Arc::new(handler));
173 }
174
175 pub fn emit(&self, topic: &str, payload: &str) {
190 if let Ok(handlers) = self.sync_handlers.read() {
192 if let Some(topic_handlers) = handlers.get(topic) {
193 for handler in topic_handlers {
194 handler(payload);
195 }
196 }
197 }
198
199 if let Ok(handlers) = self.async_handlers.read() {
201 if let Some(topic_handlers) = handlers.get(topic) {
202 for handler in topic_handlers {
203 let handler = handler.clone();
204 let payload = payload.to_string();
205 tokio::spawn(async move {
206 handler(payload).await;
207 });
208 }
209 }
210 }
211 }
212
213 pub async fn emit_await(&self, topic: &str, payload: &str) {
217 {
219 let handlers = self.sync_handlers.read().unwrap();
220 if let Some(topic_handlers) = handlers.get(topic) {
221 for handler in topic_handlers {
222 handler(payload);
223 }
224 }
225 }
226
227 let tasks = {
229 let handlers = self.async_handlers.read().unwrap();
230 if let Some(topic_handlers) = handlers.get(topic) {
231 topic_handlers
232 .iter()
233 .map(|handler| {
234 let handler = handler.clone();
235 let payload = payload.to_string();
236 tokio::spawn(async move {
237 handler(payload).await;
238 })
239 })
240 .collect::<Vec<_>>()
241 } else {
242 Vec::new()
243 }
244 };
245 for task in tasks {
246 let _ = task.await;
247 }
248 }
249
250 pub fn handler_count(&self, topic: &str) -> usize {
252 let sync_count = self
253 .sync_handlers
254 .read()
255 .map(|h| h.get(topic).map_or(0, |v| v.len()))
256 .unwrap_or(0);
257 let async_count = self
258 .async_handlers
259 .read()
260 .map(|h| h.get(topic).map_or(0, |v| v.len()))
261 .unwrap_or(0);
262 sync_count + async_count
263 }
264
265 pub fn topics(&self) -> Vec<String> {
267 let mut topics = Vec::new();
268 if let Ok(handlers) = self.sync_handlers.read() {
269 topics.extend(handlers.keys().cloned());
270 }
271 if let Ok(handlers) = self.async_handlers.read() {
272 for key in handlers.keys() {
273 if !topics.contains(key) {
274 topics.push(key.clone());
275 }
276 }
277 }
278 topics
279 }
280}
281
282impl Default for EventBus {
283 fn default() -> Self {
284 Self::new()
285 }
286}
287
288#[cfg(test)]
289mod tests {
290 use super::*;
291 use std::sync::atomic::{AtomicUsize, Ordering};
292
293 #[test]
294 fn test_sync_event_handler() {
295 let bus = EventBus::new();
296 let counter = Arc::new(AtomicUsize::new(0));
297 let counter_clone = counter.clone();
298
299 bus.on("test.event", move |_payload: &str| {
300 counter_clone.fetch_add(1, Ordering::SeqCst);
301 });
302
303 bus.emit("test.event", "hello");
304 bus.emit("test.event", "world");
305
306 assert_eq!(counter.load(Ordering::SeqCst), 2);
307 }
308
309 #[test]
310 fn test_multiple_handlers() {
311 let bus = EventBus::new();
312 let counter = Arc::new(AtomicUsize::new(0));
313
314 let c1 = counter.clone();
315 bus.on("multi", move |_| {
316 c1.fetch_add(1, Ordering::SeqCst);
317 });
318
319 let c2 = counter.clone();
320 bus.on("multi", move |_| {
321 c2.fetch_add(10, Ordering::SeqCst);
322 });
323
324 bus.emit("multi", "");
325 assert_eq!(counter.load(Ordering::SeqCst), 11);
326 }
327
328 #[test]
329 fn test_handler_count() {
330 let bus = EventBus::new();
331 assert_eq!(bus.handler_count("topic"), 0);
332
333 bus.on("topic", |_| {});
334 assert_eq!(bus.handler_count("topic"), 1);
335
336 bus.on("topic", |_| {});
337 assert_eq!(bus.handler_count("topic"), 2);
338 }
339
340 #[test]
341 fn test_topics() {
342 let bus = EventBus::new();
343 bus.on("a", |_| {});
344 bus.on("b", |_| {});
345
346 let topics = bus.topics();
347 assert!(topics.contains(&"a".to_string()));
348 assert!(topics.contains(&"b".to_string()));
349 }
350
351 #[test]
352 fn test_unregistered_topic_is_noop() {
353 let bus = EventBus::new();
354 bus.emit("nonexistent", "payload");
356 }
357
358 #[tokio::test]
359 async fn test_async_event_handler() {
360 let bus = EventBus::new();
361 let counter = Arc::new(AtomicUsize::new(0));
362 let counter_clone = counter.clone();
363
364 bus.on_async("async.event", move |_payload: String| {
365 let counter = counter_clone.clone();
366 Box::pin(async move {
367 counter.fetch_add(1, Ordering::SeqCst);
368 })
369 });
370
371 bus.emit_await("async.event", "hello").await;
372 assert_eq!(counter.load(Ordering::SeqCst), 1);
373 }
374
375 #[tokio::test]
376 async fn test_emit_await_waits_for_all() {
377 let bus = EventBus::new();
378 let counter = Arc::new(AtomicUsize::new(0));
379
380 let c1 = counter.clone();
381 bus.on_async("wait", move |_| {
382 let c = c1.clone();
383 Box::pin(async move {
384 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
385 c.fetch_add(1, Ordering::SeqCst);
386 })
387 });
388
389 let c2 = counter.clone();
390 bus.on_async("wait", move |_| {
391 let c = c2.clone();
392 Box::pin(async move {
393 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
394 c.fetch_add(1, Ordering::SeqCst);
395 })
396 });
397
398 bus.emit_await("wait", "").await;
399 assert_eq!(counter.load(Ordering::SeqCst), 2);
401 }
402
403 #[test]
404 fn test_lifecycle_hooks_default() {
405 let hooks = LifecycleHooks::new();
406 assert!(!hooks.has_hooks());
407 }
408}