Skip to main content

orcs_runtime/engine/
eventbus.rs

1//! EventBus - Unified Communication Between Components.
2//!
3//! The [`EventBus`] is the central message routing system in ORCS architecture.
4//! All Component communication flows through EventBus.
5//!
6//! # Message Types
7//!
8//! ```text
9//! ┌─────────────┐  Request   ┌─────────────┐
10//! │   Source    │ ─────────► │   Target    │
11//! │  Component  │            │  Component  │
12//! │             │ ◄───────── │             │
13//! └─────────────┘  Response  └─────────────┘
14//!
15//! ┌─────────────┐   Signal   ┌─────────────┐
16//! │   Human     │ ─────────► │  All        │
17//! │  (Source)   │            │  Components │
18//! └─────────────┘            └─────────────┘
19//! ```
20//!
21//! ## Request/Response
22//!
23//! Synchronous queries between Components:
24//! - Source sends Request with target ComponentId
25//! - Target processes and returns Response
26//! - Timeout if no response within deadline
27//!
28//! ## Signal
29//!
30//! Control interrupts (highest priority):
31//! - Broadcast to ALL registered Components
32//! - Components MUST handle or face forced abort
33//! - Veto signal stops everything immediately
34//!
35//! # Error Handling
36//!
37//! Operations return [`EngineError`] which implements [`orcs_types::ErrorCode`].
38//!
39//! | Error | Code | Recoverable |
40//! |-------|------|-------------|
41//! | Component not found | `ENGINE_COMPONENT_NOT_FOUND` | No |
42//! | No target | `ENGINE_NO_TARGET` | No |
43//! | Send failed | `ENGINE_SEND_FAILED` | Yes |
44//! | Timeout | `ENGINE_TIMEOUT` | Yes |
45
46use super::error::EngineError;
47use crate::channel::{ChannelHandle, Event};
48use orcs_event::{EventCategory, Request, Signal};
49use orcs_hook::SharedHookRegistry;
50use orcs_types::{ChannelId, ComponentId, RequestId};
51use parking_lot::RwLock;
52use serde_json::Value;
53use std::collections::{HashMap, HashSet};
54use std::sync::Arc;
55use std::time::Duration;
56use tokio::sync::{broadcast, mpsc, oneshot};
57
58/// Shared channel handles for event broadcasting.
59///
60/// This type allows multiple components (EventBus, ClientRunner) to
61/// access the same set of channel handles for event injection and broadcast.
62///
63/// Uses parking_lot::RwLock (no poison semantics) for sync access.
64pub type SharedChannelHandles = Arc<RwLock<HashMap<ChannelId, ChannelHandle>>>;
65
66/// Shared mapping from Component FQN to ChannelId for RPC routing.
67///
68/// Keys are FQN strings (`"namespace::name"`, e.g. `"skill::skill_manager"`).
69/// Used by both EventBus and EventEmitter to resolve which ChannelRunner
70/// hosts a given Component, enabling `orcs.request()` from Lua.
71pub type SharedComponentChannelMap = Arc<RwLock<HashMap<String, ChannelId>>>;
72
73/// EventBus - routes messages between components and channels.
74///
75/// The EventBus is responsible for:
76/// - Registering/unregistering Components
77/// - Routing Request messages to target Components
78/// - Broadcasting Signal messages to all Components
79/// - Injecting Events into specific Channels
80/// - Managing pending response channels
81///
82/// # Thread Safety
83///
84/// EventBus itself is not `Send`/`Sync`. Use it within a single async task
85/// or wrap with appropriate synchronization.
86pub struct EventBus {
87    /// Request senders per component (for standalone ComponentHandle, used in tests).
88    request_senders: HashMap<ComponentId, mpsc::Sender<Request>>,
89    /// Pending response receivers (for standalone ComponentHandle path).
90    pending_responses: HashMap<RequestId, oneshot::Sender<Result<Value, EngineError>>>,
91    /// Signal broadcaster (test-only path via `register()` / `ComponentHandle`).
92    ///
93    /// Engine uses its own `signal_tx` for ChannelRunner broadcast.
94    signal_tx: broadcast::Sender<Signal>,
95    /// Category subscriptions: category -> set of component IDs
96    subscriptions: HashMap<EventCategory, HashSet<ComponentId>>,
97    /// Shared channel handles for event injection and broadcast.
98    ///
99    /// This is shared with ClientRunner to enable UserInput broadcast.
100    channel_handles: SharedChannelHandles,
101    /// Maps ComponentId to ChannelId for routing RPC requests via ChannelHandle.
102    ///
103    /// Shared with EventEmitter instances to enable `orcs.request()` from Lua.
104    /// Populated by `register_component_channel()` when Engine spawns runners.
105    component_channel_map: SharedComponentChannelMap,
106    /// Shared hook registry for dispatching EventBus hooks.
107    hook_registry: Option<SharedHookRegistry>,
108}
109
110impl EventBus {
111    /// Create new EventBus
112    #[must_use]
113    pub fn new() -> Self {
114        let (signal_tx, _) = broadcast::channel(64);
115        Self {
116            request_senders: HashMap::new(),
117            pending_responses: HashMap::new(),
118            signal_tx,
119            subscriptions: HashMap::new(),
120            channel_handles: Arc::new(RwLock::new(HashMap::new())),
121            component_channel_map: Arc::new(RwLock::new(HashMap::new())),
122            hook_registry: None,
123        }
124    }
125
126    /// Sets the shared hook registry for EventBus hooks.
127    pub fn set_hook_registry(&mut self, registry: SharedHookRegistry) {
128        self.hook_registry = Some(registry);
129    }
130
131    /// Dispatches a hook through the shared hook registry.
132    ///
133    /// Uses `ComponentId::builtin("eventbus")` as the component identity.
134    fn dispatch_hook(
135        &self,
136        point: orcs_hook::HookPoint,
137        target_id: &ComponentId,
138        payload: serde_json::Value,
139    ) -> orcs_hook::HookAction {
140        let channel_id = ChannelId::new();
141
142        let ctx = orcs_hook::HookContext::new(
143            point,
144            target_id.clone(),
145            channel_id,
146            orcs_types::Principal::System,
147            0,
148            payload,
149        );
150
151        let Some(registry) = &self.hook_registry else {
152            return orcs_hook::HookAction::Continue(Box::new(ctx));
153        };
154
155        let guard = registry.read().unwrap_or_else(|poisoned| {
156            tracing::warn!("hook registry lock poisoned, using inner value");
157            poisoned.into_inner()
158        });
159        guard.dispatch(point, target_id, None, ctx)
160    }
161
162    /// Returns a clone of the shared channel handles.
163    ///
164    /// This can be used by ClientRunner to broadcast events
165    /// without holding a reference to the entire EventBus.
166    #[must_use]
167    pub fn shared_handles(&self) -> SharedChannelHandles {
168        Arc::clone(&self.channel_handles)
169    }
170
171    /// Returns a clone of the shared component-to-channel mapping.
172    ///
173    /// Used by EventEmitter to resolve ComponentId → ChannelId for
174    /// `orcs.request()` RPC routing.
175    #[must_use]
176    pub fn shared_component_channel_map(&self) -> SharedComponentChannelMap {
177        Arc::clone(&self.component_channel_map)
178    }
179
180    /// Register component with subscriptions.
181    ///
182    /// The component will receive requests matching any of the specified
183    /// categories when using `publish` for category-based routing.
184    ///
185    /// # Arguments
186    ///
187    /// * `id` - Component identifier
188    /// * `subscriptions` - Event categories this component subscribes to
189    ///
190    /// # Example
191    ///
192    /// ```ignore
193    /// let mut bus = EventBus::new();
194    /// let handle = bus.register(
195    ///     ComponentId::builtin("hil"),
196    ///     vec![EventCategory::Hil, EventCategory::Lifecycle],
197    /// );
198    /// ```
199    pub fn register(
200        &mut self,
201        id: ComponentId,
202        subscriptions: Vec<EventCategory>,
203    ) -> ComponentHandle {
204        let (req_tx, req_rx) = mpsc::channel(32);
205        let signal_rx = self.signal_tx.subscribe();
206
207        self.request_senders.insert(id.clone(), req_tx);
208
209        // Register subscriptions
210        let sub_strings: Vec<String> = subscriptions.iter().map(|c| format!("{c:?}")).collect();
211        for category in subscriptions {
212            self.subscriptions
213                .entry(category)
214                .or_default()
215                .insert(id.clone());
216        }
217
218        // -- BusOnRegister hook (event) --
219        let payload = serde_json::json!({
220            "component_id": id.fqn(),
221            "subscriptions": sub_strings,
222        });
223        let _ = self.dispatch_hook(orcs_hook::HookPoint::BusOnRegister, &id, payload);
224
225        ComponentHandle {
226            component_id: id,
227            request_rx: req_rx,
228            signal_rx,
229        }
230    }
231
232    /// Returns subscribers for a given category.
233    #[must_use]
234    pub fn subscribers(&self, category: &EventCategory) -> Vec<&ComponentId> {
235        self.subscriptions
236            .get(category)
237            .map(|set| set.iter().collect())
238            .unwrap_or_default()
239    }
240
241    /// Unregister component
242    pub fn unregister(&mut self, id: &ComponentId) {
243        // -- BusOnUnregister hook (event) — fire before cleanup --
244        let payload = serde_json::json!({
245            "component_id": id.fqn(),
246        });
247        let _ = self.dispatch_hook(orcs_hook::HookPoint::BusOnUnregister, id, payload);
248
249        self.request_senders.remove(id);
250        // Remove from all subscription lists
251        for subscribers in self.subscriptions.values_mut() {
252            subscribers.remove(id);
253        }
254    }
255
256    /// Send request to target component.
257    ///
258    /// Routes the request to the specified target Component and waits for response.
259    /// Target is required - use [`Request::with_target`] to set it.
260    ///
261    /// # Errors
262    ///
263    /// Returns [`EngineError`] if:
264    /// - No target specified ([`EngineError::NoTarget`])
265    /// - Target component not found ([`EngineError::ComponentNotFound`])
266    /// - Send failed ([`EngineError::SendFailed`])
267    /// - Channel closed ([`EngineError::ChannelClosed`])
268    /// - Request timed out ([`EngineError::Timeout`])
269    pub async fn request(&mut self, req: Request) -> Result<Value, EngineError> {
270        let request_id = req.id;
271        let timeout_ms = req.timeout_ms;
272
273        let Some(target) = &req.target else {
274            return Err(EngineError::NoTarget);
275        };
276
277        // Try ChannelRunner-backed routing first (production path).
278        //
279        // If the target Component is hosted by a ChannelRunner with a
280        // request channel enabled, route directly via ChannelHandle.
281        // The reply comes back through a oneshot in the RequestEnvelope,
282        // bypassing pending_responses entirely.
283        let resolved_channel_id = self
284            .component_channel_map
285            .read()
286            .get(&target.fqn())
287            .copied();
288        if let Some(channel_id) = resolved_channel_id {
289            let handle = {
290                let handles = self.channel_handles.read();
291                handles.get(&channel_id).cloned()
292            };
293
294            if let Some(handle) = handle {
295                if handle.accepts_requests() {
296                    let (reply_tx, reply_rx) = oneshot::channel();
297                    if handle.send_request(req, reply_tx).await.is_err() {
298                        return Err(EngineError::SendFailed("request channel closed".into()));
299                    }
300
301                    let timeout_duration = Duration::from_millis(timeout_ms);
302                    return match tokio::time::timeout(timeout_duration, reply_rx).await {
303                        Ok(Ok(result)) => result.map_err(EngineError::ComponentFailed),
304                        Ok(Err(_)) => Err(EngineError::ChannelClosed),
305                        Err(_) => Err(EngineError::Timeout(request_id)),
306                    };
307                }
308            }
309        }
310
311        // Fallback: standalone ComponentHandle path (tests, non-channel components).
312        let Some(sender) = self.request_senders.get(target) else {
313            return Err(EngineError::ComponentNotFound(target.clone()));
314        };
315
316        let (tx, rx) = oneshot::channel();
317        self.pending_responses.insert(request_id, tx);
318
319        if sender.send(req).await.is_err() {
320            self.pending_responses.remove(&request_id);
321            return Err(EngineError::SendFailed("channel closed".into()));
322        }
323
324        let timeout_duration = Duration::from_millis(timeout_ms);
325        match tokio::time::timeout(timeout_duration, rx).await {
326            Ok(Ok(result)) => result,
327            Ok(Err(_)) => Err(EngineError::ChannelClosed),
328            Err(_) => {
329                self.pending_responses.remove(&request_id);
330                Err(EngineError::Timeout(request_id))
331            }
332        }
333    }
334
335    /// Publish request to subscribers of the request's category.
336    ///
337    /// Routes the request to the first Component that subscribes to the
338    /// request's [`EventCategory`]. This enables loose coupling where
339    /// the sender doesn't need to know the specific target component.
340    ///
341    /// # Category-Based Routing
342    ///
343    /// ```text
344    /// Request { category: Hil, operation: "submit" }
345    ///     │
346    ///     ▼ (lookup subscribers for Hil)
347    /// EventBus::subscriptions[Hil] = [HilComponent]
348    ///     │
349    ///     ▼ (route to first subscriber)
350    /// HilComponent::on_request()
351    /// ```
352    ///
353    /// # Errors
354    ///
355    /// Returns [`EngineError`] if:
356    /// - No subscribers for the category ([`EngineError::NoSubscriber`])
357    /// - Send failed ([`EngineError::SendFailed`])
358    /// - Request timed out ([`EngineError::Timeout`])
359    pub async fn publish(&mut self, req: Request) -> Result<Value, EngineError> {
360        let category = req.category.clone();
361
362        // Find first subscriber for this category
363        let target = self
364            .subscriptions
365            .get(&category)
366            .and_then(|set| set.iter().next().cloned())
367            .ok_or(EngineError::NoSubscriber(category))?;
368
369        // Route to the target
370        let req_with_target = req.with_target(target);
371        self.request(req_with_target).await
372    }
373
374    /// Respond to a pending request.
375    ///
376    /// Components call this to complete a request with either a successful
377    /// value or an error message. The error is wrapped as [`EngineError::ComponentFailed`].
378    ///
379    /// If the request_id is not found (already timed out or responded),
380    /// this is a no-op.
381    pub fn respond(&mut self, request_id: RequestId, result: Result<Value, String>) {
382        if let Some(tx) = self.pending_responses.remove(&request_id) {
383            let mapped = result.map_err(EngineError::ComponentFailed);
384            let _ = tx.send(mapped);
385        }
386    }
387
388    /// Get number of registered components
389    #[must_use]
390    pub fn component_count(&self) -> usize {
391        self.request_senders.len()
392    }
393
394    // === Channel Event Injection ===
395
396    /// Registers a channel handle for event injection.
397    ///
398    /// Call this when a new [`ChannelRunner`](crate::channel::ChannelRunner)
399    /// is created to enable event injection to that channel.
400    pub fn register_channel(&mut self, handle: ChannelHandle) {
401        let mut handles = self.channel_handles.write();
402        handles.insert(handle.id, handle);
403    }
404
405    /// Registers a Component FQN → ChannelId mapping for RPC routing.
406    ///
407    /// After registration, `request()` will route requests targeting
408    /// this Component through the ChannelHandle's request channel
409    /// instead of the standalone ComponentHandle path.
410    ///
411    /// # Arguments
412    ///
413    /// * `component_id` - The Component's identifier (FQN is extracted)
414    /// * `channel_id` - The Channel hosting the Component
415    pub fn register_component_channel(
416        &mut self,
417        component_id: &ComponentId,
418        channel_id: ChannelId,
419    ) {
420        self.component_channel_map
421            .write()
422            .insert(component_id.fqn(), channel_id);
423    }
424
425    /// Unregisters a channel handle.
426    ///
427    /// Call this when a channel is killed or completed.
428    pub fn unregister_channel(&mut self, id: &ChannelId) {
429        let mut handles = self.channel_handles.write();
430        handles.remove(id);
431        // Clean up component → channel mapping
432        self.component_channel_map
433            .write()
434            .retain(|_, cid| cid != id);
435    }
436
437    /// Injects a direct event into a specific channel (subscription filter bypassed).
438    ///
439    /// This enables targeted event injection (e.g., `@component` routing)
440    /// at any time. The receiving ChannelRunner will process the event
441    /// regardless of its subscription list.
442    ///
443    /// # Arguments
444    ///
445    /// * `channel_id` - Target channel
446    /// * `event` - Event to inject
447    ///
448    /// # Errors
449    ///
450    /// Returns [`EngineError::ChannelNotFound`] if the channel is not registered.
451    /// Returns [`EngineError::SendFailed`] if the channel's buffer is full or closed.
452    pub async fn inject(&self, channel_id: ChannelId, event: Event) -> Result<(), EngineError> {
453        let handle = {
454            let handles = self.channel_handles.read();
455            handles
456                .get(&channel_id)
457                .cloned()
458                .ok_or(EngineError::ChannelNotFound(channel_id))?
459        };
460
461        handle
462            .inject_direct(event)
463            .await
464            .map_err(|_| EngineError::SendFailed("channel closed".into()))
465    }
466
467    /// Try to inject a direct event without blocking (subscription filter bypassed).
468    ///
469    /// Returns immediately if the buffer is full.
470    ///
471    /// # Errors
472    ///
473    /// Returns error if channel not found, buffer full, or channel closed.
474    pub fn try_inject(&self, channel_id: ChannelId, event: Event) -> Result<(), EngineError> {
475        let handles = self.channel_handles.read();
476        let handle = handles
477            .get(&channel_id)
478            .ok_or(EngineError::ChannelNotFound(channel_id))?;
479
480        handle
481            .try_inject_direct(event)
482            .map_err(|e| EngineError::SendFailed(e.to_string()))
483    }
484
485    /// Broadcasts an event to all registered channels (subscription filter applies).
486    ///
487    /// This is used for data-plane events (e.g., UserInput) that need to reach
488    /// all channels. Unlike Signal broadcast (control-plane, high priority),
489    /// this operates on the data-plane via channel event queues.
490    ///
491    /// Each receiving ChannelRunner will apply its subscription filter —
492    /// only channels subscribed to the event's category will process it.
493    ///
494    /// # Arguments
495    ///
496    /// * `event` - Event to broadcast to all channels
497    ///
498    /// # Returns
499    ///
500    /// Number of channels that successfully received the event.
501    /// Channels with full buffers or closed handles are skipped.
502    pub fn broadcast(&self, event: Event) -> usize {
503        let bus_id = ComponentId::builtin("eventbus");
504
505        // -- BusPreBroadcast hook --
506        let pre_payload = serde_json::json!({
507            "category": format!("{:?}", event.category),
508            "operation": event.operation,
509            "source": event.source.fqn(),
510        });
511        let pre_action =
512            self.dispatch_hook(orcs_hook::HookPoint::BusPreBroadcast, &bus_id, pre_payload);
513        match &pre_action {
514            orcs_hook::HookAction::Abort { .. } | orcs_hook::HookAction::Skip(_) => {
515                return 0;
516            }
517            _ => {} // Continue
518        }
519
520        let handles = self.channel_handles.read();
521        let mut delivered = 0;
522        for handle in handles.values() {
523            if handle.try_inject(event.clone()).is_ok() {
524                delivered += 1;
525            }
526        }
527
528        // -- BusPostBroadcast hook (observe-only) --
529        let post_payload = serde_json::json!({
530            "category": format!("{:?}", event.category),
531            "operation": event.operation,
532            "delivered": delivered,
533        });
534        let _ = self.dispatch_hook(
535            orcs_hook::HookPoint::BusPostBroadcast,
536            &bus_id,
537            post_payload,
538        );
539
540        delivered
541    }
542
543    /// Broadcasts an event to all registered channels (async version).
544    ///
545    /// Waits for each channel to accept the event. Use this when delivery
546    /// guarantee is more important than latency. Subscription filter applies.
547    ///
548    /// # Arguments
549    ///
550    /// * `event` - Event to broadcast to all channels
551    ///
552    /// # Returns
553    ///
554    /// Number of channels that successfully received the event.
555    pub async fn broadcast_async(&self, event: Event) -> usize {
556        let bus_id = ComponentId::builtin("eventbus");
557
558        // -- BusPreBroadcast hook --
559        let pre_payload = serde_json::json!({
560            "category": format!("{:?}", event.category),
561            "operation": event.operation,
562            "source": event.source.fqn(),
563        });
564        let pre_action =
565            self.dispatch_hook(orcs_hook::HookPoint::BusPreBroadcast, &bus_id, pre_payload);
566        match &pre_action {
567            orcs_hook::HookAction::Abort { .. } | orcs_hook::HookAction::Skip(_) => {
568                return 0;
569            }
570            _ => {}
571        }
572
573        // Collect handles first to avoid holding lock during async operations
574        let handles: Vec<_> = {
575            let h = self.channel_handles.read();
576            h.values().cloned().collect()
577        };
578
579        let mut delivered = 0;
580        for handle in handles {
581            if handle.inject(event.clone()).await.is_ok() {
582                delivered += 1;
583            }
584        }
585
586        // -- BusPostBroadcast hook (observe-only) --
587        let post_payload = serde_json::json!({
588            "category": format!("{:?}", event.category),
589            "operation": event.operation,
590            "delivered": delivered,
591        });
592        let _ = self.dispatch_hook(
593            orcs_hook::HookPoint::BusPostBroadcast,
594            &bus_id,
595            post_payload,
596        );
597
598        delivered
599    }
600
601    /// Returns the number of registered channels.
602    #[must_use]
603    pub fn channel_count(&self) -> usize {
604        let handles = self.channel_handles.read();
605        handles.len()
606    }
607}
608
609// Test-only methods that use the standalone ComponentHandle signal path.
610#[cfg(test)]
611impl EventBus {
612    /// Broadcast signal to all components registered via `register()`.
613    pub fn signal(&self, signal: Signal) {
614        let _ = self.signal_tx.send(signal);
615    }
616}
617
618impl Default for EventBus {
619    fn default() -> Self {
620        Self::new()
621    }
622}
623
624/// Handle for component to receive messages
625pub struct ComponentHandle {
626    component_id: ComponentId,
627    request_rx: mpsc::Receiver<Request>,
628    signal_rx: broadcast::Receiver<Signal>,
629}
630
631impl ComponentHandle {
632    /// Get component ID
633    #[must_use]
634    pub fn component_id(&self) -> &ComponentId {
635        &self.component_id
636    }
637
638    /// Try to receive a request (non-blocking)
639    pub fn try_recv_request(&mut self) -> Option<Request> {
640        self.request_rx.try_recv().ok()
641    }
642
643    /// Try to receive a signal (non-blocking)
644    pub fn try_recv_signal(&mut self) -> Option<Signal> {
645        self.signal_rx.try_recv().ok()
646    }
647
648    /// Receive a request (async, waits until available).
649    ///
650    /// Returns `None` if the channel is closed.
651    pub async fn recv_request(&mut self) -> Option<Request> {
652        self.request_rx.recv().await
653    }
654
655    /// Receive a signal (async, waits until available).
656    ///
657    /// # Errors
658    ///
659    /// Returns error if the channel is closed or lagged.
660    pub async fn recv_signal(&mut self) -> Result<Signal, broadcast::error::RecvError> {
661        self.signal_rx.recv().await
662    }
663}
664
665#[cfg(test)]
666mod tests {
667    use super::*;
668    use crate::Principal;
669    use orcs_component::EventCategory;
670    use orcs_types::ChannelId;
671
672    #[test]
673    fn eventbus_creation() {
674        let bus = EventBus::new();
675        assert_eq!(bus.component_count(), 0);
676    }
677
678    #[test]
679    fn register_component() {
680        let mut bus = EventBus::new();
681        let id = ComponentId::builtin("test");
682        let _handle = bus.register(id, vec![EventCategory::Lifecycle]);
683
684        assert_eq!(bus.component_count(), 1);
685    }
686
687    #[test]
688    fn unregister_component() {
689        let mut bus = EventBus::new();
690        let id = ComponentId::builtin("test");
691        let _handle = bus.register(id.clone(), vec![EventCategory::Lifecycle]);
692
693        bus.unregister(&id);
694        assert_eq!(bus.component_count(), 0);
695    }
696
697    #[tokio::test]
698    async fn signal_broadcast() {
699        let mut bus = EventBus::new();
700        let id = ComponentId::builtin("test");
701        let mut handle = bus.register(id, vec![EventCategory::Lifecycle]);
702
703        let principal = Principal::System;
704        bus.signal(Signal::veto(principal));
705        tokio::task::yield_now().await;
706
707        let signal = handle.try_recv_signal();
708        assert!(signal.is_some());
709        assert!(signal.expect("should receive veto signal").is_veto());
710    }
711
712    #[tokio::test]
713    async fn request_to_nonexistent_target() {
714        use orcs_types::ErrorCode;
715
716        let mut bus = EventBus::new();
717        let source = ComponentId::builtin("source");
718        let target = ComponentId::builtin("nonexistent");
719        let channel = ChannelId::new();
720
721        let req = Request::new(EventCategory::Echo, "test", source, channel, Value::Null)
722            .with_target(target);
723
724        let result = bus.request(req).await;
725        assert!(result.is_err());
726        assert_eq!(
727            result
728                .expect_err("request to nonexistent target should fail")
729                .code(),
730            "ENGINE_COMPONENT_NOT_FOUND"
731        );
732    }
733
734    #[tokio::test]
735    async fn request_without_target() {
736        use orcs_types::ErrorCode;
737
738        let mut bus = EventBus::new();
739        let source = ComponentId::builtin("source");
740        let channel = ChannelId::new();
741
742        let req = Request::new(EventCategory::Echo, "test", source, channel, Value::Null);
743
744        let result = bus.request(req).await;
745        assert!(result.is_err());
746        assert_eq!(
747            result
748                .expect_err("request without target should fail")
749                .code(),
750            "ENGINE_NO_TARGET"
751        );
752    }
753
754    #[tokio::test]
755    async fn request_respond_flow() {
756        let mut bus = EventBus::new();
757        let source = ComponentId::builtin("source");
758        let target = ComponentId::builtin("target");
759        let channel = ChannelId::new();
760
761        let mut handle = bus.register(target.clone(), vec![EventCategory::Echo]);
762
763        let req = Request::new(
764            EventCategory::Echo,
765            "echo",
766            source,
767            channel,
768            Value::String("hello".into()),
769        )
770        .with_target(target);
771        let request_id = req.id;
772
773        let (tx, rx) = tokio::sync::oneshot::channel();
774
775        tokio::spawn(async move {
776            if let Some(req) = handle.recv_request().await {
777                tx.send(req).ok();
778            }
779        });
780
781        let _response_task = tokio::spawn(async move {
782            let mut bus = bus;
783            bus.request(req).await
784        });
785
786        let received_req = rx.await.expect("should receive request via oneshot");
787        assert_eq!(received_req.id, request_id);
788    }
789
790    #[tokio::test]
791    async fn respond_completes_request() {
792        let mut bus = EventBus::new();
793        let source = ComponentId::builtin("source");
794        let target = ComponentId::builtin("target");
795        let channel = ChannelId::new();
796
797        let mut handle = bus.register(target.clone(), vec![EventCategory::Echo]);
798
799        let req = Request::new(
800            EventCategory::Echo,
801            "echo",
802            source,
803            channel,
804            Value::String("test".into()),
805        )
806        .with_target(target.clone());
807        let request_id = req.id;
808
809        let (resp_tx, resp_rx) = tokio::sync::oneshot::channel::<Result<Value, String>>();
810        tokio::spawn(async move {
811            if let Some(req) = handle.recv_request().await {
812                resp_tx.send(Ok(req.payload)).ok();
813            }
814        });
815
816        let (tx, rx) = tokio::sync::oneshot::channel();
817        bus.pending_responses.insert(request_id, tx);
818
819        if let Some(sender) = bus.request_senders.get(&target) {
820            sender
821                .send(req)
822                .await
823                .expect("send request to target should succeed");
824        }
825
826        let result = resp_rx.await.expect("should receive response from handler");
827        bus.respond(request_id, result);
828
829        let received = rx
830            .await
831            .expect("should receive completed response via oneshot");
832        assert!(received.is_ok());
833    }
834
835    #[test]
836    fn respond_to_pending_request() {
837        let mut bus = EventBus::new();
838        let request_id = RequestId::new();
839
840        let (tx, mut rx) = tokio::sync::oneshot::channel();
841        bus.pending_responses.insert(request_id, tx);
842
843        bus.respond(request_id, Ok(Value::String("result".into())));
844
845        let received = rx.try_recv();
846        assert!(received.is_ok());
847        assert!(received.expect("should receive response value").is_ok());
848    }
849
850    #[test]
851    fn respond_to_unknown_request_is_noop() {
852        let mut bus = EventBus::new();
853        let request_id = RequestId::new();
854
855        bus.respond(request_id, Ok(Value::Null));
856    }
857
858    #[tokio::test]
859    async fn multiple_signal_receivers() {
860        let mut bus = EventBus::new();
861        let id1 = ComponentId::builtin("comp1");
862        let id2 = ComponentId::builtin("comp2");
863
864        let mut handle1 = bus.register(id1, vec![EventCategory::Lifecycle]);
865        let mut handle2 = bus.register(id2, vec![EventCategory::Lifecycle]);
866
867        let principal = Principal::System;
868        bus.signal(Signal::veto(principal));
869        tokio::task::yield_now().await;
870
871        assert!(handle1.try_recv_signal().is_some());
872        assert!(handle2.try_recv_signal().is_some());
873    }
874
875    #[test]
876    fn component_handle_getters() {
877        let mut bus = EventBus::new();
878        let id = ComponentId::builtin("test");
879        let handle = bus.register(id.clone(), vec![EventCategory::Lifecycle]);
880
881        assert_eq!(handle.component_id(), &id);
882    }
883
884    #[tokio::test]
885    async fn request_timeout() {
886        use orcs_types::ErrorCode;
887
888        let mut bus = EventBus::new();
889        let source = ComponentId::builtin("source");
890        let target = ComponentId::builtin("target");
891        let channel = ChannelId::new();
892
893        let _handle = bus.register(target.clone(), vec![EventCategory::Echo]);
894
895        let req = Request::new(EventCategory::Echo, "slow_op", source, channel, Value::Null)
896            .with_target(target)
897            .with_timeout(10);
898
899        let result = bus.request(req).await;
900
901        assert!(result.is_err());
902        let err = result.expect_err("request should timeout");
903        assert_eq!(err.code(), "ENGINE_TIMEOUT");
904        assert!(err.is_recoverable());
905    }
906
907    #[test]
908    fn register_with_multiple_subscriptions() {
909        let mut bus = EventBus::new();
910        let id = ComponentId::builtin("hil");
911
912        let _handle = bus.register(
913            id.clone(),
914            vec![EventCategory::Hil, EventCategory::Lifecycle],
915        );
916
917        assert_eq!(bus.component_count(), 1);
918        assert_eq!(bus.subscribers(&EventCategory::Hil).len(), 1);
919        assert_eq!(bus.subscribers(&EventCategory::Lifecycle).len(), 1);
920        assert_eq!(bus.subscribers(&EventCategory::Echo).len(), 0);
921    }
922
923    #[test]
924    fn unregister_removes_subscriptions() {
925        let mut bus = EventBus::new();
926        let id = ComponentId::builtin("hil");
927
928        let _handle = bus.register(id.clone(), vec![EventCategory::Hil, EventCategory::Echo]);
929
930        assert_eq!(bus.subscribers(&EventCategory::Hil).len(), 1);
931
932        bus.unregister(&id);
933
934        assert_eq!(bus.subscribers(&EventCategory::Hil).len(), 0);
935        assert_eq!(bus.subscribers(&EventCategory::Echo).len(), 0);
936    }
937
938    #[tokio::test]
939    async fn publish_no_subscriber_error() {
940        use orcs_types::ErrorCode;
941
942        let mut bus = EventBus::new();
943        let source = ComponentId::builtin("source");
944        let channel = ChannelId::new();
945
946        // No HIL subscriber registered
947        let req = Request::new(EventCategory::Hil, "submit", source, channel, Value::Null);
948
949        let result = bus.publish(req).await;
950
951        assert!(result.is_err());
952        let err = result.expect_err("publish without subscriber should fail");
953        assert_eq!(err.code(), "ENGINE_NO_SUBSCRIBER");
954    }
955
956    #[tokio::test]
957    async fn publish_routes_to_subscriber() {
958        let mut bus = EventBus::new();
959        let source = ComponentId::builtin("source");
960        let hil_id = ComponentId::builtin("hil");
961        let channel = ChannelId::new();
962
963        // Register HIL component with Hil subscription
964        let mut handle = bus.register(hil_id, vec![EventCategory::Hil]);
965
966        let req = Request::new(EventCategory::Hil, "submit", source, channel, Value::Null);
967        let request_id = req.id;
968
969        // Spawn a task to receive the request
970        let (tx, rx) = tokio::sync::oneshot::channel();
971        tokio::spawn(async move {
972            if let Some(received) = handle.recv_request().await {
973                tx.send(received).ok();
974            }
975        });
976
977        // Publish should route to the HIL subscriber
978        let _publish_task = tokio::spawn(async move {
979            let mut bus = bus;
980            bus.publish(req).await
981        });
982
983        let received = rx.await.expect("should receive published request");
984        assert_eq!(received.id, request_id);
985        assert_eq!(received.operation, "submit");
986    }
987
988    // === Broadcast tests ===
989
990    #[test]
991    fn broadcast_to_no_channels() {
992        let bus = EventBus::new();
993        let source = ComponentId::builtin("source");
994
995        let event = Event {
996            category: EventCategory::UserInput,
997            operation: "input".to_string(),
998            source,
999            payload: serde_json::json!({"message": "hello"}),
1000        };
1001
1002        let delivered = bus.broadcast(event);
1003        assert_eq!(delivered, 0);
1004    }
1005
1006    #[tokio::test]
1007    async fn broadcast_to_multiple_channels() {
1008        use crate::channel::{ChannelConfig, World, WorldManager};
1009
1010        let mut world = World::new();
1011        let ch1 = world.create_channel(ChannelConfig::default());
1012        let ch2 = world.create_channel(ChannelConfig::default());
1013
1014        let (manager, _world_tx) = WorldManager::with_world(world);
1015        let _manager_task = tokio::spawn(manager.run());
1016
1017        let mut bus = EventBus::new();
1018
1019        // Create channel handles manually for test
1020        let (tx1, mut rx1) = tokio::sync::mpsc::channel(32);
1021        let (tx2, mut rx2) = tokio::sync::mpsc::channel(32);
1022
1023        let handle1 = ChannelHandle::new(ch1, tx1);
1024        let handle2 = ChannelHandle::new(ch2, tx2);
1025
1026        bus.register_channel(handle1);
1027        bus.register_channel(handle2);
1028
1029        let source = ComponentId::builtin("source");
1030        let event = Event {
1031            category: EventCategory::UserInput,
1032            operation: "input".to_string(),
1033            source,
1034            payload: serde_json::json!({"message": "broadcast test"}),
1035        };
1036
1037        let delivered = bus.broadcast(event);
1038        assert_eq!(delivered, 2);
1039
1040        // Both channels should receive the event (as Broadcast InboundEvent)
1041        let evt1 = rx1.try_recv();
1042        let evt2 = rx2.try_recv();
1043        assert!(evt1.is_ok());
1044        assert!(evt2.is_ok());
1045        assert_eq!(
1046            evt1.expect("channel 1 should receive broadcast")
1047                .into_event()
1048                .operation,
1049            "input"
1050        );
1051        assert_eq!(
1052            evt2.expect("channel 2 should receive broadcast")
1053                .into_event()
1054                .operation,
1055            "input"
1056        );
1057    }
1058
1059    #[tokio::test]
1060    async fn broadcast_async_to_channels() {
1061        let mut bus = EventBus::new();
1062
1063        let ch1 = ChannelId::new();
1064        let (tx1, mut rx1) = tokio::sync::mpsc::channel(32);
1065        let handle1 = ChannelHandle::new(ch1, tx1);
1066        bus.register_channel(handle1);
1067
1068        let source = ComponentId::builtin("source");
1069        let event = Event {
1070            category: EventCategory::UserInput,
1071            operation: "async_input".to_string(),
1072            source,
1073            payload: serde_json::json!({"message": "async broadcast"}),
1074        };
1075
1076        let delivered = bus.broadcast_async(event).await;
1077        assert_eq!(delivered, 1);
1078
1079        let evt = rx1.try_recv();
1080        assert!(evt.is_ok());
1081        assert_eq!(
1082            evt.expect("channel should receive async broadcast")
1083                .into_event()
1084                .operation,
1085            "async_input"
1086        );
1087    }
1088
1089    // === Component-to-Component RPC via ChannelHandle Tests ===
1090
1091    /// Helper: creates a ChannelRunner with request channel enabled,
1092    /// returning the handle (with request_tx) and the runner task.
1093    /// The runner echoes on_request payloads, so RPC callers get their payload back.
1094    async fn setup_runner_with_request_channel() -> (
1095        ChannelHandle,
1096        tokio::task::JoinHandle<crate::channel::RunnerResult>,
1097        ChannelId,
1098        tokio::sync::broadcast::Sender<orcs_event::Signal>,
1099        tokio::sync::mpsc::Sender<crate::channel::WorldCommand>,
1100        tokio::task::JoinHandle<()>,
1101    ) {
1102        use crate::channel::ChannelRunner;
1103        use crate::channel::{ChannelConfig, World, WorldManager};
1104        use orcs_component::{ComponentError, Status};
1105        use orcs_event::SignalResponse;
1106        use tokio::sync::broadcast;
1107
1108        struct EchoRpcComponent {
1109            id: ComponentId,
1110        }
1111        impl orcs_component::Component for EchoRpcComponent {
1112            fn id(&self) -> &ComponentId {
1113                &self.id
1114            }
1115            fn status(&self) -> Status {
1116                Status::Idle
1117            }
1118            fn on_request(
1119                &mut self,
1120                request: &orcs_event::Request,
1121            ) -> Result<Value, ComponentError> {
1122                Ok(request.payload.clone())
1123            }
1124            fn on_signal(&mut self, signal: &orcs_event::Signal) -> SignalResponse {
1125                if signal.is_veto() {
1126                    SignalResponse::Abort
1127                } else {
1128                    SignalResponse::Handled
1129                }
1130            }
1131            fn abort(&mut self) {}
1132        }
1133
1134        let mut world = World::new();
1135        let channel_id = world.create_channel(ChannelConfig::interactive());
1136        let (manager, world_tx) = WorldManager::with_world(world);
1137        let world_handle = manager.world();
1138        let manager_task = tokio::spawn(manager.run());
1139        let (signal_tx, _) = broadcast::channel(64);
1140
1141        let component: Box<dyn orcs_component::Component> = Box::new(EchoRpcComponent {
1142            id: ComponentId::builtin("rpc_target"),
1143        });
1144        let signal_rx = signal_tx.subscribe();
1145        let (runner, handle) = ChannelRunner::builder(
1146            channel_id,
1147            world_tx.clone(),
1148            world_handle,
1149            signal_rx,
1150            component,
1151        )
1152        .with_request_channel()
1153        .build();
1154
1155        let runner_task = tokio::spawn(runner.run());
1156
1157        (
1158            handle,
1159            runner_task,
1160            channel_id,
1161            signal_tx,
1162            world_tx,
1163            manager_task,
1164        )
1165    }
1166
1167    #[tokio::test]
1168    async fn request_via_channel_handle_routes_and_responds() {
1169        let (handle, runner_task, channel_id, signal_tx, world_tx, manager_task) =
1170            setup_runner_with_request_channel().await;
1171
1172        let mut bus = EventBus::new();
1173        let source = ComponentId::builtin("source");
1174        let target = ComponentId::builtin("rpc_target");
1175
1176        bus.register_channel(handle);
1177        bus.register_component_channel(&target, channel_id);
1178
1179        let req = Request::new(
1180            EventCategory::Echo,
1181            "echo",
1182            source,
1183            channel_id,
1184            Value::String("rpc_test".into()),
1185        )
1186        .with_target(target);
1187
1188        let result = bus.request(req).await;
1189        assert!(result.is_ok());
1190        assert_eq!(
1191            result.expect("RPC request should succeed"),
1192            Value::String("rpc_test".into())
1193        );
1194
1195        // Cleanup
1196        signal_tx
1197            .send(orcs_event::Signal::cancel(
1198                channel_id,
1199                crate::Principal::System,
1200            ))
1201            .expect("send cancel signal for cleanup should succeed");
1202        let _ = tokio::time::timeout(std::time::Duration::from_millis(100), runner_task).await;
1203        let _ = world_tx.send(crate::channel::WorldCommand::Shutdown).await;
1204        let _ = manager_task.await;
1205    }
1206
1207    #[tokio::test]
1208    async fn request_via_channel_handle_timeout() {
1209        use orcs_types::ErrorCode;
1210
1211        // Use a component that never responds quickly enough
1212        let (handle, runner_task, channel_id, signal_tx, world_tx, manager_task) =
1213            setup_runner_with_request_channel().await;
1214
1215        let mut bus = EventBus::new();
1216        let source = ComponentId::builtin("source");
1217        let target = ComponentId::builtin("rpc_target");
1218
1219        bus.register_channel(handle);
1220        bus.register_component_channel(&target, channel_id);
1221
1222        // Stop the runner first so reply never comes
1223        signal_tx
1224            .send(orcs_event::Signal::veto(crate::Principal::System))
1225            .expect("send veto signal to stop runner should succeed");
1226        let _ = tokio::time::timeout(std::time::Duration::from_millis(200), runner_task).await;
1227
1228        let req = Request::new(
1229            EventCategory::Echo,
1230            "slow_op",
1231            source,
1232            channel_id,
1233            Value::Null,
1234        )
1235        .with_target(target)
1236        .with_timeout(50);
1237
1238        let result = bus.request(req).await;
1239
1240        // After runner stops, request_rx is dropped, so send_request fails
1241        // or the oneshot reply_tx is dropped → ChannelClosed
1242        assert!(result.is_err());
1243        let err = result.expect_err("request after runner stop should fail");
1244        let code = err.code();
1245        assert!(
1246            code == "ENGINE_SEND_FAILED" || code == "ENGINE_CHANNEL_CLOSED",
1247            "expected send_failed or channel_closed, got: {}",
1248            code,
1249        );
1250
1251        let _ = world_tx.send(crate::channel::WorldCommand::Shutdown).await;
1252        let _ = manager_task.await;
1253    }
1254
1255    #[tokio::test]
1256    async fn request_falls_back_to_component_handle_when_no_channel_map() {
1257        let mut bus = EventBus::new();
1258        let source = ComponentId::builtin("source");
1259        let target = ComponentId::builtin("target");
1260        let channel = ChannelId::new();
1261
1262        // Register via ComponentHandle (standalone path), not ChannelHandle
1263        let mut handle = bus.register(target.clone(), vec![EventCategory::Echo]);
1264
1265        let req = Request::new(
1266            EventCategory::Echo,
1267            "echo",
1268            source,
1269            channel,
1270            Value::String("fallback".into()),
1271        )
1272        .with_target(target);
1273        let request_id = req.id;
1274
1275        // Spawn receiver on ComponentHandle path
1276        let (tx, rx) = tokio::sync::oneshot::channel();
1277        tokio::spawn(async move {
1278            if let Some(req) = handle.recv_request().await {
1279                tx.send(req).ok();
1280            }
1281        });
1282
1283        let _response_task = tokio::spawn(async move {
1284            let mut bus = bus;
1285            bus.request(req).await
1286        });
1287
1288        let received = rx
1289            .await
1290            .expect("should receive request via fallback ComponentHandle path");
1291        assert_eq!(received.id, request_id);
1292    }
1293
1294    #[tokio::test]
1295    async fn unregister_channel_cleans_up_component_channel_map() {
1296        let mut bus = EventBus::new();
1297        let target = ComponentId::builtin("target");
1298        let channel_id = ChannelId::new();
1299
1300        let (event_tx, _event_rx) = tokio::sync::mpsc::channel(32);
1301        let handle = ChannelHandle::new(channel_id, event_tx);
1302
1303        bus.register_channel(handle);
1304        bus.register_component_channel(&target, channel_id);
1305
1306        // Verify mapping exists
1307        assert!(bus.component_channel_map.read().contains_key(&target.fqn()));
1308
1309        // Unregister should clean up
1310        bus.unregister_channel(&channel_id);
1311        assert!(!bus.component_channel_map.read().contains_key(&target.fqn()));
1312    }
1313
1314    // --- EventBus Hook integration tests (Step 4.3) ---
1315
1316    mod hook_tests {
1317        use super::*;
1318        use orcs_hook::testing::MockHook;
1319        use orcs_hook::HookPoint;
1320        use serde_json::json;
1321
1322        fn setup_with_hooks() -> (EventBus, orcs_hook::SharedHookRegistry) {
1323            let registry = orcs_hook::shared_hook_registry();
1324            let mut bus = EventBus::new();
1325            bus.set_hook_registry(Arc::clone(&registry));
1326            (bus, registry)
1327        }
1328
1329        #[test]
1330        fn bus_on_register_fires() {
1331            let (mut bus, registry) = setup_with_hooks();
1332
1333            let hook = MockHook::pass_through("reg-observer", "*::*", HookPoint::BusOnRegister);
1334            let counter = hook.call_count.clone();
1335            registry
1336                .write()
1337                .expect("hook registry write lock should succeed")
1338                .register(Box::new(hook));
1339
1340            let _handle =
1341                bus.register(ComponentId::builtin("test"), vec![EventCategory::Lifecycle]);
1342            assert_eq!(counter.load(std::sync::atomic::Ordering::SeqCst), 1);
1343        }
1344
1345        #[test]
1346        fn bus_on_register_payload_contains_component_info() {
1347            let (mut bus, registry) = setup_with_hooks();
1348
1349            let hook = MockHook::modifier("reg-checker", "*::*", HookPoint::BusOnRegister, |ctx| {
1350                assert_eq!(ctx.payload["component_id"], "builtin::test-comp");
1351                let subs = ctx.payload["subscriptions"]
1352                    .as_array()
1353                    .expect("subscriptions should be an array");
1354                assert!(!subs.is_empty());
1355            });
1356            let counter = hook.call_count.clone();
1357            registry
1358                .write()
1359                .expect("hook registry write lock should succeed")
1360                .register(Box::new(hook));
1361
1362            let _handle = bus.register(
1363                ComponentId::builtin("test-comp"),
1364                vec![EventCategory::Lifecycle],
1365            );
1366            assert_eq!(counter.load(std::sync::atomic::Ordering::SeqCst), 1);
1367        }
1368
1369        #[test]
1370        fn bus_on_unregister_fires() {
1371            let (mut bus, registry) = setup_with_hooks();
1372
1373            let id = ComponentId::builtin("test");
1374            let _handle = bus.register(id.clone(), vec![EventCategory::Lifecycle]);
1375
1376            let hook = MockHook::pass_through("unreg-observer", "*::*", HookPoint::BusOnUnregister);
1377            let counter = hook.call_count.clone();
1378            registry
1379                .write()
1380                .expect("hook registry write lock should succeed")
1381                .register(Box::new(hook));
1382
1383            bus.unregister(&id);
1384            assert_eq!(counter.load(std::sync::atomic::Ordering::SeqCst), 1);
1385        }
1386
1387        #[test]
1388        fn bus_on_unregister_payload_contains_component_id() {
1389            let (mut bus, registry) = setup_with_hooks();
1390
1391            let id = ComponentId::builtin("my-comp");
1392            let _handle = bus.register(id.clone(), vec![EventCategory::Echo]);
1393
1394            let hook =
1395                MockHook::modifier("unreg-checker", "*::*", HookPoint::BusOnUnregister, |ctx| {
1396                    assert_eq!(ctx.payload["component_id"], "builtin::my-comp");
1397                });
1398            let counter = hook.call_count.clone();
1399            registry
1400                .write()
1401                .expect("hook registry write lock should succeed")
1402                .register(Box::new(hook));
1403
1404            bus.unregister(&id);
1405            assert_eq!(counter.load(std::sync::atomic::Ordering::SeqCst), 1);
1406        }
1407
1408        #[test]
1409        fn bus_pre_broadcast_abort_prevents_delivery() {
1410            let (mut bus, registry) = setup_with_hooks();
1411
1412            let ch = ChannelId::new();
1413            let (tx, _rx) = tokio::sync::mpsc::channel(32);
1414            bus.register_channel(ChannelHandle::new(ch, tx));
1415
1416            let hook = MockHook::aborter(
1417                "block-broadcast",
1418                "*::*",
1419                HookPoint::BusPreBroadcast,
1420                "policy",
1421            );
1422            registry
1423                .write()
1424                .expect("hook registry write lock should succeed")
1425                .register(Box::new(hook));
1426
1427            let event = Event {
1428                category: EventCategory::UserInput,
1429                operation: "input".to_string(),
1430                source: ComponentId::builtin("source"),
1431                payload: json!({"msg": "hello"}),
1432            };
1433
1434            let delivered = bus.broadcast(event);
1435            assert_eq!(delivered, 0);
1436        }
1437
1438        #[test]
1439        fn bus_pre_broadcast_skip_prevents_delivery() {
1440            let (mut bus, registry) = setup_with_hooks();
1441
1442            let ch = ChannelId::new();
1443            let (tx, _rx) = tokio::sync::mpsc::channel(32);
1444            bus.register_channel(ChannelHandle::new(ch, tx));
1445
1446            let hook = MockHook::skipper(
1447                "skip-broadcast",
1448                "*::*",
1449                HookPoint::BusPreBroadcast,
1450                json!(null),
1451            );
1452            registry
1453                .write()
1454                .expect("hook registry write lock should succeed")
1455                .register(Box::new(hook));
1456
1457            let event = Event {
1458                category: EventCategory::UserInput,
1459                operation: "input".to_string(),
1460                source: ComponentId::builtin("source"),
1461                payload: json!({}),
1462            };
1463
1464            let delivered = bus.broadcast(event);
1465            assert_eq!(delivered, 0);
1466        }
1467
1468        #[test]
1469        fn bus_pre_broadcast_continue_allows_delivery() {
1470            let (mut bus, registry) = setup_with_hooks();
1471
1472            let ch = ChannelId::new();
1473            let (tx, mut rx) = tokio::sync::mpsc::channel(32);
1474            bus.register_channel(ChannelHandle::new(ch, tx));
1475
1476            let hook =
1477                MockHook::pass_through("allow-broadcast", "*::*", HookPoint::BusPreBroadcast);
1478            let counter = hook.call_count.clone();
1479            registry
1480                .write()
1481                .expect("hook registry write lock should succeed")
1482                .register(Box::new(hook));
1483
1484            let event = Event {
1485                category: EventCategory::UserInput,
1486                operation: "input".to_string(),
1487                source: ComponentId::builtin("source"),
1488                payload: json!({"msg": "ok"}),
1489            };
1490
1491            let delivered = bus.broadcast(event);
1492            assert_eq!(delivered, 1);
1493            assert_eq!(counter.load(std::sync::atomic::Ordering::SeqCst), 1);
1494            assert!(rx.try_recv().is_ok());
1495        }
1496
1497        #[test]
1498        fn bus_post_broadcast_fires_with_delivered_count() {
1499            let (mut bus, registry) = setup_with_hooks();
1500
1501            let ch = ChannelId::new();
1502            let (tx, _rx) = tokio::sync::mpsc::channel(32);
1503            bus.register_channel(ChannelHandle::new(ch, tx));
1504
1505            let hook =
1506                MockHook::modifier("post-checker", "*::*", HookPoint::BusPostBroadcast, |ctx| {
1507                    assert_eq!(ctx.payload["delivered"], 1);
1508                    assert_eq!(ctx.payload["operation"], "input");
1509                });
1510            let counter = hook.call_count.clone();
1511            registry
1512                .write()
1513                .expect("hook registry write lock should succeed")
1514                .register(Box::new(hook));
1515
1516            let event = Event {
1517                category: EventCategory::UserInput,
1518                operation: "input".to_string(),
1519                source: ComponentId::builtin("source"),
1520                payload: json!({}),
1521            };
1522
1523            let _ = bus.broadcast(event);
1524            assert_eq!(counter.load(std::sync::atomic::Ordering::SeqCst), 1);
1525        }
1526
1527        #[tokio::test]
1528        async fn bus_pre_broadcast_abort_prevents_async_delivery() {
1529            let (mut bus, registry) = setup_with_hooks();
1530
1531            let ch = ChannelId::new();
1532            let (tx, _rx) = tokio::sync::mpsc::channel(32);
1533            bus.register_channel(ChannelHandle::new(ch, tx));
1534
1535            let hook =
1536                MockHook::aborter("block-async", "*::*", HookPoint::BusPreBroadcast, "blocked");
1537            registry
1538                .write()
1539                .expect("hook registry write lock should succeed")
1540                .register(Box::new(hook));
1541
1542            let event = Event {
1543                category: EventCategory::UserInput,
1544                operation: "input".to_string(),
1545                source: ComponentId::builtin("source"),
1546                payload: json!({}),
1547            };
1548
1549            let delivered = bus.broadcast_async(event).await;
1550            assert_eq!(delivered, 0);
1551        }
1552
1553        #[test]
1554        fn no_hook_registry_passthrough() {
1555            // Without hook_registry, everything should work normally
1556            let mut bus = EventBus::new();
1557
1558            let ch = ChannelId::new();
1559            let (tx, mut rx) = tokio::sync::mpsc::channel(32);
1560            bus.register_channel(ChannelHandle::new(ch, tx));
1561
1562            let event = Event {
1563                category: EventCategory::UserInput,
1564                operation: "input".to_string(),
1565                source: ComponentId::builtin("source"),
1566                payload: json!({"msg": "normal"}),
1567            };
1568
1569            let delivered = bus.broadcast(event);
1570            assert_eq!(delivered, 1);
1571            assert!(rx.try_recv().is_ok());
1572        }
1573    }
1574}