Skip to main content

orcs_runtime/channel/runner/
base.rs

1//! ChannelRunner - Parallel execution context for a Channel.
2//!
3//! Each [`ChannelRunner`] runs in its own tokio task, enabling true
4//! parallel execution of Channels. It receives Events via mpsc and
5//! sends World modifications via the command queue.
6//!
7//! # Architecture
8//!
9//! ```text
10//!                           ┌──────────────────────────┐
11//!                           │     ChannelRunner        │
12//!                           │                          │
13//! EventBus ───inject()────► │  event_rx ◄── mpsc       │
14//!                           │                          │
15//! Human ────signal()──────► │  signal_rx ◄── broadcast │
16//!                           │                          │
17//!                           │         │                │
18//!                           │         ▼                │
19//!                           │  process_event()         │
20//!                           │         │                │
21//!                           │         ▼                │
22//!                           │  world_tx ───► WorldManager
23//!                           │                          │
24//!                           └──────────────────────────┘
25//! ```
26//!
27//! # Lifecycle
28//!
29//! 1. Created via [`ChannelRunner::builder()`]
30//! 2. Started with [`ChannelRunner::run()`] (spawns tokio task)
31//! 3. Processes Events until channel completes or is killed
32//! 4. Cleanup on drop
33
34use super::child_context::{ChildContextImpl, LuaChildLoader};
35use super::child_spawner::ChildSpawner;
36use super::common::{
37    determine_channel_action, dispatch_signal_to_component, is_channel_active, is_channel_paused,
38    send_abort, send_transition, SignalAction,
39};
40use super::paused_queue::PausedEventQueue;
41use super::EventEmitter;
42use crate::auth::PermissionChecker;
43use crate::auth::Session;
44use crate::channel::command::{StateTransition, WorldCommand};
45use crate::channel::config::ChannelConfig;
46use crate::channel::World;
47use crate::engine::SharedChannelHandles;
48use orcs_component::{
49    AsyncChildContext, ChildContext, Component, ComponentError, ComponentLoader, ComponentSnapshot,
50    SnapshotError,
51};
52use orcs_event::{EventCategory, Request, Signal, SubscriptionEntry};
53use orcs_hook::SharedHookRegistry;
54use orcs_types::ChannelId;
55use serde::{Deserialize, Serialize};
56use serde_json::Value;
57use std::borrow::Cow;
58use std::sync::{Arc, Mutex as StdMutex};
59use tokio::sync::{broadcast, mpsc, oneshot, Mutex, RwLock};
60use tracing::{debug, error, info, warn};
61
62/// An event that can be injected into a channel.
63///
64/// Events are the primary means of external input to a running channel.
65/// Unlike Signals (which are control messages), Events represent work
66/// to be processed by the bound Component.
67#[derive(Debug, Clone, Serialize, Deserialize)]
68pub struct Event {
69    /// Category of this event (for routing).
70    pub category: EventCategory,
71    /// Operation to perform (maps to Component.on_request operation).
72    pub operation: String,
73    /// Source component that sent this event.
74    pub source: orcs_types::ComponentId,
75    /// Event payload data.
76    pub payload: serde_json::Value,
77}
78
79/// Internal routing wrapper that distinguishes broadcast from direct event injection.
80///
81/// - [`Broadcast`](InboundEvent::Broadcast): Events sent to all channels
82///   (e.g., UserInput from ClientRunner). The subscription filter is applied —
83///   only channels subscribed to the event's category will process it.
84/// - [`Direct`](InboundEvent::Direct): Events targeted at a specific channel
85///   (e.g., `@component` routing via Engine). The subscription filter is
86///   bypassed — the channel always processes the event.
87///
88/// This enum is internal to the runner system and does not affect the
89/// [`Event`] struct itself.
90#[derive(Debug, Clone)]
91pub(crate) enum InboundEvent {
92    /// Broadcast event — subscription filter applies.
93    Broadcast(Event),
94    /// Direct event — subscription filter bypassed.
95    Direct(Event),
96}
97
98impl InboundEvent {
99    /// Extracts the inner [`Event`], consuming the wrapper.
100    pub(crate) fn into_event(self) -> Event {
101        match self {
102            Self::Broadcast(e) | Self::Direct(e) => e,
103        }
104    }
105
106    /// Returns `true` if this is a direct (filter-bypassing) event.
107    pub(crate) fn is_direct(&self) -> bool {
108        matches!(self, Self::Direct(_))
109    }
110}
111
112/// Opaque handle for sending events into a channel as `InboundEvent::Direct`.
113///
114/// All events sent through `OutputSender` bypass the subscription filter
115/// on the receiving [`ChannelRunner`]. Use [`OutputSender::channel()`] to
116/// create a matched sender/receiver pair for testing or external use.
117///
118/// Internally wraps `mpsc::Sender<InboundEvent>` without exposing
119/// the `InboundEvent` type to external crates.
120#[derive(Clone, Debug)]
121pub struct OutputSender {
122    inner: mpsc::Sender<InboundEvent>,
123}
124
125impl OutputSender {
126    /// Creates a matched (`OutputSender`, [`OutputReceiver`]) pair.
127    ///
128    /// This is the public way to create a channel for use with
129    /// [`ChildSpawner`] and [`ChildContextImpl`] in integration tests
130    /// or external code.
131    #[must_use]
132    pub fn channel(buffer: usize) -> (Self, OutputReceiver) {
133        let (tx, rx) = mpsc::channel(buffer);
134        (Self { inner: tx }, OutputReceiver { inner: rx })
135    }
136
137    /// Creates a new OutputSender from an InboundEvent sender (crate-internal).
138    pub(crate) fn new(tx: mpsc::Sender<InboundEvent>) -> Self {
139        Self { inner: tx }
140    }
141
142    /// Returns the inner sender (crate-internal).
143    #[allow(dead_code)]
144    pub(crate) fn into_inner(self) -> mpsc::Sender<InboundEvent> {
145        self.inner
146    }
147
148    /// Sends an event as [`InboundEvent::Direct`] (non-blocking).
149    #[allow(clippy::result_large_err)]
150    pub(crate) fn try_send_direct(
151        &self,
152        event: Event,
153    ) -> Result<(), mpsc::error::TrySendError<InboundEvent>> {
154        self.inner.try_send(InboundEvent::Direct(event))
155    }
156
157    /// Sends an event as [`InboundEvent::Direct`] (async, waits for capacity).
158    #[allow(dead_code)]
159    pub(crate) async fn send_direct(
160        &self,
161        event: Event,
162    ) -> Result<(), mpsc::error::SendError<InboundEvent>> {
163        self.inner.send(InboundEvent::Direct(event)).await
164    }
165}
166
167/// Receiver end of an [`OutputSender`] channel.
168///
169/// Unwraps `InboundEvent` internally, returning plain [`Event`] values.
170/// Created via [`OutputSender::channel()`].
171pub struct OutputReceiver {
172    inner: mpsc::Receiver<InboundEvent>,
173}
174
175impl OutputReceiver {
176    /// Receives the next event, waiting until one is available.
177    ///
178    /// Returns `None` when all senders have been dropped.
179    pub async fn recv(&mut self) -> Option<Event> {
180        self.inner.recv().await.map(InboundEvent::into_event)
181    }
182
183    /// Attempts to receive an event without blocking.
184    pub fn try_recv(&mut self) -> Result<Event, mpsc::error::TryRecvError> {
185        self.inner.try_recv().map(InboundEvent::into_event)
186    }
187}
188
189/// Result of a ChannelRunner's execution.
190///
191/// Why a [`ChannelRunner`] exited its event loop.
192///
193/// Carried in [`RunnerResult`] and forwarded to the engine monitor
194/// via [`RunnerExitNotice`](super::super::engine::RunnerExitNotice)
195/// so that Lua components can react to unexpected terminations.
196#[derive(Debug, Clone, Copy, PartialEq, Eq)]
197pub enum ExitReason {
198    /// Signal-initiated stop (Veto, Cancel, etc.).
199    Signal,
200    /// Event channel closed — no more events will arrive.
201    EventChannelClosed,
202    /// Signal channel closed — engine is shutting down.
203    SignalChannelClosed,
204    /// Channel became inactive in World (parent stopped, etc.).
205    ChannelInactive,
206    /// Component's `handle_event` returned `false` (self-stop).
207    ComponentStopped,
208    /// IO bridge closed — user terminal disconnected (ClientRunner only).
209    IoChannelClosed,
210    /// User issued a quit command via IO (ClientRunner only).
211    UserQuit,
212}
213
214impl ExitReason {
215    /// Short machine-readable tag for JSON serialization.
216    pub fn as_str(&self) -> &'static str {
217        match self {
218            Self::Signal => "signal",
219            Self::EventChannelClosed => "event_channel_closed",
220            Self::SignalChannelClosed => "signal_channel_closed",
221            Self::ChannelInactive => "channel_inactive",
222            Self::ComponentStopped => "component_stopped",
223            Self::IoChannelClosed => "io_channel_closed",
224            Self::UserQuit => "user_quit",
225        }
226    }
227}
228
229impl std::fmt::Display for ExitReason {
230    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
231        f.write_str(self.as_str())
232    }
233}
234
235/// Returned by [`ChannelRunner::run()`] after the event loop exits and
236/// the shutdown sequence completes. Contains the channel ID, component
237/// FQN, and an optional snapshot captured during graceful shutdown.
238#[derive(Debug)]
239pub struct RunnerResult {
240    /// Channel ID this runner was bound to.
241    pub channel_id: ChannelId,
242    /// Fully qualified name of the bound Component.
243    ///
244    /// `Cow::Borrowed` for static FQNs (e.g. ClientRunner),
245    /// `Cow::Owned` for dynamic FQNs (e.g. ChannelRunner components).
246    pub component_fqn: Cow<'static, str>,
247    /// Snapshot captured during shutdown (None if component doesn't support snapshots).
248    pub snapshot: Option<ComponentSnapshot>,
249    /// Why the runner exited its event loop.
250    pub exit_reason: ExitReason,
251}
252
253/// An RPC request paired with its reply channel.
254///
255/// Created by [`EventBus::request()`] and delivered to [`ChannelRunner`]
256/// via its `request_rx` channel. The runner calls `Component::on_request()`
257/// and sends the result back through `reply_tx`.
258pub(crate) struct RequestEnvelope {
259    /// The incoming request.
260    pub request: Request,
261    /// One-shot channel to send the response back to the caller.
262    pub reply_tx: oneshot::Sender<Result<Value, String>>,
263}
264
265/// Buffer size for the request channel (Component-to-Component RPC).
266const REQUEST_BUFFER_SIZE: usize = 32;
267
268/// Default event buffer size per channel.
269const EVENT_BUFFER_SIZE: usize = 64;
270
271/// Handle for injecting events into a [`ChannelRunner`].
272#[derive(Clone, Debug)]
273pub struct ChannelHandle {
274    /// Channel ID.
275    pub id: ChannelId,
276    /// Event sender (carries [`InboundEvent`] internally).
277    event_tx: mpsc::Sender<InboundEvent>,
278    /// Request sender for Component-to-Component RPC (None if not enabled).
279    request_tx: Option<mpsc::Sender<RequestEnvelope>>,
280}
281
282impl ChannelHandle {
283    /// Creates a new handle with the given sender.
284    #[must_use]
285    pub(crate) fn new(id: ChannelId, event_tx: mpsc::Sender<InboundEvent>) -> Self {
286        Self {
287            id,
288            event_tx,
289            request_tx: None,
290        }
291    }
292
293    /// Returns `true` if this handle accepts RPC requests.
294    #[must_use]
295    pub fn accepts_requests(&self) -> bool {
296        self.request_tx.is_some()
297    }
298
299    /// Returns `true` if the runner behind this handle is still alive.
300    ///
301    /// Checks `mpsc::Sender::is_closed()` on the event channel. The channel
302    /// is closed when the receiver (ChannelRunner) is dropped, indicating
303    /// that the runner has stopped.
304    #[must_use]
305    pub fn is_alive(&self) -> bool {
306        !self.event_tx.is_closed()
307    }
308
309    /// Sends an RPC request to the bound Component.
310    ///
311    /// The request is delivered to the ChannelRunner's `request_rx` and
312    /// processed by `Component::on_request()`. The result is sent back
313    /// through `reply_tx`.
314    ///
315    /// # Errors
316    ///
317    /// Returns `Err` if the request channel is not enabled or is closed.
318    pub(crate) async fn send_request(
319        &self,
320        request: Request,
321        reply_tx: oneshot::Sender<Result<Value, String>>,
322    ) -> Result<(), mpsc::error::SendError<RequestEnvelope>> {
323        match &self.request_tx {
324            Some(tx) => tx.send(RequestEnvelope { request, reply_tx }).await,
325            None => Err(mpsc::error::SendError(RequestEnvelope {
326                request,
327                reply_tx,
328            })),
329        }
330    }
331
332    /// Injects a broadcast event into the channel (subscription filter applies).
333    ///
334    /// Returns an error if the channel has been dropped.
335    pub(crate) async fn inject(
336        &self,
337        event: Event,
338    ) -> Result<(), mpsc::error::SendError<InboundEvent>> {
339        self.event_tx.send(InboundEvent::Broadcast(event)).await
340    }
341
342    /// Try to inject a broadcast event without blocking.
343    ///
344    /// Returns an error if the buffer is full or channel is dropped.
345    #[allow(clippy::result_large_err)]
346    pub(crate) fn try_inject(
347        &self,
348        event: Event,
349    ) -> Result<(), mpsc::error::TrySendError<InboundEvent>> {
350        self.event_tx.try_send(InboundEvent::Broadcast(event))
351    }
352
353    /// Injects a direct event into the channel (subscription filter bypassed).
354    ///
355    /// Use this for targeted delivery (e.g., `@component` routing).
356    pub(crate) async fn inject_direct(
357        &self,
358        event: Event,
359    ) -> Result<(), mpsc::error::SendError<InboundEvent>> {
360        self.event_tx.send(InboundEvent::Direct(event)).await
361    }
362
363    /// Try to inject a direct event without blocking.
364    ///
365    /// Bypasses the subscription filter on the receiving ChannelRunner.
366    #[allow(clippy::result_large_err)]
367    pub(crate) fn try_inject_direct(
368        &self,
369        event: Event,
370    ) -> Result<(), mpsc::error::TrySendError<InboundEvent>> {
371        self.event_tx.try_send(InboundEvent::Direct(event))
372    }
373}
374
375/// State stored when a Component returns [`ComponentError::Suspended`].
376///
377/// Holds the original request and approval metadata so that the ChannelRunner
378/// can re-dispatch the request after the user approves.
379struct PendingApproval {
380    /// Unique ID for this approval (matches Signal routing).
381    approval_id: String,
382    /// Permission pattern to grant on approval (e.g. `"shell:*"`).
383    grant_pattern: String,
384    /// The original request to re-dispatch after approval.
385    original_request: Request,
386}
387
388/// Execution context for a single Channel.
389///
390/// `ChannelRunner` provides the runtime environment for parallel
391/// channel execution. Each runner:
392///
393/// - Binds 1:1 with a [`Component`]
394/// - Receives Events via an mpsc channel and delivers to Component
395/// - Receives Signals via broadcast and delivers to Component
396/// - Sends World modifications via command queue
397/// - Has read access to the World for queries
398/// - Manages spawned children via ChildSpawner
399pub struct ChannelRunner {
400    /// This channel's ID.
401    id: ChannelId,
402    /// Bound component's ID (cached at build time to avoid locking for hook dispatch).
403    component_id: orcs_types::ComponentId,
404    /// Receiver for incoming events (wrapped as [`InboundEvent`]).
405    event_rx: mpsc::Receiver<InboundEvent>,
406    /// Receiver for signals (broadcast).
407    signal_rx: broadcast::Receiver<Signal>,
408    /// Sender for World commands.
409    world_tx: mpsc::Sender<WorldCommand>,
410    /// Read-only World access.
411    world: Arc<RwLock<World>>,
412    /// Bound Component (1:1 relationship).
413    component: Arc<Mutex<Box<dyn Component>>>,
414    /// Cached subscription entries from Component.
415    ///
416    /// Populated at build time to avoid locking Component on every event.
417    /// Broadcast events whose category (and optionally operation) do not
418    /// match any entry are silently skipped.
419    /// Direct events bypass this filter entirely.
420    subscriptions: Vec<SubscriptionEntry>,
421    /// Queue for events received while paused.
422    paused_queue: PausedEventQueue,
423    /// Child spawner for managing spawned children (optional).
424    child_spawner: Option<Arc<StdMutex<ChildSpawner>>>,
425    /// Event sender for child context (kept for context creation).
426    event_tx: Option<mpsc::Sender<InboundEvent>>,
427    /// Receiver for incoming RPC requests from other Components.
428    ///
429    /// When enabled via [`ChannelRunnerBuilder::with_request_channel()`],
430    /// the runner accepts [`RequestEnvelope`]s, calls `Component::on_request()`,
431    /// and sends the result back through the envelope's `reply_tx`.
432    request_rx: Option<mpsc::Receiver<RequestEnvelope>>,
433    /// Initial snapshot to restore before init (session resume).
434    initial_snapshot: Option<ComponentSnapshot>,
435    /// Shared channel handles for RPC from children.
436    shared_handles: Option<SharedChannelHandles>,
437    /// Shared FQN → ChannelId map for RPC from children.
438    component_channel_map: Option<crate::engine::SharedComponentChannelMap>,
439    /// Shared hook registry for lifecycle hook dispatch.
440    hook_registry: Option<SharedHookRegistry>,
441    /// Per-component configuration from `[components.settings.<name>]`.
442    component_config: serde_json::Value,
443    /// Dynamic command grants for granting permissions on approval.
444    grants: Option<Arc<dyn orcs_auth::GrantPolicy>>,
445    /// IO output sender for routing approval requests to the ClientRunner.
446    io_output_tx: Option<OutputSender>,
447    /// Pending approval state: stored when Component returns Suspended.
448    pending_approval: Option<PendingApproval>,
449}
450
451/// Helper for `tokio::select!`: receives from an optional request channel.
452///
453/// Returns `None` immediately if the receiver is `None` (request channel
454/// not enabled), allowing the select! branch to be skipped.
455async fn recv_request(rx: &mut Option<mpsc::Receiver<RequestEnvelope>>) -> Option<RequestEnvelope> {
456    match rx {
457        Some(rx) => rx.recv().await,
458        None => std::future::pending().await,
459    }
460}
461
462impl ChannelRunner {
463    /// Dispatches hooks for the given point with the given payload.
464    ///
465    /// Returns `HookAction::Continue` with the (possibly modified) context
466    /// if no registry is configured.
467    fn dispatch_hook(
468        &self,
469        point: orcs_hook::HookPoint,
470        payload: serde_json::Value,
471    ) -> orcs_hook::HookAction {
472        let Some(registry) = &self.hook_registry else {
473            let ctx = orcs_hook::HookContext::new(
474                point,
475                self.component_id.clone(),
476                self.id,
477                orcs_types::Principal::System,
478                0,
479                payload,
480            );
481            return orcs_hook::HookAction::Continue(Box::new(ctx));
482        };
483
484        let ctx = orcs_hook::HookContext::new(
485            point,
486            self.component_id.clone(),
487            self.id,
488            orcs_types::Principal::System,
489            0,
490            payload,
491        );
492
493        let guard = registry.read().unwrap_or_else(|poisoned| {
494            warn!("hook registry lock poisoned, using inner value");
495            poisoned.into_inner()
496        });
497        guard.dispatch(point, &self.component_id, None, ctx)
498    }
499
500    /// Returns a reference to the shared hook registry, if configured.
501    #[must_use]
502    pub fn hook_registry(&self) -> Option<&SharedHookRegistry> {
503        self.hook_registry.as_ref()
504    }
505
506    /// Creates a ChildContext for use by managed children.
507    ///
508    /// The returned context can be injected into LuaChild instances
509    /// to enable them to spawn sub-children.
510    ///
511    /// Returns None if child spawning was not enabled for this runner.
512    #[must_use]
513    pub fn create_child_context(&self, child_id: &str) -> Option<Box<dyn ChildContext>> {
514        let spawner = self.child_spawner.as_ref()?;
515        let event_tx = self.event_tx.as_ref()?;
516
517        let mut ctx = ChildContextImpl::new(
518            child_id,
519            OutputSender::new(event_tx.clone()),
520            Arc::clone(spawner),
521        );
522        ctx = self.inject_rpc(ctx);
523
524        Some(Box::new(ctx))
525    }
526
527    /// Creates a ChildContext with a LuaChildLoader for spawning Lua children.
528    ///
529    /// # Arguments
530    ///
531    /// * `child_id` - ID of the child that will use this context
532    /// * `loader` - Loader for creating LuaChild instances from configs
533    #[must_use]
534    pub fn create_child_context_with_loader(
535        &self,
536        child_id: &str,
537        loader: Arc<dyn LuaChildLoader>,
538    ) -> Option<Box<dyn ChildContext>> {
539        let spawner = self.child_spawner.as_ref()?;
540        let event_tx = self.event_tx.as_ref()?;
541
542        let mut ctx = ChildContextImpl::new(
543            child_id,
544            OutputSender::new(event_tx.clone()),
545            Arc::clone(spawner),
546        )
547        .with_lua_loader(loader);
548        ctx = self.inject_rpc(ctx);
549
550        Some(Box::new(ctx))
551    }
552
553    /// Creates an AsyncChildContext for use by async children.
554    ///
555    /// The returned context can be used with async spawn operations.
556    ///
557    /// Returns None if child spawning was not enabled for this runner.
558    #[must_use]
559    pub fn create_async_child_context(&self, child_id: &str) -> Option<Box<dyn AsyncChildContext>> {
560        let spawner = self.child_spawner.as_ref()?;
561        let event_tx = self.event_tx.as_ref()?;
562
563        let mut ctx = ChildContextImpl::new(
564            child_id,
565            OutputSender::new(event_tx.clone()),
566            Arc::clone(spawner),
567        );
568        ctx = self.inject_rpc(ctx);
569
570        Some(Box::new(ctx))
571    }
572
573    /// Creates an AsyncChildContext with a LuaChildLoader.
574    ///
575    /// # Arguments
576    ///
577    /// * `child_id` - ID of the child that will use this context
578    /// * `loader` - Loader for creating LuaChild instances from configs
579    #[must_use]
580    pub fn create_async_child_context_with_loader(
581        &self,
582        child_id: &str,
583        loader: Arc<dyn LuaChildLoader>,
584    ) -> Option<Box<dyn AsyncChildContext>> {
585        let spawner = self.child_spawner.as_ref()?;
586        let event_tx = self.event_tx.as_ref()?;
587
588        let mut ctx = ChildContextImpl::new(
589            child_id,
590            OutputSender::new(event_tx.clone()),
591            Arc::clone(spawner),
592        )
593        .with_lua_loader(loader);
594        ctx = self.inject_rpc(ctx);
595
596        Some(Box::new(ctx))
597    }
598
599    /// Injects RPC support (shared handles + channel map + channel ID)
600    /// into a ChildContextImpl if the runner was built with RPC resources.
601    fn inject_rpc(&self, ctx: ChildContextImpl) -> ChildContextImpl {
602        if let (Some(handles), Some(map)) = (&self.shared_handles, &self.component_channel_map) {
603            ctx.with_rpc_support(handles.clone(), map.clone(), self.id)
604        } else {
605            ctx
606        }
607    }
608
609    /// Returns a reference to the child spawner, if enabled.
610    #[must_use]
611    pub fn child_spawner(&self) -> Option<&Arc<StdMutex<ChildSpawner>>> {
612        self.child_spawner.as_ref()
613    }
614
615    /// Returns this channel's ID.
616    #[must_use]
617    pub fn id(&self) -> ChannelId {
618        self.id
619    }
620
621    /// Returns a reference to the world_tx sender.
622    #[must_use]
623    pub fn world_tx(&self) -> &mpsc::Sender<WorldCommand> {
624        &self.world_tx
625    }
626
627    /// Returns a reference to the world.
628    #[must_use]
629    pub fn world(&self) -> &Arc<RwLock<World>> {
630        &self.world
631    }
632
633    /// Runs the channel's event loop.
634    ///
635    /// This method consumes the runner and processes events until:
636    /// - The channel is completed or killed
637    /// - A Veto signal is received
638    /// - All event senders are dropped
639    ///
640    /// After the event loop exits, executes the shutdown sequence:
641    /// 1. Capture component snapshot (if supported)
642    /// 2. Call `component.shutdown()` for cleanup
643    /// 3. Return [`RunnerResult`] with snapshot
644    ///
645    /// If an initial snapshot was provided via
646    /// [`ChannelRunnerBuilder::with_initial_snapshot()`], it is restored
647    /// before `init()` is called.
648    #[tracing::instrument(skip_all, level = "info", fields(channel_id = %self.id))]
649    pub async fn run(mut self) -> RunnerResult {
650        info!("ChannelRunner started");
651
652        // Restore + Initialize component
653        {
654            let mut comp = self.component.lock().await;
655
656            // Restore from initial snapshot (session resume)
657            if let Some(snapshot) = self.initial_snapshot.take() {
658                match comp.restore(&snapshot) {
659                    Ok(()) => info!("restored component from initial snapshot"),
660                    Err(SnapshotError::NotSupported(_)) => {
661                        debug!("component does not support snapshot restore");
662                    }
663                    Err(e) => {
664                        warn!(error = %e, "failed to restore initial snapshot");
665                    }
666                }
667            }
668
669            // --- Pre-init hook ---
670            self.dispatch_hook(
671                orcs_hook::HookPoint::ComponentPreInit,
672                serde_json::json!({ "component": comp.id().fqn() }),
673            );
674
675            if let Err(e) = comp.init(&self.component_config) {
676                warn!(error = %e, "component init failed");
677            }
678
679            // --- Post-init hook ---
680            self.dispatch_hook(
681                orcs_hook::HookPoint::ComponentPostInit,
682                serde_json::json!({ "component": comp.id().fqn() }),
683            );
684        }
685
686        // Default fallback: overwritten by each break-point in the loop.
687        // The initial value is a safety net for edge cases (e.g. immediate break).
688        #[allow(unused_assignments)]
689        let mut exit_reason = ExitReason::Signal;
690        loop {
691            tokio::select! {
692                // Priority: signals > requests > events
693                biased;
694
695                signal = self.signal_rx.recv() => {
696                    match signal {
697                        Ok(sig) => {
698                            if !self.handle_signal(sig).await {
699                                exit_reason = ExitReason::Signal;
700                                break;
701                            }
702                        }
703                        Err(broadcast::error::RecvError::Closed) => {
704                            info!("signal channel closed");
705                            exit_reason = ExitReason::SignalChannelClosed;
706                            break;
707                        }
708                        Err(broadcast::error::RecvError::Lagged(n)) => {
709                            warn!(lagged = n, "signal receiver lagged");
710                        }
711                    }
712                }
713
714                // RPC requests from other Components (higher priority than events).
715                // When request_rx is None, recv_request() returns pending (never resolves),
716                // effectively disabling this branch in select!.
717                Some(envelope) = recv_request(&mut self.request_rx) => {
718                    self.handle_rpc_request(envelope).await;
719                }
720
721                event = self.event_rx.recv() => {
722                    match event {
723                        Some(evt) => {
724                            if !self.handle_event(evt).await {
725                                exit_reason = ExitReason::ComponentStopped;
726                                break;
727                            }
728                        }
729                        None => {
730                            info!("event channel closed");
731                            exit_reason = ExitReason::EventChannelClosed;
732                            break;
733                        }
734                    }
735                }
736            }
737
738            // Check if channel is still running
739            if !is_channel_active(&self.world, self.id).await {
740                debug!("channel no longer active");
741                exit_reason = ExitReason::ChannelInactive;
742                break;
743            }
744        }
745
746        // === Shutdown Sequence ===
747        let (component_fqn, snapshot) = {
748            let mut comp = self.component.lock().await;
749            let fqn = comp.id().fqn();
750
751            // 1. Capture snapshot
752            let snapshot = match comp.snapshot() {
753                Ok(s) => {
754                    debug!(component = %fqn, "captured shutdown snapshot");
755                    Some(s)
756                }
757                Err(SnapshotError::NotSupported(_)) => None,
758                Err(e) => {
759                    warn!(component = %fqn, error = %e, "snapshot failed during shutdown");
760                    None
761                }
762            };
763
764            // 2. Shutdown
765            self.dispatch_hook(
766                orcs_hook::HookPoint::ComponentPreShutdown,
767                serde_json::json!({ "component": &fqn }),
768            );
769
770            comp.shutdown();
771
772            self.dispatch_hook(
773                orcs_hook::HookPoint::ComponentPostShutdown,
774                serde_json::json!({ "component": &fqn }),
775            );
776            debug!(component = %fqn, "component shutdown complete");
777
778            (fqn, snapshot)
779        };
780
781        info!(exit_reason = %exit_reason, "ChannelRunner stopped");
782
783        RunnerResult {
784            channel_id: self.id,
785            component_fqn: Cow::Owned(component_fqn),
786            snapshot,
787            exit_reason,
788        }
789    }
790
791    /// Handles an incoming signal.
792    ///
793    /// Returns `false` if the runner should stop.
794    async fn handle_signal(&mut self, signal: Signal) -> bool {
795        debug!(signal_kind = ?signal.kind, "received signal");
796
797        // Check if signal affects this channel
798        if !signal.affects_channel(self.id) {
799            return true;
800        }
801
802        // --- Pre-dispatch hook ---
803        let pre_payload = serde_json::json!({
804            "signal_kind": format!("{:?}", signal.kind),
805            "signal_scope": format!("{:?}", signal.scope),
806        });
807        let pre_action = self.dispatch_hook(orcs_hook::HookPoint::SignalPreDispatch, pre_payload);
808        match pre_action {
809            orcs_hook::HookAction::Skip(_) => {
810                debug!("signal skipped by pre-dispatch hook");
811                return true;
812            }
813            orcs_hook::HookAction::Abort { reason } => {
814                warn!(reason = %reason, "signal aborted by pre-dispatch hook");
815                return true;
816            }
817            orcs_hook::HookAction::Continue(_) | orcs_hook::HookAction::Replace(_) => {}
818        }
819
820        // Propagate signal to spawned children
821        if let Some(spawner) = &self.child_spawner {
822            if let Ok(mut s) = spawner.lock() {
823                s.propagate_signal(&signal);
824            }
825        }
826
827        // Dispatch to component first
828        let component_action = dispatch_signal_to_component(&signal, &self.component).await;
829        if let SignalAction::Stop { reason } = component_action {
830            info!(reason = %reason, "component requested stop");
831            // Abort all children before stopping
832            self.abort_all_children();
833            send_abort(&self.world_tx, self.id, &reason).await;
834            return false;
835        }
836
837        // Determine channel-level action
838        let action = determine_channel_action(&signal.kind);
839        match action {
840            SignalAction::Stop { reason } => {
841                info!(reason = %reason, "stopping channel");
842                // Abort all children before stopping
843                self.abort_all_children();
844                send_abort(&self.world_tx, self.id, &reason).await;
845                return false;
846            }
847            SignalAction::Transition(transition) => {
848                let is_resolve = matches!(transition, StateTransition::ResolveApproval { .. });
849                let accepted = send_transition(&self.world_tx, self.id, transition.clone()).await;
850
851                // Drain paused queue on resume
852                if matches!(transition, StateTransition::Resume) {
853                    self.drain_paused_queue().await;
854                }
855
856                // On successful approval resolution, grant pattern and re-dispatch.
857                if is_resolve && accepted {
858                    self.handle_approval_resolved().await;
859                }
860            }
861            SignalAction::Continue => {
862                // Handle Reject: clear pending approval and notify user.
863                if let orcs_event::SignalKind::Reject {
864                    approval_id,
865                    reason,
866                } = &signal.kind
867                {
868                    if let Some(pending) = &self.pending_approval {
869                        if pending.approval_id == *approval_id {
870                            info!(
871                                approval_id = %approval_id,
872                                "approval rejected, clearing pending"
873                            );
874                            // Abort the channel to exit AwaitingApproval.
875                            send_abort(
876                                &self.world_tx,
877                                self.id,
878                                reason.as_deref().unwrap_or("rejected by user"),
879                            )
880                            .await;
881                            self.pending_approval = None;
882
883                            // Notify user.
884                            if let Some(io_tx) = &self.io_output_tx {
885                                let event = Event {
886                                    category: EventCategory::Output,
887                                    operation: "output".to_string(),
888                                    source: self.component_id.clone(),
889                                    payload: serde_json::json!({
890                                        "message": format!(
891                                            "Rejected: {}",
892                                            reason.as_deref().unwrap_or("no reason")
893                                        ),
894                                        "level": "warn",
895                                    }),
896                                };
897                                let _ = io_tx.try_send_direct(event);
898                            }
899                        }
900                    }
901                }
902            }
903        }
904
905        // --- Post-dispatch hook ---
906        let post_payload = serde_json::json!({
907            "signal_kind": format!("{:?}", signal.kind),
908            "handled": true,
909        });
910        let _post_action =
911            self.dispatch_hook(orcs_hook::HookPoint::SignalPostDispatch, post_payload);
912
913        true
914    }
915
916    /// Handles post-approval: grants the permission pattern and re-dispatches the request.
917    ///
918    /// Called after a `ResolveApproval` transition succeeds (channel returns to Running).
919    /// Consumes the stored [`PendingApproval`] and:
920    /// 1. Grants the `grant_pattern` via the shared `GrantPolicy`
921    /// 2. Re-dispatches the original request to the Component (which will now pass permission check)
922    /// 3. Sends `ShowApproved` notification to the user
923    async fn handle_approval_resolved(&mut self) {
924        let pending = match self.pending_approval.take() {
925            Some(p) => p,
926            None => {
927                debug!("ResolveApproval accepted but no pending approval stored");
928                return;
929            }
930        };
931
932        info!(
933            approval_id = %pending.approval_id,
934            grant_pattern = %pending.grant_pattern,
935            "approval resolved, granting pattern and re-dispatching"
936        );
937
938        // 1. Grant the permission pattern.
939        if let Some(grants) = &self.grants {
940            if let Err(e) =
941                grants.grant(orcs_auth::CommandGrant::persistent(&pending.grant_pattern))
942            {
943                warn!(
944                    error = %e,
945                    pattern = %pending.grant_pattern,
946                    "failed to grant pattern after approval"
947                );
948            }
949        } else {
950            warn!("no GrantPolicy configured, cannot grant pattern");
951        }
952
953        // 2. Notify user that the approval was accepted (before re-dispatch so
954        //    the confirmation appears before any output from the re-dispatched request).
955        if let Some(io_tx) = &self.io_output_tx {
956            let event = Event {
957                category: EventCategory::Output,
958                operation: "output".to_string(),
959                source: self.component_id.clone(),
960                payload: serde_json::json!({
961                    "message": format!("Approved: {}", pending.approval_id),
962                    "level": "info",
963                }),
964            };
965            let _ = io_tx.try_send_direct(event);
966        }
967
968        // 3. Re-dispatch the original request.
969        let result = {
970            let mut comp = self.component.lock().await;
971            comp.on_request(&pending.original_request)
972        };
973
974        match result {
975            Ok(response) => {
976                debug!(response = ?response, "re-dispatched request succeeded after approval");
977            }
978            Err(ComponentError::Suspended {
979                approval_id: new_approval_id,
980                grant_pattern: new_grant_pattern,
981                pending_request: new_pending_request,
982            }) => {
983                // Cascading approval: re-dispatch hit a different permission gate.
984                // Enter a new approval cycle with the same original request so that
985                // accumulated grants will all be in effect on the next re-dispatch.
986                info!(
987                    approval_id = %new_approval_id,
988                    grant_pattern = %new_grant_pattern,
989                    "re-dispatch triggered cascading approval"
990                );
991
992                self.pending_approval = Some(PendingApproval {
993                    approval_id: new_approval_id.clone(),
994                    grant_pattern: new_grant_pattern,
995                    original_request: pending.original_request,
996                });
997
998                send_transition(
999                    &self.world_tx,
1000                    self.id,
1001                    StateTransition::AwaitApproval {
1002                        request_id: new_approval_id.clone(),
1003                    },
1004                )
1005                .await;
1006
1007                let description = new_pending_request
1008                    .get("description")
1009                    .and_then(|v| v.as_str())
1010                    .unwrap_or("command execution")
1011                    .to_string();
1012                let command = new_pending_request
1013                    .get("command")
1014                    .and_then(|v| v.as_str())
1015                    .unwrap_or("")
1016                    .to_string();
1017
1018                if let Some(io_tx) = &self.io_output_tx {
1019                    let output_event = Event {
1020                        category: EventCategory::Output,
1021                        operation: "approval_request".to_string(),
1022                        source: self.component_id.clone(),
1023                        payload: serde_json::json!({
1024                            "type": "approval_request",
1025                            "approval_id": new_approval_id,
1026                            "operation": "exec",
1027                            "description": format!("{}: {}", description, command),
1028                            "source": self.component_id.fqn(),
1029                        }),
1030                    };
1031                    let _ = io_tx.try_send_direct(output_event);
1032                }
1033            }
1034            Err(e) => {
1035                warn!(error = %e, "re-dispatched request failed after approval");
1036            }
1037        }
1038    }
1039
1040    /// Aborts all spawned children.
1041    fn abort_all_children(&self) {
1042        if let Some(spawner) = &self.child_spawner {
1043            if let Ok(mut s) = spawner.lock() {
1044                s.abort_all();
1045                debug!("aborted all children");
1046            }
1047        }
1048    }
1049
1050    /// Handles an incoming event.
1051    ///
1052    /// Subscription filter is applied first (for broadcast events),
1053    /// then paused events are queued for later processing.
1054    /// Direct events bypass the subscription filter entirely.
1055    async fn handle_event(&mut self, inbound: InboundEvent) -> bool {
1056        let is_direct = inbound.is_direct();
1057        let event = inbound.into_event();
1058
1059        debug!(
1060            category = ?event.category,
1061            operation = %event.operation,
1062            direct = is_direct,
1063            "received event"
1064        );
1065
1066        // Subscription filter first: drop broadcast events we don't subscribe to.
1067        // This runs BEFORE the pause check so unsubscribed events never enter the queue.
1068        // Checks both category AND operation (if operation filter is set).
1069        if !is_direct
1070            && !self
1071                .subscriptions
1072                .iter()
1073                .any(|s| s.matches(&event.category, &event.operation))
1074        {
1075            debug!(category = ?event.category, operation = %event.operation, "skipping event (not subscribed)");
1076            return true;
1077        }
1078
1079        // Queue events while paused
1080        if is_channel_paused(&self.world, self.id).await {
1081            self.paused_queue
1082                .try_enqueue(event, "ChannelRunner", self.id);
1083            return true;
1084        }
1085
1086        self.process_event(event, is_direct).await;
1087        true
1088    }
1089
1090    /// Processes a single event by delivering it to the Component.
1091    ///
1092    /// Subscription filtering is already applied by `handle_event()` before
1093    /// this method is called. Direct events and paused-queue drains bypass
1094    /// the filter by design.
1095    async fn process_event(&mut self, event: Event, _is_direct: bool) {
1096        // --- Pre-dispatch hook ---
1097        let pre_payload = serde_json::json!({
1098            "category": format!("{:?}", event.category),
1099            "operation": &event.operation,
1100            "source": event.source.fqn(),
1101            "payload": &event.payload,
1102        });
1103        let pre_action = self.dispatch_hook(orcs_hook::HookPoint::RequestPreDispatch, pre_payload);
1104        let request_payload = match pre_action {
1105            orcs_hook::HookAction::Continue(ctx) => {
1106                // Use potentially modified payload from hook chain
1107                ctx.payload.get("payload").cloned().unwrap_or(event.payload)
1108            }
1109            orcs_hook::HookAction::Skip(value) => {
1110                debug!(value = ?value, "request skipped by pre-dispatch hook");
1111                return;
1112            }
1113            orcs_hook::HookAction::Abort { reason } => {
1114                warn!(reason = %reason, "request aborted by pre-dispatch hook");
1115                return;
1116            }
1117            orcs_hook::HookAction::Replace(_) => {
1118                // Replace is invalid for pre-hooks (registry already warns)
1119                event.payload
1120            }
1121        };
1122
1123        // Capture before move into Request::new.
1124        let is_user_input = event.category == EventCategory::UserInput;
1125        let event_operation = event.operation.clone();
1126
1127        let request = Request::new(
1128            event.category,
1129            &event_operation,
1130            event.source,
1131            self.id,
1132            request_payload,
1133        );
1134
1135        // Notify user that processing has started (UserInput only).
1136        if is_user_input {
1137            if let Some(io_tx) = &self.io_output_tx {
1138                let notify = Event {
1139                    category: EventCategory::Output,
1140                    operation: "processing".to_string(),
1141                    source: self.component_id.clone(),
1142                    payload: serde_json::json!({
1143                        "type": "processing",
1144                        "component": &self.component_id.name,
1145                        "operation": &event_operation,
1146                    }),
1147                };
1148                let _ = io_tx.try_send_direct(notify);
1149            }
1150        }
1151
1152        let result = {
1153            let mut comp = self.component.lock().await;
1154            comp.on_request(&request)
1155        };
1156
1157        // --- Post-dispatch hook ---
1158        let post_payload = match &result {
1159            Ok(response) => serde_json::json!({
1160                "operation": &event_operation,
1161                "response": response,
1162                "success": true,
1163            }),
1164            Err(e) => serde_json::json!({
1165                "operation": &event_operation,
1166                "error": e.to_string(),
1167                "success": false,
1168            }),
1169        };
1170        let _post_action =
1171            self.dispatch_hook(orcs_hook::HookPoint::RequestPostDispatch, post_payload);
1172
1173        match result {
1174            Ok(response) => {
1175                debug!(response = ?response, "component returned success");
1176            }
1177            Err(ComponentError::Suspended {
1178                approval_id,
1179                grant_pattern,
1180                pending_request,
1181            }) => {
1182                info!(
1183                    approval_id = %approval_id,
1184                    grant_pattern = %grant_pattern,
1185                    "component suspended pending approval"
1186                );
1187
1188                // Store the pending approval for re-dispatch after approval.
1189                self.pending_approval = Some(PendingApproval {
1190                    approval_id: approval_id.clone(),
1191                    grant_pattern,
1192                    original_request: request,
1193                });
1194
1195                // Transition channel to AwaitingApproval state.
1196                send_transition(
1197                    &self.world_tx,
1198                    self.id,
1199                    StateTransition::AwaitApproval {
1200                        request_id: approval_id.clone(),
1201                    },
1202                )
1203                .await;
1204
1205                // Notify user via IOOutput (ClientRunner → IOBridge → console).
1206                let description = pending_request
1207                    .get("description")
1208                    .and_then(|v| v.as_str())
1209                    .unwrap_or("command execution")
1210                    .to_string();
1211                let command = pending_request
1212                    .get("command")
1213                    .and_then(|v| v.as_str())
1214                    .unwrap_or("")
1215                    .to_string();
1216
1217                let output_event = Event {
1218                    category: EventCategory::Output,
1219                    operation: "approval_request".to_string(),
1220                    source: self.component_id.clone(),
1221                    payload: serde_json::json!({
1222                        "type": "approval_request",
1223                        "approval_id": approval_id,
1224                        "operation": "exec",
1225                        "description": format!("{}: {}", description, command),
1226                        "source": self.component_id.fqn(),
1227                    }),
1228                };
1229                if let Some(io_tx) = &self.io_output_tx {
1230                    let _ = io_tx.try_send_direct(output_event);
1231                }
1232            }
1233            Err(ComponentError::Aborted) => {
1234                info!("component aborted");
1235            }
1236            Err(e) => {
1237                error!(error = %e, "component returned error");
1238            }
1239        }
1240    }
1241
1242    /// Drains the paused queue and processes all queued events.
1243    ///
1244    /// Queued events have already passed the subscription filter in
1245    /// `handle_event()`, so they are processed without re-filtering.
1246    async fn drain_paused_queue(&mut self) {
1247        // Collect events first to avoid borrow issues with async process_event
1248        let events: Vec<_> = self.paused_queue.drain("ChannelRunner", self.id).collect();
1249
1250        for event in events {
1251            self.process_event(event, true).await;
1252        }
1253    }
1254
1255    /// Handles an incoming RPC request from another Component.
1256    ///
1257    /// Calls `Component::on_request()` and sends the result back through
1258    /// the envelope's `reply_tx`.
1259    async fn handle_rpc_request(&self, envelope: RequestEnvelope) {
1260        debug!(
1261            request_id = %envelope.request.id,
1262            operation = %envelope.request.operation,
1263            source = %envelope.request.source,
1264            "handling RPC request"
1265        );
1266
1267        // --- Pre-dispatch hook ---
1268        let pre_payload = serde_json::json!({
1269            "request_id": envelope.request.id.to_string(),
1270            "operation": &envelope.request.operation,
1271            "source": envelope.request.source.fqn(),
1272            "payload": &envelope.request.payload,
1273        });
1274        let pre_action = self.dispatch_hook(orcs_hook::HookPoint::RequestPreDispatch, pre_payload);
1275        match &pre_action {
1276            orcs_hook::HookAction::Skip(value) => {
1277                debug!(value = ?value, "RPC request skipped by pre-dispatch hook");
1278                let response = Ok(value.clone());
1279                if envelope.reply_tx.send(response).is_err() {
1280                    debug!("RPC reply dropped (caller cancelled)");
1281                }
1282                return;
1283            }
1284            orcs_hook::HookAction::Abort { reason } => {
1285                warn!(reason = %reason, "RPC request aborted by pre-dispatch hook");
1286                let response = Err(reason.clone());
1287                if envelope.reply_tx.send(response).is_err() {
1288                    debug!("RPC reply dropped (caller cancelled)");
1289                }
1290                return;
1291            }
1292            orcs_hook::HookAction::Continue(_) | orcs_hook::HookAction::Replace(_) => {}
1293        }
1294
1295        let result = {
1296            let mut comp = self.component.lock().await;
1297            comp.on_request(&envelope.request)
1298        };
1299
1300        // --- Post-dispatch hook ---
1301        let post_payload = match &result {
1302            Ok(response) => serde_json::json!({
1303                "operation": &envelope.request.operation,
1304                "response": response,
1305                "success": true,
1306            }),
1307            Err(e) => serde_json::json!({
1308                "operation": &envelope.request.operation,
1309                "error": e.to_string(),
1310                "success": false,
1311            }),
1312        };
1313        let post_action =
1314            self.dispatch_hook(orcs_hook::HookPoint::RequestPostDispatch, post_payload);
1315
1316        // Apply Replace from post-hook if present
1317        let final_result = match post_action {
1318            orcs_hook::HookAction::Replace(value) => Ok(value),
1319            _ => result.map_err(|e| e.to_string()),
1320        };
1321
1322        if envelope.reply_tx.send(final_result).is_err() {
1323            debug!("RPC reply dropped (caller cancelled)");
1324        }
1325    }
1326
1327    /// Spawns a child channel with a bound Component.
1328    ///
1329    /// Returns the new channel's ID and handle, or None if spawn failed.
1330    pub async fn spawn_child(
1331        &self,
1332        config: ChannelConfig,
1333        signal_rx: broadcast::Receiver<Signal>,
1334        component: Box<dyn Component>,
1335    ) -> Option<(ChannelRunner, ChannelHandle)> {
1336        let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
1337        let cmd = WorldCommand::Spawn {
1338            parent: self.id,
1339            config,
1340            reply: reply_tx,
1341        };
1342
1343        if self.world_tx.send(cmd).await.is_err() {
1344            return None;
1345        }
1346
1347        let child_id = reply_rx.await.ok()??;
1348        let (runner, handle) = ChannelRunner::builder(
1349            child_id,
1350            self.world_tx.clone(),
1351            Arc::clone(&self.world),
1352            signal_rx,
1353            component,
1354        )
1355        .build();
1356
1357        Some((runner, handle))
1358    }
1359}
1360
1361/// Builder for creating [`ChannelRunner`] instances.
1362///
1363/// This consolidates the various constructor patterns into a fluent API:
1364///
1365/// ```ignore
1366/// // Basic runner (no emitter, no child spawner)
1367/// let (runner, handle) = ChannelRunnerBuilder::new(id, world_tx, world, signal_rx, component)
1368///     .build();
1369///
1370/// // With emitter only
1371/// let (runner, handle) = ChannelRunnerBuilder::new(id, world_tx, world, signal_rx, component)
1372///     .with_emitter(signal_tx)
1373///     .build();
1374///
1375/// // With child spawner only
1376/// let (runner, handle) = ChannelRunnerBuilder::new(id, world_tx, world, signal_rx, component)
1377///     .with_child_spawner(signal_tx)
1378///     .build();
1379///
1380/// // With both emitter and child spawner
1381/// let (runner, handle) = ChannelRunnerBuilder::new(id, world_tx, world, signal_rx, component)
1382///     .with_emitter(signal_tx.clone())
1383///     .with_child_spawner(signal_tx)
1384///     .build();
1385/// ```
1386pub struct ChannelRunnerBuilder {
1387    id: ChannelId,
1388    world_tx: mpsc::Sender<WorldCommand>,
1389    world: Arc<RwLock<World>>,
1390    signal_rx: broadcast::Receiver<Signal>,
1391    component: Box<dyn Component>,
1392    /// Signal sender for emitter (if enabled).
1393    emitter_signal_tx: Option<broadcast::Sender<Signal>>,
1394    /// Output channel for routing Output events to IO channel.
1395    output_tx: Option<OutputSender>,
1396    /// Enable child spawner.
1397    enable_child_spawner: bool,
1398    /// Lua child loader for spawning Lua children.
1399    lua_loader: Option<Arc<dyn LuaChildLoader>>,
1400    /// Component loader for spawning components as runners.
1401    component_loader: Option<Arc<dyn ComponentLoader>>,
1402    /// Session for permission checking (identity + privilege level).
1403    session: Option<Arc<Session>>,
1404    /// Permission checker policy.
1405    checker: Option<Arc<dyn PermissionChecker>>,
1406    /// Dynamic command grants.
1407    grants: Option<Arc<dyn orcs_auth::GrantPolicy>>,
1408    /// Shared channel handles for event broadcasting via Emitter.
1409    shared_handles: Option<SharedChannelHandles>,
1410    /// Shared Board for auto-recording emitted events.
1411    board: Option<crate::board::SharedBoard>,
1412    /// Initial snapshot to restore before init (session resume).
1413    initial_snapshot: Option<ComponentSnapshot>,
1414    /// Enable request channel for Component-to-Component RPC.
1415    enable_request_channel: bool,
1416    /// Shared ComponentId → ChannelId mapping for RPC routing via Emitter.
1417    component_channel_map: Option<crate::engine::SharedComponentChannelMap>,
1418    /// Shared hook registry for lifecycle hook dispatch.
1419    hook_registry: Option<SharedHookRegistry>,
1420    /// Shared MCP client manager for MCP tool dispatch.
1421    mcp_manager: Option<Arc<orcs_mcp::McpClientManager>>,
1422    /// Per-component configuration from config file.
1423    component_config: serde_json::Value,
1424}
1425
1426impl ChannelRunnerBuilder {
1427    /// Creates a new builder with required parameters.
1428    #[must_use]
1429    pub fn new(
1430        id: ChannelId,
1431        world_tx: mpsc::Sender<WorldCommand>,
1432        world: Arc<RwLock<World>>,
1433        signal_rx: broadcast::Receiver<Signal>,
1434        component: Box<dyn Component>,
1435    ) -> Self {
1436        Self {
1437            id,
1438            world_tx,
1439            world,
1440            signal_rx,
1441            component,
1442            emitter_signal_tx: None,
1443            output_tx: None,
1444            enable_child_spawner: false,
1445            lua_loader: None,
1446            component_loader: None,
1447            session: None,
1448            checker: None,
1449            grants: None,
1450            shared_handles: None,
1451            board: None,
1452            initial_snapshot: None,
1453            enable_request_channel: false,
1454            component_channel_map: None,
1455            hook_registry: None,
1456            mcp_manager: None,
1457            component_config: serde_json::Value::Object(serde_json::Map::new()),
1458        }
1459    }
1460
1461    /// Enables the EventEmitter for this runner.
1462    ///
1463    /// The emitter allows the Component to emit output events via `orcs.output()`.
1464    #[must_use]
1465    pub fn with_emitter(mut self, signal_tx: broadcast::Sender<Signal>) -> Self {
1466        self.emitter_signal_tx = Some(signal_tx);
1467        self
1468    }
1469
1470    /// Sets the output channel for routing Output events.
1471    ///
1472    /// When set, the Component's `emit_output()` calls will send events
1473    /// to this channel instead of the runner's own event channel.
1474    /// This enables ChannelRunner components to display output via
1475    /// ClientRunner's IOBridge.
1476    ///
1477    /// # Arguments
1478    ///
1479    /// * `output_tx` - Sender for the IO channel's event_rx
1480    #[must_use]
1481    pub fn with_output_channel(mut self, output_tx: OutputSender) -> Self {
1482        self.output_tx = Some(output_tx);
1483        self
1484    }
1485
1486    /// Enables the ChildSpawner for this runner.
1487    ///
1488    /// This allows the Component and its Children to spawn sub-children
1489    /// via ChildContext.
1490    ///
1491    /// # Arguments
1492    ///
1493    /// * `loader` - Optional Lua child loader for spawning Lua children
1494    #[must_use]
1495    pub fn with_child_spawner(mut self, loader: Option<Arc<dyn LuaChildLoader>>) -> Self {
1496        self.enable_child_spawner = true;
1497        self.lua_loader = loader;
1498        self
1499    }
1500
1501    /// Sets the component loader for spawning components as runners.
1502    ///
1503    /// This enables the ChildContext::spawn_runner_from_script() functionality.
1504    ///
1505    /// # Arguments
1506    ///
1507    /// * `loader` - Component loader for creating components from scripts
1508    #[must_use]
1509    pub fn with_component_loader(mut self, loader: Arc<dyn ComponentLoader>) -> Self {
1510        self.component_loader = Some(loader);
1511        self
1512    }
1513
1514    /// Sets the session for permission checking.
1515    ///
1516    /// Takes ownership of a Session and wraps it in Arc.
1517    /// For sharing a Session across multiple runners, use [`Self::with_session_arc`].
1518    ///
1519    /// When set, operations like `orcs.exec()`, `spawn_child()`, and `spawn_runner()`
1520    /// will be checked against the session's privilege level.
1521    ///
1522    /// # Arguments
1523    ///
1524    /// * `session` - Session to use for permission checks
1525    #[must_use]
1526    pub fn with_session(mut self, session: Session) -> Self {
1527        self.session = Some(Arc::new(session));
1528        self
1529    }
1530
1531    /// Sets the session for permission checking (Arc version).
1532    ///
1533    /// Use this when sharing a Session across multiple runners,
1534    /// so that dynamic grants (via HIL) are shared.
1535    ///
1536    /// # Arguments
1537    ///
1538    /// * `session` - Arc-wrapped Session for shared access
1539    #[must_use]
1540    pub fn with_session_arc(mut self, session: Arc<Session>) -> Self {
1541        self.session = Some(session);
1542        self
1543    }
1544
1545    /// Sets the permission checker policy.
1546    ///
1547    /// # Arguments
1548    ///
1549    /// * `checker` - Permission checker implementation
1550    #[must_use]
1551    pub fn with_checker(mut self, checker: Arc<dyn PermissionChecker>) -> Self {
1552        self.checker = Some(checker);
1553        self
1554    }
1555
1556    /// Sets the dynamic command grant store.
1557    ///
1558    /// # Arguments
1559    ///
1560    /// * `grants` - Grant policy implementation for dynamic command permissions
1561    #[must_use]
1562    pub fn with_grants(mut self, grants: Arc<dyn orcs_auth::GrantPolicy>) -> Self {
1563        self.grants = Some(grants);
1564        self
1565    }
1566
1567    /// Sets the shared channel handles for event broadcasting.
1568    ///
1569    /// When set, the EventEmitter's `emit_event()` will broadcast
1570    /// Extension events to all registered channels.
1571    #[must_use]
1572    pub fn with_shared_handles(mut self, handles: SharedChannelHandles) -> Self {
1573        self.shared_handles = Some(handles);
1574        self
1575    }
1576
1577    /// Sets the shared component-to-channel mapping for RPC routing.
1578    ///
1579    /// When set, the EventEmitter's `request()` can resolve target
1580    /// ComponentId to ChannelId and route RPC via ChannelHandle.
1581    #[must_use]
1582    pub fn with_component_channel_map(
1583        mut self,
1584        map: crate::engine::SharedComponentChannelMap,
1585    ) -> Self {
1586        self.component_channel_map = Some(map);
1587        self
1588    }
1589
1590    /// Sets the shared Board for auto-recording emitted events.
1591    ///
1592    /// When set, the EventEmitter will automatically append entries
1593    /// to the Board on `emit_output()` and `emit_event()`.
1594    #[must_use]
1595    pub fn with_board(mut self, board: crate::board::SharedBoard) -> Self {
1596        self.board = Some(board);
1597        self
1598    }
1599
1600    /// Sets the initial snapshot to restore before `init()`.
1601    ///
1602    /// When set, the Component's `restore()` method is called with
1603    /// this snapshot before `init()` during `run()`. Used for
1604    /// session resume.
1605    ///
1606    /// # Arguments
1607    ///
1608    /// * `snapshot` - Snapshot to restore from
1609    #[must_use]
1610    pub fn with_initial_snapshot(mut self, snapshot: ComponentSnapshot) -> Self {
1611        self.initial_snapshot = Some(snapshot);
1612        self
1613    }
1614
1615    /// Enables the request channel for Component-to-Component RPC.
1616    ///
1617    /// When enabled, the runner's [`ChannelHandle`] will accept
1618    /// `RequestEnvelope`s via `send_request()`, allowing other
1619    /// Components to call this Component's `on_request()` and receive
1620    /// a response.
1621    #[must_use]
1622    pub fn with_request_channel(mut self) -> Self {
1623        self.enable_request_channel = true;
1624        self
1625    }
1626
1627    /// Sets the shared hook registry for lifecycle hook dispatch.
1628    ///
1629    /// When set, the runner will dispatch hooks at lifecycle points
1630    /// (e.g., pre/post request, signal, component init/shutdown).
1631    #[must_use]
1632    pub fn with_hook_registry(mut self, registry: SharedHookRegistry) -> Self {
1633        self.hook_registry = Some(registry);
1634        self
1635    }
1636
1637    /// Sets the shared MCP client manager.
1638    ///
1639    /// When set, the runner propagates the manager to ChildContextImpl
1640    /// so that MCP tools can be dispatched from Lua components.
1641    #[must_use]
1642    pub fn with_mcp_manager(mut self, manager: Arc<orcs_mcp::McpClientManager>) -> Self {
1643        self.mcp_manager = Some(manager);
1644        self
1645    }
1646
1647    /// Sets per-component configuration passed to `Component::init()`.
1648    ///
1649    /// The value comes from `[components.settings.<name>]` in the config file.
1650    #[must_use]
1651    pub fn with_component_config(mut self, config: serde_json::Value) -> Self {
1652        self.component_config = config;
1653        self
1654    }
1655
1656    /// Configures auth (session/checker/grants) and IO output routing on a [`ChildContextImpl`].
1657    ///
1658    /// Extracted from `build()` to eliminate duplication between the child-spawner
1659    /// branch and the auth-only branch.
1660    fn configure_context(
1661        &mut self,
1662        mut ctx: ChildContextImpl,
1663        io_output_tx: &Option<OutputSender>,
1664        component_id: &str,
1665        rpc_handles: &Option<SharedChannelHandles>,
1666        rpc_map: &Option<crate::engine::SharedComponentChannelMap>,
1667        channel_id: ChannelId,
1668    ) -> ChildContextImpl {
1669        if let Some(session) = self.session.take() {
1670            ctx = ctx.with_session_arc(session);
1671            info!("ChannelRunnerBuilder: enabled session for {}", component_id);
1672        }
1673        if let Some(checker) = self.checker.take() {
1674            ctx = ctx.with_checker(checker);
1675            info!(
1676                "ChannelRunnerBuilder: enabled permission checker for {}",
1677                component_id
1678            );
1679        }
1680        if let Some(grants) = &self.grants {
1681            ctx = ctx.with_grants(Arc::clone(grants));
1682            info!(
1683                "ChannelRunnerBuilder: enabled grant store for {}",
1684                component_id
1685            );
1686        }
1687        if let Some(io_tx) = io_output_tx.clone() {
1688            ctx = ctx.with_io_output_channel(io_tx);
1689            info!(
1690                "ChannelRunnerBuilder: enabled IO output routing for {}",
1691                component_id
1692            );
1693        }
1694        if let (Some(handles), Some(map)) = (rpc_handles.clone(), rpc_map.clone()) {
1695            ctx = ctx.with_rpc_support(handles, map, channel_id);
1696        }
1697        if let Some(reg) = &self.hook_registry {
1698            ctx = ctx.with_hook_registry(Arc::clone(reg));
1699        }
1700        if let Some(mgr) = &self.mcp_manager {
1701            ctx = ctx.with_mcp_manager(Arc::clone(mgr));
1702        }
1703        ctx
1704    }
1705
1706    /// Builds the ChannelRunner and returns it with a ChannelHandle.
1707    #[must_use]
1708    pub fn build(mut self) -> (ChannelRunner, ChannelHandle) {
1709        let (event_tx, event_rx) = mpsc::channel(EVENT_BUFFER_SIZE);
1710
1711        // Cache component ID before moving component into Arc
1712        let component_id = self.component.id().clone();
1713
1714        // Clone output_tx for ChildContext IO routing (before emitter takes it)
1715        let io_output_tx = self.output_tx.as_ref().cloned();
1716
1717        // Clone RPC resources for ChildContext (before emitter takes them)
1718        let rpc_handles = self.shared_handles.clone();
1719        let rpc_map = self.component_channel_map.clone();
1720
1721        // Set up emitter if enabled
1722        if let Some(signal_tx) = &self.emitter_signal_tx {
1723            let component_id = self.component.id().clone();
1724            let mut emitter = EventEmitter::new(
1725                OutputSender::new(event_tx.clone()),
1726                signal_tx.clone(),
1727                component_id.clone(),
1728            );
1729
1730            // Route Output events to IO channel if configured
1731            if let Some(output_tx) = self.output_tx.take() {
1732                emitter = emitter.with_output_channel(output_tx);
1733                info!(
1734                    "ChannelRunnerBuilder: routing output to IO channel for {}",
1735                    component_id.fqn()
1736                );
1737            }
1738
1739            // Enable event broadcasting if shared handles are provided
1740            if let Some(handles) = self.shared_handles.take() {
1741                emitter = emitter.with_shared_handles(handles);
1742                info!(
1743                    "ChannelRunnerBuilder: enabled event broadcast for {}",
1744                    component_id.fqn()
1745                );
1746            }
1747
1748            // Enable RPC routing if component channel map is provided
1749            if let Some(map) = self.component_channel_map.take() {
1750                emitter = emitter.with_component_channel_map(map, self.id);
1751            }
1752
1753            // Attach Board for auto-recording
1754            if let Some(board) = self.board.take() {
1755                emitter = emitter.with_board(board);
1756            }
1757
1758            self.component.set_emitter(Box::new(emitter));
1759
1760            info!(
1761                "ChannelRunnerBuilder: injected emitter for {}",
1762                component_id.fqn()
1763            );
1764        }
1765
1766        // Set up child spawner if enabled
1767        let child_spawner = if self.enable_child_spawner {
1768            let component_id = self.component.id().fqn();
1769            let output_sender = OutputSender::new(event_tx.clone());
1770            let spawner = ChildSpawner::new(&component_id, output_sender.clone());
1771            let spawner_arc = Arc::new(StdMutex::new(spawner));
1772
1773            // Create ChildContext and inject into Component
1774            let mut ctx =
1775                ChildContextImpl::new(&component_id, output_sender, Arc::clone(&spawner_arc));
1776
1777            // Add Lua loader if provided
1778            if let Some(loader) = self.lua_loader.take() {
1779                ctx = ctx.with_lua_loader(loader);
1780                info!(
1781                    "ChannelRunnerBuilder: created spawner with Lua loader for {}",
1782                    component_id
1783                );
1784            } else {
1785                info!(
1786                    "ChannelRunnerBuilder: created spawner (no Lua loader) for {}",
1787                    component_id
1788                );
1789            }
1790
1791            // Enable runner spawning if signal emitter is available
1792            if let Some(signal_tx) = &self.emitter_signal_tx {
1793                ctx = ctx.with_runner_support(
1794                    self.world_tx.clone(),
1795                    Arc::clone(&self.world),
1796                    signal_tx.clone(),
1797                );
1798                info!(
1799                    "ChannelRunnerBuilder: enabled runner spawning for {}",
1800                    component_id
1801                );
1802            }
1803
1804            // Add component loader for spawn_runner_from_script
1805            if let Some(loader) = self.component_loader.take() {
1806                ctx = ctx.with_component_loader(loader);
1807                info!(
1808                    "ChannelRunnerBuilder: enabled component loader for {}",
1809                    component_id
1810                );
1811            }
1812
1813            // Add session, checker, grants, and IO output routing
1814            ctx = self.configure_context(
1815                ctx,
1816                &io_output_tx,
1817                &component_id,
1818                &rpc_handles,
1819                &rpc_map,
1820                self.id,
1821            );
1822
1823            self.component.set_child_context(Box::new(ctx));
1824
1825            Some(spawner_arc)
1826        } else if self.session.is_some() || self.checker.is_some() || self.grants.is_some() {
1827            // No child spawner, but auth context is needed for permission-checked orcs.exec()
1828            let component_id = self.component.id().fqn();
1829            let dummy_output = OutputSender::new(event_tx.clone());
1830            let dummy_spawner = ChildSpawner::new(&component_id, dummy_output.clone());
1831            let dummy_arc = Arc::new(StdMutex::new(dummy_spawner));
1832            let mut ctx =
1833                ChildContextImpl::new(&component_id, dummy_output, Arc::clone(&dummy_arc));
1834
1835            ctx = self.configure_context(
1836                ctx,
1837                &io_output_tx,
1838                &component_id,
1839                &rpc_handles,
1840                &rpc_map,
1841                self.id,
1842            );
1843
1844            self.component.set_child_context(Box::new(ctx));
1845            info!(
1846                "ChannelRunnerBuilder: auth-only context injected for {}",
1847                component_id
1848            );
1849            None
1850        } else {
1851            None
1852        };
1853
1854        // Determine if we need to keep event_tx for child context
1855        let event_tx_for_context = if self.enable_child_spawner || self.emitter_signal_tx.is_some()
1856        {
1857            Some(event_tx.clone())
1858        } else {
1859            None
1860        };
1861
1862        // Cache subscription entries from Component to avoid locking on every event.
1863        // Uses subscription_entries() which includes operation-level filtering.
1864        let subscriptions = self.component.subscription_entries();
1865
1866        // Create request channel if enabled
1867        let (request_tx, request_rx) = if self.enable_request_channel {
1868            let (tx, rx) = mpsc::channel(REQUEST_BUFFER_SIZE);
1869            (Some(tx), Some(rx))
1870        } else {
1871            (None, None)
1872        };
1873
1874        let runner = ChannelRunner {
1875            id: self.id,
1876            component_id,
1877            event_rx,
1878            signal_rx: self.signal_rx,
1879            world_tx: self.world_tx,
1880            world: self.world,
1881            component: Arc::new(Mutex::new(self.component)),
1882            subscriptions,
1883            paused_queue: PausedEventQueue::new(),
1884            child_spawner,
1885            event_tx: event_tx_for_context,
1886            request_rx,
1887            initial_snapshot: self.initial_snapshot,
1888            shared_handles: rpc_handles,
1889            component_channel_map: rpc_map,
1890            hook_registry: self.hook_registry,
1891            component_config: self.component_config,
1892            grants: self.grants.clone(),
1893            io_output_tx: io_output_tx.clone(),
1894            pending_approval: None,
1895        };
1896
1897        let mut handle = ChannelHandle::new(self.id, event_tx);
1898        handle.request_tx = request_tx;
1899
1900        (runner, handle)
1901    }
1902}
1903
1904impl ChannelRunner {
1905    /// Creates a new builder for constructing a ChannelRunner.
1906    ///
1907    /// This is the recommended way to create runners with optional features.
1908    ///
1909    /// # Example
1910    ///
1911    /// ```ignore
1912    /// let (runner, handle) = ChannelRunner::builder(id, world_tx, world, signal_rx, component)
1913    ///     .with_emitter(signal_tx)
1914    ///     .with_child_spawner(None)
1915    ///     .build();
1916    /// ```
1917    #[must_use]
1918    pub fn builder(
1919        id: ChannelId,
1920        world_tx: mpsc::Sender<WorldCommand>,
1921        world: Arc<RwLock<World>>,
1922        signal_rx: broadcast::Receiver<Signal>,
1923        component: Box<dyn Component>,
1924    ) -> ChannelRunnerBuilder {
1925        ChannelRunnerBuilder::new(id, world_tx, world, signal_rx, component)
1926    }
1927}
1928
1929#[cfg(test)]
1930mod tests {
1931    use super::*;
1932    use crate::channel::manager::WorldManager;
1933    use crate::channel::ChannelConfig;
1934    use orcs_component::{ComponentError, Status};
1935    use orcs_event::{EventCategory, SignalResponse};
1936    use orcs_types::{ComponentId, Principal};
1937    use serde_json::Value;
1938
1939    /// Test mock component that echoes requests.
1940    struct MockComponent {
1941        id: ComponentId,
1942        status: Status,
1943    }
1944
1945    impl MockComponent {
1946        fn new(name: &str) -> Self {
1947            Self {
1948                id: ComponentId::builtin(name),
1949                status: Status::Idle,
1950            }
1951        }
1952    }
1953
1954    impl Component for MockComponent {
1955        fn id(&self) -> &ComponentId {
1956            &self.id
1957        }
1958
1959        fn status(&self) -> Status {
1960            self.status
1961        }
1962
1963        fn subscriptions(&self) -> &[EventCategory] {
1964            &[EventCategory::Echo, EventCategory::Lifecycle]
1965        }
1966
1967        fn on_request(&mut self, request: &Request) -> Result<Value, ComponentError> {
1968            Ok(request.payload.clone())
1969        }
1970
1971        fn on_signal(&mut self, signal: &Signal) -> SignalResponse {
1972            if signal.is_veto() {
1973                self.status = Status::Aborted;
1974                SignalResponse::Abort
1975            } else {
1976                SignalResponse::Handled
1977            }
1978        }
1979
1980        fn abort(&mut self) {
1981            self.status = Status::Aborted;
1982        }
1983    }
1984
1985    fn mock_component() -> Box<dyn Component> {
1986        Box::new(MockComponent::new("test"))
1987    }
1988
1989    async fn setup() -> (
1990        tokio::task::JoinHandle<()>,
1991        mpsc::Sender<WorldCommand>,
1992        Arc<RwLock<World>>,
1993        broadcast::Sender<Signal>,
1994        ChannelId,
1995    ) {
1996        let mut world = World::new();
1997        let io = world.create_channel(ChannelConfig::interactive());
1998
1999        let (manager, world_tx) = WorldManager::with_world(world);
2000        let world_handle = manager.world();
2001
2002        let manager_task = tokio::spawn(manager.run());
2003
2004        let (signal_tx, _) = broadcast::channel(64);
2005
2006        (manager_task, world_tx, world_handle, signal_tx, io)
2007    }
2008
2009    async fn teardown(
2010        manager_task: tokio::task::JoinHandle<()>,
2011        world_tx: mpsc::Sender<WorldCommand>,
2012    ) {
2013        let _ = world_tx.send(WorldCommand::Shutdown).await;
2014        let _ = manager_task.await;
2015    }
2016
2017    #[tokio::test]
2018    async fn runner_creation() {
2019        let (manager_task, world_tx, world, signal_tx, primary) = setup().await;
2020
2021        let signal_rx = signal_tx.subscribe();
2022        let (runner, handle) = ChannelRunner::builder(
2023            primary,
2024            world_tx.clone(),
2025            world,
2026            signal_rx,
2027            mock_component(),
2028        )
2029        .build();
2030
2031        assert_eq!(runner.id(), primary);
2032        assert_eq!(handle.id, primary);
2033
2034        teardown(manager_task, world_tx).await;
2035    }
2036
2037    #[tokio::test]
2038    async fn runner_receives_events() {
2039        let (manager_task, world_tx, world, signal_tx, primary) = setup().await;
2040
2041        let signal_rx = signal_tx.subscribe();
2042        let (runner, handle) = ChannelRunner::builder(
2043            primary,
2044            world_tx.clone(),
2045            world,
2046            signal_rx,
2047            mock_component(),
2048        )
2049        .build();
2050
2051        let runner_task = tokio::spawn(runner.run());
2052
2053        let event = Event {
2054            category: EventCategory::Echo,
2055            operation: "echo".to_string(),
2056            source: ComponentId::builtin("test"),
2057            payload: serde_json::json!({"test": true}),
2058        };
2059        handle
2060            .inject(event)
2061            .await
2062            .expect("inject echo event into runner");
2063
2064        tokio::time::sleep(std::time::Duration::from_millis(10)).await;
2065
2066        signal_tx
2067            .send(Signal::cancel(primary, Principal::System))
2068            .expect("send cancel signal");
2069
2070        let _ = tokio::time::timeout(std::time::Duration::from_millis(100), runner_task).await;
2071
2072        teardown(manager_task, world_tx).await;
2073    }
2074
2075    #[tokio::test]
2076    async fn runner_handles_veto() {
2077        let (manager_task, world_tx, world, signal_tx, primary) = setup().await;
2078
2079        let signal_rx = signal_tx.subscribe();
2080        let (runner, _handle) = ChannelRunner::builder(
2081            primary,
2082            world_tx.clone(),
2083            world.clone(),
2084            signal_rx,
2085            mock_component(),
2086        )
2087        .build();
2088
2089        let runner_task = tokio::spawn(runner.run());
2090
2091        tokio::task::yield_now().await;
2092
2093        signal_tx
2094            .send(Signal::veto(Principal::System))
2095            .expect("send veto signal");
2096
2097        let result = tokio::time::timeout(std::time::Duration::from_millis(100), runner_task).await;
2098        assert!(result.is_ok());
2099
2100        teardown(manager_task, world_tx).await;
2101    }
2102
2103    #[tokio::test]
2104    async fn channel_handle_clone() {
2105        let (manager_task, world_tx, world, signal_tx, primary) = setup().await;
2106
2107        let signal_rx = signal_tx.subscribe();
2108        let (_runner, handle) = ChannelRunner::builder(
2109            primary,
2110            world_tx.clone(),
2111            world,
2112            signal_rx,
2113            mock_component(),
2114        )
2115        .build();
2116
2117        let handle2 = handle.clone();
2118        assert_eq!(handle.id, handle2.id);
2119
2120        teardown(manager_task, world_tx).await;
2121    }
2122
2123    #[tokio::test]
2124    async fn runner_with_emitter_creation() {
2125        let (manager_task, world_tx, world, signal_tx, primary) = setup().await;
2126
2127        let signal_rx = signal_tx.subscribe();
2128        let (runner, handle) = ChannelRunner::builder(
2129            primary,
2130            world_tx.clone(),
2131            world,
2132            signal_rx,
2133            mock_component(),
2134        )
2135        .with_emitter(signal_tx.clone())
2136        .build();
2137
2138        assert_eq!(runner.id(), primary);
2139        assert_eq!(handle.id, primary);
2140
2141        teardown(manager_task, world_tx).await;
2142    }
2143
2144    #[tokio::test]
2145    async fn runner_with_emitter_receives_emitted_events() {
2146        use std::sync::atomic::{AtomicUsize, Ordering};
2147        use std::sync::Arc as StdArc;
2148
2149        // Component that uses emitter to emit output
2150        struct EmittingComponent {
2151            id: ComponentId,
2152            emitter: Option<Box<dyn orcs_component::Emitter>>,
2153            call_count: StdArc<AtomicUsize>,
2154        }
2155
2156        impl EmittingComponent {
2157            fn new(call_count: StdArc<AtomicUsize>) -> Self {
2158                Self {
2159                    id: ComponentId::builtin("emitting"),
2160                    emitter: None,
2161                    call_count,
2162                }
2163            }
2164        }
2165
2166        impl Component for EmittingComponent {
2167            fn id(&self) -> &ComponentId {
2168                &self.id
2169            }
2170
2171            fn status(&self) -> Status {
2172                Status::Idle
2173            }
2174
2175            fn subscriptions(&self) -> &[EventCategory] {
2176                &[EventCategory::Echo]
2177            }
2178
2179            fn on_request(&mut self, request: &Request) -> Result<Value, ComponentError> {
2180                self.call_count.fetch_add(1, Ordering::SeqCst);
2181                // Emit output via emitter when receiving a request
2182                if let Some(emitter) = &self.emitter {
2183                    emitter.emit_output("Response from component");
2184                }
2185                Ok(request.payload.clone())
2186            }
2187
2188            fn on_signal(&mut self, signal: &Signal) -> SignalResponse {
2189                if signal.is_veto() {
2190                    SignalResponse::Abort
2191                } else {
2192                    SignalResponse::Handled
2193                }
2194            }
2195
2196            fn abort(&mut self) {}
2197
2198            fn set_emitter(&mut self, emitter: Box<dyn orcs_component::Emitter>) {
2199                self.emitter = Some(emitter);
2200            }
2201        }
2202
2203        let (manager_task, world_tx, world, signal_tx, primary) = setup().await;
2204
2205        let call_count = StdArc::new(AtomicUsize::new(0));
2206        let component = Box::new(EmittingComponent::new(StdArc::clone(&call_count)));
2207
2208        let signal_rx = signal_tx.subscribe();
2209        let (runner, handle) =
2210            ChannelRunner::builder(primary, world_tx.clone(), world, signal_rx, component)
2211                .with_emitter(signal_tx.clone())
2212                .build();
2213
2214        let runner_task = tokio::spawn(runner.run());
2215
2216        // Inject an event
2217        let event = Event {
2218            category: EventCategory::Echo,
2219            operation: "test".to_string(),
2220            source: ComponentId::builtin("test"),
2221            payload: serde_json::json!({"trigger": true}),
2222        };
2223        handle
2224            .inject(event)
2225            .await
2226            .expect("inject event into emitting runner");
2227
2228        // Wait for processing
2229        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2230
2231        // Component should have been called
2232        assert!(
2233            call_count.load(Ordering::SeqCst) >= 1,
2234            "Component should have received the event"
2235        );
2236
2237        // Stop runner
2238        signal_tx
2239            .send(Signal::cancel(primary, Principal::System))
2240            .expect("send cancel signal to emitting runner");
2241
2242        let _ = tokio::time::timeout(std::time::Duration::from_millis(100), runner_task).await;
2243
2244        teardown(manager_task, world_tx).await;
2245    }
2246
2247    // Builder pattern tests
2248
2249    #[tokio::test]
2250    async fn builder_basic() {
2251        let (manager_task, world_tx, world, signal_tx, primary) = setup().await;
2252
2253        let signal_rx = signal_tx.subscribe();
2254        let (runner, handle) = ChannelRunner::builder(
2255            primary,
2256            world_tx.clone(),
2257            world,
2258            signal_rx,
2259            mock_component(),
2260        )
2261        .build();
2262
2263        assert_eq!(runner.id(), primary);
2264        assert_eq!(handle.id, primary);
2265        assert!(runner.child_spawner().is_none());
2266
2267        teardown(manager_task, world_tx).await;
2268    }
2269
2270    #[tokio::test]
2271    async fn builder_with_emitter() {
2272        let (manager_task, world_tx, world, signal_tx, primary) = setup().await;
2273
2274        let signal_rx = signal_tx.subscribe();
2275        let (runner, handle) = ChannelRunner::builder(
2276            primary,
2277            world_tx.clone(),
2278            world,
2279            signal_rx,
2280            mock_component(),
2281        )
2282        .with_emitter(signal_tx.clone())
2283        .build();
2284
2285        assert_eq!(runner.id(), primary);
2286        assert_eq!(handle.id, primary);
2287
2288        teardown(manager_task, world_tx).await;
2289    }
2290
2291    #[tokio::test]
2292    async fn builder_with_child_spawner() {
2293        let (manager_task, world_tx, world, signal_tx, primary) = setup().await;
2294
2295        let signal_rx = signal_tx.subscribe();
2296        let (runner, handle) = ChannelRunner::builder(
2297            primary,
2298            world_tx.clone(),
2299            world,
2300            signal_rx,
2301            mock_component(),
2302        )
2303        .with_child_spawner(None)
2304        .build();
2305
2306        assert_eq!(runner.id(), primary);
2307        assert_eq!(handle.id, primary);
2308        assert!(runner.child_spawner().is_some());
2309
2310        teardown(manager_task, world_tx).await;
2311    }
2312
2313    #[tokio::test]
2314    async fn builder_with_full_support() {
2315        let (manager_task, world_tx, world, signal_tx, primary) = setup().await;
2316
2317        let signal_rx = signal_tx.subscribe();
2318        let (runner, handle) = ChannelRunner::builder(
2319            primary,
2320            world_tx.clone(),
2321            world,
2322            signal_rx,
2323            mock_component(),
2324        )
2325        .with_emitter(signal_tx.clone())
2326        .with_child_spawner(None)
2327        .build();
2328
2329        assert_eq!(runner.id(), primary);
2330        assert_eq!(handle.id, primary);
2331        assert!(runner.child_spawner().is_some());
2332
2333        teardown(manager_task, world_tx).await;
2334    }
2335
2336    #[tokio::test]
2337    async fn builder_creates_child_context() {
2338        let (manager_task, world_tx, world, signal_tx, primary) = setup().await;
2339
2340        let signal_rx = signal_tx.subscribe();
2341        let (runner, _handle) = ChannelRunner::builder(
2342            primary,
2343            world_tx.clone(),
2344            world,
2345            signal_rx,
2346            mock_component(),
2347        )
2348        .with_child_spawner(None)
2349        .build();
2350
2351        // Should be able to create child context when spawner is enabled
2352        let ctx = runner.create_child_context("child-1");
2353        assert!(ctx.is_some());
2354
2355        teardown(manager_task, world_tx).await;
2356    }
2357
2358    #[tokio::test]
2359    async fn builder_no_child_context_without_spawner() {
2360        let (manager_task, world_tx, world, signal_tx, primary) = setup().await;
2361
2362        let signal_rx = signal_tx.subscribe();
2363        let (runner, _handle) = ChannelRunner::builder(
2364            primary,
2365            world_tx.clone(),
2366            world,
2367            signal_rx,
2368            mock_component(),
2369        )
2370        .build();
2371
2372        // Should NOT be able to create child context when spawner is disabled
2373        let ctx = runner.create_child_context("child-1");
2374        assert!(ctx.is_none());
2375
2376        teardown(manager_task, world_tx).await;
2377    }
2378
2379    // === Request Channel (Component-to-Component RPC) Tests ===
2380
2381    #[tokio::test]
2382    async fn builder_with_request_channel_enables_accepts_requests() {
2383        let (manager_task, world_tx, world, signal_tx, primary) = setup().await;
2384
2385        let signal_rx = signal_tx.subscribe();
2386        let (_runner, handle) = ChannelRunner::builder(
2387            primary,
2388            world_tx.clone(),
2389            world,
2390            signal_rx,
2391            mock_component(),
2392        )
2393        .with_request_channel()
2394        .build();
2395
2396        assert!(handle.accepts_requests());
2397
2398        teardown(manager_task, world_tx).await;
2399    }
2400
2401    #[tokio::test]
2402    async fn builder_without_request_channel_rejects_requests() {
2403        let (manager_task, world_tx, world, signal_tx, primary) = setup().await;
2404
2405        let signal_rx = signal_tx.subscribe();
2406        let (_runner, handle) = ChannelRunner::builder(
2407            primary,
2408            world_tx.clone(),
2409            world,
2410            signal_rx,
2411            mock_component(),
2412        )
2413        .build();
2414
2415        assert!(!handle.accepts_requests());
2416
2417        teardown(manager_task, world_tx).await;
2418    }
2419
2420    #[tokio::test]
2421    async fn rpc_request_routed_to_runner_and_responded() {
2422        let (manager_task, world_tx, world, signal_tx, primary) = setup().await;
2423
2424        let signal_rx = signal_tx.subscribe();
2425        let (runner, handle) = ChannelRunner::builder(
2426            primary,
2427            world_tx.clone(),
2428            world,
2429            signal_rx,
2430            mock_component(),
2431        )
2432        .with_request_channel()
2433        .build();
2434
2435        let runner_task = tokio::spawn(runner.run());
2436
2437        // Send RPC request via handle
2438        let source = ComponentId::builtin("caller");
2439        let target = ComponentId::builtin("test");
2440        let req = Request::new(
2441            EventCategory::Echo,
2442            "echo",
2443            source,
2444            primary,
2445            Value::String("rpc_payload".into()),
2446        )
2447        .with_target(target);
2448
2449        let (reply_tx, reply_rx) = oneshot::channel();
2450        handle
2451            .send_request(req, reply_tx)
2452            .await
2453            .expect("send RPC request to runner");
2454
2455        // Should receive the echoed payload back
2456        let result = tokio::time::timeout(std::time::Duration::from_millis(200), reply_rx)
2457            .await
2458            .expect("reply should arrive within timeout")
2459            .expect("reply channel should not be dropped");
2460
2461        assert_eq!(
2462            result.expect("RPC response should be Ok"),
2463            Value::String("rpc_payload".into())
2464        );
2465
2466        // Cleanup
2467        signal_tx
2468            .send(Signal::cancel(primary, Principal::System))
2469            .expect("send cancel signal after RPC test");
2470        let _ = tokio::time::timeout(std::time::Duration::from_millis(100), runner_task).await;
2471        teardown(manager_task, world_tx).await;
2472    }
2473
2474    #[tokio::test]
2475    async fn rpc_request_component_error_returned_as_string() {
2476        /// Component that always returns error from on_request.
2477        struct FailingComponent {
2478            id: ComponentId,
2479        }
2480
2481        impl Component for FailingComponent {
2482            fn id(&self) -> &ComponentId {
2483                &self.id
2484            }
2485            fn status(&self) -> Status {
2486                Status::Idle
2487            }
2488            fn on_request(&mut self, _request: &Request) -> Result<Value, ComponentError> {
2489                Err(ComponentError::ExecutionFailed(
2490                    "deliberate test failure".into(),
2491                ))
2492            }
2493            fn on_signal(&mut self, signal: &Signal) -> SignalResponse {
2494                if signal.is_veto() {
2495                    SignalResponse::Abort
2496                } else {
2497                    SignalResponse::Handled
2498                }
2499            }
2500            fn abort(&mut self) {}
2501        }
2502
2503        let (manager_task, world_tx, world, signal_tx, primary) = setup().await;
2504
2505        let component: Box<dyn Component> = Box::new(FailingComponent {
2506            id: ComponentId::builtin("failing"),
2507        });
2508        let signal_rx = signal_tx.subscribe();
2509        let (runner, handle) =
2510            ChannelRunner::builder(primary, world_tx.clone(), world, signal_rx, component)
2511                .with_request_channel()
2512                .build();
2513
2514        let runner_task = tokio::spawn(runner.run());
2515
2516        let source = ComponentId::builtin("caller");
2517        let target = ComponentId::builtin("failing");
2518        let req = Request::new(EventCategory::Echo, "op", source, primary, Value::Null)
2519            .with_target(target);
2520
2521        let (reply_tx, reply_rx) = oneshot::channel();
2522        handle
2523            .send_request(req, reply_tx)
2524            .await
2525            .expect("send RPC request to failing runner");
2526
2527        let result = tokio::time::timeout(std::time::Duration::from_millis(200), reply_rx)
2528            .await
2529            .expect("reply should arrive")
2530            .expect("channel should not be dropped");
2531
2532        assert!(result.is_err(), "should return Err for component failure");
2533        let err_msg = result.expect_err("expected Err variant for component failure");
2534        assert!(
2535            err_msg.contains("deliberate test failure"),
2536            "error message should contain original error, got: {err_msg}"
2537        );
2538
2539        signal_tx
2540            .send(Signal::cancel(primary, Principal::System))
2541            .expect("send cancel signal after failing RPC test");
2542        let _ = tokio::time::timeout(std::time::Duration::from_millis(100), runner_task).await;
2543        teardown(manager_task, world_tx).await;
2544    }
2545
2546    // === Output Channel Routing Tests ===
2547    //
2548    // Verify that when `with_output_channel(output_tx)` is configured on the
2549    // ChannelRunnerBuilder, Output events emitted by the Component's emitter
2550    // are routed to the output channel (ClientRunner/IOPort path) rather than
2551    // the runner's own event channel.
2552
2553    #[tokio::test]
2554    async fn emitter_output_routes_to_output_channel_via_builder() {
2555        use std::sync::atomic::{AtomicUsize, Ordering};
2556        use std::sync::Arc as StdArc;
2557
2558        /// Component that emits output via its emitter during on_request.
2559        struct OutputRoutingComponent {
2560            id: ComponentId,
2561            emitter: Option<Box<dyn orcs_component::Emitter>>,
2562            call_count: StdArc<AtomicUsize>,
2563        }
2564
2565        impl OutputRoutingComponent {
2566            fn new(call_count: StdArc<AtomicUsize>) -> Self {
2567                Self {
2568                    id: ComponentId::builtin("output-routing"),
2569                    emitter: None,
2570                    call_count,
2571                }
2572            }
2573        }
2574
2575        impl Component for OutputRoutingComponent {
2576            fn id(&self) -> &ComponentId {
2577                &self.id
2578            }
2579            fn status(&self) -> Status {
2580                Status::Idle
2581            }
2582            fn subscriptions(&self) -> &[EventCategory] {
2583                &[EventCategory::Echo]
2584            }
2585            fn on_request(&mut self, _request: &Request) -> Result<Value, ComponentError> {
2586                self.call_count.fetch_add(1, Ordering::SeqCst);
2587                if let Some(emitter) = &self.emitter {
2588                    emitter.emit_output("routed output message");
2589                }
2590                Ok(serde_json::json!({"success": true}))
2591            }
2592            fn on_signal(&mut self, signal: &Signal) -> SignalResponse {
2593                if signal.is_veto() {
2594                    SignalResponse::Abort
2595                } else {
2596                    SignalResponse::Handled
2597                }
2598            }
2599            fn abort(&mut self) {}
2600            fn set_emitter(&mut self, emitter: Box<dyn orcs_component::Emitter>) {
2601                self.emitter = Some(emitter);
2602            }
2603        }
2604
2605        let (manager_task, world_tx, world, signal_tx, primary) = setup().await;
2606
2607        let call_count = StdArc::new(AtomicUsize::new(0));
2608        let component = Box::new(OutputRoutingComponent::new(StdArc::clone(&call_count)));
2609
2610        // Create a separate output channel to receive routed Output events
2611        let (output_tx, mut output_rx) = OutputSender::channel(64);
2612
2613        let signal_rx = signal_tx.subscribe();
2614        let (runner, handle) =
2615            ChannelRunner::builder(primary, world_tx.clone(), world, signal_rx, component)
2616                .with_emitter(signal_tx.clone())
2617                .with_output_channel(output_tx)
2618                .build();
2619
2620        let runner_task = tokio::spawn(runner.run());
2621
2622        // Inject an event to trigger on_request → emit_output
2623        let event = Event {
2624            category: EventCategory::Echo,
2625            operation: "test".to_string(),
2626            source: ComponentId::builtin("test"),
2627            payload: serde_json::json!({"trigger": true}),
2628        };
2629        handle.inject(event).await.expect("inject event");
2630
2631        // Wait for processing
2632        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2633
2634        // Verify the component was called
2635        assert!(
2636            call_count.load(Ordering::SeqCst) >= 1,
2637            "Component should have received the event"
2638        );
2639
2640        // Verify the Output event arrived on the output channel
2641        let output_event = output_rx
2642            .try_recv()
2643            .expect("Output event should arrive on output channel");
2644        assert_eq!(
2645            output_event.category,
2646            EventCategory::Output,
2647            "Event category should be Output"
2648        );
2649        assert_eq!(
2650            output_event.operation, "display",
2651            "Event operation should be 'display'"
2652        );
2653        assert_eq!(
2654            output_event.payload["message"], "routed output message",
2655            "Event payload message should match what the component emitted"
2656        );
2657        assert_eq!(
2658            output_event.payload["level"], "info",
2659            "Event payload level should be 'info'"
2660        );
2661
2662        // Cleanup
2663        signal_tx
2664            .send(Signal::cancel(primary, Principal::System))
2665            .expect("send cancel");
2666        let _ = tokio::time::timeout(std::time::Duration::from_millis(100), runner_task).await;
2667        teardown(manager_task, world_tx).await;
2668    }
2669
2670    #[tokio::test]
2671    async fn emitter_output_records_to_board_and_routes_to_output_channel() {
2672        use std::sync::atomic::{AtomicUsize, Ordering};
2673        use std::sync::Arc as StdArc;
2674
2675        struct BoardOutputComponent {
2676            id: ComponentId,
2677            emitter: Option<Box<dyn orcs_component::Emitter>>,
2678            call_count: StdArc<AtomicUsize>,
2679        }
2680
2681        impl BoardOutputComponent {
2682            fn new(call_count: StdArc<AtomicUsize>) -> Self {
2683                Self {
2684                    id: ComponentId::builtin("board-output"),
2685                    emitter: None,
2686                    call_count,
2687                }
2688            }
2689        }
2690
2691        impl Component for BoardOutputComponent {
2692            fn id(&self) -> &ComponentId {
2693                &self.id
2694            }
2695            fn status(&self) -> Status {
2696                Status::Idle
2697            }
2698            fn subscriptions(&self) -> &[EventCategory] {
2699                &[EventCategory::Echo]
2700            }
2701            fn on_request(&mut self, _request: &Request) -> Result<Value, ComponentError> {
2702                self.call_count.fetch_add(1, Ordering::SeqCst);
2703                if let Some(emitter) = &self.emitter {
2704                    emitter.emit_output("board and io message");
2705                }
2706                Ok(serde_json::json!({"success": true}))
2707            }
2708            fn on_signal(&mut self, signal: &Signal) -> SignalResponse {
2709                if signal.is_veto() {
2710                    SignalResponse::Abort
2711                } else {
2712                    SignalResponse::Handled
2713                }
2714            }
2715            fn abort(&mut self) {}
2716            fn set_emitter(&mut self, emitter: Box<dyn orcs_component::Emitter>) {
2717                self.emitter = Some(emitter);
2718            }
2719        }
2720
2721        let (manager_task, world_tx, world, signal_tx, primary) = setup().await;
2722
2723        let call_count = StdArc::new(AtomicUsize::new(0));
2724        let component = Box::new(BoardOutputComponent::new(StdArc::clone(&call_count)));
2725
2726        let (output_tx, mut output_rx) = OutputSender::channel(64);
2727        let board = crate::board::shared_board();
2728
2729        let signal_rx = signal_tx.subscribe();
2730        let (runner, handle) =
2731            ChannelRunner::builder(primary, world_tx.clone(), world, signal_rx, component)
2732                .with_emitter(signal_tx.clone())
2733                .with_output_channel(output_tx)
2734                .with_board(Arc::clone(&board))
2735                .build();
2736
2737        let runner_task = tokio::spawn(runner.run());
2738
2739        // Inject event to trigger on_request → emit_output
2740        let event = Event {
2741            category: EventCategory::Echo,
2742            operation: "test".to_string(),
2743            source: ComponentId::builtin("test"),
2744            payload: serde_json::json!({"trigger": true}),
2745        };
2746        handle.inject(event).await.expect("inject event");
2747
2748        // Wait for processing
2749        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2750
2751        // Verify component was called
2752        assert!(call_count.load(Ordering::SeqCst) >= 1);
2753
2754        // Verify Output event on output channel
2755        let output_event = output_rx
2756            .try_recv()
2757            .expect("Output event should arrive on output channel");
2758        assert_eq!(output_event.payload["message"], "board and io message");
2759
2760        // Verify Board also recorded the entry
2761        let b = board.read();
2762        assert!(b.len() >= 1, "Board should have at least 1 entry");
2763        let entries = b.recent(1);
2764        assert_eq!(
2765            entries[0].payload["message"], "board and io message",
2766            "Board entry should match the emitted message"
2767        );
2768
2769        // Cleanup
2770        signal_tx
2771            .send(Signal::cancel(primary, Principal::System))
2772            .expect("send cancel");
2773        let _ = tokio::time::timeout(std::time::Duration::from_millis(100), runner_task).await;
2774        teardown(manager_task, world_tx).await;
2775    }
2776
2777    // === ChannelHandle::is_alive tests ===
2778
2779    #[test]
2780    fn handle_is_alive_while_receiver_exists() {
2781        let (tx, rx) = tokio::sync::mpsc::channel::<InboundEvent>(32);
2782        let handle = ChannelHandle::new(ChannelId::new(), tx);
2783
2784        assert!(
2785            handle.is_alive(),
2786            "handle should be alive while receiver exists"
2787        );
2788        drop(rx);
2789        assert!(
2790            !handle.is_alive(),
2791            "handle should not be alive after receiver is dropped"
2792        );
2793    }
2794
2795    // --- ExitReason tests ---
2796
2797    #[test]
2798    fn exit_reason_as_str_covers_all_variants() {
2799        let cases = [
2800            (ExitReason::Signal, "signal"),
2801            (ExitReason::EventChannelClosed, "event_channel_closed"),
2802            (ExitReason::SignalChannelClosed, "signal_channel_closed"),
2803            (ExitReason::ChannelInactive, "channel_inactive"),
2804            (ExitReason::ComponentStopped, "component_stopped"),
2805            (ExitReason::IoChannelClosed, "io_channel_closed"),
2806            (ExitReason::UserQuit, "user_quit"),
2807        ];
2808        for (variant, expected) in &cases {
2809            assert_eq!(
2810                variant.as_str(),
2811                *expected,
2812                "ExitReason::{:?}.as_str() mismatch",
2813                variant
2814            );
2815        }
2816    }
2817
2818    #[test]
2819    fn exit_reason_display_matches_as_str() {
2820        let variants = [
2821            ExitReason::Signal,
2822            ExitReason::EventChannelClosed,
2823            ExitReason::SignalChannelClosed,
2824            ExitReason::ChannelInactive,
2825            ExitReason::ComponentStopped,
2826            ExitReason::IoChannelClosed,
2827            ExitReason::UserQuit,
2828        ];
2829        for variant in &variants {
2830            assert_eq!(
2831                variant.to_string(),
2832                variant.as_str(),
2833                "Display and as_str() should match for {:?}",
2834                variant
2835            );
2836        }
2837    }
2838
2839    #[test]
2840    fn exit_reason_clone_and_eq() {
2841        let original = ExitReason::ComponentStopped;
2842        let cloned = original.clone();
2843        assert_eq!(original, cloned, "Clone should produce equal value");
2844    }
2845}