reovim_kernel/ipc/event_bus/mod.rs
1//! Event bus for type-erased event dispatch.
2//!
3//! The `EventBus` is the central hub for event-driven communication between
4//! kernel subsystems, drivers, and modules. It provides:
5//!
6//! - **Type-erased dispatch**: Single bus handles all event types
7//! - **Lock-free hot path**: Dispatch uses `ArcSwap` for zero-lock reads
8//! - **Priority ordering**: Handlers execute in priority order (lower = first)
9//! - **RAII subscriptions**: Handlers auto-unsubscribe when dropped
10//! - **Scoped events**: Track event lifecycle for synchronization
11//!
12//! # Design Philosophy
13//!
14//! Following the proven lock-free patterns:
15//! - **Lock-free dispatch**: `ArcSwap::load()` for handler lookup
16//! - **RCU for subscriptions**: Copy-on-write via `ArcSwap::rcu()`
17//! - **Fire-and-forget**: Handlers return `EventResult`, not `Result<T, E>`
18//!
19//! # Performance
20//!
21//! - Dispatch latency: ~100ns (0 handlers) to ~1µs (10 handlers)
22//! - Subscription: ~1µs (RCU clone + sort)
23//! - Handler lookup: O(1) `HashMap` + O(n) handler iteration
24//!
25//! # Example
26//!
27//! ```
28//! use reovim_kernel::api::v1::*;
29//!
30//! #[derive(Debug)]
31//! struct BufferChanged { buffer_id: u64 }
32//! impl Event for BufferChanged {}
33//!
34//! let bus = EventBus::new();
35//!
36//! // Subscribe with priority 100 (default)
37//! let _sub = bus.subscribe::<BufferChanged, _>(100, |event| {
38//! println!("Buffer {} changed", event.buffer_id);
39//! EventResult::Handled
40//! });
41//!
42//! // Emit event - handler is called synchronously
43//! bus.emit(BufferChanged { buffer_id: 1 });
44//! ```
45
46mod handler;
47mod sender;
48
49pub use sender::EventSender;
50
51use std::{any::TypeId, collections::HashMap, sync::Arc};
52
53use reovim_arch::sync::{ArcSwap, Mutex};
54
55use handler::{ContextHandlerFn, HandlerFn, HandlerType, RegisteredHandler};
56
57use super::{
58 channel::{BoundedReceiver, BoundedSender, bounded},
59 context::{DispatchResult, HandlerContext},
60 event::{DynEvent, Event, EventResult, TargetedEvent},
61 scope::EventScope,
62 subscription::{Subscription, SubscriptionId},
63};
64
65/// Optional channel for dedicated processor thread pattern.
66///
67/// Used when the `EventBus` is created with `new_with_channel()`.
68struct ChannelInner {
69 /// Sender side (cloneable)
70 tx: BoundedSender<DynEvent>,
71
72 /// Receiver side (only accessible once via `take_receiver()`)
73 rx: Mutex<Option<BoundedReceiver<DynEvent>>>,
74}
75
76/// Internal state for `EventBus`.
77struct EventBusInner {
78 /// Handler storage: `TypeId` -> sorted Vec of handlers.
79 /// Uses `ArcSwap` for lock-free reads.
80 handlers: ArcSwap<HashMap<TypeId, Vec<RegisteredHandler>>>,
81
82 /// Async event queue for `emit_async()`.
83 queue: Mutex<Vec<DynEvent>>,
84
85 /// Optional channel for dedicated processor thread.
86 channel: Option<ChannelInner>,
87}
88
89/// Type-erased event bus for pub/sub communication.
90///
91/// The `EventBus` routes events to registered handlers based on event type.
92/// It uses lock-free data structures for the hot dispatch path and
93/// copy-on-write for subscription updates.
94///
95/// # Thread Safety
96///
97/// `EventBus` is `Clone`, `Send`, and `Sync`. Cloning creates a new handle
98/// to the same underlying bus. Multiple threads can dispatch events
99/// concurrently without blocking.
100///
101/// # Handler Ordering
102///
103/// Handlers are called in priority order (lower priority number = earlier).
104/// For handlers with the same priority, registration order is preserved.
105///
106/// # Two Usage Patterns
107///
108/// 1. **Simple/Tests**: Use `new()` with `emit_async()` and `process_queue()`
109/// 2. **Runtime**: Use `new_with_channel()` with `sender()` and `take_receiver()`
110#[derive(Clone)]
111pub struct EventBus {
112 inner: Arc<EventBusInner>,
113}
114
115impl EventBus {
116 /// Create a new empty event bus.
117 ///
118 /// This creates an `EventBus` without a channel. Use `emit_async()` and
119 /// `process_queue()` for deferred event processing.
120 ///
121 /// For runtime integration with a dedicated processor thread, use
122 /// `new_with_channel()` instead.
123 #[must_use]
124 pub fn new() -> Self {
125 Self {
126 inner: Arc::new(EventBusInner {
127 handlers: ArcSwap::from_pointee(HashMap::new()),
128 queue: Mutex::new(Vec::new()),
129 channel: None,
130 }),
131 }
132 }
133
134 /// Create an event bus with a channel for dedicated processor thread.
135 ///
136 /// This is the preferred constructor for runtime integration where events
137 /// are processed by a dedicated OS thread using `blocking_recv()`.
138 ///
139 /// # Arguments
140 ///
141 /// * `capacity` - Bounded channel capacity (typically 1024)
142 ///
143 /// # Usage Pattern
144 ///
145 /// ```ignore
146 /// let bus = EventBus::new_with_channel(1024);
147 ///
148 /// // Get sender for emitting events
149 /// let sender = bus.sender().unwrap();
150 ///
151 /// // Take receiver for processor thread
152 /// let mut receiver = bus.take_receiver().unwrap();
153 ///
154 /// // Spawn processor thread
155 /// std::thread::spawn(move || {
156 /// while let Some(event) = receiver.blocking_recv() {
157 /// // Process event
158 /// }
159 /// });
160 ///
161 /// // Emit events from anywhere
162 /// sender.send(MyEvent { ... });
163 /// ```
164 #[must_use]
165 pub fn new_with_channel(capacity: usize) -> Self {
166 let (tx, rx) = bounded(capacity);
167 Self {
168 inner: Arc::new(EventBusInner {
169 handlers: ArcSwap::from_pointee(HashMap::new()),
170 queue: Mutex::new(Vec::new()),
171 channel: Some(ChannelInner {
172 tx,
173 rx: Mutex::new(Some(rx)),
174 }),
175 }),
176 }
177 }
178
179 /// Get a cloneable sender for emitting events via channel.
180 ///
181 /// Returns `None` if the `EventBus` was created without a channel.
182 ///
183 /// # Example
184 ///
185 /// ```ignore
186 /// let bus = EventBus::new_with_channel(1024);
187 /// let sender = bus.sender().expect("bus has channel");
188 ///
189 /// sender.try_send(MyEvent { ... });
190 /// ```
191 #[must_use]
192 pub fn sender(&self) -> Option<EventSender> {
193 self.inner
194 .channel
195 .as_ref()
196 .map(|c| EventSender { tx: c.tx.clone() })
197 }
198
199 /// Take the receiver for the dedicated processor thread.
200 ///
201 /// Can only be called once. Subsequent calls return `None`.
202 ///
203 /// # Example
204 ///
205 /// ```ignore
206 /// let bus = EventBus::new_with_channel(1024);
207 /// let receiver = bus.take_receiver().expect("bus has channel");
208 ///
209 /// // Spawn processor thread
210 /// std::thread::spawn(move || {
211 /// while let Some(event) = receiver.blocking_recv() {
212 /// // Process event
213 /// }
214 /// });
215 /// ```
216 #[must_use]
217 pub fn take_receiver(&self) -> Option<BoundedReceiver<DynEvent>> {
218 self.inner.channel.as_ref()?.rx.lock().take()
219 }
220
221 /// Subscribe a handler for events of type `E`.
222 ///
223 /// Returns a `Subscription` handle that unsubscribes when dropped.
224 ///
225 /// # Arguments
226 ///
227 /// * `priority` - Handler priority (lower = called earlier). Convention:
228 /// - 0-50: Core/critical handlers
229 /// - 100: Default priority
230 /// - 200+: Low priority (cleanup, logging)
231 /// * `handler` - Function called for each event of type `E`
232 ///
233 /// # Example
234 ///
235 /// ```
236 /// use reovim_kernel::api::v1::*;
237 ///
238 /// #[derive(Debug)]
239 /// struct MyEvent { value: i32 }
240 /// impl Event for MyEvent {}
241 ///
242 /// let bus = EventBus::new();
243 ///
244 /// let sub = bus.subscribe::<MyEvent, _>(100, |event| {
245 /// println!("Received: {:?}", event.value);
246 /// EventResult::Handled
247 /// });
248 ///
249 /// // Handler is active while `sub` is alive
250 /// bus.emit(MyEvent { value: 42 });
251 ///
252 /// // Dropping `sub` removes the handler
253 /// drop(sub);
254 /// ```
255 #[cfg_attr(coverage_nightly, coverage(off))]
256 pub fn subscribe<E, F>(&self, priority: u32, handler: F) -> Subscription
257 where
258 E: Event,
259 F: Fn(&E) -> EventResult + Send + Sync + 'static,
260 {
261 let type_id = TypeId::of::<E>();
262 let sub_id = SubscriptionId::new();
263
264 // Wrap handler to downcast from DynEvent
265 let wrapped_handler: HandlerFn = Arc::new(move |dyn_event: &DynEvent| {
266 dyn_event
267 .downcast_ref::<E>()
268 .map_or(EventResult::NotHandled, &handler)
269 });
270
271 let registered = RegisteredHandler {
272 id: sub_id,
273 priority,
274 handler: HandlerType::Simple(wrapped_handler),
275 };
276
277 // RCU update: clone, modify, swap
278 // Note: rcu takes FnMut, so we need to clone registered
279 self.inner.handlers.rcu(|current| {
280 let mut new_map = (**current).clone();
281 let handlers = new_map.entry(type_id).or_default();
282 handlers.push(registered.clone());
283 // Sort by priority (stable sort preserves registration order for same priority)
284 handlers.sort_by_key(|h| h.priority);
285 new_map
286 });
287
288 // Create unsubscribe closure that captures the inner Arc
289 let inner = Arc::clone(&self.inner);
290 let unsubscribe = move || {
291 inner.handlers.rcu(|current| {
292 let mut new_map = (**current).clone();
293 if let Some(handlers) = new_map.get_mut(&type_id) {
294 handlers.retain(|h| h.id != sub_id);
295 if handlers.is_empty() {
296 new_map.remove(&type_id);
297 }
298 }
299 new_map
300 });
301 };
302
303 Subscription::new::<E>(sub_id, unsubscribe)
304 }
305
306 /// Subscribe a context-aware handler for events of type `E`.
307 ///
308 /// Similar to `subscribe`, but the handler receives a `HandlerContext`
309 /// that allows emitting new events, requesting renders, etc.
310 ///
311 /// # Arguments
312 ///
313 /// * `priority` - Handler priority (lower = called earlier)
314 /// * `handler` - Function called for each event, with access to context
315 ///
316 /// # Example
317 ///
318 /// ```
319 /// use reovim_kernel::api::v1::*;
320 ///
321 /// #[derive(Debug)]
322 /// struct MyEvent { value: i32 }
323 /// impl Event for MyEvent {}
324 ///
325 /// #[derive(Debug)]
326 /// struct FollowUpEvent;
327 /// impl Event for FollowUpEvent {}
328 ///
329 /// let bus = EventBus::new();
330 ///
331 /// let sub = bus.subscribe_with_context::<MyEvent, _>(100, |event, ctx| {
332 /// // Emit a follow-up event
333 /// ctx.emit(FollowUpEvent);
334 /// // Request render
335 /// ctx.request_render();
336 /// EventResult::Handled
337 /// });
338 /// ```
339 #[cfg_attr(coverage_nightly, coverage(off))]
340 pub fn subscribe_with_context<E, F>(&self, priority: u32, handler: F) -> Subscription
341 where
342 E: Event,
343 F: Fn(&E, &mut HandlerContext) -> EventResult + Send + Sync + 'static,
344 {
345 let type_id = TypeId::of::<E>();
346 let sub_id = SubscriptionId::new();
347
348 // Wrap handler to downcast from DynEvent
349 let wrapped_handler: ContextHandlerFn = Arc::new(move |dyn_event: &DynEvent, ctx| {
350 dyn_event
351 .downcast_ref::<E>()
352 .map_or(EventResult::NotHandled, |e| handler(e, ctx))
353 });
354
355 let registered = RegisteredHandler {
356 id: sub_id,
357 priority,
358 handler: HandlerType::WithContext(wrapped_handler),
359 };
360
361 // RCU update: clone, modify, swap
362 self.inner.handlers.rcu(|current| {
363 let mut new_map = (**current).clone();
364 let handlers = new_map.entry(type_id).or_default();
365 handlers.push(registered.clone());
366 handlers.sort_by_key(|h| h.priority);
367 new_map
368 });
369
370 // Create unsubscribe closure
371 let inner = Arc::clone(&self.inner);
372 let unsubscribe = move || {
373 inner.handlers.rcu(|current| {
374 let mut new_map = (**current).clone();
375 if let Some(handlers) = new_map.get_mut(&type_id) {
376 handlers.retain(|h| h.id != sub_id);
377 if handlers.is_empty() {
378 new_map.remove(&type_id);
379 }
380 }
381 new_map
382 });
383 };
384
385 Subscription::new::<E>(sub_id, unsubscribe)
386 }
387
388 /// Subscribe a handler for targeted events, filtering by target.
389 ///
390 /// The handler is only called when `event.target()` matches the
391 /// specified target string.
392 ///
393 /// # Arguments
394 ///
395 /// * `target` - Target string to filter events
396 /// * `priority` - Handler priority (lower = called earlier)
397 /// * `handler` - Function called for matching events
398 ///
399 /// # Example
400 ///
401 /// ```
402 /// use reovim_kernel::api::v1::*;
403 ///
404 /// #[derive(Debug)]
405 /// struct PluginInput {
406 /// target: &'static str,
407 /// value: i32,
408 /// }
409 /// impl Event for PluginInput {}
410 /// impl TargetedEvent for PluginInput {
411 /// fn target(&self) -> &str { self.target }
412 /// }
413 ///
414 /// let bus = EventBus::new();
415 ///
416 /// // Only receives events where target == "my_plugin"
417 /// let sub = bus.subscribe_targeted::<PluginInput, _>("my_plugin", 100, |event, ctx| {
418 /// println!("My plugin received: {}", event.value);
419 /// EventResult::Handled
420 /// });
421 /// ```
422 pub fn subscribe_targeted<E, F>(&self, target: &str, priority: u32, handler: F) -> Subscription
423 where
424 E: TargetedEvent,
425 F: Fn(&E, &mut HandlerContext) -> EventResult + Send + Sync + 'static,
426 {
427 let target = target.to_owned();
428 self.subscribe_with_context::<E, _>(priority, move |event, ctx| {
429 if event.target() == target {
430 handler(event, ctx)
431 } else {
432 EventResult::NotHandled
433 }
434 })
435 }
436
437 /// Emit an event synchronously.
438 ///
439 /// Dispatches the event to all registered handlers in priority order.
440 /// Returns the combined result of all handlers.
441 ///
442 /// # Handler Behavior
443 ///
444 /// - Handlers are called in priority order (lower = first)
445 /// - If any handler returns `Consumed`, dispatch stops
446 /// - Otherwise continues until all handlers have been called
447 ///
448 /// # Example
449 ///
450 /// ```
451 /// use reovim_kernel::api::v1::*;
452 ///
453 /// #[derive(Debug)]
454 /// struct MyEvent;
455 /// impl Event for MyEvent {}
456 ///
457 /// let bus = EventBus::new();
458 /// let result = bus.emit(MyEvent);
459 /// assert!(result.is_not_handled()); // No handlers registered
460 /// ```
461 pub fn emit<E: Event>(&self, event: E) -> EventResult {
462 let dyn_event = DynEvent::new(event);
463 self.dispatch(&dyn_event)
464 }
465
466 /// Emit an event with scope tracking.
467 ///
468 /// The scope's counter is incremented before dispatch and decremented
469 /// after all handlers complete. This allows callers to wait for all
470 /// effects of the event to complete.
471 ///
472 /// # Example
473 ///
474 /// ```
475 /// use reovim_kernel::api::v1::*;
476 ///
477 /// #[derive(Debug)]
478 /// struct MyEvent;
479 /// impl Event for MyEvent {}
480 ///
481 /// let bus = EventBus::new();
482 /// let scope = EventScope::new();
483 ///
484 /// scope.increment(); // Manually increment before emit_scoped
485 /// let result = bus.emit_scoped(MyEvent, &scope);
486 /// // Scope counter is now decremented
487 /// ```
488 pub fn emit_scoped<E: Event>(&self, event: E, scope: &EventScope) -> EventResult {
489 let dyn_event = DynEvent::new(event).with_scope(scope.clone());
490 let result = self.dispatch(&dyn_event);
491
492 // Decrement scope after dispatch completes
493 scope.decrement();
494
495 result
496 }
497
498 /// Queue an event for later processing.
499 ///
500 /// The event is stored in an internal queue and processed when
501 /// `process_queue()` is called. This is useful for deferring
502 /// event dispatch to avoid reentrancy issues.
503 ///
504 /// # Example
505 ///
506 /// ```
507 /// use reovim_kernel::api::v1::*;
508 ///
509 /// #[derive(Debug)]
510 /// struct MyEvent;
511 /// impl Event for MyEvent {}
512 ///
513 /// let bus = EventBus::new();
514 ///
515 /// bus.emit_async(MyEvent);
516 /// bus.emit_async(MyEvent);
517 ///
518 /// // Events are queued but not dispatched yet
519 /// let count = bus.process_queue();
520 /// assert_eq!(count, 2);
521 /// ```
522 pub fn emit_async<E: Event>(&self, event: E) {
523 let dyn_event = DynEvent::new(event);
524 self.inner.queue.lock().push(dyn_event);
525 }
526
527 /// Queue an event with scope tracking for later processing.
528 ///
529 /// Like `emit_async`, but attaches a scope for lifecycle tracking.
530 /// The scope is incremented when queued and decremented after dispatch.
531 pub fn emit_async_scoped<E: Event>(&self, event: E, scope: &EventScope) {
532 scope.increment();
533 let dyn_event = DynEvent::new(event).with_scope(scope.clone());
534 self.inner.queue.lock().push(dyn_event);
535 }
536
537 /// Process all queued events.
538 ///
539 /// Drains the queue and dispatches each event synchronously.
540 /// Returns the number of events processed.
541 ///
542 /// # Scope Handling
543 ///
544 /// For events with attached scopes, the scope is decremented after
545 /// each event is dispatched.
546 #[must_use]
547 pub fn process_queue(&self) -> usize {
548 let events: Vec<_> = {
549 let mut queue = self.inner.queue.lock();
550 std::mem::take(&mut *queue)
551 };
552
553 let count = events.len();
554
555 for mut event in events {
556 let scope = event.take_scope();
557 let _result = self.dispatch(&event);
558 // TODO: Replace with pr_trace!() once printk is implemented
559 // pr_trace!("processed queued event: {} -> {:?}", event.type_name(), result);
560 if let Some(scope) = scope {
561 scope.decrement();
562 }
563 }
564
565 count
566 }
567
568 /// Dispatch a type-erased event to registered handlers.
569 ///
570 /// This is the core dispatch logic, used by both `emit` and `process_queue`.
571 #[must_use]
572 pub fn dispatch(&self, event: &DynEvent) -> EventResult {
573 // Lock-free read of handlers
574 let handlers = self.inner.handlers.load();
575 let type_id = event.type_id();
576
577 let Some(handlers) = handlers.get(&type_id) else {
578 return EventResult::NotHandled;
579 };
580
581 let mut result = EventResult::NotHandled;
582 // Create a temporary context for context-aware handlers
583 let mut temp_ctx = HandlerContext::new();
584
585 for handler in handlers {
586 let handler_result = match &handler.handler {
587 HandlerType::Simple(h) => h(event),
588 HandlerType::WithContext(h) => h(event, &mut temp_ctx),
589 };
590
591 match handler_result {
592 EventResult::Consumed => {
593 return EventResult::Consumed;
594 }
595 EventResult::Handled => {
596 result = EventResult::Handled;
597 }
598 EventResult::NotHandled => {
599 // Continue to next handler
600 }
601 }
602 }
603
604 result
605 }
606
607 /// Dispatch an event with handler context.
608 ///
609 /// This allows handlers to emit new events, request renders, etc.
610 /// Returns a `DispatchResult` containing the event result and any
611 /// side effects (emitted events, render requests).
612 ///
613 /// # Example
614 ///
615 /// ```ignore
616 /// let mut ctx = HandlerContext::new().with_scope(Some(scope));
617 /// let result = bus.dispatch_with_context(&event, &mut ctx);
618 ///
619 /// if result.render_requested {
620 /// // Handle render request
621 /// }
622 ///
623 /// // Process emitted events
624 /// for event in result.emitted_events {
625 /// bus.dispatch(&event);
626 /// }
627 /// ```
628 pub fn dispatch_with_context(
629 &self,
630 event: &DynEvent,
631 ctx: &mut HandlerContext,
632 ) -> DispatchResult {
633 // Lock-free read of handlers
634 let handlers = self.inner.handlers.load();
635 let type_id = event.type_id();
636
637 let Some(handlers) = handlers.get(&type_id) else {
638 return DispatchResult::not_handled();
639 };
640
641 let mut result = EventResult::NotHandled;
642
643 for handler in handlers {
644 let handler_result = match &handler.handler {
645 HandlerType::Simple(h) => h(event),
646 HandlerType::WithContext(h) => h(event, ctx),
647 };
648
649 match handler_result {
650 EventResult::Consumed => {
651 return DispatchResult::new(EventResult::Consumed, ctx);
652 }
653 EventResult::Handled => {
654 result = EventResult::Handled;
655 }
656 EventResult::NotHandled => {
657 // Continue to next handler
658 }
659 }
660 }
661
662 DispatchResult::new(result, ctx)
663 }
664
665 /// Get the number of handlers registered for a specific event type.
666 ///
667 /// Useful for debugging and testing.
668 #[must_use]
669 pub fn handler_count<E: Event>(&self) -> usize {
670 let handlers = self.inner.handlers.load();
671 handlers.get(&TypeId::of::<E>()).map_or(0, Vec::len)
672 }
673
674 /// Get the total number of handlers registered across all event types.
675 #[must_use]
676 pub fn total_handler_count(&self) -> usize {
677 let handlers = self.inner.handlers.load();
678 handlers.values().map(Vec::len).sum()
679 }
680
681 /// Get the number of events in the async queue.
682 #[must_use]
683 pub fn queue_len(&self) -> usize {
684 self.inner.queue.lock().len()
685 }
686
687 /// Check if the async queue is empty.
688 #[must_use]
689 pub fn queue_is_empty(&self) -> bool {
690 self.inner.queue.lock().is_empty()
691 }
692}
693
694impl Default for EventBus {
695 fn default() -> Self {
696 Self::new()
697 }
698}
699
700impl std::fmt::Debug for EventBus {
701 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
702 let handlers = self.inner.handlers.load();
703 f.debug_struct("EventBus")
704 .field("event_types", &handlers.len())
705 .field("total_handlers", &self.total_handler_count())
706 .field("queue_len", &self.queue_len())
707 .finish()
708 }
709}