Skip to main content

orcs_runtime/engine/
engine.rs

1//! OrcsEngine - Main Runtime.
2//!
3//! The [`OrcsEngine`] is the central runtime that:
4//!
5//! - Spawns and manages ChannelRunners (parallel execution)
6//! - Dispatches Signals to all Runners via broadcast
7//! - Coordinates World (Channel hierarchy) management
8//! - Collects component snapshots during graceful shutdown
9//!
10//! # Architecture
11//!
12//! ```text
13//! OrcsEngine
14//!     │
15//!     ├── signal_tx ────► broadcast to all Runners
16//!     │
17//!     ├── world_tx ─────► WorldManager (channel operations)
18//!     │
19//!     ├── channel_handles ──► Event injection to Runners
20//!     │
21//!     └── collected_snapshots ─► Snapshots from graceful shutdown
22//! ```
23//!
24//! # Signal Handling
25//!
26//! Signals are interrupts from Human (or authorized Components):
27//!
28//! - **Veto**: Immediate engine stop (checked first)
29//! - **Cancel**: Stop specific Channel/Component
30//! - **Pause/Resume**: Suspend/continue execution
31//! - **Approve/Reject**: HIL responses
32//!
33//! Signals are broadcast to all Runners, which deliver them to their
34//! bound Components.
35
36use super::eventbus::EventBus;
37use crate::board::{self, SharedBoard};
38use crate::channel::{
39    ChannelConfig, ChannelHandle, ChannelRunner, ClientRunner, ClientRunnerConfig, LuaChildLoader,
40    OutputSender, RunnerResult, World, WorldCommand, WorldCommandSender, WorldManager,
41};
42use crate::io::IOPort;
43use crate::Principal;
44use orcs_component::{Component, ComponentLoader, ComponentSnapshot};
45use orcs_event::Signal;
46use orcs_hook::SharedHookRegistry;
47use orcs_types::{ChannelId, ComponentId};
48use std::collections::HashMap;
49use std::sync::Arc;
50use std::time::Duration;
51use tokio::sync::{broadcast, mpsc, RwLock};
52use tracing::{debug, info, warn};
53
54/// Signal broadcast channel buffer size.
55///
56/// 256 signals provides sufficient buffering for HIL interactions.
57/// Lagged receivers will miss old signals but continue receiving new ones.
58const SIGNAL_BUFFER_SIZE: usize = 256;
59
60/// Timeout for waiting on each runner task during graceful shutdown.
61const RUNNER_SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(5);
62
63/// Notification sent by a runner wrapper when the runner exits.
64///
65/// The monitor task receives these to detect unexpected runner terminations
66/// during normal operation (before Veto/shutdown).
67#[derive(Debug)]
68struct RunnerExitNotice {
69    channel_id: ChannelId,
70    component_fqn: String,
71    /// Why the runner exited. Forwarded from [`RunnerResult::exit_reason`].
72    exit_reason: crate::channel::ExitReason,
73}
74
75/// OrcsEngine - Main runtime for ORCS CLI.
76///
77/// Coordinates parallel execution of ChannelRunners and manages
78/// the World (channel hierarchy).
79///
80/// # Parallel Execution Model
81///
82/// Engine operates in parallel mode with:
83/// - [`WorldManager`](crate::channel::WorldManager) for concurrent World access
84/// - [`ChannelRunner`](crate::channel::ChannelRunner) per channel
85/// - Event injection via channel handles
86/// - Snapshot collection via graceful runner shutdown
87///
88/// # Example
89///
90/// ```ignore
91/// use orcs_runtime::{World, ChannelConfig};
92///
93/// // Create World with IO channel
94/// let mut world = World::new();
95/// let io = world.create_channel(ChannelConfig::interactive());
96///
97/// // Inject into engine with IO channel (required)
98/// let engine = OrcsEngine::new(world, io);
99///
100/// // Spawn channel runners with bound components
101/// engine.spawn_runner(io, Box::new(MyComponent::new()));
102///
103/// // Run the engine (parallel execution)
104/// engine.run().await;
105/// ```
106///
107/// # Signal Priority
108///
109/// Signals are broadcast to all Runners. A Veto signal
110/// immediately stops the engine without processing further.
111pub struct OrcsEngine {
112    /// EventBus for channel event injection
113    eventbus: EventBus,
114    /// Running state
115    running: bool,
116    // --- Parallel execution infrastructure ---
117    /// World command sender for async modifications
118    world_tx: WorldCommandSender,
119    /// Read-only World access for parallel reads
120    world_read: Arc<RwLock<World>>,
121    /// Signal broadcaster for all channel runners
122    signal_tx: broadcast::Sender<Signal>,
123    /// Channel runner handles (for event injection)
124    channel_handles: HashMap<ChannelId, ChannelHandle>,
125    /// WorldManager task handle
126    manager_task: Option<tokio::task::JoinHandle<()>>,
127    /// Channel runner task handles.
128    ///
129    /// Each runner returns a [`RunnerResult`] containing the component's
130    /// shutdown snapshot. The Engine awaits these on shutdown instead of
131    /// aborting them.
132    runner_tasks: HashMap<ChannelId, tokio::task::JoinHandle<RunnerResult>>,
133    /// Snapshots collected from runners during graceful shutdown.
134    ///
135    /// Populated by [`shutdown_parallel()`] when runners complete.
136    /// Keyed by component FQN (fully qualified name).
137    collected_snapshots: HashMap<String, ComponentSnapshot>,
138    // --- IO Channel (required) ---
139    /// IO channel for Human input/output.
140    ///
141    /// This is a required field - every engine must have an IO channel
142    /// for Human interaction.
143    io_channel: ChannelId,
144    /// Shared Board for cross-component event visibility.
145    board: SharedBoard,
146    /// Shared hook registry (injected into all spawned runners).
147    hook_registry: Option<SharedHookRegistry>,
148    /// Shared MCP client manager (injected into all spawned runners).
149    mcp_manager: Option<Arc<orcs_mcp::McpClientManager>>,
150    /// Sender for runner exit notifications (cloned into each runner wrapper task).
151    runner_exit_tx: mpsc::UnboundedSender<RunnerExitNotice>,
152    /// Receiver for runner exit notifications (taken by the monitor task on start).
153    runner_exit_rx: Option<mpsc::UnboundedReceiver<RunnerExitNotice>>,
154    /// Background monitor task that detects unexpected runner terminations.
155    monitor_task: Option<tokio::task::JoinHandle<()>>,
156}
157
158impl OrcsEngine {
159    /// Create new engine with injected World and IO channel.
160    ///
161    /// The World is immediately transferred to a [`WorldManager`] which
162    /// starts processing commands. The IO channel is required for Human
163    /// input/output.
164    ///
165    /// # Arguments
166    ///
167    /// * `world` - The World containing the IO channel
168    /// * `io_channel` - The IO channel ID (must exist in World)
169    ///
170    /// # Example
171    ///
172    /// ```ignore
173    /// let mut world = World::new();
174    /// let io = world.create_channel(ChannelConfig::interactive());
175    ///
176    /// let engine = OrcsEngine::new(world, io);
177    /// ```
178    #[must_use]
179    pub fn new(world: World, io_channel: ChannelId) -> Self {
180        // Create WorldManager immediately (takes ownership of World)
181        let (manager, world_tx) = WorldManager::with_world(world);
182        let world_read = manager.world();
183
184        // Create signal broadcaster
185        let (signal_tx, _) = broadcast::channel(SIGNAL_BUFFER_SIZE);
186
187        // Start WorldManager task
188        let manager_task = tokio::spawn(manager.run());
189
190        // Runner exit notification channel for the monitor task.
191        let (runner_exit_tx, runner_exit_rx) = mpsc::unbounded_channel();
192
193        info!(
194            "OrcsEngine created with IO channel {} (WorldManager started)",
195            io_channel
196        );
197
198        Self {
199            eventbus: EventBus::new(),
200            running: false,
201            world_tx,
202            world_read,
203            signal_tx,
204            channel_handles: HashMap::new(),
205            manager_task: Some(manager_task),
206            runner_tasks: HashMap::new(),
207            collected_snapshots: HashMap::new(),
208            io_channel,
209            board: board::shared_board(),
210            hook_registry: None,
211            mcp_manager: None,
212            runner_exit_tx,
213            runner_exit_rx: Some(runner_exit_rx),
214            monitor_task: None,
215        }
216    }
217
218    /// Sets the shared MCP client manager for all spawned runners.
219    ///
220    /// Must be called before spawning runners to ensure they receive
221    /// the manager.
222    pub fn set_mcp_manager(&mut self, manager: Arc<orcs_mcp::McpClientManager>) {
223        self.mcp_manager = Some(manager);
224    }
225
226    /// Sets the shared hook registry for all spawned runners.
227    ///
228    /// Must be called before spawning runners to ensure they receive
229    /// the registry.
230    pub fn set_hook_registry(&mut self, registry: SharedHookRegistry) {
231        self.eventbus.set_hook_registry(Arc::clone(&registry));
232        self.hook_registry = Some(registry);
233    }
234
235    /// Returns a reference to the hook registry, if configured.
236    #[must_use]
237    pub fn hook_registry(&self) -> Option<&SharedHookRegistry> {
238        self.hook_registry.as_ref()
239    }
240
241    /// Applies the hook registry to a ChannelRunnerBuilder if configured.
242    fn apply_hook_registry(
243        &self,
244        builder: crate::channel::ChannelRunnerBuilder,
245    ) -> crate::channel::ChannelRunnerBuilder {
246        match &self.hook_registry {
247            Some(reg) => builder.with_hook_registry(Arc::clone(reg)),
248            None => builder,
249        }
250    }
251
252    /// Applies the MCP manager to a ChannelRunnerBuilder if configured.
253    fn apply_mcp_manager(
254        &self,
255        builder: crate::channel::ChannelRunnerBuilder,
256    ) -> crate::channel::ChannelRunnerBuilder {
257        match &self.mcp_manager {
258            Some(mgr) => builder.with_mcp_manager(Arc::clone(mgr)),
259            None => builder,
260        }
261    }
262
263    /// Registers runner with EventBus, stores handle, and spawns the tokio task.
264    ///
265    /// Common finalization extracted from `spawn_runner*` methods.
266    fn finalize_runner(
267        &mut self,
268        channel_id: ChannelId,
269        component_id: &ComponentId,
270        runner: ChannelRunner,
271        handle: ChannelHandle,
272    ) -> ChannelHandle {
273        self.eventbus.register_channel(handle.clone());
274        self.eventbus
275            .register_component_channel(component_id, channel_id);
276        self.channel_handles.insert(channel_id, handle.clone());
277
278        // Wrap the runner task to send an exit notification to the monitor.
279        let exit_tx = self.runner_exit_tx.clone();
280        let fqn = component_id.fqn().to_string();
281        let runner_task = tokio::spawn(async move {
282            let result = runner.run().await;
283            // Notify monitor — ignore send error (monitor may be stopped during shutdown).
284            let _ = exit_tx.send(RunnerExitNotice {
285                channel_id,
286                component_fqn: fqn,
287                exit_reason: result.exit_reason,
288            });
289            result
290        });
291        self.runner_tasks.insert(channel_id, runner_task);
292        handle
293    }
294
295    /// Spawn a channel runner for a channel with a bound Component.
296    ///
297    /// Returns the channel handle for event injection.
298    ///
299    /// # Arguments
300    ///
301    /// * `channel_id` - The channel to run
302    /// * `component` - The Component to bind (1:1 relationship)
303    pub fn spawn_runner(
304        &mut self,
305        channel_id: ChannelId,
306        component: Box<dyn Component>,
307    ) -> ChannelHandle {
308        let component_id = component.id().clone();
309        let signal_rx = self.signal_tx.subscribe();
310        let builder = ChannelRunner::builder(
311            channel_id,
312            self.world_tx.clone(),
313            Arc::clone(&self.world_read),
314            signal_rx,
315            component,
316        )
317        .with_request_channel();
318        let (runner, handle) = self
319            .apply_mcp_manager(self.apply_hook_registry(builder))
320            .build();
321
322        let handle = self.finalize_runner(channel_id, &component_id, runner, handle);
323        info!("Spawned runner for channel {}", channel_id);
324        handle
325    }
326
327    /// Spawn a channel runner with an EventEmitter injected into the Component.
328    ///
329    /// This enables IO-less (event-only) execution for Components that support
330    /// the Emitter trait. The Component can emit output via `emit_output()`.
331    ///
332    /// Use this for ChannelRunner-based execution instead of ClientRunner.
333    /// Spawn a ClientRunner for an IO channel (no component).
334    ///
335    /// ClientRunner is dedicated to Human I/O bridging. It broadcasts UserInput
336    /// events to all channels and displays Output events from other channels.
337    ///
338    /// # Arguments
339    ///
340    /// * `channel_id` - The channel to run
341    /// * `io_port` - IO port for View communication
342    /// * `principal` - Principal for signal creation from IO input
343    ///
344    /// # Returns
345    ///
346    /// A tuple of (ChannelHandle, event_tx) where event_tx can be used by
347    /// other runners to send Output events to this runner for display.
348    pub fn spawn_client_runner(
349        &mut self,
350        channel_id: ChannelId,
351        io_port: IOPort,
352        principal: orcs_types::Principal,
353    ) -> (ChannelHandle, OutputSender) {
354        let config = ClientRunnerConfig {
355            world_tx: self.world_tx.clone(),
356            world: Arc::clone(&self.world_read),
357            signal_rx: self.signal_tx.subscribe(),
358            channel_handles: self.eventbus.shared_handles(),
359        };
360        let (runner, handle) = ClientRunner::new(channel_id, config, io_port, principal);
361
362        // Get event_tx for other runners to send Output events (wrapped as OutputSender)
363        let output_sender = OutputSender::new(runner.event_tx().clone());
364
365        // Register handle with EventBus for event injection
366        self.eventbus.register_channel(handle.clone());
367
368        // Store handle
369        self.channel_handles.insert(channel_id, handle.clone());
370
371        // Spawn runner task
372        let runner_task = tokio::spawn(runner.run());
373        self.runner_tasks.insert(channel_id, runner_task);
374
375        info!("Spawned ClientRunner for channel {}", channel_id);
376        (handle, output_sender)
377    }
378
379    /// Spawn a child channel with configuration and bound Component.
380    ///
381    /// Creates a new channel in World and spawns its runner.
382    ///
383    /// # Arguments
384    ///
385    /// * `parent` - Parent channel ID
386    /// * `config` - Channel configuration
387    /// * `component` - Component to bind to the new channel (1:1)
388    pub async fn spawn_channel(
389        &mut self,
390        parent: ChannelId,
391        config: ChannelConfig,
392        component: Box<dyn Component>,
393    ) -> Option<ChannelId> {
394        // Send spawn command
395        let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
396        let cmd = WorldCommand::Spawn {
397            parent,
398            config,
399            reply: reply_tx,
400        };
401
402        if self.world_tx.send(cmd).await.is_err() {
403            return None;
404        }
405
406        // Wait for reply
407        let child_id = reply_rx.await.ok()??;
408
409        // Spawn runner for new channel with bound component
410        self.spawn_runner(child_id, component);
411
412        Some(child_id)
413    }
414
415    /// Spawn a child channel with authentication/authorization.
416    ///
417    /// Creates a new channel in World and spawns its runner with
418    /// Session and PermissionChecker configured.
419    ///
420    /// # Authorization Check
421    ///
422    /// Before spawning, checks if the session is allowed to spawn children
423    /// using the provided checker. Returns `None` if unauthorized.
424    ///
425    /// # Arguments
426    ///
427    /// * `parent` - Parent channel ID
428    /// * `config` - Channel configuration
429    /// * `component` - Component to bind to the new channel (1:1)
430    /// * `session` - Session for permission checking (Arc for shared grants)
431    /// * `checker` - Permission checker policy
432    ///
433    /// # Returns
434    ///
435    /// - `Some(ChannelId)` if spawn succeeded
436    /// - `None` if parent doesn't exist or permission denied
437    pub async fn spawn_channel_with_auth(
438        &mut self,
439        parent: ChannelId,
440        config: ChannelConfig,
441        component: Box<dyn Component>,
442        session: Arc<crate::Session>,
443        checker: Arc<dyn crate::auth::PermissionChecker>,
444    ) -> Option<ChannelId> {
445        // Permission check: can this session spawn children?
446        if !checker.can_spawn_child(&session) {
447            warn!(
448                principal = ?session.principal(),
449                "spawn_channel denied: permission denied"
450            );
451            return None;
452        }
453
454        // Send spawn command
455        let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
456        let cmd = WorldCommand::Spawn {
457            parent,
458            config,
459            reply: reply_tx,
460        };
461
462        if self.world_tx.send(cmd).await.is_err() {
463            return None;
464        }
465
466        // Wait for reply
467        let child_id = reply_rx.await.ok()??;
468
469        // Spawn runner with auth configured
470        self.spawn_runner_with_auth(child_id, component, session, checker);
471
472        Some(child_id)
473    }
474
475    /// Spawn a runner with Session and PermissionChecker.
476    ///
477    /// The Session and Checker are passed to the ChildContext for
478    /// command permission checking.
479    ///
480    /// # Arguments
481    ///
482    /// * `channel_id` - The channel to run
483    /// * `component` - The Component to bind
484    /// * `session` - Session for permission checking
485    /// * `checker` - Permission checker policy
486    pub fn spawn_runner_with_auth(
487        &mut self,
488        channel_id: ChannelId,
489        component: Box<dyn Component>,
490        session: Arc<crate::Session>,
491        checker: Arc<dyn crate::auth::PermissionChecker>,
492    ) -> ChannelHandle {
493        let signal_rx = self.signal_tx.subscribe();
494        let component_id = component.id().clone();
495
496        // Build runner with auth
497        let builder = ChannelRunner::builder(
498            channel_id,
499            self.world_tx.clone(),
500            Arc::clone(&self.world_read),
501            signal_rx,
502            component,
503        )
504        .with_emitter(self.signal_tx.clone())
505        .with_shared_handles(self.eventbus.shared_handles())
506        .with_component_channel_map(self.eventbus.shared_component_channel_map())
507        .with_board(Arc::clone(&self.board))
508        .with_session_arc(session)
509        .with_checker(checker)
510        .with_request_channel();
511        let (runner, handle) = self
512            .apply_mcp_manager(self.apply_hook_registry(builder))
513            .build();
514
515        let handle = self.finalize_runner(channel_id, &component_id, runner, handle);
516        info!(
517            "Spawned runner (with auth) for channel {} (component={})",
518            channel_id,
519            component_id.fqn(),
520        );
521        handle
522    }
523
524    /// Spawn a ChannelRunner with all options including auth.
525    ///
526    /// This is the most complete spawn method that supports:
527    /// - Event emission (signal broadcasting)
528    /// - Output routing to IO channel
529    /// - Child spawning via LuaChildLoader
530    /// - Component loading for spawn_runner_from_script
531    /// - Session-based permission checking
532    ///
533    /// # Arguments
534    ///
535    /// * `channel_id` - The channel to run
536    /// * `component` - The Component to bind
537    /// * `output_tx` - Optional channel for Output event routing
538    /// * `lua_loader` - Optional loader for spawning Lua children
539    /// * `component_loader` - Optional loader for spawning Components as runners
540    /// * `session` - Session for permission checking
541    /// * `checker` - Permission checker policy
542    ///
543    /// # Returns
544    ///
545    /// A handle for injecting Events into the Channel.
546    #[allow(clippy::too_many_arguments)]
547    pub fn spawn_runner_full_auth(
548        &mut self,
549        channel_id: ChannelId,
550        component: Box<dyn Component>,
551        output_tx: Option<OutputSender>,
552        lua_loader: Option<Arc<dyn LuaChildLoader>>,
553        component_loader: Option<Arc<dyn ComponentLoader>>,
554        session: Arc<crate::Session>,
555        checker: Arc<dyn crate::auth::PermissionChecker>,
556        grants: Arc<dyn orcs_auth::GrantPolicy>,
557    ) -> ChannelHandle {
558        let empty_config = serde_json::Value::Object(serde_json::Map::new());
559        self.spawn_runner_full_auth_with_snapshot(
560            channel_id,
561            component,
562            output_tx,
563            lua_loader,
564            component_loader,
565            session,
566            checker,
567            grants,
568            None,
569            empty_config,
570        )
571    }
572
573    /// Spawns a ChannelRunner with full auth and an optional initial snapshot.
574    ///
575    /// Same as [`Self::spawn_runner_full_auth`] but accepts an optional
576    /// [`ComponentSnapshot`] to restore before `init()` (session resume).
577    #[allow(clippy::too_many_arguments)]
578    pub fn spawn_runner_full_auth_with_snapshot(
579        &mut self,
580        channel_id: ChannelId,
581        component: Box<dyn Component>,
582        output_tx: Option<OutputSender>,
583        lua_loader: Option<Arc<dyn LuaChildLoader>>,
584        component_loader: Option<Arc<dyn ComponentLoader>>,
585        session: Arc<crate::Session>,
586        checker: Arc<dyn crate::auth::PermissionChecker>,
587        grants: Arc<dyn orcs_auth::GrantPolicy>,
588        initial_snapshot: Option<ComponentSnapshot>,
589        component_config: serde_json::Value,
590    ) -> ChannelHandle {
591        let signal_rx = self.signal_tx.subscribe();
592        let component_id = component.id().clone();
593
594        // Use builder with all options including auth
595        let mut builder = ChannelRunner::builder(
596            channel_id,
597            self.world_tx.clone(),
598            Arc::clone(&self.world_read),
599            signal_rx,
600            component,
601        )
602        .with_emitter(self.signal_tx.clone())
603        .with_shared_handles(self.eventbus.shared_handles())
604        .with_component_channel_map(self.eventbus.shared_component_channel_map())
605        .with_board(Arc::clone(&self.board))
606        .with_session_arc(session)
607        .with_checker(checker)
608        .with_grants(grants)
609        .with_request_channel()
610        .with_component_config(component_config);
611
612        // Route Output events to IO channel if specified
613        if let Some(tx) = output_tx {
614            builder = builder.with_output_channel(tx);
615        }
616
617        // Enable child spawning if loader provided
618        let has_child_spawner = lua_loader.is_some();
619        if has_child_spawner {
620            builder = builder.with_child_spawner(lua_loader);
621        }
622
623        // Enable component loader for spawn_runner_from_script
624        if let Some(loader) = component_loader {
625            builder = builder.with_component_loader(loader);
626        }
627
628        // Restore from initial snapshot if provided (session resume)
629        if let Some(snapshot) = initial_snapshot {
630            builder = builder.with_initial_snapshot(snapshot);
631        }
632
633        let (runner, handle) = self
634            .apply_mcp_manager(self.apply_hook_registry(builder))
635            .build();
636
637        let handle = self.finalize_runner(channel_id, &component_id, runner, handle);
638        info!(
639            "Spawned runner (full+auth) for channel {} (component={}, child_spawner={})",
640            channel_id,
641            component_id.fqn(),
642            has_child_spawner
643        );
644        handle
645    }
646
647    /// Returns the read-only World handle for parallel access.
648    #[must_use]
649    pub fn world_read(&self) -> &Arc<RwLock<World>> {
650        &self.world_read
651    }
652
653    /// Returns the world command sender.
654    #[must_use]
655    pub fn world_tx(&self) -> &WorldCommandSender {
656        &self.world_tx
657    }
658
659    /// Injects an event to a specific channel (targeted delivery).
660    ///
661    /// Unlike signal broadcast, this sends to a single channel only.
662    /// Used for `@component message` routing.
663    ///
664    /// # Errors
665    ///
666    /// Returns `EngineError::ChannelNotFound` if channel doesn't exist,
667    /// or `EngineError::SendFailed` if the buffer is full.
668    pub fn inject_event(
669        &self,
670        channel_id: ChannelId,
671        event: crate::channel::Event,
672    ) -> Result<(), super::EngineError> {
673        self.eventbus.try_inject(channel_id, event)
674    }
675
676    /// Send signal (from external, e.g., human input)
677    ///
678    /// Broadcasts to all ChannelRunners via signal_tx.
679    pub fn signal(&self, signal: Signal) {
680        info!("Signal dispatched: {:?}", signal.kind);
681        // Broadcast to ChannelRunners
682        let _ = self.signal_tx.send(signal);
683    }
684
685    /// Check if engine is running
686    #[must_use]
687    pub fn is_running(&self) -> bool {
688        self.running
689    }
690
691    /// Start the engine (set running flag) and spawn the runner monitor.
692    ///
693    /// Use this when you need to start the engine without entering the run loop,
694    /// for example in interactive mode where you control the polling yourself.
695    pub fn start(&mut self) {
696        self.running = true;
697        self.start_runner_monitor();
698        info!("OrcsEngine started");
699    }
700
701    /// Spawns the background runner monitor task.
702    ///
703    /// The monitor receives [`RunnerExitNotice`] messages from wrapper tasks
704    /// and logs unexpected runner terminations during normal operation.
705    fn start_runner_monitor(&mut self) {
706        if self.monitor_task.is_some() {
707            return;
708        }
709        let Some(mut exit_rx) = self.runner_exit_rx.take() else {
710            warn!("Runner monitor already started (exit_rx already taken)");
711            return;
712        };
713
714        let shared_handles = self.eventbus.shared_handles();
715        let shared_ccm = self.eventbus.shared_component_channel_map();
716        let task = tokio::spawn(async move {
717            while let Some(notice) = exit_rx.recv().await {
718                let reason_str = notice.exit_reason.as_str();
719                warn!(
720                    channel = %notice.channel_id,
721                    component = %notice.component_fqn,
722                    exit_reason = reason_str,
723                    "Runner exited"
724                );
725
726                // Broadcast Lifecycle event so Lua components can react.
727                let event = crate::channel::Event {
728                    category: orcs_event::EventCategory::Lifecycle,
729                    operation: "runner_exited".into(),
730                    source: orcs_types::ComponentId::builtin("engine"),
731                    payload: serde_json::json!({
732                        "channel_id": notice.channel_id.to_string(),
733                        "component_fqn": notice.component_fqn,
734                        "exit_reason": reason_str,
735                    }),
736                };
737                {
738                    let handles = shared_handles.read();
739                    let mut delivered = 0usize;
740                    for handle in handles.values() {
741                        if handle.try_inject(event.clone()).is_ok() {
742                            delivered += 1;
743                        }
744                    }
745                    info!(
746                        channel = %notice.channel_id,
747                        component = %notice.component_fqn,
748                        exit_reason = reason_str,
749                        delivered,
750                        "Lifecycle::runner_exited event broadcast"
751                    );
752                }
753
754                // Clean up dead handle and component mapping.
755                // The Lifecycle event is already injected into surviving channels'
756                // event queues, so removing the dead handle is safe.
757                {
758                    let mut handles = shared_handles.write();
759                    handles.remove(&notice.channel_id);
760                }
761                {
762                    let mut ccm = shared_ccm.write();
763                    ccm.retain(|_, cid| *cid != notice.channel_id);
764                }
765                debug!(
766                    channel = %notice.channel_id,
767                    component = %notice.component_fqn,
768                    "Dead channel handle removed"
769                );
770            }
771            debug!("Runner monitor stopped (all exit senders dropped)");
772        });
773        self.monitor_task = Some(task);
774        debug!("Runner monitor task spawned");
775    }
776
777    /// Stop the engine by sending a Veto signal.
778    ///
779    /// This triggers graceful shutdown via the signal broadcast channel.
780    /// Call [`shutdown()`](Self::shutdown) after this to await runner
781    /// completion and collect snapshots.
782    pub fn stop(&self) {
783        info!("Engine stop requested");
784        let _ = self.signal_tx.send(Signal::veto(Principal::System));
785    }
786
787    /// Awaits all runners, collects snapshots, and cleans up.
788    ///
789    /// Must be called after [`stop()`](Self::stop) to complete the
790    /// shutdown sequence. Snapshots are available via
791    /// [`collected_snapshots()`](Self::collected_snapshots) after this returns.
792    pub async fn shutdown(&mut self) {
793        self.shutdown_parallel().await;
794    }
795
796    /// Returns the shared Board handle.
797    ///
798    /// External code (e.g., Lua API registration) can use this to
799    /// query the Board for recent events.
800    #[must_use]
801    pub fn board(&self) -> &SharedBoard {
802        &self.board
803    }
804
805    /// Returns the IO channel ID.
806    ///
807    /// The IO channel is required and always available.
808    #[must_use]
809    pub fn io_channel(&self) -> ChannelId {
810        self.io_channel
811    }
812
813    /// Main run loop with parallel execution.
814    ///
815    /// This method:
816    /// 1. Waits for stop signal (running flag set to false)
817    /// 2. Shuts down all runners on exit
818    ///
819    /// # Note
820    ///
821    /// Caller must spawn runners via `spawn_runner()` before calling `run()`.
822    /// Each runner requires a bound Component (1:1 relationship).
823    ///
824    /// # Example
825    ///
826    /// ```ignore
827    /// let mut engine = OrcsEngine::new(world);
828    /// engine.spawn_runner(io_id, Box::new(MyComponent::new()));
829    /// engine.run().await;
830    /// ```
831    pub async fn run(&mut self) {
832        self.start();
833        info!("Entering parallel run loop");
834
835        let mut signal_rx = self.signal_tx.subscribe();
836
837        loop {
838            tokio::select! {
839                result = signal_rx.recv() => {
840                    match result {
841                        Ok(signal) if signal.is_veto() => {
842                            info!("Veto signal received, stopping engine");
843                            break;
844                        }
845                        Ok(_) => {
846                            // Other signals: continue running
847                        }
848                        Err(broadcast::error::RecvError::Lagged(n)) => {
849                            warn!("Signal receiver lagged by {} messages", n);
850                        }
851                        Err(broadcast::error::RecvError::Closed) => {
852                            warn!("Signal channel closed unexpectedly");
853                            break;
854                        }
855                    }
856                }
857            }
858        }
859
860        self.running = false;
861        self.shutdown_parallel().await;
862        info!("OrcsEngine stopped (parallel mode)");
863    }
864
865    /// Shutdown parallel execution infrastructure (graceful).
866    ///
867    /// # Shutdown Order
868    ///
869    /// 0. Abort runner monitor (suppress spurious WARN during normal shutdown).
870    /// 1. Await all runner tasks with timeout (Veto already broadcast).
871    ///    Runners execute their shutdown sequence (snapshot → shutdown)
872    ///    and return [`RunnerResult`] with captured snapshots.
873    /// 2. Collect snapshots from completed runners into `collected_snapshots`.
874    /// 3. Shutdown WorldManager (runners may need `world_tx` during shutdown,
875    ///    so WorldManager must stay alive until runners complete).
876    /// 4. Unregister channel handles and clean up.
877    async fn shutdown_parallel(&mut self) {
878        // 0. Abort runner monitor before awaiting runners to suppress
879        //    spurious "exited unexpectedly" warnings during normal shutdown.
880        if let Some(task) = self.monitor_task.take() {
881            task.abort();
882            debug!("Runner monitor aborted for shutdown");
883        }
884        // 1. Await all runner tasks in parallel with per-runner timeout.
885        //    Each runner is wrapped in a tokio task that applies the timeout
886        //    and aborts on expiry. All wrappers run concurrently, so total
887        //    wall time is bounded by RUNNER_SHUTDOWN_TIMEOUT (not N × timeout).
888        let tasks: Vec<_> = self.runner_tasks.drain().collect();
889        let mut wrapper_handles = Vec::with_capacity(tasks.len());
890
891        for (id, mut task) in tasks {
892            wrapper_handles.push(tokio::spawn(async move {
893                tokio::select! {
894                    result = &mut task => {
895                        match result {
896                            Ok(runner_result) => Some((id, runner_result)),
897                            Err(e) => {
898                                warn!(channel = %id, error = %e, "runner task panicked");
899                                None
900                            }
901                        }
902                    }
903                    _ = tokio::time::sleep(RUNNER_SHUTDOWN_TIMEOUT) => {
904                        warn!(channel = %id, "runner task timed out, aborting");
905                        task.abort();
906                        None
907                    }
908                }
909            }));
910        }
911
912        // 2. Collect snapshots from completed runners.
913        for handle in wrapper_handles {
914            if let Ok(Some((id, result))) = handle.await {
915                debug!(
916                    channel = %id,
917                    component = %result.component_fqn,
918                    has_snapshot = result.snapshot.is_some(),
919                    "runner completed gracefully"
920                );
921                if let Some(snapshot) = result.snapshot {
922                    self.collected_snapshots
923                        .insert(result.component_fqn.into_owned(), snapshot);
924                }
925            }
926        }
927
928        info!(
929            "collected {} snapshots from runners",
930            self.collected_snapshots.len()
931        );
932
933        // 3. Shutdown WorldManager (after runners complete)
934        let _ = self.world_tx.send(WorldCommand::Shutdown).await;
935        if let Some(task) = self.manager_task.take() {
936            let _ = task.await;
937        }
938
939        // 4. Unregister channel handles
940        for id in self.channel_handles.keys() {
941            self.eventbus.unregister_channel(id);
942        }
943        self.channel_handles.clear();
944    }
945
946    // === Session Persistence ===
947
948    /// Returns a reference to snapshots collected during graceful shutdown.
949    ///
950    /// Populated by `shutdown_parallel()` when runners complete their
951    /// shutdown sequence. Keyed by component FQN.
952    ///
953    /// # Usage
954    ///
955    /// Call this after `run()` completes to retrieve snapshots for session
956    /// persistence.
957    #[must_use]
958    pub fn collected_snapshots(&self) -> &HashMap<String, ComponentSnapshot> {
959        &self.collected_snapshots
960    }
961}
962
963#[cfg(test)]
964mod tests {
965    use super::*;
966    use crate::channel::{ChannelCore, WorldCommand};
967    use crate::Principal;
968    use orcs_component::ComponentError;
969    use orcs_event::{Request, SignalResponse};
970    use orcs_types::ComponentId;
971    use serde_json::Value;
972
973    /// Create a World with IO channel for testing.
974    fn test_world() -> (World, ChannelId) {
975        let mut world = World::new();
976        let io = world.create_channel(ChannelConfig::interactive());
977        (world, io)
978    }
979
980    struct EchoComponent {
981        id: ComponentId,
982        aborted: bool,
983    }
984
985    impl EchoComponent {
986        fn new() -> Self {
987            Self {
988                id: ComponentId::builtin("echo"),
989                aborted: false,
990            }
991        }
992    }
993
994    impl Component for EchoComponent {
995        fn id(&self) -> &ComponentId {
996            &self.id
997        }
998
999        fn status(&self) -> orcs_component::Status {
1000            orcs_component::Status::Idle
1001        }
1002
1003        fn on_request(&mut self, request: &Request) -> Result<Value, ComponentError> {
1004            Ok(request.payload.clone())
1005        }
1006
1007        fn on_signal(&mut self, signal: &Signal) -> SignalResponse {
1008            if signal.is_veto() {
1009                SignalResponse::Abort
1010            } else {
1011                SignalResponse::Handled
1012            }
1013        }
1014
1015        fn abort(&mut self) {
1016            self.aborted = true;
1017        }
1018
1019        fn status_detail(&self) -> Option<orcs_component::StatusDetail> {
1020            None
1021        }
1022
1023        fn init(&mut self, _config: &serde_json::Value) -> Result<(), ComponentError> {
1024            Ok(())
1025        }
1026
1027        fn shutdown(&mut self) {
1028            // Default: no-op
1029        }
1030    }
1031
1032    #[tokio::test]
1033    async fn engine_creation() {
1034        let (world, io) = test_world();
1035        let engine = OrcsEngine::new(world, io);
1036        assert!(!engine.is_running());
1037        assert_eq!(engine.io_channel(), io);
1038
1039        // Cleanup
1040        let _ = engine.world_tx().send(WorldCommand::Shutdown).await;
1041    }
1042
1043    #[tokio::test]
1044    async fn engine_world_access() {
1045        let (world, io) = test_world();
1046        let engine = OrcsEngine::new(world, io);
1047
1048        // WorldManager starts immediately in new()
1049        let w = engine.world_read().read().await;
1050        assert!(w.get(&io).is_some());
1051        drop(w);
1052
1053        // Cleanup
1054        let _ = engine.world_tx().send(WorldCommand::Shutdown).await;
1055    }
1056
1057    #[tokio::test]
1058    async fn stop_engine_sends_veto_signal() {
1059        let (world, io) = test_world();
1060        let engine = OrcsEngine::new(world, io);
1061
1062        // Subscribe before stop
1063        let mut rx = engine.signal_tx.subscribe();
1064
1065        // stop() should send Veto signal
1066        engine.stop();
1067
1068        // Verify Veto was sent
1069        let received = rx.recv().await.expect("receive signal");
1070        assert!(received.is_veto());
1071
1072        // Cleanup
1073        let _ = engine.world_tx().send(WorldCommand::Shutdown).await;
1074    }
1075
1076    #[tokio::test]
1077    async fn world_access_parallel() {
1078        let (world, io) = test_world();
1079        let engine = OrcsEngine::new(world, io);
1080
1081        // Complete via command
1082        let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
1083        engine
1084            .world_tx()
1085            .send(WorldCommand::Complete {
1086                id: io,
1087                reply: reply_tx,
1088            })
1089            .await
1090            .expect("send complete command");
1091        assert!(reply_rx.await.expect("receive complete reply"));
1092
1093        // Verify state
1094        let w = engine.world_read().read().await;
1095        assert!(!w.get(&io).expect("get channel").is_running());
1096        drop(w);
1097
1098        // Cleanup
1099        let _ = engine.world_tx().send(WorldCommand::Shutdown).await;
1100    }
1101
1102    #[tokio::test]
1103    async fn spawn_runner_creates_task() {
1104        let (world, io) = test_world();
1105        let mut engine = OrcsEngine::new(world, io);
1106
1107        let echo = Box::new(EchoComponent::new());
1108        let _handle = engine.spawn_runner(io, echo);
1109
1110        // Runner task should be stored
1111        assert_eq!(engine.runner_tasks.len(), 1);
1112        assert!(engine.runner_tasks.contains_key(&io));
1113
1114        // Cleanup
1115        let _ = engine.world_tx().send(WorldCommand::Shutdown).await;
1116    }
1117
1118    #[tokio::test]
1119    async fn signal_broadcast_works() {
1120        let (world, io) = test_world();
1121        let engine = OrcsEngine::new(world, io);
1122
1123        // Subscribe before signal
1124        let mut rx = engine.signal_tx.subscribe();
1125
1126        let principal = Principal::System;
1127        let cancel = Signal::cancel(io, principal);
1128        engine.signal(cancel.clone());
1129
1130        // Should receive the signal
1131        let received = rx.recv().await.expect("receive signal");
1132        assert!(matches!(received.kind, orcs_event::SignalKind::Cancel));
1133
1134        // Cleanup
1135        let _ = engine.world_tx().send(WorldCommand::Shutdown).await;
1136    }
1137
1138    // === Snapshot Tests ===
1139
1140    /// A component that supports snapshots for testing.
1141    struct SnapshottableComponent {
1142        id: ComponentId,
1143        counter: u64,
1144    }
1145
1146    impl SnapshottableComponent {
1147        fn new(name: &str, initial_value: u64) -> Self {
1148            Self {
1149                id: ComponentId::builtin(name),
1150                counter: initial_value,
1151            }
1152        }
1153    }
1154
1155    impl Component for SnapshottableComponent {
1156        fn id(&self) -> &ComponentId {
1157            &self.id
1158        }
1159
1160        fn status(&self) -> orcs_component::Status {
1161            orcs_component::Status::Idle
1162        }
1163
1164        fn on_request(&mut self, _request: &Request) -> Result<Value, ComponentError> {
1165            self.counter += 1;
1166            Ok(Value::Number(self.counter.into()))
1167        }
1168
1169        fn on_signal(&mut self, signal: &Signal) -> SignalResponse {
1170            if signal.is_veto() {
1171                SignalResponse::Abort
1172            } else {
1173                SignalResponse::Handled
1174            }
1175        }
1176
1177        fn abort(&mut self) {}
1178
1179        fn snapshot(
1180            &self,
1181        ) -> Result<orcs_component::ComponentSnapshot, orcs_component::SnapshotError> {
1182            orcs_component::ComponentSnapshot::from_state(self.id.fqn(), &self.counter)
1183        }
1184
1185        fn restore(
1186            &mut self,
1187            snapshot: &orcs_component::ComponentSnapshot,
1188        ) -> Result<(), orcs_component::SnapshotError> {
1189            self.counter = snapshot.to_state()?;
1190            Ok(())
1191        }
1192    }
1193
1194    #[tokio::test]
1195    async fn graceful_shutdown_collects_snapshots() {
1196        let (world, io) = test_world();
1197        let mut engine = OrcsEngine::new(world, io);
1198
1199        // Spawn runner with snapshottable component
1200        let comp = Box::new(SnapshottableComponent::new("snap", 42));
1201        let _handle = engine.spawn_runner(io, comp);
1202
1203        // Run engine briefly then stop via Veto
1204        let engine_signal_tx = engine.signal_tx.clone();
1205        tokio::spawn(async move {
1206            tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1207            let _ = engine_signal_tx.send(Signal::veto(Principal::System));
1208        });
1209
1210        engine.run().await;
1211
1212        // Snapshots should have been collected during graceful shutdown
1213        let snapshots = engine.collected_snapshots();
1214        assert_eq!(snapshots.len(), 1);
1215        assert!(snapshots.contains_key("builtin::snap"));
1216    }
1217
1218    #[tokio::test]
1219    async fn snapshots_persist_via_store_after_graceful_shutdown() {
1220        use crate::session::{LocalFileStore, SessionAsset, SessionStore};
1221        use crate::WorkDir;
1222
1223        let wd = WorkDir::temporary().expect("should create temp WorkDir for store test");
1224        let store = LocalFileStore::new(wd.path().to_path_buf()).expect("create store");
1225
1226        let (world, io) = test_world();
1227        let mut engine = OrcsEngine::new(world, io);
1228
1229        // Spawn runner with snapshottable component
1230        let comp = Box::new(SnapshottableComponent::new("snap", 42));
1231        let _handle = engine.spawn_runner(io, comp);
1232
1233        // Run engine briefly then stop via Veto
1234        let engine_signal_tx = engine.signal_tx.clone();
1235        tokio::spawn(async move {
1236            tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1237            let _ = engine_signal_tx.send(Signal::veto(Principal::System));
1238        });
1239
1240        engine.run().await;
1241
1242        // App-layer responsibility: transfer snapshots → asset → store
1243        let mut asset = SessionAsset::new();
1244        let session_id = asset.id.clone();
1245        for (fqn, snapshot) in engine.collected_snapshots() {
1246            asset
1247                .component_snapshots
1248                .insert(fqn.clone(), snapshot.clone());
1249        }
1250        asset.touch();
1251        store.save(&asset).await.expect("save session");
1252
1253        // Verify snapshot was saved
1254        assert!(asset.get_snapshot("builtin::snap").is_some());
1255
1256        // Load session directly from store (no engine involvement)
1257        let loaded = store.load(&session_id).await.expect("load session");
1258        assert_eq!(loaded.id, session_id);
1259        assert!(loaded.get_snapshot("builtin::snap").is_some());
1260    }
1261
1262    // === spawn_channel_with_auth Tests ===
1263
1264    mod spawn_channel_with_auth_tests {
1265        use super::*;
1266        use crate::auth::{DefaultPolicy, Session};
1267        use orcs_types::PrincipalId;
1268        use std::time::Duration;
1269
1270        fn standard_session() -> Arc<Session> {
1271            Arc::new(Session::new(Principal::User(PrincipalId::new())))
1272        }
1273
1274        fn elevated_session() -> Arc<Session> {
1275            Arc::new(
1276                Session::new(Principal::User(PrincipalId::new())).elevate(Duration::from_secs(60)),
1277            )
1278        }
1279
1280        fn default_checker() -> Arc<dyn crate::auth::PermissionChecker> {
1281            Arc::new(DefaultPolicy)
1282        }
1283
1284        #[tokio::test]
1285        async fn spawn_channel_with_auth_denied_for_standard_session() {
1286            let (world, io) = test_world();
1287            let mut engine = OrcsEngine::new(world, io);
1288
1289            let session = standard_session();
1290            let checker = default_checker();
1291            let config = ChannelConfig::new(100, true);
1292            let component = Box::new(EchoComponent::new());
1293
1294            // Standard session should be denied
1295            let result = engine
1296                .spawn_channel_with_auth(io, config, component, session, checker)
1297                .await;
1298
1299            assert!(result.is_none(), "standard session should be denied");
1300
1301            // Cleanup
1302            let _ = engine.world_tx().send(WorldCommand::Shutdown).await;
1303        }
1304
1305        #[tokio::test]
1306        async fn spawn_channel_with_auth_allowed_for_elevated_session() {
1307            let (world, io) = test_world();
1308            let mut engine = OrcsEngine::new(world, io);
1309
1310            let session = elevated_session();
1311            let checker = default_checker();
1312            let config = ChannelConfig::new(100, true);
1313            let component = Box::new(EchoComponent::new());
1314
1315            // Elevated session should be allowed
1316            let result = engine
1317                .spawn_channel_with_auth(io, config, component, session, checker)
1318                .await;
1319
1320            assert!(result.is_some(), "elevated session should be allowed");
1321
1322            let child_id = result.expect("elevated session should produce a child channel id");
1323            // Verify child was created in world
1324            let w = engine.world_read().read().await;
1325            assert!(w.get(&child_id).is_some());
1326            drop(w);
1327
1328            // Cleanup
1329            let _ = engine.world_tx().send(WorldCommand::Shutdown).await;
1330        }
1331
1332        #[tokio::test]
1333        async fn spawn_runner_with_auth_creates_runner() {
1334            let (world, io) = test_world();
1335            let mut engine = OrcsEngine::new(world, io);
1336
1337            let session = elevated_session();
1338            let checker = default_checker();
1339            let component = Box::new(EchoComponent::new());
1340
1341            let handle = engine.spawn_runner_with_auth(io, component, session, checker);
1342
1343            assert_eq!(handle.id, io);
1344            assert!(engine.runner_tasks.contains_key(&io));
1345
1346            // Cleanup
1347            let _ = engine.world_tx().send(WorldCommand::Shutdown).await;
1348        }
1349    }
1350}