Skip to main content

orcs_runtime/channel/runner/
base.rs

1//! ChannelRunner - Parallel execution context for a Channel.
2//!
3//! Each [`ChannelRunner`] runs in its own tokio task, enabling true
4//! parallel execution of Channels. It receives Events via mpsc and
5//! sends World modifications via the command queue.
6//!
7//! # Architecture
8//!
9//! ```text
10//!                           ┌──────────────────────────┐
11//!                           │     ChannelRunner        │
12//!                           │                          │
13//! EventBus ───inject()────► │  event_rx ◄── mpsc       │
14//!                           │                          │
15//! Human ────signal()──────► │  signal_rx ◄── broadcast │
16//!                           │                          │
17//!                           │         │                │
18//!                           │         ▼                │
19//!                           │  process_event()         │
20//!                           │         │                │
21//!                           │         ▼                │
22//!                           │  world_tx ───► WorldManager
23//!                           │                          │
24//!                           └──────────────────────────┘
25//! ```
26//!
27//! # Lifecycle
28//!
29//! 1. Created via [`ChannelRunner::builder()`]
30//! 2. Started with [`ChannelRunner::run()`] (spawns tokio task)
31//! 3. Processes Events until channel completes or is killed
32//! 4. Cleanup on drop
33
34use super::child_context::{ChildContextImpl, LuaChildLoader};
35use super::child_spawner::ChildSpawner;
36use super::common::{
37    determine_channel_action, dispatch_signal_to_component, is_channel_active, is_channel_paused,
38    send_abort, send_transition, SignalAction,
39};
40use super::paused_queue::PausedEventQueue;
41use super::EventEmitter;
42use crate::auth::PermissionChecker;
43use crate::auth::Session;
44use crate::channel::command::{StateTransition, WorldCommand};
45use crate::channel::config::ChannelConfig;
46use crate::channel::World;
47use crate::engine::SharedChannelHandles;
48use orcs_component::{
49    AsyncChildContext, ChildContext, Component, ComponentError, ComponentLoader, ComponentSnapshot,
50    SnapshotError,
51};
52use orcs_event::{EventCategory, Request, Signal, SubscriptionEntry};
53use orcs_hook::SharedHookRegistry;
54use orcs_types::ChannelId;
55use serde::{Deserialize, Serialize};
56use serde_json::Value;
57use std::borrow::Cow;
58use std::sync::{Arc, Mutex as StdMutex};
59use tokio::sync::{broadcast, mpsc, oneshot, Mutex, RwLock};
60use tracing::{debug, info, warn};
61
62/// An event that can be injected into a channel.
63///
64/// Events are the primary means of external input to a running channel.
65/// Unlike Signals (which are control messages), Events represent work
66/// to be processed by the bound Component.
67#[derive(Debug, Clone, Serialize, Deserialize)]
68pub struct Event {
69    /// Category of this event (for routing).
70    pub category: EventCategory,
71    /// Operation to perform (maps to Component.on_request operation).
72    pub operation: String,
73    /// Source component that sent this event.
74    pub source: orcs_types::ComponentId,
75    /// Event payload data.
76    pub payload: serde_json::Value,
77}
78
79/// Internal routing wrapper that distinguishes broadcast from direct event injection.
80///
81/// - [`Broadcast`](InboundEvent::Broadcast): Events sent to all channels
82///   (e.g., UserInput from ClientRunner). The subscription filter is applied —
83///   only channels subscribed to the event's category will process it.
84/// - [`Direct`](InboundEvent::Direct): Events targeted at a specific channel
85///   (e.g., `@component` routing via Engine). The subscription filter is
86///   bypassed — the channel always processes the event.
87///
88/// This enum is internal to the runner system and does not affect the
89/// [`Event`] struct itself.
90#[derive(Debug, Clone)]
91pub(crate) enum InboundEvent {
92    /// Broadcast event — subscription filter applies.
93    Broadcast(Event),
94    /// Direct event — subscription filter bypassed.
95    Direct(Event),
96}
97
98impl InboundEvent {
99    /// Extracts the inner [`Event`], consuming the wrapper.
100    pub(crate) fn into_event(self) -> Event {
101        match self {
102            Self::Broadcast(e) | Self::Direct(e) => e,
103        }
104    }
105
106    /// Returns `true` if this is a direct (filter-bypassing) event.
107    pub(crate) fn is_direct(&self) -> bool {
108        matches!(self, Self::Direct(_))
109    }
110}
111
112/// Opaque handle for sending events into a channel as `InboundEvent::Direct`.
113///
114/// All events sent through `OutputSender` bypass the subscription filter
115/// on the receiving [`ChannelRunner`]. Use [`OutputSender::channel()`] to
116/// create a matched sender/receiver pair for testing or external use.
117///
118/// Internally wraps `mpsc::Sender<InboundEvent>` without exposing
119/// the `InboundEvent` type to external crates.
120#[derive(Clone, Debug)]
121pub struct OutputSender {
122    inner: mpsc::Sender<InboundEvent>,
123}
124
125impl OutputSender {
126    /// Creates a matched (`OutputSender`, [`OutputReceiver`]) pair.
127    ///
128    /// This is the public way to create a channel for use with
129    /// [`ChildSpawner`] and [`ChildContextImpl`] in integration tests
130    /// or external code.
131    #[must_use]
132    pub fn channel(buffer: usize) -> (Self, OutputReceiver) {
133        let (tx, rx) = mpsc::channel(buffer);
134        (Self { inner: tx }, OutputReceiver { inner: rx })
135    }
136
137    /// Creates a new OutputSender from an InboundEvent sender (crate-internal).
138    pub(crate) fn new(tx: mpsc::Sender<InboundEvent>) -> Self {
139        Self { inner: tx }
140    }
141
142    /// Returns the inner sender (crate-internal).
143    #[allow(dead_code)]
144    pub(crate) fn into_inner(self) -> mpsc::Sender<InboundEvent> {
145        self.inner
146    }
147
148    /// Sends an event as [`InboundEvent::Direct`] (non-blocking).
149    #[allow(clippy::result_large_err)]
150    pub(crate) fn try_send_direct(
151        &self,
152        event: Event,
153    ) -> Result<(), mpsc::error::TrySendError<InboundEvent>> {
154        self.inner.try_send(InboundEvent::Direct(event))
155    }
156
157    /// Sends an event as [`InboundEvent::Direct`] (async, waits for capacity).
158    #[allow(dead_code)]
159    pub(crate) async fn send_direct(
160        &self,
161        event: Event,
162    ) -> Result<(), mpsc::error::SendError<InboundEvent>> {
163        self.inner.send(InboundEvent::Direct(event)).await
164    }
165}
166
167/// Receiver end of an [`OutputSender`] channel.
168///
169/// Unwraps `InboundEvent` internally, returning plain [`Event`] values.
170/// Created via [`OutputSender::channel()`].
171pub struct OutputReceiver {
172    inner: mpsc::Receiver<InboundEvent>,
173}
174
175impl OutputReceiver {
176    /// Receives the next event, waiting until one is available.
177    ///
178    /// Returns `None` when all senders have been dropped.
179    pub async fn recv(&mut self) -> Option<Event> {
180        self.inner.recv().await.map(InboundEvent::into_event)
181    }
182
183    /// Attempts to receive an event without blocking.
184    pub fn try_recv(&mut self) -> Result<Event, mpsc::error::TryRecvError> {
185        self.inner.try_recv().map(InboundEvent::into_event)
186    }
187}
188
189/// Result of a ChannelRunner's execution.
190///
191/// Returned by [`ChannelRunner::run()`] after the event loop exits and
192/// the shutdown sequence completes. Contains the channel ID, component
193/// FQN, and an optional snapshot captured during graceful shutdown.
194#[derive(Debug)]
195pub struct RunnerResult {
196    /// Channel ID this runner was bound to.
197    pub channel_id: ChannelId,
198    /// Fully qualified name of the bound Component.
199    ///
200    /// `Cow::Borrowed` for static FQNs (e.g. ClientRunner),
201    /// `Cow::Owned` for dynamic FQNs (e.g. ChannelRunner components).
202    pub component_fqn: Cow<'static, str>,
203    /// Snapshot captured during shutdown (None if component doesn't support snapshots).
204    pub snapshot: Option<ComponentSnapshot>,
205}
206
207/// An RPC request paired with its reply channel.
208///
209/// Created by [`EventBus::request()`] and delivered to [`ChannelRunner`]
210/// via its `request_rx` channel. The runner calls `Component::on_request()`
211/// and sends the result back through `reply_tx`.
212pub(crate) struct RequestEnvelope {
213    /// The incoming request.
214    pub request: Request,
215    /// One-shot channel to send the response back to the caller.
216    pub reply_tx: oneshot::Sender<Result<Value, String>>,
217}
218
219/// Buffer size for the request channel (Component-to-Component RPC).
220const REQUEST_BUFFER_SIZE: usize = 32;
221
222/// Default event buffer size per channel.
223const EVENT_BUFFER_SIZE: usize = 64;
224
225/// Handle for injecting events into a [`ChannelRunner`].
226#[derive(Clone, Debug)]
227pub struct ChannelHandle {
228    /// Channel ID.
229    pub id: ChannelId,
230    /// Event sender (carries [`InboundEvent`] internally).
231    event_tx: mpsc::Sender<InboundEvent>,
232    /// Request sender for Component-to-Component RPC (None if not enabled).
233    request_tx: Option<mpsc::Sender<RequestEnvelope>>,
234}
235
236impl ChannelHandle {
237    /// Creates a new handle with the given sender.
238    #[must_use]
239    pub(crate) fn new(id: ChannelId, event_tx: mpsc::Sender<InboundEvent>) -> Self {
240        Self {
241            id,
242            event_tx,
243            request_tx: None,
244        }
245    }
246
247    /// Returns `true` if this handle accepts RPC requests.
248    #[must_use]
249    pub fn accepts_requests(&self) -> bool {
250        self.request_tx.is_some()
251    }
252
253    /// Sends an RPC request to the bound Component.
254    ///
255    /// The request is delivered to the ChannelRunner's `request_rx` and
256    /// processed by `Component::on_request()`. The result is sent back
257    /// through `reply_tx`.
258    ///
259    /// # Errors
260    ///
261    /// Returns `Err` if the request channel is not enabled or is closed.
262    pub(crate) async fn send_request(
263        &self,
264        request: Request,
265        reply_tx: oneshot::Sender<Result<Value, String>>,
266    ) -> Result<(), mpsc::error::SendError<RequestEnvelope>> {
267        match &self.request_tx {
268            Some(tx) => tx.send(RequestEnvelope { request, reply_tx }).await,
269            None => Err(mpsc::error::SendError(RequestEnvelope {
270                request,
271                reply_tx,
272            })),
273        }
274    }
275
276    /// Injects a broadcast event into the channel (subscription filter applies).
277    ///
278    /// Returns an error if the channel has been dropped.
279    pub(crate) async fn inject(
280        &self,
281        event: Event,
282    ) -> Result<(), mpsc::error::SendError<InboundEvent>> {
283        self.event_tx.send(InboundEvent::Broadcast(event)).await
284    }
285
286    /// Try to inject a broadcast event without blocking.
287    ///
288    /// Returns an error if the buffer is full or channel is dropped.
289    #[allow(clippy::result_large_err)]
290    pub(crate) fn try_inject(
291        &self,
292        event: Event,
293    ) -> Result<(), mpsc::error::TrySendError<InboundEvent>> {
294        self.event_tx.try_send(InboundEvent::Broadcast(event))
295    }
296
297    /// Injects a direct event into the channel (subscription filter bypassed).
298    ///
299    /// Use this for targeted delivery (e.g., `@component` routing).
300    pub(crate) async fn inject_direct(
301        &self,
302        event: Event,
303    ) -> Result<(), mpsc::error::SendError<InboundEvent>> {
304        self.event_tx.send(InboundEvent::Direct(event)).await
305    }
306
307    /// Try to inject a direct event without blocking.
308    ///
309    /// Bypasses the subscription filter on the receiving ChannelRunner.
310    #[allow(clippy::result_large_err)]
311    pub(crate) fn try_inject_direct(
312        &self,
313        event: Event,
314    ) -> Result<(), mpsc::error::TrySendError<InboundEvent>> {
315        self.event_tx.try_send(InboundEvent::Direct(event))
316    }
317}
318
319/// State stored when a Component returns [`ComponentError::Suspended`].
320///
321/// Holds the original request and approval metadata so that the ChannelRunner
322/// can re-dispatch the request after the user approves.
323struct PendingApproval {
324    /// Unique ID for this approval (matches Signal routing).
325    approval_id: String,
326    /// Permission pattern to grant on approval (e.g. `"shell:*"`).
327    grant_pattern: String,
328    /// The original request to re-dispatch after approval.
329    original_request: Request,
330}
331
332/// Execution context for a single Channel.
333///
334/// `ChannelRunner` provides the runtime environment for parallel
335/// channel execution. Each runner:
336///
337/// - Binds 1:1 with a [`Component`]
338/// - Receives Events via an mpsc channel and delivers to Component
339/// - Receives Signals via broadcast and delivers to Component
340/// - Sends World modifications via command queue
341/// - Has read access to the World for queries
342/// - Manages spawned children via ChildSpawner
343pub struct ChannelRunner {
344    /// This channel's ID.
345    id: ChannelId,
346    /// Bound component's ID (cached at build time to avoid locking for hook dispatch).
347    component_id: orcs_types::ComponentId,
348    /// Receiver for incoming events (wrapped as [`InboundEvent`]).
349    event_rx: mpsc::Receiver<InboundEvent>,
350    /// Receiver for signals (broadcast).
351    signal_rx: broadcast::Receiver<Signal>,
352    /// Sender for World commands.
353    world_tx: mpsc::Sender<WorldCommand>,
354    /// Read-only World access.
355    world: Arc<RwLock<World>>,
356    /// Bound Component (1:1 relationship).
357    component: Arc<Mutex<Box<dyn Component>>>,
358    /// Cached subscription entries from Component.
359    ///
360    /// Populated at build time to avoid locking Component on every event.
361    /// Broadcast events whose category (and optionally operation) do not
362    /// match any entry are silently skipped.
363    /// Direct events bypass this filter entirely.
364    subscriptions: Vec<SubscriptionEntry>,
365    /// Queue for events received while paused.
366    paused_queue: PausedEventQueue,
367    /// Child spawner for managing spawned children (optional).
368    child_spawner: Option<Arc<StdMutex<ChildSpawner>>>,
369    /// Event sender for child context (kept for context creation).
370    event_tx: Option<mpsc::Sender<InboundEvent>>,
371    /// Receiver for incoming RPC requests from other Components.
372    ///
373    /// When enabled via [`ChannelRunnerBuilder::with_request_channel()`],
374    /// the runner accepts [`RequestEnvelope`]s, calls `Component::on_request()`,
375    /// and sends the result back through the envelope's `reply_tx`.
376    request_rx: Option<mpsc::Receiver<RequestEnvelope>>,
377    /// Initial snapshot to restore before init (session resume).
378    initial_snapshot: Option<ComponentSnapshot>,
379    /// Shared channel handles for RPC from children.
380    shared_handles: Option<SharedChannelHandles>,
381    /// Shared FQN → ChannelId map for RPC from children.
382    component_channel_map: Option<crate::engine::SharedComponentChannelMap>,
383    /// Shared hook registry for lifecycle hook dispatch.
384    hook_registry: Option<SharedHookRegistry>,
385    /// Per-component configuration from `[components.settings.<name>]`.
386    component_config: serde_json::Value,
387    /// Dynamic command grants for granting permissions on approval.
388    grants: Option<Arc<dyn orcs_auth::GrantPolicy>>,
389    /// IO output sender for routing approval requests to the ClientRunner.
390    io_output_tx: Option<OutputSender>,
391    /// Pending approval state: stored when Component returns Suspended.
392    pending_approval: Option<PendingApproval>,
393}
394
395/// Helper for `tokio::select!`: receives from an optional request channel.
396///
397/// Returns `None` immediately if the receiver is `None` (request channel
398/// not enabled), allowing the select! branch to be skipped.
399async fn recv_request(rx: &mut Option<mpsc::Receiver<RequestEnvelope>>) -> Option<RequestEnvelope> {
400    match rx {
401        Some(rx) => rx.recv().await,
402        None => std::future::pending().await,
403    }
404}
405
406impl ChannelRunner {
407    /// Dispatches hooks for the given point with the given payload.
408    ///
409    /// Returns `HookAction::Continue` with the (possibly modified) context
410    /// if no registry is configured.
411    fn dispatch_hook(
412        &self,
413        point: orcs_hook::HookPoint,
414        payload: serde_json::Value,
415    ) -> orcs_hook::HookAction {
416        let Some(registry) = &self.hook_registry else {
417            let ctx = orcs_hook::HookContext::new(
418                point,
419                self.component_id.clone(),
420                self.id,
421                orcs_types::Principal::System,
422                0,
423                payload,
424            );
425            return orcs_hook::HookAction::Continue(Box::new(ctx));
426        };
427
428        let ctx = orcs_hook::HookContext::new(
429            point,
430            self.component_id.clone(),
431            self.id,
432            orcs_types::Principal::System,
433            0,
434            payload,
435        );
436
437        let guard = registry.read().unwrap_or_else(|poisoned| {
438            warn!("hook registry lock poisoned, using inner value");
439            poisoned.into_inner()
440        });
441        guard.dispatch(point, &self.component_id, None, ctx)
442    }
443
444    /// Returns a reference to the shared hook registry, if configured.
445    #[must_use]
446    pub fn hook_registry(&self) -> Option<&SharedHookRegistry> {
447        self.hook_registry.as_ref()
448    }
449
450    /// Creates a ChildContext for use by managed children.
451    ///
452    /// The returned context can be injected into LuaChild instances
453    /// to enable them to spawn sub-children.
454    ///
455    /// Returns None if child spawning was not enabled for this runner.
456    #[must_use]
457    pub fn create_child_context(&self, child_id: &str) -> Option<Box<dyn ChildContext>> {
458        let spawner = self.child_spawner.as_ref()?;
459        let event_tx = self.event_tx.as_ref()?;
460
461        let mut ctx = ChildContextImpl::new(
462            child_id,
463            OutputSender::new(event_tx.clone()),
464            Arc::clone(spawner),
465        );
466        ctx = self.inject_rpc(ctx);
467
468        Some(Box::new(ctx))
469    }
470
471    /// Creates a ChildContext with a LuaChildLoader for spawning Lua children.
472    ///
473    /// # Arguments
474    ///
475    /// * `child_id` - ID of the child that will use this context
476    /// * `loader` - Loader for creating LuaChild instances from configs
477    #[must_use]
478    pub fn create_child_context_with_loader(
479        &self,
480        child_id: &str,
481        loader: Arc<dyn LuaChildLoader>,
482    ) -> Option<Box<dyn ChildContext>> {
483        let spawner = self.child_spawner.as_ref()?;
484        let event_tx = self.event_tx.as_ref()?;
485
486        let mut ctx = ChildContextImpl::new(
487            child_id,
488            OutputSender::new(event_tx.clone()),
489            Arc::clone(spawner),
490        )
491        .with_lua_loader(loader);
492        ctx = self.inject_rpc(ctx);
493
494        Some(Box::new(ctx))
495    }
496
497    /// Creates an AsyncChildContext for use by async children.
498    ///
499    /// The returned context can be used with async spawn operations.
500    ///
501    /// Returns None if child spawning was not enabled for this runner.
502    #[must_use]
503    pub fn create_async_child_context(&self, child_id: &str) -> Option<Box<dyn AsyncChildContext>> {
504        let spawner = self.child_spawner.as_ref()?;
505        let event_tx = self.event_tx.as_ref()?;
506
507        let mut ctx = ChildContextImpl::new(
508            child_id,
509            OutputSender::new(event_tx.clone()),
510            Arc::clone(spawner),
511        );
512        ctx = self.inject_rpc(ctx);
513
514        Some(Box::new(ctx))
515    }
516
517    /// Creates an AsyncChildContext with a LuaChildLoader.
518    ///
519    /// # Arguments
520    ///
521    /// * `child_id` - ID of the child that will use this context
522    /// * `loader` - Loader for creating LuaChild instances from configs
523    #[must_use]
524    pub fn create_async_child_context_with_loader(
525        &self,
526        child_id: &str,
527        loader: Arc<dyn LuaChildLoader>,
528    ) -> Option<Box<dyn AsyncChildContext>> {
529        let spawner = self.child_spawner.as_ref()?;
530        let event_tx = self.event_tx.as_ref()?;
531
532        let mut ctx = ChildContextImpl::new(
533            child_id,
534            OutputSender::new(event_tx.clone()),
535            Arc::clone(spawner),
536        )
537        .with_lua_loader(loader);
538        ctx = self.inject_rpc(ctx);
539
540        Some(Box::new(ctx))
541    }
542
543    /// Injects RPC support (shared handles + channel map + channel ID)
544    /// into a ChildContextImpl if the runner was built with RPC resources.
545    fn inject_rpc(&self, ctx: ChildContextImpl) -> ChildContextImpl {
546        if let (Some(handles), Some(map)) = (&self.shared_handles, &self.component_channel_map) {
547            ctx.with_rpc_support(handles.clone(), map.clone(), self.id)
548        } else {
549            ctx
550        }
551    }
552
553    /// Returns a reference to the child spawner, if enabled.
554    #[must_use]
555    pub fn child_spawner(&self) -> Option<&Arc<StdMutex<ChildSpawner>>> {
556        self.child_spawner.as_ref()
557    }
558
559    /// Returns this channel's ID.
560    #[must_use]
561    pub fn id(&self) -> ChannelId {
562        self.id
563    }
564
565    /// Returns a reference to the world_tx sender.
566    #[must_use]
567    pub fn world_tx(&self) -> &mpsc::Sender<WorldCommand> {
568        &self.world_tx
569    }
570
571    /// Returns a reference to the world.
572    #[must_use]
573    pub fn world(&self) -> &Arc<RwLock<World>> {
574        &self.world
575    }
576
577    /// Runs the channel's event loop.
578    ///
579    /// This method consumes the runner and processes events until:
580    /// - The channel is completed or killed
581    /// - A Veto signal is received
582    /// - All event senders are dropped
583    ///
584    /// After the event loop exits, executes the shutdown sequence:
585    /// 1. Capture component snapshot (if supported)
586    /// 2. Call `component.shutdown()` for cleanup
587    /// 3. Return [`RunnerResult`] with snapshot
588    ///
589    /// If an initial snapshot was provided via
590    /// [`ChannelRunnerBuilder::with_initial_snapshot()`], it is restored
591    /// before `init()` is called.
592    #[tracing::instrument(skip_all, level = "info", fields(channel_id = %self.id))]
593    pub async fn run(mut self) -> RunnerResult {
594        info!("ChannelRunner started");
595
596        // Restore + Initialize component
597        {
598            let mut comp = self.component.lock().await;
599
600            // Restore from initial snapshot (session resume)
601            if let Some(snapshot) = self.initial_snapshot.take() {
602                match comp.restore(&snapshot) {
603                    Ok(()) => info!("restored component from initial snapshot"),
604                    Err(SnapshotError::NotSupported(_)) => {
605                        debug!("component does not support snapshot restore");
606                    }
607                    Err(e) => {
608                        warn!(error = %e, "failed to restore initial snapshot");
609                    }
610                }
611            }
612
613            // --- Pre-init hook ---
614            self.dispatch_hook(
615                orcs_hook::HookPoint::ComponentPreInit,
616                serde_json::json!({ "component": comp.id().fqn() }),
617            );
618
619            if let Err(e) = comp.init(&self.component_config) {
620                warn!(error = %e, "component init failed");
621            }
622
623            // --- Post-init hook ---
624            self.dispatch_hook(
625                orcs_hook::HookPoint::ComponentPostInit,
626                serde_json::json!({ "component": comp.id().fqn() }),
627            );
628        }
629
630        loop {
631            tokio::select! {
632                // Priority: signals > requests > events
633                biased;
634
635                signal = self.signal_rx.recv() => {
636                    match signal {
637                        Ok(sig) => {
638                            if !self.handle_signal(sig).await {
639                                break;
640                            }
641                        }
642                        Err(broadcast::error::RecvError::Closed) => {
643                            info!("signal channel closed");
644                            break;
645                        }
646                        Err(broadcast::error::RecvError::Lagged(n)) => {
647                            warn!(lagged = n, "signal receiver lagged");
648                        }
649                    }
650                }
651
652                // RPC requests from other Components (higher priority than events).
653                // When request_rx is None, recv_request() returns pending (never resolves),
654                // effectively disabling this branch in select!.
655                Some(envelope) = recv_request(&mut self.request_rx) => {
656                    self.handle_rpc_request(envelope).await;
657                }
658
659                event = self.event_rx.recv() => {
660                    match event {
661                        Some(evt) => {
662                            if !self.handle_event(evt).await {
663                                break;
664                            }
665                        }
666                        None => {
667                            info!("event channel closed");
668                            break;
669                        }
670                    }
671                }
672            }
673
674            // Check if channel is still running
675            if !is_channel_active(&self.world, self.id).await {
676                debug!("channel no longer active");
677                break;
678            }
679        }
680
681        // === Shutdown Sequence ===
682        let (component_fqn, snapshot) = {
683            let mut comp = self.component.lock().await;
684            let fqn = comp.id().fqn();
685
686            // 1. Capture snapshot
687            let snapshot = match comp.snapshot() {
688                Ok(s) => {
689                    debug!(component = %fqn, "captured shutdown snapshot");
690                    Some(s)
691                }
692                Err(SnapshotError::NotSupported(_)) => None,
693                Err(e) => {
694                    warn!(component = %fqn, error = %e, "snapshot failed during shutdown");
695                    None
696                }
697            };
698
699            // 2. Shutdown
700            self.dispatch_hook(
701                orcs_hook::HookPoint::ComponentPreShutdown,
702                serde_json::json!({ "component": &fqn }),
703            );
704
705            comp.shutdown();
706
707            self.dispatch_hook(
708                orcs_hook::HookPoint::ComponentPostShutdown,
709                serde_json::json!({ "component": &fqn }),
710            );
711            debug!(component = %fqn, "component shutdown complete");
712
713            (fqn, snapshot)
714        };
715
716        info!("ChannelRunner stopped");
717
718        RunnerResult {
719            channel_id: self.id,
720            component_fqn: Cow::Owned(component_fqn),
721            snapshot,
722        }
723    }
724
725    /// Handles an incoming signal.
726    ///
727    /// Returns `false` if the runner should stop.
728    async fn handle_signal(&mut self, signal: Signal) -> bool {
729        debug!(signal_kind = ?signal.kind, "received signal");
730
731        // Check if signal affects this channel
732        if !signal.affects_channel(self.id) {
733            return true;
734        }
735
736        // --- Pre-dispatch hook ---
737        let pre_payload = serde_json::json!({
738            "signal_kind": format!("{:?}", signal.kind),
739            "signal_scope": format!("{:?}", signal.scope),
740        });
741        let pre_action = self.dispatch_hook(orcs_hook::HookPoint::SignalPreDispatch, pre_payload);
742        match pre_action {
743            orcs_hook::HookAction::Skip(_) => {
744                debug!("signal skipped by pre-dispatch hook");
745                return true;
746            }
747            orcs_hook::HookAction::Abort { reason } => {
748                warn!(reason = %reason, "signal aborted by pre-dispatch hook");
749                return true;
750            }
751            orcs_hook::HookAction::Continue(_) | orcs_hook::HookAction::Replace(_) => {}
752        }
753
754        // Propagate signal to spawned children
755        if let Some(spawner) = &self.child_spawner {
756            if let Ok(mut s) = spawner.lock() {
757                s.propagate_signal(&signal);
758            }
759        }
760
761        // Dispatch to component first
762        let component_action = dispatch_signal_to_component(&signal, &self.component).await;
763        if let SignalAction::Stop { reason } = component_action {
764            info!(reason = %reason, "component requested stop");
765            // Abort all children before stopping
766            self.abort_all_children();
767            send_abort(&self.world_tx, self.id, &reason).await;
768            return false;
769        }
770
771        // Determine channel-level action
772        let action = determine_channel_action(&signal.kind);
773        match action {
774            SignalAction::Stop { reason } => {
775                info!(reason = %reason, "stopping channel");
776                // Abort all children before stopping
777                self.abort_all_children();
778                send_abort(&self.world_tx, self.id, &reason).await;
779                return false;
780            }
781            SignalAction::Transition(transition) => {
782                let is_resolve = matches!(transition, StateTransition::ResolveApproval { .. });
783                let accepted = send_transition(&self.world_tx, self.id, transition.clone()).await;
784
785                // Drain paused queue on resume
786                if matches!(transition, StateTransition::Resume) {
787                    self.drain_paused_queue().await;
788                }
789
790                // On successful approval resolution, grant pattern and re-dispatch.
791                if is_resolve && accepted {
792                    self.handle_approval_resolved().await;
793                }
794            }
795            SignalAction::Continue => {
796                // Handle Reject: clear pending approval and notify user.
797                if let orcs_event::SignalKind::Reject {
798                    approval_id,
799                    reason,
800                } = &signal.kind
801                {
802                    if let Some(pending) = &self.pending_approval {
803                        if pending.approval_id == *approval_id {
804                            info!(
805                                approval_id = %approval_id,
806                                "approval rejected, clearing pending"
807                            );
808                            // Abort the channel to exit AwaitingApproval.
809                            send_abort(
810                                &self.world_tx,
811                                self.id,
812                                reason.as_deref().unwrap_or("rejected by user"),
813                            )
814                            .await;
815                            self.pending_approval = None;
816
817                            // Notify user.
818                            if let Some(io_tx) = &self.io_output_tx {
819                                let event = Event {
820                                    category: EventCategory::Output,
821                                    operation: "output".to_string(),
822                                    source: self.component_id.clone(),
823                                    payload: serde_json::json!({
824                                        "message": format!(
825                                            "Rejected: {}",
826                                            reason.as_deref().unwrap_or("no reason")
827                                        ),
828                                        "level": "warn",
829                                    }),
830                                };
831                                let _ = io_tx.try_send_direct(event);
832                            }
833                        }
834                    }
835                }
836            }
837        }
838
839        // --- Post-dispatch hook ---
840        let post_payload = serde_json::json!({
841            "signal_kind": format!("{:?}", signal.kind),
842            "handled": true,
843        });
844        let _post_action =
845            self.dispatch_hook(orcs_hook::HookPoint::SignalPostDispatch, post_payload);
846
847        true
848    }
849
850    /// Handles post-approval: grants the permission pattern and re-dispatches the request.
851    ///
852    /// Called after a `ResolveApproval` transition succeeds (channel returns to Running).
853    /// Consumes the stored [`PendingApproval`] and:
854    /// 1. Grants the `grant_pattern` via the shared `GrantPolicy`
855    /// 2. Re-dispatches the original request to the Component (which will now pass permission check)
856    /// 3. Sends `ShowApproved` notification to the user
857    async fn handle_approval_resolved(&mut self) {
858        let pending = match self.pending_approval.take() {
859            Some(p) => p,
860            None => {
861                debug!("ResolveApproval accepted but no pending approval stored");
862                return;
863            }
864        };
865
866        info!(
867            approval_id = %pending.approval_id,
868            grant_pattern = %pending.grant_pattern,
869            "approval resolved, granting pattern and re-dispatching"
870        );
871
872        // 1. Grant the permission pattern.
873        if let Some(grants) = &self.grants {
874            if let Err(e) =
875                grants.grant(orcs_auth::CommandGrant::persistent(&pending.grant_pattern))
876            {
877                warn!(
878                    error = %e,
879                    pattern = %pending.grant_pattern,
880                    "failed to grant pattern after approval"
881                );
882            }
883        } else {
884            warn!("no GrantPolicy configured, cannot grant pattern");
885        }
886
887        // 2. Re-dispatch the original request.
888        let result = {
889            let mut comp = self.component.lock().await;
890            comp.on_request(&pending.original_request)
891        };
892
893        match result {
894            Ok(response) => {
895                debug!(response = ?response, "re-dispatched request succeeded after approval");
896            }
897            Err(e) => {
898                warn!(error = %e, "re-dispatched request failed after approval");
899            }
900        }
901
902        // 3. Notify user that approval was accepted.
903        if let Some(io_tx) = &self.io_output_tx {
904            let event = Event {
905                category: EventCategory::Output,
906                operation: "output".to_string(),
907                source: self.component_id.clone(),
908                payload: serde_json::json!({
909                    "message": format!("Approved: {}", pending.approval_id),
910                    "level": "info",
911                }),
912            };
913            let _ = io_tx.try_send_direct(event);
914        }
915    }
916
917    /// Aborts all spawned children.
918    fn abort_all_children(&self) {
919        if let Some(spawner) = &self.child_spawner {
920            if let Ok(mut s) = spawner.lock() {
921                s.abort_all();
922                debug!("aborted all children");
923            }
924        }
925    }
926
927    /// Handles an incoming event.
928    ///
929    /// Subscription filter is applied first (for broadcast events),
930    /// then paused events are queued for later processing.
931    /// Direct events bypass the subscription filter entirely.
932    async fn handle_event(&mut self, inbound: InboundEvent) -> bool {
933        let is_direct = inbound.is_direct();
934        let event = inbound.into_event();
935
936        debug!(
937            category = ?event.category,
938            operation = %event.operation,
939            direct = is_direct,
940            "received event"
941        );
942
943        // Subscription filter first: drop broadcast events we don't subscribe to.
944        // This runs BEFORE the pause check so unsubscribed events never enter the queue.
945        // Checks both category AND operation (if operation filter is set).
946        if !is_direct
947            && !self
948                .subscriptions
949                .iter()
950                .any(|s| s.matches(&event.category, &event.operation))
951        {
952            debug!(category = ?event.category, operation = %event.operation, "skipping event (not subscribed)");
953            return true;
954        }
955
956        // Queue events while paused
957        if is_channel_paused(&self.world, self.id).await {
958            self.paused_queue
959                .try_enqueue(event, "ChannelRunner", self.id);
960            return true;
961        }
962
963        self.process_event(event, is_direct).await;
964        true
965    }
966
967    /// Processes a single event by delivering it to the Component.
968    ///
969    /// Subscription filtering is already applied by `handle_event()` before
970    /// this method is called. Direct events and paused-queue drains bypass
971    /// the filter by design.
972    async fn process_event(&mut self, event: Event, _is_direct: bool) {
973        // --- Pre-dispatch hook ---
974        let pre_payload = serde_json::json!({
975            "category": format!("{:?}", event.category),
976            "operation": &event.operation,
977            "source": event.source.fqn(),
978            "payload": &event.payload,
979        });
980        let pre_action = self.dispatch_hook(orcs_hook::HookPoint::RequestPreDispatch, pre_payload);
981        let request_payload = match pre_action {
982            orcs_hook::HookAction::Continue(ctx) => {
983                // Use potentially modified payload from hook chain
984                ctx.payload.get("payload").cloned().unwrap_or(event.payload)
985            }
986            orcs_hook::HookAction::Skip(value) => {
987                debug!(value = ?value, "request skipped by pre-dispatch hook");
988                return;
989            }
990            orcs_hook::HookAction::Abort { reason } => {
991                warn!(reason = %reason, "request aborted by pre-dispatch hook");
992                return;
993            }
994            orcs_hook::HookAction::Replace(_) => {
995                // Replace is invalid for pre-hooks (registry already warns)
996                event.payload
997            }
998        };
999
1000        // Capture before move into Request::new.
1001        let is_user_input = event.category == EventCategory::UserInput;
1002        let event_operation = event.operation.clone();
1003
1004        let request = Request::new(
1005            event.category,
1006            &event_operation,
1007            event.source,
1008            self.id,
1009            request_payload,
1010        );
1011
1012        // Notify user that processing has started (UserInput only).
1013        if is_user_input {
1014            if let Some(io_tx) = &self.io_output_tx {
1015                let notify = Event {
1016                    category: EventCategory::Output,
1017                    operation: "processing".to_string(),
1018                    source: self.component_id.clone(),
1019                    payload: serde_json::json!({
1020                        "type": "processing",
1021                        "component": &self.component_id.name,
1022                        "operation": &event_operation,
1023                    }),
1024                };
1025                let _ = io_tx.try_send_direct(notify);
1026            }
1027        }
1028
1029        let result = {
1030            let mut comp = self.component.lock().await;
1031            comp.on_request(&request)
1032        };
1033
1034        // --- Post-dispatch hook ---
1035        let post_payload = match &result {
1036            Ok(response) => serde_json::json!({
1037                "operation": &event_operation,
1038                "response": response,
1039                "success": true,
1040            }),
1041            Err(e) => serde_json::json!({
1042                "operation": &event_operation,
1043                "error": e.to_string(),
1044                "success": false,
1045            }),
1046        };
1047        let _post_action =
1048            self.dispatch_hook(orcs_hook::HookPoint::RequestPostDispatch, post_payload);
1049
1050        match result {
1051            Ok(response) => {
1052                debug!(response = ?response, "component returned success");
1053            }
1054            Err(ComponentError::Suspended {
1055                approval_id,
1056                grant_pattern,
1057                pending_request,
1058            }) => {
1059                info!(
1060                    approval_id = %approval_id,
1061                    grant_pattern = %grant_pattern,
1062                    "component suspended pending approval"
1063                );
1064
1065                // Store the pending approval for re-dispatch after approval.
1066                self.pending_approval = Some(PendingApproval {
1067                    approval_id: approval_id.clone(),
1068                    grant_pattern,
1069                    original_request: request,
1070                });
1071
1072                // Transition channel to AwaitingApproval state.
1073                send_transition(
1074                    &self.world_tx,
1075                    self.id,
1076                    StateTransition::AwaitApproval {
1077                        request_id: approval_id.clone(),
1078                    },
1079                )
1080                .await;
1081
1082                // Notify user via IOOutput (ClientRunner → IOBridge → console).
1083                let description = pending_request
1084                    .get("description")
1085                    .and_then(|v| v.as_str())
1086                    .unwrap_or("command execution")
1087                    .to_string();
1088                let command = pending_request
1089                    .get("command")
1090                    .and_then(|v| v.as_str())
1091                    .unwrap_or("")
1092                    .to_string();
1093
1094                let output_event = Event {
1095                    category: EventCategory::Output,
1096                    operation: "approval_request".to_string(),
1097                    source: self.component_id.clone(),
1098                    payload: serde_json::json!({
1099                        "type": "approval_request",
1100                        "approval_id": approval_id,
1101                        "operation": "exec",
1102                        "description": format!("{}: {}", description, command),
1103                        "source": self.component_id.fqn(),
1104                    }),
1105                };
1106                if let Some(io_tx) = &self.io_output_tx {
1107                    let _ = io_tx.try_send_direct(output_event);
1108                }
1109            }
1110            Err(e) => {
1111                warn!(error = %e, "component returned error");
1112            }
1113        }
1114    }
1115
1116    /// Drains the paused queue and processes all queued events.
1117    ///
1118    /// Queued events have already passed the subscription filter in
1119    /// `handle_event()`, so they are processed without re-filtering.
1120    async fn drain_paused_queue(&mut self) {
1121        // Collect events first to avoid borrow issues with async process_event
1122        let events: Vec<_> = self.paused_queue.drain("ChannelRunner", self.id).collect();
1123
1124        for event in events {
1125            self.process_event(event, true).await;
1126        }
1127    }
1128
1129    /// Handles an incoming RPC request from another Component.
1130    ///
1131    /// Calls `Component::on_request()` and sends the result back through
1132    /// the envelope's `reply_tx`.
1133    async fn handle_rpc_request(&self, envelope: RequestEnvelope) {
1134        debug!(
1135            request_id = %envelope.request.id,
1136            operation = %envelope.request.operation,
1137            source = %envelope.request.source,
1138            "handling RPC request"
1139        );
1140
1141        // --- Pre-dispatch hook ---
1142        let pre_payload = serde_json::json!({
1143            "request_id": envelope.request.id.to_string(),
1144            "operation": &envelope.request.operation,
1145            "source": envelope.request.source.fqn(),
1146            "payload": &envelope.request.payload,
1147        });
1148        let pre_action = self.dispatch_hook(orcs_hook::HookPoint::RequestPreDispatch, pre_payload);
1149        match &pre_action {
1150            orcs_hook::HookAction::Skip(value) => {
1151                debug!(value = ?value, "RPC request skipped by pre-dispatch hook");
1152                let response = Ok(value.clone());
1153                if envelope.reply_tx.send(response).is_err() {
1154                    debug!("RPC reply dropped (caller cancelled)");
1155                }
1156                return;
1157            }
1158            orcs_hook::HookAction::Abort { reason } => {
1159                warn!(reason = %reason, "RPC request aborted by pre-dispatch hook");
1160                let response = Err(reason.clone());
1161                if envelope.reply_tx.send(response).is_err() {
1162                    debug!("RPC reply dropped (caller cancelled)");
1163                }
1164                return;
1165            }
1166            orcs_hook::HookAction::Continue(_) | orcs_hook::HookAction::Replace(_) => {}
1167        }
1168
1169        let result = {
1170            let mut comp = self.component.lock().await;
1171            comp.on_request(&envelope.request)
1172        };
1173
1174        // --- Post-dispatch hook ---
1175        let post_payload = match &result {
1176            Ok(response) => serde_json::json!({
1177                "operation": &envelope.request.operation,
1178                "response": response,
1179                "success": true,
1180            }),
1181            Err(e) => serde_json::json!({
1182                "operation": &envelope.request.operation,
1183                "error": e.to_string(),
1184                "success": false,
1185            }),
1186        };
1187        let post_action =
1188            self.dispatch_hook(orcs_hook::HookPoint::RequestPostDispatch, post_payload);
1189
1190        // Apply Replace from post-hook if present
1191        let final_result = match post_action {
1192            orcs_hook::HookAction::Replace(value) => Ok(value),
1193            _ => result.map_err(|e| e.to_string()),
1194        };
1195
1196        if envelope.reply_tx.send(final_result).is_err() {
1197            debug!("RPC reply dropped (caller cancelled)");
1198        }
1199    }
1200
1201    /// Spawns a child channel with a bound Component.
1202    ///
1203    /// Returns the new channel's ID and handle, or None if spawn failed.
1204    pub async fn spawn_child(
1205        &self,
1206        config: ChannelConfig,
1207        signal_rx: broadcast::Receiver<Signal>,
1208        component: Box<dyn Component>,
1209    ) -> Option<(ChannelRunner, ChannelHandle)> {
1210        let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
1211        let cmd = WorldCommand::Spawn {
1212            parent: self.id,
1213            config,
1214            reply: reply_tx,
1215        };
1216
1217        if self.world_tx.send(cmd).await.is_err() {
1218            return None;
1219        }
1220
1221        let child_id = reply_rx.await.ok()??;
1222        let (runner, handle) = ChannelRunner::builder(
1223            child_id,
1224            self.world_tx.clone(),
1225            Arc::clone(&self.world),
1226            signal_rx,
1227            component,
1228        )
1229        .build();
1230
1231        Some((runner, handle))
1232    }
1233}
1234
1235/// Builder for creating [`ChannelRunner`] instances.
1236///
1237/// This consolidates the various constructor patterns into a fluent API:
1238///
1239/// ```ignore
1240/// // Basic runner (no emitter, no child spawner)
1241/// let (runner, handle) = ChannelRunnerBuilder::new(id, world_tx, world, signal_rx, component)
1242///     .build();
1243///
1244/// // With emitter only
1245/// let (runner, handle) = ChannelRunnerBuilder::new(id, world_tx, world, signal_rx, component)
1246///     .with_emitter(signal_tx)
1247///     .build();
1248///
1249/// // With child spawner only
1250/// let (runner, handle) = ChannelRunnerBuilder::new(id, world_tx, world, signal_rx, component)
1251///     .with_child_spawner(signal_tx)
1252///     .build();
1253///
1254/// // With both emitter and child spawner
1255/// let (runner, handle) = ChannelRunnerBuilder::new(id, world_tx, world, signal_rx, component)
1256///     .with_emitter(signal_tx.clone())
1257///     .with_child_spawner(signal_tx)
1258///     .build();
1259/// ```
1260pub struct ChannelRunnerBuilder {
1261    id: ChannelId,
1262    world_tx: mpsc::Sender<WorldCommand>,
1263    world: Arc<RwLock<World>>,
1264    signal_rx: broadcast::Receiver<Signal>,
1265    component: Box<dyn Component>,
1266    /// Signal sender for emitter (if enabled).
1267    emitter_signal_tx: Option<broadcast::Sender<Signal>>,
1268    /// Output channel for routing Output events to IO channel.
1269    output_tx: Option<OutputSender>,
1270    /// Enable child spawner.
1271    enable_child_spawner: bool,
1272    /// Lua child loader for spawning Lua children.
1273    lua_loader: Option<Arc<dyn LuaChildLoader>>,
1274    /// Component loader for spawning components as runners.
1275    component_loader: Option<Arc<dyn ComponentLoader>>,
1276    /// Session for permission checking (identity + privilege level).
1277    session: Option<Arc<Session>>,
1278    /// Permission checker policy.
1279    checker: Option<Arc<dyn PermissionChecker>>,
1280    /// Dynamic command grants.
1281    grants: Option<Arc<dyn orcs_auth::GrantPolicy>>,
1282    /// Shared channel handles for event broadcasting via Emitter.
1283    shared_handles: Option<SharedChannelHandles>,
1284    /// Shared Board for auto-recording emitted events.
1285    board: Option<crate::board::SharedBoard>,
1286    /// Initial snapshot to restore before init (session resume).
1287    initial_snapshot: Option<ComponentSnapshot>,
1288    /// Enable request channel for Component-to-Component RPC.
1289    enable_request_channel: bool,
1290    /// Shared ComponentId → ChannelId mapping for RPC routing via Emitter.
1291    component_channel_map: Option<crate::engine::SharedComponentChannelMap>,
1292    /// Shared hook registry for lifecycle hook dispatch.
1293    hook_registry: Option<SharedHookRegistry>,
1294    /// Per-component configuration from config file.
1295    component_config: serde_json::Value,
1296}
1297
1298impl ChannelRunnerBuilder {
1299    /// Creates a new builder with required parameters.
1300    #[must_use]
1301    pub fn new(
1302        id: ChannelId,
1303        world_tx: mpsc::Sender<WorldCommand>,
1304        world: Arc<RwLock<World>>,
1305        signal_rx: broadcast::Receiver<Signal>,
1306        component: Box<dyn Component>,
1307    ) -> Self {
1308        Self {
1309            id,
1310            world_tx,
1311            world,
1312            signal_rx,
1313            component,
1314            emitter_signal_tx: None,
1315            output_tx: None,
1316            enable_child_spawner: false,
1317            lua_loader: None,
1318            component_loader: None,
1319            session: None,
1320            checker: None,
1321            grants: None,
1322            shared_handles: None,
1323            board: None,
1324            initial_snapshot: None,
1325            enable_request_channel: false,
1326            component_channel_map: None,
1327            hook_registry: None,
1328            component_config: serde_json::Value::Object(serde_json::Map::new()),
1329        }
1330    }
1331
1332    /// Enables the EventEmitter for this runner.
1333    ///
1334    /// The emitter allows the Component to emit output events via `orcs.output()`.
1335    #[must_use]
1336    pub fn with_emitter(mut self, signal_tx: broadcast::Sender<Signal>) -> Self {
1337        self.emitter_signal_tx = Some(signal_tx);
1338        self
1339    }
1340
1341    /// Sets the output channel for routing Output events.
1342    ///
1343    /// When set, the Component's `emit_output()` calls will send events
1344    /// to this channel instead of the runner's own event channel.
1345    /// This enables ChannelRunner components to display output via
1346    /// ClientRunner's IOBridge.
1347    ///
1348    /// # Arguments
1349    ///
1350    /// * `output_tx` - Sender for the IO channel's event_rx
1351    #[must_use]
1352    pub fn with_output_channel(mut self, output_tx: OutputSender) -> Self {
1353        self.output_tx = Some(output_tx);
1354        self
1355    }
1356
1357    /// Enables the ChildSpawner for this runner.
1358    ///
1359    /// This allows the Component and its Children to spawn sub-children
1360    /// via ChildContext.
1361    ///
1362    /// # Arguments
1363    ///
1364    /// * `loader` - Optional Lua child loader for spawning Lua children
1365    #[must_use]
1366    pub fn with_child_spawner(mut self, loader: Option<Arc<dyn LuaChildLoader>>) -> Self {
1367        self.enable_child_spawner = true;
1368        self.lua_loader = loader;
1369        self
1370    }
1371
1372    /// Sets the component loader for spawning components as runners.
1373    ///
1374    /// This enables the ChildContext::spawn_runner_from_script() functionality.
1375    ///
1376    /// # Arguments
1377    ///
1378    /// * `loader` - Component loader for creating components from scripts
1379    #[must_use]
1380    pub fn with_component_loader(mut self, loader: Arc<dyn ComponentLoader>) -> Self {
1381        self.component_loader = Some(loader);
1382        self
1383    }
1384
1385    /// Sets the session for permission checking.
1386    ///
1387    /// Takes ownership of a Session and wraps it in Arc.
1388    /// For sharing a Session across multiple runners, use [`Self::with_session_arc`].
1389    ///
1390    /// When set, operations like `orcs.exec()`, `spawn_child()`, and `spawn_runner()`
1391    /// will be checked against the session's privilege level.
1392    ///
1393    /// # Arguments
1394    ///
1395    /// * `session` - Session to use for permission checks
1396    #[must_use]
1397    pub fn with_session(mut self, session: Session) -> Self {
1398        self.session = Some(Arc::new(session));
1399        self
1400    }
1401
1402    /// Sets the session for permission checking (Arc version).
1403    ///
1404    /// Use this when sharing a Session across multiple runners,
1405    /// so that dynamic grants (via HIL) are shared.
1406    ///
1407    /// # Arguments
1408    ///
1409    /// * `session` - Arc-wrapped Session for shared access
1410    #[must_use]
1411    pub fn with_session_arc(mut self, session: Arc<Session>) -> Self {
1412        self.session = Some(session);
1413        self
1414    }
1415
1416    /// Sets the permission checker policy.
1417    ///
1418    /// # Arguments
1419    ///
1420    /// * `checker` - Permission checker implementation
1421    #[must_use]
1422    pub fn with_checker(mut self, checker: Arc<dyn PermissionChecker>) -> Self {
1423        self.checker = Some(checker);
1424        self
1425    }
1426
1427    /// Sets the dynamic command grant store.
1428    ///
1429    /// # Arguments
1430    ///
1431    /// * `grants` - Grant policy implementation for dynamic command permissions
1432    #[must_use]
1433    pub fn with_grants(mut self, grants: Arc<dyn orcs_auth::GrantPolicy>) -> Self {
1434        self.grants = Some(grants);
1435        self
1436    }
1437
1438    /// Sets the shared channel handles for event broadcasting.
1439    ///
1440    /// When set, the EventEmitter's `emit_event()` will broadcast
1441    /// Extension events to all registered channels.
1442    #[must_use]
1443    pub fn with_shared_handles(mut self, handles: SharedChannelHandles) -> Self {
1444        self.shared_handles = Some(handles);
1445        self
1446    }
1447
1448    /// Sets the shared component-to-channel mapping for RPC routing.
1449    ///
1450    /// When set, the EventEmitter's `request()` can resolve target
1451    /// ComponentId to ChannelId and route RPC via ChannelHandle.
1452    #[must_use]
1453    pub fn with_component_channel_map(
1454        mut self,
1455        map: crate::engine::SharedComponentChannelMap,
1456    ) -> Self {
1457        self.component_channel_map = Some(map);
1458        self
1459    }
1460
1461    /// Sets the shared Board for auto-recording emitted events.
1462    ///
1463    /// When set, the EventEmitter will automatically append entries
1464    /// to the Board on `emit_output()` and `emit_event()`.
1465    #[must_use]
1466    pub fn with_board(mut self, board: crate::board::SharedBoard) -> Self {
1467        self.board = Some(board);
1468        self
1469    }
1470
1471    /// Sets the initial snapshot to restore before `init()`.
1472    ///
1473    /// When set, the Component's `restore()` method is called with
1474    /// this snapshot before `init()` during `run()`. Used for
1475    /// session resume.
1476    ///
1477    /// # Arguments
1478    ///
1479    /// * `snapshot` - Snapshot to restore from
1480    #[must_use]
1481    pub fn with_initial_snapshot(mut self, snapshot: ComponentSnapshot) -> Self {
1482        self.initial_snapshot = Some(snapshot);
1483        self
1484    }
1485
1486    /// Enables the request channel for Component-to-Component RPC.
1487    ///
1488    /// When enabled, the runner's [`ChannelHandle`] will accept
1489    /// `RequestEnvelope`s via `send_request()`, allowing other
1490    /// Components to call this Component's `on_request()` and receive
1491    /// a response.
1492    #[must_use]
1493    pub fn with_request_channel(mut self) -> Self {
1494        self.enable_request_channel = true;
1495        self
1496    }
1497
1498    /// Sets the shared hook registry for lifecycle hook dispatch.
1499    ///
1500    /// When set, the runner will dispatch hooks at lifecycle points
1501    /// (e.g., pre/post request, signal, component init/shutdown).
1502    #[must_use]
1503    pub fn with_hook_registry(mut self, registry: SharedHookRegistry) -> Self {
1504        self.hook_registry = Some(registry);
1505        self
1506    }
1507
1508    /// Sets per-component configuration passed to `Component::init()`.
1509    ///
1510    /// The value comes from `[components.settings.<name>]` in the config file.
1511    #[must_use]
1512    pub fn with_component_config(mut self, config: serde_json::Value) -> Self {
1513        self.component_config = config;
1514        self
1515    }
1516
1517    /// Configures auth (session/checker/grants) and IO output routing on a [`ChildContextImpl`].
1518    ///
1519    /// Extracted from `build()` to eliminate duplication between the child-spawner
1520    /// branch and the auth-only branch.
1521    fn configure_context(
1522        &mut self,
1523        mut ctx: ChildContextImpl,
1524        io_output_tx: &Option<OutputSender>,
1525        component_id: &str,
1526        rpc_handles: &Option<SharedChannelHandles>,
1527        rpc_map: &Option<crate::engine::SharedComponentChannelMap>,
1528        channel_id: ChannelId,
1529    ) -> ChildContextImpl {
1530        if let Some(session) = self.session.take() {
1531            ctx = ctx.with_session_arc(session);
1532            info!("ChannelRunnerBuilder: enabled session for {}", component_id);
1533        }
1534        if let Some(checker) = self.checker.take() {
1535            ctx = ctx.with_checker(checker);
1536            info!(
1537                "ChannelRunnerBuilder: enabled permission checker for {}",
1538                component_id
1539            );
1540        }
1541        if let Some(grants) = self.grants.take() {
1542            ctx = ctx.with_grants(grants);
1543            info!(
1544                "ChannelRunnerBuilder: enabled grant store for {}",
1545                component_id
1546            );
1547        }
1548        if let Some(io_tx) = io_output_tx.clone() {
1549            ctx = ctx.with_io_output_channel(io_tx);
1550            info!(
1551                "ChannelRunnerBuilder: enabled IO output routing for {}",
1552                component_id
1553            );
1554        }
1555        if let (Some(handles), Some(map)) = (rpc_handles.clone(), rpc_map.clone()) {
1556            ctx = ctx.with_rpc_support(handles, map, channel_id);
1557        }
1558        if let Some(reg) = &self.hook_registry {
1559            ctx = ctx.with_hook_registry(Arc::clone(reg));
1560        }
1561        ctx
1562    }
1563
1564    /// Builds the ChannelRunner and returns it with a ChannelHandle.
1565    #[must_use]
1566    pub fn build(mut self) -> (ChannelRunner, ChannelHandle) {
1567        let (event_tx, event_rx) = mpsc::channel(EVENT_BUFFER_SIZE);
1568
1569        // Cache component ID before moving component into Arc
1570        let component_id = self.component.id().clone();
1571
1572        // Clone output_tx for ChildContext IO routing (before emitter takes it)
1573        let io_output_tx = self.output_tx.as_ref().cloned();
1574
1575        // Clone RPC resources for ChildContext (before emitter takes them)
1576        let rpc_handles = self.shared_handles.clone();
1577        let rpc_map = self.component_channel_map.clone();
1578
1579        // Set up emitter if enabled
1580        if let Some(signal_tx) = &self.emitter_signal_tx {
1581            let component_id = self.component.id().clone();
1582            let mut emitter = EventEmitter::new(
1583                OutputSender::new(event_tx.clone()),
1584                signal_tx.clone(),
1585                component_id.clone(),
1586            );
1587
1588            // Route Output events to IO channel if configured
1589            if let Some(output_tx) = self.output_tx.take() {
1590                emitter = emitter.with_output_channel(output_tx);
1591                info!(
1592                    "ChannelRunnerBuilder: routing output to IO channel for {}",
1593                    component_id.fqn()
1594                );
1595            }
1596
1597            // Enable event broadcasting if shared handles are provided
1598            if let Some(handles) = self.shared_handles.take() {
1599                emitter = emitter.with_shared_handles(handles);
1600                info!(
1601                    "ChannelRunnerBuilder: enabled event broadcast for {}",
1602                    component_id.fqn()
1603                );
1604            }
1605
1606            // Enable RPC routing if component channel map is provided
1607            if let Some(map) = self.component_channel_map.take() {
1608                emitter = emitter.with_component_channel_map(map, self.id);
1609            }
1610
1611            // Attach Board for auto-recording
1612            if let Some(board) = self.board.take() {
1613                emitter = emitter.with_board(board);
1614            }
1615
1616            self.component.set_emitter(Box::new(emitter));
1617
1618            info!(
1619                "ChannelRunnerBuilder: injected emitter for {}",
1620                component_id.fqn()
1621            );
1622        }
1623
1624        // Set up child spawner if enabled
1625        let child_spawner = if self.enable_child_spawner {
1626            let component_id = self.component.id().fqn();
1627            let output_sender = OutputSender::new(event_tx.clone());
1628            let spawner = ChildSpawner::new(&component_id, output_sender.clone());
1629            let spawner_arc = Arc::new(StdMutex::new(spawner));
1630
1631            // Create ChildContext and inject into Component
1632            let mut ctx =
1633                ChildContextImpl::new(&component_id, output_sender, Arc::clone(&spawner_arc));
1634
1635            // Add Lua loader if provided
1636            if let Some(loader) = self.lua_loader.take() {
1637                ctx = ctx.with_lua_loader(loader);
1638                info!(
1639                    "ChannelRunnerBuilder: created spawner with Lua loader for {}",
1640                    component_id
1641                );
1642            } else {
1643                info!(
1644                    "ChannelRunnerBuilder: created spawner (no Lua loader) for {}",
1645                    component_id
1646                );
1647            }
1648
1649            // Enable runner spawning if signal emitter is available
1650            if let Some(signal_tx) = &self.emitter_signal_tx {
1651                ctx = ctx.with_runner_support(
1652                    self.world_tx.clone(),
1653                    Arc::clone(&self.world),
1654                    signal_tx.clone(),
1655                );
1656                info!(
1657                    "ChannelRunnerBuilder: enabled runner spawning for {}",
1658                    component_id
1659                );
1660            }
1661
1662            // Add component loader for spawn_runner_from_script
1663            if let Some(loader) = self.component_loader.take() {
1664                ctx = ctx.with_component_loader(loader);
1665                info!(
1666                    "ChannelRunnerBuilder: enabled component loader for {}",
1667                    component_id
1668                );
1669            }
1670
1671            // Add session, checker, grants, and IO output routing
1672            ctx = self.configure_context(
1673                ctx,
1674                &io_output_tx,
1675                &component_id,
1676                &rpc_handles,
1677                &rpc_map,
1678                self.id,
1679            );
1680
1681            self.component.set_child_context(Box::new(ctx));
1682
1683            Some(spawner_arc)
1684        } else if self.session.is_some() || self.checker.is_some() || self.grants.is_some() {
1685            // No child spawner, but auth context is needed for permission-checked orcs.exec()
1686            let component_id = self.component.id().fqn();
1687            let dummy_output = OutputSender::new(event_tx.clone());
1688            let dummy_spawner = ChildSpawner::new(&component_id, dummy_output.clone());
1689            let dummy_arc = Arc::new(StdMutex::new(dummy_spawner));
1690            let mut ctx =
1691                ChildContextImpl::new(&component_id, dummy_output, Arc::clone(&dummy_arc));
1692
1693            ctx = self.configure_context(
1694                ctx,
1695                &io_output_tx,
1696                &component_id,
1697                &rpc_handles,
1698                &rpc_map,
1699                self.id,
1700            );
1701
1702            self.component.set_child_context(Box::new(ctx));
1703            info!(
1704                "ChannelRunnerBuilder: auth-only context injected for {}",
1705                component_id
1706            );
1707            None
1708        } else {
1709            None
1710        };
1711
1712        // Determine if we need to keep event_tx for child context
1713        let event_tx_for_context = if self.enable_child_spawner || self.emitter_signal_tx.is_some()
1714        {
1715            Some(event_tx.clone())
1716        } else {
1717            None
1718        };
1719
1720        // Cache subscription entries from Component to avoid locking on every event.
1721        // Uses subscription_entries() which includes operation-level filtering.
1722        let subscriptions = self.component.subscription_entries();
1723
1724        // Create request channel if enabled
1725        let (request_tx, request_rx) = if self.enable_request_channel {
1726            let (tx, rx) = mpsc::channel(REQUEST_BUFFER_SIZE);
1727            (Some(tx), Some(rx))
1728        } else {
1729            (None, None)
1730        };
1731
1732        let runner = ChannelRunner {
1733            id: self.id,
1734            component_id,
1735            event_rx,
1736            signal_rx: self.signal_rx,
1737            world_tx: self.world_tx,
1738            world: self.world,
1739            component: Arc::new(Mutex::new(self.component)),
1740            subscriptions,
1741            paused_queue: PausedEventQueue::new(),
1742            child_spawner,
1743            event_tx: event_tx_for_context,
1744            request_rx,
1745            initial_snapshot: self.initial_snapshot,
1746            shared_handles: rpc_handles,
1747            component_channel_map: rpc_map,
1748            hook_registry: self.hook_registry,
1749            component_config: self.component_config,
1750            grants: self.grants.clone(),
1751            io_output_tx: io_output_tx.clone(),
1752            pending_approval: None,
1753        };
1754
1755        let mut handle = ChannelHandle::new(self.id, event_tx);
1756        handle.request_tx = request_tx;
1757
1758        (runner, handle)
1759    }
1760}
1761
1762impl ChannelRunner {
1763    /// Creates a new builder for constructing a ChannelRunner.
1764    ///
1765    /// This is the recommended way to create runners with optional features.
1766    ///
1767    /// # Example
1768    ///
1769    /// ```ignore
1770    /// let (runner, handle) = ChannelRunner::builder(id, world_tx, world, signal_rx, component)
1771    ///     .with_emitter(signal_tx)
1772    ///     .with_child_spawner(None)
1773    ///     .build();
1774    /// ```
1775    #[must_use]
1776    pub fn builder(
1777        id: ChannelId,
1778        world_tx: mpsc::Sender<WorldCommand>,
1779        world: Arc<RwLock<World>>,
1780        signal_rx: broadcast::Receiver<Signal>,
1781        component: Box<dyn Component>,
1782    ) -> ChannelRunnerBuilder {
1783        ChannelRunnerBuilder::new(id, world_tx, world, signal_rx, component)
1784    }
1785}
1786
1787#[cfg(test)]
1788mod tests {
1789    use super::*;
1790    use crate::channel::manager::WorldManager;
1791    use crate::channel::ChannelConfig;
1792    use orcs_component::{ComponentError, Status};
1793    use orcs_event::{EventCategory, SignalResponse};
1794    use orcs_types::{ComponentId, Principal};
1795    use serde_json::Value;
1796
1797    /// Test mock component that echoes requests.
1798    struct MockComponent {
1799        id: ComponentId,
1800        status: Status,
1801    }
1802
1803    impl MockComponent {
1804        fn new(name: &str) -> Self {
1805            Self {
1806                id: ComponentId::builtin(name),
1807                status: Status::Idle,
1808            }
1809        }
1810    }
1811
1812    impl Component for MockComponent {
1813        fn id(&self) -> &ComponentId {
1814            &self.id
1815        }
1816
1817        fn status(&self) -> Status {
1818            self.status
1819        }
1820
1821        fn subscriptions(&self) -> &[EventCategory] {
1822            &[EventCategory::Echo, EventCategory::Lifecycle]
1823        }
1824
1825        fn on_request(&mut self, request: &Request) -> Result<Value, ComponentError> {
1826            Ok(request.payload.clone())
1827        }
1828
1829        fn on_signal(&mut self, signal: &Signal) -> SignalResponse {
1830            if signal.is_veto() {
1831                self.status = Status::Aborted;
1832                SignalResponse::Abort
1833            } else {
1834                SignalResponse::Handled
1835            }
1836        }
1837
1838        fn abort(&mut self) {
1839            self.status = Status::Aborted;
1840        }
1841    }
1842
1843    fn mock_component() -> Box<dyn Component> {
1844        Box::new(MockComponent::new("test"))
1845    }
1846
1847    async fn setup() -> (
1848        tokio::task::JoinHandle<()>,
1849        mpsc::Sender<WorldCommand>,
1850        Arc<RwLock<World>>,
1851        broadcast::Sender<Signal>,
1852        ChannelId,
1853    ) {
1854        let mut world = World::new();
1855        let io = world.create_channel(ChannelConfig::interactive());
1856
1857        let (manager, world_tx) = WorldManager::with_world(world);
1858        let world_handle = manager.world();
1859
1860        let manager_task = tokio::spawn(manager.run());
1861
1862        let (signal_tx, _) = broadcast::channel(64);
1863
1864        (manager_task, world_tx, world_handle, signal_tx, io)
1865    }
1866
1867    async fn teardown(
1868        manager_task: tokio::task::JoinHandle<()>,
1869        world_tx: mpsc::Sender<WorldCommand>,
1870    ) {
1871        let _ = world_tx.send(WorldCommand::Shutdown).await;
1872        let _ = manager_task.await;
1873    }
1874
1875    #[tokio::test]
1876    async fn runner_creation() {
1877        let (manager_task, world_tx, world, signal_tx, primary) = setup().await;
1878
1879        let signal_rx = signal_tx.subscribe();
1880        let (runner, handle) = ChannelRunner::builder(
1881            primary,
1882            world_tx.clone(),
1883            world,
1884            signal_rx,
1885            mock_component(),
1886        )
1887        .build();
1888
1889        assert_eq!(runner.id(), primary);
1890        assert_eq!(handle.id, primary);
1891
1892        teardown(manager_task, world_tx).await;
1893    }
1894
1895    #[tokio::test]
1896    async fn runner_receives_events() {
1897        let (manager_task, world_tx, world, signal_tx, primary) = setup().await;
1898
1899        let signal_rx = signal_tx.subscribe();
1900        let (runner, handle) = ChannelRunner::builder(
1901            primary,
1902            world_tx.clone(),
1903            world,
1904            signal_rx,
1905            mock_component(),
1906        )
1907        .build();
1908
1909        let runner_task = tokio::spawn(runner.run());
1910
1911        let event = Event {
1912            category: EventCategory::Echo,
1913            operation: "echo".to_string(),
1914            source: ComponentId::builtin("test"),
1915            payload: serde_json::json!({"test": true}),
1916        };
1917        handle
1918            .inject(event)
1919            .await
1920            .expect("inject echo event into runner");
1921
1922        tokio::time::sleep(std::time::Duration::from_millis(10)).await;
1923
1924        signal_tx
1925            .send(Signal::cancel(primary, Principal::System))
1926            .expect("send cancel signal");
1927
1928        let _ = tokio::time::timeout(std::time::Duration::from_millis(100), runner_task).await;
1929
1930        teardown(manager_task, world_tx).await;
1931    }
1932
1933    #[tokio::test]
1934    async fn runner_handles_veto() {
1935        let (manager_task, world_tx, world, signal_tx, primary) = setup().await;
1936
1937        let signal_rx = signal_tx.subscribe();
1938        let (runner, _handle) = ChannelRunner::builder(
1939            primary,
1940            world_tx.clone(),
1941            world.clone(),
1942            signal_rx,
1943            mock_component(),
1944        )
1945        .build();
1946
1947        let runner_task = tokio::spawn(runner.run());
1948
1949        tokio::task::yield_now().await;
1950
1951        signal_tx
1952            .send(Signal::veto(Principal::System))
1953            .expect("send veto signal");
1954
1955        let result = tokio::time::timeout(std::time::Duration::from_millis(100), runner_task).await;
1956        assert!(result.is_ok());
1957
1958        teardown(manager_task, world_tx).await;
1959    }
1960
1961    #[tokio::test]
1962    async fn channel_handle_clone() {
1963        let (manager_task, world_tx, world, signal_tx, primary) = setup().await;
1964
1965        let signal_rx = signal_tx.subscribe();
1966        let (_runner, handle) = ChannelRunner::builder(
1967            primary,
1968            world_tx.clone(),
1969            world,
1970            signal_rx,
1971            mock_component(),
1972        )
1973        .build();
1974
1975        let handle2 = handle.clone();
1976        assert_eq!(handle.id, handle2.id);
1977
1978        teardown(manager_task, world_tx).await;
1979    }
1980
1981    #[tokio::test]
1982    async fn runner_with_emitter_creation() {
1983        let (manager_task, world_tx, world, signal_tx, primary) = setup().await;
1984
1985        let signal_rx = signal_tx.subscribe();
1986        let (runner, handle) = ChannelRunner::builder(
1987            primary,
1988            world_tx.clone(),
1989            world,
1990            signal_rx,
1991            mock_component(),
1992        )
1993        .with_emitter(signal_tx.clone())
1994        .build();
1995
1996        assert_eq!(runner.id(), primary);
1997        assert_eq!(handle.id, primary);
1998
1999        teardown(manager_task, world_tx).await;
2000    }
2001
2002    #[tokio::test]
2003    async fn runner_with_emitter_receives_emitted_events() {
2004        use std::sync::atomic::{AtomicUsize, Ordering};
2005        use std::sync::Arc as StdArc;
2006
2007        // Component that uses emitter to emit output
2008        struct EmittingComponent {
2009            id: ComponentId,
2010            emitter: Option<Box<dyn orcs_component::Emitter>>,
2011            call_count: StdArc<AtomicUsize>,
2012        }
2013
2014        impl EmittingComponent {
2015            fn new(call_count: StdArc<AtomicUsize>) -> Self {
2016                Self {
2017                    id: ComponentId::builtin("emitting"),
2018                    emitter: None,
2019                    call_count,
2020                }
2021            }
2022        }
2023
2024        impl Component for EmittingComponent {
2025            fn id(&self) -> &ComponentId {
2026                &self.id
2027            }
2028
2029            fn status(&self) -> Status {
2030                Status::Idle
2031            }
2032
2033            fn subscriptions(&self) -> &[EventCategory] {
2034                &[EventCategory::Echo]
2035            }
2036
2037            fn on_request(&mut self, request: &Request) -> Result<Value, ComponentError> {
2038                self.call_count.fetch_add(1, Ordering::SeqCst);
2039                // Emit output via emitter when receiving a request
2040                if let Some(emitter) = &self.emitter {
2041                    emitter.emit_output("Response from component");
2042                }
2043                Ok(request.payload.clone())
2044            }
2045
2046            fn on_signal(&mut self, signal: &Signal) -> SignalResponse {
2047                if signal.is_veto() {
2048                    SignalResponse::Abort
2049                } else {
2050                    SignalResponse::Handled
2051                }
2052            }
2053
2054            fn abort(&mut self) {}
2055
2056            fn set_emitter(&mut self, emitter: Box<dyn orcs_component::Emitter>) {
2057                self.emitter = Some(emitter);
2058            }
2059        }
2060
2061        let (manager_task, world_tx, world, signal_tx, primary) = setup().await;
2062
2063        let call_count = StdArc::new(AtomicUsize::new(0));
2064        let component = Box::new(EmittingComponent::new(StdArc::clone(&call_count)));
2065
2066        let signal_rx = signal_tx.subscribe();
2067        let (runner, handle) =
2068            ChannelRunner::builder(primary, world_tx.clone(), world, signal_rx, component)
2069                .with_emitter(signal_tx.clone())
2070                .build();
2071
2072        let runner_task = tokio::spawn(runner.run());
2073
2074        // Inject an event
2075        let event = Event {
2076            category: EventCategory::Echo,
2077            operation: "test".to_string(),
2078            source: ComponentId::builtin("test"),
2079            payload: serde_json::json!({"trigger": true}),
2080        };
2081        handle
2082            .inject(event)
2083            .await
2084            .expect("inject event into emitting runner");
2085
2086        // Wait for processing
2087        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2088
2089        // Component should have been called
2090        assert!(
2091            call_count.load(Ordering::SeqCst) >= 1,
2092            "Component should have received the event"
2093        );
2094
2095        // Stop runner
2096        signal_tx
2097            .send(Signal::cancel(primary, Principal::System))
2098            .expect("send cancel signal to emitting runner");
2099
2100        let _ = tokio::time::timeout(std::time::Duration::from_millis(100), runner_task).await;
2101
2102        teardown(manager_task, world_tx).await;
2103    }
2104
2105    // Builder pattern tests
2106
2107    #[tokio::test]
2108    async fn builder_basic() {
2109        let (manager_task, world_tx, world, signal_tx, primary) = setup().await;
2110
2111        let signal_rx = signal_tx.subscribe();
2112        let (runner, handle) = ChannelRunner::builder(
2113            primary,
2114            world_tx.clone(),
2115            world,
2116            signal_rx,
2117            mock_component(),
2118        )
2119        .build();
2120
2121        assert_eq!(runner.id(), primary);
2122        assert_eq!(handle.id, primary);
2123        assert!(runner.child_spawner().is_none());
2124
2125        teardown(manager_task, world_tx).await;
2126    }
2127
2128    #[tokio::test]
2129    async fn builder_with_emitter() {
2130        let (manager_task, world_tx, world, signal_tx, primary) = setup().await;
2131
2132        let signal_rx = signal_tx.subscribe();
2133        let (runner, handle) = ChannelRunner::builder(
2134            primary,
2135            world_tx.clone(),
2136            world,
2137            signal_rx,
2138            mock_component(),
2139        )
2140        .with_emitter(signal_tx.clone())
2141        .build();
2142
2143        assert_eq!(runner.id(), primary);
2144        assert_eq!(handle.id, primary);
2145
2146        teardown(manager_task, world_tx).await;
2147    }
2148
2149    #[tokio::test]
2150    async fn builder_with_child_spawner() {
2151        let (manager_task, world_tx, world, signal_tx, primary) = setup().await;
2152
2153        let signal_rx = signal_tx.subscribe();
2154        let (runner, handle) = ChannelRunner::builder(
2155            primary,
2156            world_tx.clone(),
2157            world,
2158            signal_rx,
2159            mock_component(),
2160        )
2161        .with_child_spawner(None)
2162        .build();
2163
2164        assert_eq!(runner.id(), primary);
2165        assert_eq!(handle.id, primary);
2166        assert!(runner.child_spawner().is_some());
2167
2168        teardown(manager_task, world_tx).await;
2169    }
2170
2171    #[tokio::test]
2172    async fn builder_with_full_support() {
2173        let (manager_task, world_tx, world, signal_tx, primary) = setup().await;
2174
2175        let signal_rx = signal_tx.subscribe();
2176        let (runner, handle) = ChannelRunner::builder(
2177            primary,
2178            world_tx.clone(),
2179            world,
2180            signal_rx,
2181            mock_component(),
2182        )
2183        .with_emitter(signal_tx.clone())
2184        .with_child_spawner(None)
2185        .build();
2186
2187        assert_eq!(runner.id(), primary);
2188        assert_eq!(handle.id, primary);
2189        assert!(runner.child_spawner().is_some());
2190
2191        teardown(manager_task, world_tx).await;
2192    }
2193
2194    #[tokio::test]
2195    async fn builder_creates_child_context() {
2196        let (manager_task, world_tx, world, signal_tx, primary) = setup().await;
2197
2198        let signal_rx = signal_tx.subscribe();
2199        let (runner, _handle) = ChannelRunner::builder(
2200            primary,
2201            world_tx.clone(),
2202            world,
2203            signal_rx,
2204            mock_component(),
2205        )
2206        .with_child_spawner(None)
2207        .build();
2208
2209        // Should be able to create child context when spawner is enabled
2210        let ctx = runner.create_child_context("child-1");
2211        assert!(ctx.is_some());
2212
2213        teardown(manager_task, world_tx).await;
2214    }
2215
2216    #[tokio::test]
2217    async fn builder_no_child_context_without_spawner() {
2218        let (manager_task, world_tx, world, signal_tx, primary) = setup().await;
2219
2220        let signal_rx = signal_tx.subscribe();
2221        let (runner, _handle) = ChannelRunner::builder(
2222            primary,
2223            world_tx.clone(),
2224            world,
2225            signal_rx,
2226            mock_component(),
2227        )
2228        .build();
2229
2230        // Should NOT be able to create child context when spawner is disabled
2231        let ctx = runner.create_child_context("child-1");
2232        assert!(ctx.is_none());
2233
2234        teardown(manager_task, world_tx).await;
2235    }
2236
2237    // === Request Channel (Component-to-Component RPC) Tests ===
2238
2239    #[tokio::test]
2240    async fn builder_with_request_channel_enables_accepts_requests() {
2241        let (manager_task, world_tx, world, signal_tx, primary) = setup().await;
2242
2243        let signal_rx = signal_tx.subscribe();
2244        let (_runner, handle) = ChannelRunner::builder(
2245            primary,
2246            world_tx.clone(),
2247            world,
2248            signal_rx,
2249            mock_component(),
2250        )
2251        .with_request_channel()
2252        .build();
2253
2254        assert!(handle.accepts_requests());
2255
2256        teardown(manager_task, world_tx).await;
2257    }
2258
2259    #[tokio::test]
2260    async fn builder_without_request_channel_rejects_requests() {
2261        let (manager_task, world_tx, world, signal_tx, primary) = setup().await;
2262
2263        let signal_rx = signal_tx.subscribe();
2264        let (_runner, handle) = ChannelRunner::builder(
2265            primary,
2266            world_tx.clone(),
2267            world,
2268            signal_rx,
2269            mock_component(),
2270        )
2271        .build();
2272
2273        assert!(!handle.accepts_requests());
2274
2275        teardown(manager_task, world_tx).await;
2276    }
2277
2278    #[tokio::test]
2279    async fn rpc_request_routed_to_runner_and_responded() {
2280        let (manager_task, world_tx, world, signal_tx, primary) = setup().await;
2281
2282        let signal_rx = signal_tx.subscribe();
2283        let (runner, handle) = ChannelRunner::builder(
2284            primary,
2285            world_tx.clone(),
2286            world,
2287            signal_rx,
2288            mock_component(),
2289        )
2290        .with_request_channel()
2291        .build();
2292
2293        let runner_task = tokio::spawn(runner.run());
2294
2295        // Send RPC request via handle
2296        let source = ComponentId::builtin("caller");
2297        let target = ComponentId::builtin("test");
2298        let req = Request::new(
2299            EventCategory::Echo,
2300            "echo",
2301            source,
2302            primary,
2303            Value::String("rpc_payload".into()),
2304        )
2305        .with_target(target);
2306
2307        let (reply_tx, reply_rx) = oneshot::channel();
2308        handle
2309            .send_request(req, reply_tx)
2310            .await
2311            .expect("send RPC request to runner");
2312
2313        // Should receive the echoed payload back
2314        let result = tokio::time::timeout(std::time::Duration::from_millis(200), reply_rx)
2315            .await
2316            .expect("reply should arrive within timeout")
2317            .expect("reply channel should not be dropped");
2318
2319        assert_eq!(
2320            result.expect("RPC response should be Ok"),
2321            Value::String("rpc_payload".into())
2322        );
2323
2324        // Cleanup
2325        signal_tx
2326            .send(Signal::cancel(primary, Principal::System))
2327            .expect("send cancel signal after RPC test");
2328        let _ = tokio::time::timeout(std::time::Duration::from_millis(100), runner_task).await;
2329        teardown(manager_task, world_tx).await;
2330    }
2331
2332    #[tokio::test]
2333    async fn rpc_request_component_error_returned_as_string() {
2334        /// Component that always returns error from on_request.
2335        struct FailingComponent {
2336            id: ComponentId,
2337        }
2338
2339        impl Component for FailingComponent {
2340            fn id(&self) -> &ComponentId {
2341                &self.id
2342            }
2343            fn status(&self) -> Status {
2344                Status::Idle
2345            }
2346            fn on_request(&mut self, _request: &Request) -> Result<Value, ComponentError> {
2347                Err(ComponentError::ExecutionFailed(
2348                    "deliberate test failure".into(),
2349                ))
2350            }
2351            fn on_signal(&mut self, signal: &Signal) -> SignalResponse {
2352                if signal.is_veto() {
2353                    SignalResponse::Abort
2354                } else {
2355                    SignalResponse::Handled
2356                }
2357            }
2358            fn abort(&mut self) {}
2359        }
2360
2361        let (manager_task, world_tx, world, signal_tx, primary) = setup().await;
2362
2363        let component: Box<dyn Component> = Box::new(FailingComponent {
2364            id: ComponentId::builtin("failing"),
2365        });
2366        let signal_rx = signal_tx.subscribe();
2367        let (runner, handle) =
2368            ChannelRunner::builder(primary, world_tx.clone(), world, signal_rx, component)
2369                .with_request_channel()
2370                .build();
2371
2372        let runner_task = tokio::spawn(runner.run());
2373
2374        let source = ComponentId::builtin("caller");
2375        let target = ComponentId::builtin("failing");
2376        let req = Request::new(EventCategory::Echo, "op", source, primary, Value::Null)
2377            .with_target(target);
2378
2379        let (reply_tx, reply_rx) = oneshot::channel();
2380        handle
2381            .send_request(req, reply_tx)
2382            .await
2383            .expect("send RPC request to failing runner");
2384
2385        let result = tokio::time::timeout(std::time::Duration::from_millis(200), reply_rx)
2386            .await
2387            .expect("reply should arrive")
2388            .expect("channel should not be dropped");
2389
2390        assert!(result.is_err(), "should return Err for component failure");
2391        let err_msg = result.expect_err("expected Err variant for component failure");
2392        assert!(
2393            err_msg.contains("deliberate test failure"),
2394            "error message should contain original error, got: {err_msg}"
2395        );
2396
2397        signal_tx
2398            .send(Signal::cancel(primary, Principal::System))
2399            .expect("send cancel signal after failing RPC test");
2400        let _ = tokio::time::timeout(std::time::Duration::from_millis(100), runner_task).await;
2401        teardown(manager_task, world_tx).await;
2402    }
2403
2404    // === Output Channel Routing Tests ===
2405    //
2406    // Verify that when `with_output_channel(output_tx)` is configured on the
2407    // ChannelRunnerBuilder, Output events emitted by the Component's emitter
2408    // are routed to the output channel (ClientRunner/IOPort path) rather than
2409    // the runner's own event channel.
2410
2411    #[tokio::test]
2412    async fn emitter_output_routes_to_output_channel_via_builder() {
2413        use std::sync::atomic::{AtomicUsize, Ordering};
2414        use std::sync::Arc as StdArc;
2415
2416        /// Component that emits output via its emitter during on_request.
2417        struct OutputRoutingComponent {
2418            id: ComponentId,
2419            emitter: Option<Box<dyn orcs_component::Emitter>>,
2420            call_count: StdArc<AtomicUsize>,
2421        }
2422
2423        impl OutputRoutingComponent {
2424            fn new(call_count: StdArc<AtomicUsize>) -> Self {
2425                Self {
2426                    id: ComponentId::builtin("output-routing"),
2427                    emitter: None,
2428                    call_count,
2429                }
2430            }
2431        }
2432
2433        impl Component for OutputRoutingComponent {
2434            fn id(&self) -> &ComponentId {
2435                &self.id
2436            }
2437            fn status(&self) -> Status {
2438                Status::Idle
2439            }
2440            fn subscriptions(&self) -> &[EventCategory] {
2441                &[EventCategory::Echo]
2442            }
2443            fn on_request(&mut self, _request: &Request) -> Result<Value, ComponentError> {
2444                self.call_count.fetch_add(1, Ordering::SeqCst);
2445                if let Some(emitter) = &self.emitter {
2446                    emitter.emit_output("routed output message");
2447                }
2448                Ok(serde_json::json!({"success": true}))
2449            }
2450            fn on_signal(&mut self, signal: &Signal) -> SignalResponse {
2451                if signal.is_veto() {
2452                    SignalResponse::Abort
2453                } else {
2454                    SignalResponse::Handled
2455                }
2456            }
2457            fn abort(&mut self) {}
2458            fn set_emitter(&mut self, emitter: Box<dyn orcs_component::Emitter>) {
2459                self.emitter = Some(emitter);
2460            }
2461        }
2462
2463        let (manager_task, world_tx, world, signal_tx, primary) = setup().await;
2464
2465        let call_count = StdArc::new(AtomicUsize::new(0));
2466        let component = Box::new(OutputRoutingComponent::new(StdArc::clone(&call_count)));
2467
2468        // Create a separate output channel to receive routed Output events
2469        let (output_tx, mut output_rx) = OutputSender::channel(64);
2470
2471        let signal_rx = signal_tx.subscribe();
2472        let (runner, handle) =
2473            ChannelRunner::builder(primary, world_tx.clone(), world, signal_rx, component)
2474                .with_emitter(signal_tx.clone())
2475                .with_output_channel(output_tx)
2476                .build();
2477
2478        let runner_task = tokio::spawn(runner.run());
2479
2480        // Inject an event to trigger on_request → emit_output
2481        let event = Event {
2482            category: EventCategory::Echo,
2483            operation: "test".to_string(),
2484            source: ComponentId::builtin("test"),
2485            payload: serde_json::json!({"trigger": true}),
2486        };
2487        handle.inject(event).await.expect("inject event");
2488
2489        // Wait for processing
2490        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2491
2492        // Verify the component was called
2493        assert!(
2494            call_count.load(Ordering::SeqCst) >= 1,
2495            "Component should have received the event"
2496        );
2497
2498        // Verify the Output event arrived on the output channel
2499        let output_event = output_rx
2500            .try_recv()
2501            .expect("Output event should arrive on output channel");
2502        assert_eq!(
2503            output_event.category,
2504            EventCategory::Output,
2505            "Event category should be Output"
2506        );
2507        assert_eq!(
2508            output_event.operation, "display",
2509            "Event operation should be 'display'"
2510        );
2511        assert_eq!(
2512            output_event.payload["message"], "routed output message",
2513            "Event payload message should match what the component emitted"
2514        );
2515        assert_eq!(
2516            output_event.payload["level"], "info",
2517            "Event payload level should be 'info'"
2518        );
2519
2520        // Cleanup
2521        signal_tx
2522            .send(Signal::cancel(primary, Principal::System))
2523            .expect("send cancel");
2524        let _ = tokio::time::timeout(std::time::Duration::from_millis(100), runner_task).await;
2525        teardown(manager_task, world_tx).await;
2526    }
2527
2528    #[tokio::test]
2529    async fn emitter_output_records_to_board_and_routes_to_output_channel() {
2530        use std::sync::atomic::{AtomicUsize, Ordering};
2531        use std::sync::Arc as StdArc;
2532
2533        struct BoardOutputComponent {
2534            id: ComponentId,
2535            emitter: Option<Box<dyn orcs_component::Emitter>>,
2536            call_count: StdArc<AtomicUsize>,
2537        }
2538
2539        impl BoardOutputComponent {
2540            fn new(call_count: StdArc<AtomicUsize>) -> Self {
2541                Self {
2542                    id: ComponentId::builtin("board-output"),
2543                    emitter: None,
2544                    call_count,
2545                }
2546            }
2547        }
2548
2549        impl Component for BoardOutputComponent {
2550            fn id(&self) -> &ComponentId {
2551                &self.id
2552            }
2553            fn status(&self) -> Status {
2554                Status::Idle
2555            }
2556            fn subscriptions(&self) -> &[EventCategory] {
2557                &[EventCategory::Echo]
2558            }
2559            fn on_request(&mut self, _request: &Request) -> Result<Value, ComponentError> {
2560                self.call_count.fetch_add(1, Ordering::SeqCst);
2561                if let Some(emitter) = &self.emitter {
2562                    emitter.emit_output("board and io message");
2563                }
2564                Ok(serde_json::json!({"success": true}))
2565            }
2566            fn on_signal(&mut self, signal: &Signal) -> SignalResponse {
2567                if signal.is_veto() {
2568                    SignalResponse::Abort
2569                } else {
2570                    SignalResponse::Handled
2571                }
2572            }
2573            fn abort(&mut self) {}
2574            fn set_emitter(&mut self, emitter: Box<dyn orcs_component::Emitter>) {
2575                self.emitter = Some(emitter);
2576            }
2577        }
2578
2579        let (manager_task, world_tx, world, signal_tx, primary) = setup().await;
2580
2581        let call_count = StdArc::new(AtomicUsize::new(0));
2582        let component = Box::new(BoardOutputComponent::new(StdArc::clone(&call_count)));
2583
2584        let (output_tx, mut output_rx) = OutputSender::channel(64);
2585        let board = crate::board::shared_board();
2586
2587        let signal_rx = signal_tx.subscribe();
2588        let (runner, handle) =
2589            ChannelRunner::builder(primary, world_tx.clone(), world, signal_rx, component)
2590                .with_emitter(signal_tx.clone())
2591                .with_output_channel(output_tx)
2592                .with_board(Arc::clone(&board))
2593                .build();
2594
2595        let runner_task = tokio::spawn(runner.run());
2596
2597        // Inject event to trigger on_request → emit_output
2598        let event = Event {
2599            category: EventCategory::Echo,
2600            operation: "test".to_string(),
2601            source: ComponentId::builtin("test"),
2602            payload: serde_json::json!({"trigger": true}),
2603        };
2604        handle.inject(event).await.expect("inject event");
2605
2606        // Wait for processing
2607        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2608
2609        // Verify component was called
2610        assert!(call_count.load(Ordering::SeqCst) >= 1);
2611
2612        // Verify Output event on output channel
2613        let output_event = output_rx
2614            .try_recv()
2615            .expect("Output event should arrive on output channel");
2616        assert_eq!(output_event.payload["message"], "board and io message");
2617
2618        // Verify Board also recorded the entry
2619        let b = board.read();
2620        assert!(b.len() >= 1, "Board should have at least 1 entry");
2621        let entries = b.recent(1);
2622        assert_eq!(
2623            entries[0].payload["message"], "board and io message",
2624            "Board entry should match the emitted message"
2625        );
2626
2627        // Cleanup
2628        signal_tx
2629            .send(Signal::cancel(primary, Principal::System))
2630            .expect("send cancel");
2631        let _ = tokio::time::timeout(std::time::Duration::from_millis(100), runner_task).await;
2632        teardown(manager_task, world_tx).await;
2633    }
2634}