Skip to main content

rustapi_core/
events.rs

1//! Event System and Lifecycle Hooks
2//!
3//! Provides an in-process publish/subscribe event bus and lifecycle hooks
4//! for the RustAPI application.
5//!
6//! # Lifecycle Hooks
7//!
8//! ```rust,ignore
9//! use rustapi_core::RustApi;
10//!
11//! RustApi::new()
12//!     .on_start(|| async {
13//!         println!("Server started!");
14//!     })
15//!     .on_shutdown(|| async {
16//!         println!("Server shutting down...");
17//!     })
18//!     .run("127.0.0.1:8080")
19//!     .await
20//! ```
21//!
22//! # Event Bus
23//!
24//! ```rust,ignore
25//! use rustapi_core::events::EventBus;
26//! use rustapi_core::State;
27//!
28//! let bus = EventBus::new();
29//!
30//! // Subscribe to events
31//! bus.on("user.created", |payload: &str| {
32//!     println!("User created: {}", payload);
33//! });
34//!
35//! // In a handler, emit events
36//! async fn create_user(State(bus): State<EventBus>) -> impl IntoResponse {
37//!     bus.emit("user.created", "user_123");
38//!     "created"
39//! }
40//! ```
41
42use std::collections::HashMap;
43use std::future::Future;
44use std::pin::Pin;
45use std::sync::{Arc, RwLock};
46
47// ─── Lifecycle Hooks ────────────────────────────────────────────────────────
48
49/// A boxed async callback for lifecycle hooks
50pub(crate) type LifecycleHook =
51    Box<dyn FnOnce() -> Pin<Box<dyn Future<Output = ()> + Send>> + Send>;
52
53/// Stores registered lifecycle hooks
54pub(crate) struct LifecycleHooks {
55    pub on_start: Vec<LifecycleHook>,
56    pub on_shutdown: Vec<LifecycleHook>,
57}
58
59impl LifecycleHooks {
60    pub fn new() -> Self {
61        Self {
62            on_start: Vec::new(),
63            on_shutdown: Vec::new(),
64        }
65    }
66
67    #[allow(dead_code)]
68    pub fn has_hooks(&self) -> bool {
69        !self.on_start.is_empty() || !self.on_shutdown.is_empty()
70    }
71}
72
73impl Default for LifecycleHooks {
74    fn default() -> Self {
75        Self::new()
76    }
77}
78
79// ─── Event Bus ──────────────────────────────────────────────────────────────
80
81/// Callback type for synchronous event handlers
82type SyncHandler = Arc<dyn Fn(&str) + Send + Sync>;
83
84/// Callback type for async event handlers
85type AsyncHandler = Arc<dyn Fn(String) -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync>;
86
87/// In-process publish/subscribe event bus
88///
89/// Supports both synchronous and asynchronous event handlers.
90/// Multiple handlers can be registered for the same event topic.
91///
92/// # Example
93///
94/// ```rust
95/// use rustapi_core::events::EventBus;
96///
97/// let bus = EventBus::new();
98///
99/// // Synchronous handler
100/// bus.on("user.created", |payload: &str| {
101///     println!("User created: {}", payload);
102/// });
103///
104/// // Emit events
105/// bus.emit("user.created", "user_123");
106/// ```
107#[derive(Clone)]
108pub struct EventBus {
109    sync_handlers: Arc<RwLock<HashMap<String, Vec<SyncHandler>>>>,
110    async_handlers: Arc<RwLock<HashMap<String, Vec<AsyncHandler>>>>,
111}
112
113impl EventBus {
114    /// Create a new EventBus instance
115    pub fn new() -> Self {
116        Self {
117            sync_handlers: Arc::new(RwLock::new(HashMap::new())),
118            async_handlers: Arc::new(RwLock::new(HashMap::new())),
119        }
120    }
121
122    /// Register a synchronous event handler for a topic
123    ///
124    /// The handler will be called with the event payload string whenever
125    /// the topic is emitted.
126    ///
127    /// # Example
128    ///
129    /// ```rust
130    /// use rustapi_core::events::EventBus;
131    ///
132    /// let bus = EventBus::new();
133    /// bus.on("order.completed", |payload: &str| {
134    ///     println!("Order completed: {}", payload);
135    /// });
136    /// ```
137    pub fn on<F>(&self, topic: &str, handler: F)
138    where
139        F: Fn(&str) + Send + Sync + 'static,
140    {
141        let mut handlers = self.sync_handlers.write().unwrap();
142        handlers
143            .entry(topic.to_string())
144            .or_default()
145            .push(Arc::new(handler));
146    }
147
148    /// Register an async event handler for a topic
149    ///
150    /// The handler will be spawned as a tokio task when the topic is emitted.
151    ///
152    /// # Example
153    ///
154    /// ```rust,ignore
155    /// use rustapi_core::events::EventBus;
156    ///
157    /// let bus = EventBus::new();
158    /// bus.on_async("email.send", |payload: String| {
159    ///     Box::pin(async move {
160    ///         send_email(&payload).await;
161    ///     })
162    /// });
163    /// ```
164    pub fn on_async<F>(&self, topic: &str, handler: F)
165    where
166        F: Fn(String) -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync + 'static,
167    {
168        let mut handlers = self.async_handlers.write().unwrap();
169        handlers
170            .entry(topic.to_string())
171            .or_default()
172            .push(Arc::new(handler));
173    }
174
175    /// Emit an event synchronously
176    ///
177    /// Calls all synchronous handlers for the topic in registration order.
178    /// Also spawns tokio tasks for any async handlers.
179    ///
180    /// # Example
181    ///
182    /// ```rust
183    /// use rustapi_core::events::EventBus;
184    ///
185    /// let bus = EventBus::new();
186    /// bus.on("log", |msg: &str| println!("{}", msg));
187    /// bus.emit("log", "Hello!");
188    /// ```
189    pub fn emit(&self, topic: &str, payload: &str) {
190        // Call sync handlers
191        if let Ok(handlers) = self.sync_handlers.read() {
192            if let Some(topic_handlers) = handlers.get(topic) {
193                for handler in topic_handlers {
194                    handler(payload);
195                }
196            }
197        }
198
199        // Spawn async handlers
200        if let Ok(handlers) = self.async_handlers.read() {
201            if let Some(topic_handlers) = handlers.get(topic) {
202                for handler in topic_handlers {
203                    let handler = handler.clone();
204                    let payload = payload.to_string();
205                    tokio::spawn(async move {
206                        handler(payload).await;
207                    });
208                }
209            }
210        }
211    }
212
213    /// Emit an event and await all async handlers
214    ///
215    /// Unlike `emit()`, this waits for all async handlers to complete.
216    pub async fn emit_await(&self, topic: &str, payload: &str) {
217        // Call sync handlers
218        {
219            let handlers = self.sync_handlers.read().unwrap();
220            if let Some(topic_handlers) = handlers.get(topic) {
221                for handler in topic_handlers {
222                    handler(payload);
223                }
224            }
225        }
226
227        // Await async handlers
228        let tasks = {
229            let handlers = self.async_handlers.read().unwrap();
230            if let Some(topic_handlers) = handlers.get(topic) {
231                topic_handlers
232                    .iter()
233                    .map(|handler| {
234                        let handler = handler.clone();
235                        let payload = payload.to_string();
236                        tokio::spawn(async move {
237                            handler(payload).await;
238                        })
239                    })
240                    .collect::<Vec<_>>()
241            } else {
242                Vec::new()
243            }
244        };
245        for task in tasks {
246            let _ = task.await;
247        }
248    }
249
250    /// Get the number of registered handlers for a topic (both sync and async)
251    pub fn handler_count(&self, topic: &str) -> usize {
252        let sync_count = self
253            .sync_handlers
254            .read()
255            .map(|h| h.get(topic).map_or(0, |v| v.len()))
256            .unwrap_or(0);
257        let async_count = self
258            .async_handlers
259            .read()
260            .map(|h| h.get(topic).map_or(0, |v| v.len()))
261            .unwrap_or(0);
262        sync_count + async_count
263    }
264
265    /// Get all registered topic names
266    pub fn topics(&self) -> Vec<String> {
267        let mut topics = Vec::new();
268        if let Ok(handlers) = self.sync_handlers.read() {
269            topics.extend(handlers.keys().cloned());
270        }
271        if let Ok(handlers) = self.async_handlers.read() {
272            for key in handlers.keys() {
273                if !topics.contains(key) {
274                    topics.push(key.clone());
275                }
276            }
277        }
278        topics
279    }
280}
281
282impl Default for EventBus {
283    fn default() -> Self {
284        Self::new()
285    }
286}
287
288#[cfg(test)]
289mod tests {
290    use super::*;
291    use std::sync::atomic::{AtomicUsize, Ordering};
292
293    #[test]
294    fn test_sync_event_handler() {
295        let bus = EventBus::new();
296        let counter = Arc::new(AtomicUsize::new(0));
297        let counter_clone = counter.clone();
298
299        bus.on("test.event", move |_payload: &str| {
300            counter_clone.fetch_add(1, Ordering::SeqCst);
301        });
302
303        bus.emit("test.event", "hello");
304        bus.emit("test.event", "world");
305
306        assert_eq!(counter.load(Ordering::SeqCst), 2);
307    }
308
309    #[test]
310    fn test_multiple_handlers() {
311        let bus = EventBus::new();
312        let counter = Arc::new(AtomicUsize::new(0));
313
314        let c1 = counter.clone();
315        bus.on("multi", move |_| {
316            c1.fetch_add(1, Ordering::SeqCst);
317        });
318
319        let c2 = counter.clone();
320        bus.on("multi", move |_| {
321            c2.fetch_add(10, Ordering::SeqCst);
322        });
323
324        bus.emit("multi", "");
325        assert_eq!(counter.load(Ordering::SeqCst), 11);
326    }
327
328    #[test]
329    fn test_handler_count() {
330        let bus = EventBus::new();
331        assert_eq!(bus.handler_count("topic"), 0);
332
333        bus.on("topic", |_| {});
334        assert_eq!(bus.handler_count("topic"), 1);
335
336        bus.on("topic", |_| {});
337        assert_eq!(bus.handler_count("topic"), 2);
338    }
339
340    #[test]
341    fn test_topics() {
342        let bus = EventBus::new();
343        bus.on("a", |_| {});
344        bus.on("b", |_| {});
345
346        let topics = bus.topics();
347        assert!(topics.contains(&"a".to_string()));
348        assert!(topics.contains(&"b".to_string()));
349    }
350
351    #[test]
352    fn test_unregistered_topic_is_noop() {
353        let bus = EventBus::new();
354        // Should not panic
355        bus.emit("nonexistent", "payload");
356    }
357
358    #[tokio::test]
359    async fn test_async_event_handler() {
360        let bus = EventBus::new();
361        let counter = Arc::new(AtomicUsize::new(0));
362        let counter_clone = counter.clone();
363
364        bus.on_async("async.event", move |_payload: String| {
365            let counter = counter_clone.clone();
366            Box::pin(async move {
367                counter.fetch_add(1, Ordering::SeqCst);
368            })
369        });
370
371        bus.emit_await("async.event", "hello").await;
372        assert_eq!(counter.load(Ordering::SeqCst), 1);
373    }
374
375    #[tokio::test]
376    async fn test_emit_await_waits_for_all() {
377        let bus = EventBus::new();
378        let counter = Arc::new(AtomicUsize::new(0));
379
380        let c1 = counter.clone();
381        bus.on_async("wait", move |_| {
382            let c = c1.clone();
383            Box::pin(async move {
384                tokio::time::sleep(std::time::Duration::from_millis(10)).await;
385                c.fetch_add(1, Ordering::SeqCst);
386            })
387        });
388
389        let c2 = counter.clone();
390        bus.on_async("wait", move |_| {
391            let c = c2.clone();
392            Box::pin(async move {
393                tokio::time::sleep(std::time::Duration::from_millis(10)).await;
394                c.fetch_add(1, Ordering::SeqCst);
395            })
396        });
397
398        bus.emit_await("wait", "").await;
399        // Both handlers should have completed
400        assert_eq!(counter.load(Ordering::SeqCst), 2);
401    }
402
403    #[test]
404    fn test_lifecycle_hooks_default() {
405        let hooks = LifecycleHooks::new();
406        assert!(!hooks.has_hooks());
407    }
408}