Skip to main content

reovim_kernel/ipc/event_bus/
mod.rs

1//! Event bus for type-erased event dispatch.
2//!
3//! The `EventBus` is the central hub for event-driven communication between
4//! kernel subsystems, drivers, and modules. It provides:
5//!
6//! - **Type-erased dispatch**: Single bus handles all event types
7//! - **Lock-free hot path**: Dispatch uses `ArcSwap` for zero-lock reads
8//! - **Priority ordering**: Handlers execute in priority order (lower = first)
9//! - **RAII subscriptions**: Handlers auto-unsubscribe when dropped
10//! - **Scoped events**: Track event lifecycle for synchronization
11//!
12//! # Design Philosophy
13//!
14//! Following the proven lock-free patterns:
15//! - **Lock-free dispatch**: `ArcSwap::load()` for handler lookup
16//! - **RCU for subscriptions**: Copy-on-write via `ArcSwap::rcu()`
17//! - **Fire-and-forget**: Handlers return `EventResult`, not `Result<T, E>`
18//!
19//! # Performance
20//!
21//! - Dispatch latency: ~100ns (0 handlers) to ~1µs (10 handlers)
22//! - Subscription: ~1µs (RCU clone + sort)
23//! - Handler lookup: O(1) `HashMap` + O(n) handler iteration
24//!
25//! # Example
26//!
27//! ```
28//! use reovim_kernel::api::v1::*;
29//!
30//! #[derive(Debug)]
31//! struct BufferChanged { buffer_id: u64 }
32//! impl Event for BufferChanged {}
33//!
34//! let bus = EventBus::new();
35//!
36//! // Subscribe with priority 100 (default)
37//! let _sub = bus.subscribe::<BufferChanged, _>(100, |event| {
38//!     println!("Buffer {} changed", event.buffer_id);
39//!     EventResult::Handled
40//! });
41//!
42//! // Emit event - handler is called synchronously
43//! bus.emit(BufferChanged { buffer_id: 1 });
44//! ```
45
46mod handler;
47mod sender;
48
49pub use sender::EventSender;
50
51use std::{any::TypeId, collections::HashMap, sync::Arc};
52
53use reovim_arch::sync::{ArcSwap, Mutex};
54
55use handler::{ContextHandlerFn, HandlerFn, HandlerType, RegisteredHandler};
56
57use super::{
58    channel::{BoundedReceiver, BoundedSender, bounded},
59    context::{DispatchResult, HandlerContext},
60    event::{DynEvent, Event, EventResult, TargetedEvent},
61    scope::EventScope,
62    subscription::{Subscription, SubscriptionId},
63};
64
65/// Optional channel for dedicated processor thread pattern.
66///
67/// Used when the `EventBus` is created with `new_with_channel()`.
68struct ChannelInner {
69    /// Sender side (cloneable)
70    tx: BoundedSender<DynEvent>,
71
72    /// Receiver side (only accessible once via `take_receiver()`)
73    rx: Mutex<Option<BoundedReceiver<DynEvent>>>,
74}
75
76/// Internal state for `EventBus`.
77struct EventBusInner {
78    /// Handler storage: `TypeId` -> sorted Vec of handlers.
79    /// Uses `ArcSwap` for lock-free reads.
80    handlers: ArcSwap<HashMap<TypeId, Vec<RegisteredHandler>>>,
81
82    /// Async event queue for `emit_async()`.
83    queue: Mutex<Vec<DynEvent>>,
84
85    /// Optional channel for dedicated processor thread.
86    channel: Option<ChannelInner>,
87}
88
89/// Type-erased event bus for pub/sub communication.
90///
91/// The `EventBus` routes events to registered handlers based on event type.
92/// It uses lock-free data structures for the hot dispatch path and
93/// copy-on-write for subscription updates.
94///
95/// # Thread Safety
96///
97/// `EventBus` is `Clone`, `Send`, and `Sync`. Cloning creates a new handle
98/// to the same underlying bus. Multiple threads can dispatch events
99/// concurrently without blocking.
100///
101/// # Handler Ordering
102///
103/// Handlers are called in priority order (lower priority number = earlier).
104/// For handlers with the same priority, registration order is preserved.
105///
106/// # Two Usage Patterns
107///
108/// 1. **Simple/Tests**: Use `new()` with `emit_async()` and `process_queue()`
109/// 2. **Runtime**: Use `new_with_channel()` with `sender()` and `take_receiver()`
110#[derive(Clone)]
111pub struct EventBus {
112    inner: Arc<EventBusInner>,
113}
114
115impl EventBus {
116    /// Create a new empty event bus.
117    ///
118    /// This creates an `EventBus` without a channel. Use `emit_async()` and
119    /// `process_queue()` for deferred event processing.
120    ///
121    /// For runtime integration with a dedicated processor thread, use
122    /// `new_with_channel()` instead.
123    #[must_use]
124    pub fn new() -> Self {
125        Self {
126            inner: Arc::new(EventBusInner {
127                handlers: ArcSwap::from_pointee(HashMap::new()),
128                queue: Mutex::new(Vec::new()),
129                channel: None,
130            }),
131        }
132    }
133
134    /// Create an event bus with a channel for dedicated processor thread.
135    ///
136    /// This is the preferred constructor for runtime integration where events
137    /// are processed by a dedicated OS thread using `blocking_recv()`.
138    ///
139    /// # Arguments
140    ///
141    /// * `capacity` - Bounded channel capacity (typically 1024)
142    ///
143    /// # Usage Pattern
144    ///
145    /// ```ignore
146    /// let bus = EventBus::new_with_channel(1024);
147    ///
148    /// // Get sender for emitting events
149    /// let sender = bus.sender().unwrap();
150    ///
151    /// // Take receiver for processor thread
152    /// let mut receiver = bus.take_receiver().unwrap();
153    ///
154    /// // Spawn processor thread
155    /// std::thread::spawn(move || {
156    ///     while let Some(event) = receiver.blocking_recv() {
157    ///         // Process event
158    ///     }
159    /// });
160    ///
161    /// // Emit events from anywhere
162    /// sender.send(MyEvent { ... });
163    /// ```
164    #[must_use]
165    pub fn new_with_channel(capacity: usize) -> Self {
166        let (tx, rx) = bounded(capacity);
167        Self {
168            inner: Arc::new(EventBusInner {
169                handlers: ArcSwap::from_pointee(HashMap::new()),
170                queue: Mutex::new(Vec::new()),
171                channel: Some(ChannelInner {
172                    tx,
173                    rx: Mutex::new(Some(rx)),
174                }),
175            }),
176        }
177    }
178
179    /// Get a cloneable sender for emitting events via channel.
180    ///
181    /// Returns `None` if the `EventBus` was created without a channel.
182    ///
183    /// # Example
184    ///
185    /// ```ignore
186    /// let bus = EventBus::new_with_channel(1024);
187    /// let sender = bus.sender().expect("bus has channel");
188    ///
189    /// sender.try_send(MyEvent { ... });
190    /// ```
191    #[must_use]
192    pub fn sender(&self) -> Option<EventSender> {
193        self.inner
194            .channel
195            .as_ref()
196            .map(|c| EventSender { tx: c.tx.clone() })
197    }
198
199    /// Take the receiver for the dedicated processor thread.
200    ///
201    /// Can only be called once. Subsequent calls return `None`.
202    ///
203    /// # Example
204    ///
205    /// ```ignore
206    /// let bus = EventBus::new_with_channel(1024);
207    /// let receiver = bus.take_receiver().expect("bus has channel");
208    ///
209    /// // Spawn processor thread
210    /// std::thread::spawn(move || {
211    ///     while let Some(event) = receiver.blocking_recv() {
212    ///         // Process event
213    ///     }
214    /// });
215    /// ```
216    #[must_use]
217    pub fn take_receiver(&self) -> Option<BoundedReceiver<DynEvent>> {
218        self.inner.channel.as_ref()?.rx.lock().take()
219    }
220
221    /// Subscribe a handler for events of type `E`.
222    ///
223    /// Returns a `Subscription` handle that unsubscribes when dropped.
224    ///
225    /// # Arguments
226    ///
227    /// * `priority` - Handler priority (lower = called earlier). Convention:
228    ///   - 0-50: Core/critical handlers
229    ///   - 100: Default priority
230    ///   - 200+: Low priority (cleanup, logging)
231    /// * `handler` - Function called for each event of type `E`
232    ///
233    /// # Example
234    ///
235    /// ```
236    /// use reovim_kernel::api::v1::*;
237    ///
238    /// #[derive(Debug)]
239    /// struct MyEvent { value: i32 }
240    /// impl Event for MyEvent {}
241    ///
242    /// let bus = EventBus::new();
243    ///
244    /// let sub = bus.subscribe::<MyEvent, _>(100, |event| {
245    ///     println!("Received: {:?}", event.value);
246    ///     EventResult::Handled
247    /// });
248    ///
249    /// // Handler is active while `sub` is alive
250    /// bus.emit(MyEvent { value: 42 });
251    ///
252    /// // Dropping `sub` removes the handler
253    /// drop(sub);
254    /// ```
255    #[cfg_attr(coverage_nightly, coverage(off))]
256    pub fn subscribe<E, F>(&self, priority: u32, handler: F) -> Subscription
257    where
258        E: Event,
259        F: Fn(&E) -> EventResult + Send + Sync + 'static,
260    {
261        let type_id = TypeId::of::<E>();
262        let sub_id = SubscriptionId::new();
263
264        // Wrap handler to downcast from DynEvent
265        let wrapped_handler: HandlerFn = Arc::new(move |dyn_event: &DynEvent| {
266            dyn_event
267                .downcast_ref::<E>()
268                .map_or(EventResult::NotHandled, &handler)
269        });
270
271        let registered = RegisteredHandler {
272            id: sub_id,
273            priority,
274            handler: HandlerType::Simple(wrapped_handler),
275        };
276
277        // RCU update: clone, modify, swap
278        // Note: rcu takes FnMut, so we need to clone registered
279        self.inner.handlers.rcu(|current| {
280            let mut new_map = (**current).clone();
281            let handlers = new_map.entry(type_id).or_default();
282            handlers.push(registered.clone());
283            // Sort by priority (stable sort preserves registration order for same priority)
284            handlers.sort_by_key(|h| h.priority);
285            new_map
286        });
287
288        // Create unsubscribe closure that captures the inner Arc
289        let inner = Arc::clone(&self.inner);
290        let unsubscribe = move || {
291            inner.handlers.rcu(|current| {
292                let mut new_map = (**current).clone();
293                if let Some(handlers) = new_map.get_mut(&type_id) {
294                    handlers.retain(|h| h.id != sub_id);
295                    if handlers.is_empty() {
296                        new_map.remove(&type_id);
297                    }
298                }
299                new_map
300            });
301        };
302
303        Subscription::new::<E>(sub_id, unsubscribe)
304    }
305
306    /// Subscribe a context-aware handler for events of type `E`.
307    ///
308    /// Similar to `subscribe`, but the handler receives a `HandlerContext`
309    /// that allows emitting new events, requesting renders, etc.
310    ///
311    /// # Arguments
312    ///
313    /// * `priority` - Handler priority (lower = called earlier)
314    /// * `handler` - Function called for each event, with access to context
315    ///
316    /// # Example
317    ///
318    /// ```
319    /// use reovim_kernel::api::v1::*;
320    ///
321    /// #[derive(Debug)]
322    /// struct MyEvent { value: i32 }
323    /// impl Event for MyEvent {}
324    ///
325    /// #[derive(Debug)]
326    /// struct FollowUpEvent;
327    /// impl Event for FollowUpEvent {}
328    ///
329    /// let bus = EventBus::new();
330    ///
331    /// let sub = bus.subscribe_with_context::<MyEvent, _>(100, |event, ctx| {
332    ///     // Emit a follow-up event
333    ///     ctx.emit(FollowUpEvent);
334    ///     // Request render
335    ///     ctx.request_render();
336    ///     EventResult::Handled
337    /// });
338    /// ```
339    #[cfg_attr(coverage_nightly, coverage(off))]
340    pub fn subscribe_with_context<E, F>(&self, priority: u32, handler: F) -> Subscription
341    where
342        E: Event,
343        F: Fn(&E, &mut HandlerContext) -> EventResult + Send + Sync + 'static,
344    {
345        let type_id = TypeId::of::<E>();
346        let sub_id = SubscriptionId::new();
347
348        // Wrap handler to downcast from DynEvent
349        let wrapped_handler: ContextHandlerFn = Arc::new(move |dyn_event: &DynEvent, ctx| {
350            dyn_event
351                .downcast_ref::<E>()
352                .map_or(EventResult::NotHandled, |e| handler(e, ctx))
353        });
354
355        let registered = RegisteredHandler {
356            id: sub_id,
357            priority,
358            handler: HandlerType::WithContext(wrapped_handler),
359        };
360
361        // RCU update: clone, modify, swap
362        self.inner.handlers.rcu(|current| {
363            let mut new_map = (**current).clone();
364            let handlers = new_map.entry(type_id).or_default();
365            handlers.push(registered.clone());
366            handlers.sort_by_key(|h| h.priority);
367            new_map
368        });
369
370        // Create unsubscribe closure
371        let inner = Arc::clone(&self.inner);
372        let unsubscribe = move || {
373            inner.handlers.rcu(|current| {
374                let mut new_map = (**current).clone();
375                if let Some(handlers) = new_map.get_mut(&type_id) {
376                    handlers.retain(|h| h.id != sub_id);
377                    if handlers.is_empty() {
378                        new_map.remove(&type_id);
379                    }
380                }
381                new_map
382            });
383        };
384
385        Subscription::new::<E>(sub_id, unsubscribe)
386    }
387
388    /// Subscribe a handler for targeted events, filtering by target.
389    ///
390    /// The handler is only called when `event.target()` matches the
391    /// specified target string.
392    ///
393    /// # Arguments
394    ///
395    /// * `target` - Target string to filter events
396    /// * `priority` - Handler priority (lower = called earlier)
397    /// * `handler` - Function called for matching events
398    ///
399    /// # Example
400    ///
401    /// ```
402    /// use reovim_kernel::api::v1::*;
403    ///
404    /// #[derive(Debug)]
405    /// struct PluginInput {
406    ///     target: &'static str,
407    ///     value: i32,
408    /// }
409    /// impl Event for PluginInput {}
410    /// impl TargetedEvent for PluginInput {
411    ///     fn target(&self) -> &str { self.target }
412    /// }
413    ///
414    /// let bus = EventBus::new();
415    ///
416    /// // Only receives events where target == "my_plugin"
417    /// let sub = bus.subscribe_targeted::<PluginInput, _>("my_plugin", 100, |event, ctx| {
418    ///     println!("My plugin received: {}", event.value);
419    ///     EventResult::Handled
420    /// });
421    /// ```
422    pub fn subscribe_targeted<E, F>(&self, target: &str, priority: u32, handler: F) -> Subscription
423    where
424        E: TargetedEvent,
425        F: Fn(&E, &mut HandlerContext) -> EventResult + Send + Sync + 'static,
426    {
427        let target = target.to_owned();
428        self.subscribe_with_context::<E, _>(priority, move |event, ctx| {
429            if event.target() == target {
430                handler(event, ctx)
431            } else {
432                EventResult::NotHandled
433            }
434        })
435    }
436
437    /// Emit an event synchronously.
438    ///
439    /// Dispatches the event to all registered handlers in priority order.
440    /// Returns the combined result of all handlers.
441    ///
442    /// # Handler Behavior
443    ///
444    /// - Handlers are called in priority order (lower = first)
445    /// - If any handler returns `Consumed`, dispatch stops
446    /// - Otherwise continues until all handlers have been called
447    ///
448    /// # Example
449    ///
450    /// ```
451    /// use reovim_kernel::api::v1::*;
452    ///
453    /// #[derive(Debug)]
454    /// struct MyEvent;
455    /// impl Event for MyEvent {}
456    ///
457    /// let bus = EventBus::new();
458    /// let result = bus.emit(MyEvent);
459    /// assert!(result.is_not_handled()); // No handlers registered
460    /// ```
461    pub fn emit<E: Event>(&self, event: E) -> EventResult {
462        let dyn_event = DynEvent::new(event);
463        self.dispatch(&dyn_event)
464    }
465
466    /// Emit an event with scope tracking.
467    ///
468    /// The scope's counter is incremented before dispatch and decremented
469    /// after all handlers complete. This allows callers to wait for all
470    /// effects of the event to complete.
471    ///
472    /// # Example
473    ///
474    /// ```
475    /// use reovim_kernel::api::v1::*;
476    ///
477    /// #[derive(Debug)]
478    /// struct MyEvent;
479    /// impl Event for MyEvent {}
480    ///
481    /// let bus = EventBus::new();
482    /// let scope = EventScope::new();
483    ///
484    /// scope.increment(); // Manually increment before emit_scoped
485    /// let result = bus.emit_scoped(MyEvent, &scope);
486    /// // Scope counter is now decremented
487    /// ```
488    pub fn emit_scoped<E: Event>(&self, event: E, scope: &EventScope) -> EventResult {
489        let dyn_event = DynEvent::new(event).with_scope(scope.clone());
490        let result = self.dispatch(&dyn_event);
491
492        // Decrement scope after dispatch completes
493        scope.decrement();
494
495        result
496    }
497
498    /// Queue an event for later processing.
499    ///
500    /// The event is stored in an internal queue and processed when
501    /// `process_queue()` is called. This is useful for deferring
502    /// event dispatch to avoid reentrancy issues.
503    ///
504    /// # Example
505    ///
506    /// ```
507    /// use reovim_kernel::api::v1::*;
508    ///
509    /// #[derive(Debug)]
510    /// struct MyEvent;
511    /// impl Event for MyEvent {}
512    ///
513    /// let bus = EventBus::new();
514    ///
515    /// bus.emit_async(MyEvent);
516    /// bus.emit_async(MyEvent);
517    ///
518    /// // Events are queued but not dispatched yet
519    /// let count = bus.process_queue();
520    /// assert_eq!(count, 2);
521    /// ```
522    pub fn emit_async<E: Event>(&self, event: E) {
523        let dyn_event = DynEvent::new(event);
524        self.inner.queue.lock().push(dyn_event);
525    }
526
527    /// Queue an event with scope tracking for later processing.
528    ///
529    /// Like `emit_async`, but attaches a scope for lifecycle tracking.
530    /// The scope is incremented when queued and decremented after dispatch.
531    pub fn emit_async_scoped<E: Event>(&self, event: E, scope: &EventScope) {
532        scope.increment();
533        let dyn_event = DynEvent::new(event).with_scope(scope.clone());
534        self.inner.queue.lock().push(dyn_event);
535    }
536
537    /// Process all queued events.
538    ///
539    /// Drains the queue and dispatches each event synchronously.
540    /// Returns the number of events processed.
541    ///
542    /// # Scope Handling
543    ///
544    /// For events with attached scopes, the scope is decremented after
545    /// each event is dispatched.
546    #[must_use]
547    pub fn process_queue(&self) -> usize {
548        let events: Vec<_> = {
549            let mut queue = self.inner.queue.lock();
550            std::mem::take(&mut *queue)
551        };
552
553        let count = events.len();
554
555        for mut event in events {
556            let scope = event.take_scope();
557            let _result = self.dispatch(&event);
558            // TODO: Replace with pr_trace!() once printk is implemented
559            // pr_trace!("processed queued event: {} -> {:?}", event.type_name(), result);
560            if let Some(scope) = scope {
561                scope.decrement();
562            }
563        }
564
565        count
566    }
567
568    /// Dispatch a type-erased event to registered handlers.
569    ///
570    /// This is the core dispatch logic, used by both `emit` and `process_queue`.
571    #[must_use]
572    pub fn dispatch(&self, event: &DynEvent) -> EventResult {
573        // Lock-free read of handlers
574        let handlers = self.inner.handlers.load();
575        let type_id = event.type_id();
576
577        let Some(handlers) = handlers.get(&type_id) else {
578            return EventResult::NotHandled;
579        };
580
581        let mut result = EventResult::NotHandled;
582        // Create a temporary context for context-aware handlers
583        let mut temp_ctx = HandlerContext::new();
584
585        for handler in handlers {
586            let handler_result = match &handler.handler {
587                HandlerType::Simple(h) => h(event),
588                HandlerType::WithContext(h) => h(event, &mut temp_ctx),
589            };
590
591            match handler_result {
592                EventResult::Consumed => {
593                    return EventResult::Consumed;
594                }
595                EventResult::Handled => {
596                    result = EventResult::Handled;
597                }
598                EventResult::NotHandled => {
599                    // Continue to next handler
600                }
601            }
602        }
603
604        result
605    }
606
607    /// Dispatch an event with handler context.
608    ///
609    /// This allows handlers to emit new events, request renders, etc.
610    /// Returns a `DispatchResult` containing the event result and any
611    /// side effects (emitted events, render requests).
612    ///
613    /// # Example
614    ///
615    /// ```ignore
616    /// let mut ctx = HandlerContext::new().with_scope(Some(scope));
617    /// let result = bus.dispatch_with_context(&event, &mut ctx);
618    ///
619    /// if result.render_requested {
620    ///     // Handle render request
621    /// }
622    ///
623    /// // Process emitted events
624    /// for event in result.emitted_events {
625    ///     bus.dispatch(&event);
626    /// }
627    /// ```
628    pub fn dispatch_with_context(
629        &self,
630        event: &DynEvent,
631        ctx: &mut HandlerContext,
632    ) -> DispatchResult {
633        // Lock-free read of handlers
634        let handlers = self.inner.handlers.load();
635        let type_id = event.type_id();
636
637        let Some(handlers) = handlers.get(&type_id) else {
638            return DispatchResult::not_handled();
639        };
640
641        let mut result = EventResult::NotHandled;
642
643        for handler in handlers {
644            let handler_result = match &handler.handler {
645                HandlerType::Simple(h) => h(event),
646                HandlerType::WithContext(h) => h(event, ctx),
647            };
648
649            match handler_result {
650                EventResult::Consumed => {
651                    return DispatchResult::new(EventResult::Consumed, ctx);
652                }
653                EventResult::Handled => {
654                    result = EventResult::Handled;
655                }
656                EventResult::NotHandled => {
657                    // Continue to next handler
658                }
659            }
660        }
661
662        DispatchResult::new(result, ctx)
663    }
664
665    /// Get the number of handlers registered for a specific event type.
666    ///
667    /// Useful for debugging and testing.
668    #[must_use]
669    pub fn handler_count<E: Event>(&self) -> usize {
670        let handlers = self.inner.handlers.load();
671        handlers.get(&TypeId::of::<E>()).map_or(0, Vec::len)
672    }
673
674    /// Get the total number of handlers registered across all event types.
675    #[must_use]
676    pub fn total_handler_count(&self) -> usize {
677        let handlers = self.inner.handlers.load();
678        handlers.values().map(Vec::len).sum()
679    }
680
681    /// Get the number of events in the async queue.
682    #[must_use]
683    pub fn queue_len(&self) -> usize {
684        self.inner.queue.lock().len()
685    }
686
687    /// Check if the async queue is empty.
688    #[must_use]
689    pub fn queue_is_empty(&self) -> bool {
690        self.inner.queue.lock().is_empty()
691    }
692}
693
694impl Default for EventBus {
695    fn default() -> Self {
696        Self::new()
697    }
698}
699
700impl std::fmt::Debug for EventBus {
701    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
702        let handlers = self.inner.handlers.load();
703        f.debug_struct("EventBus")
704            .field("event_types", &handlers.len())
705            .field("total_handlers", &self.total_handler_count())
706            .field("queue_len", &self.queue_len())
707            .finish()
708    }
709}