Skip to main content

orcs_runtime/engine/
engine.rs

1//! OrcsEngine - Main Runtime.
2//!
3//! The [`OrcsEngine`] is the central runtime that:
4//!
5//! - Spawns and manages ChannelRunners (parallel execution)
6//! - Dispatches Signals to all Runners via broadcast
7//! - Coordinates World (Channel hierarchy) management
8//! - Collects component snapshots during graceful shutdown
9//!
10//! # Architecture
11//!
12//! ```text
13//! OrcsEngine
14//!     │
15//!     ├── signal_tx ────► broadcast to all Runners
16//!     │
17//!     ├── world_tx ─────► WorldManager (channel operations)
18//!     │
19//!     ├── channel_handles ──► Event injection to Runners
20//!     │
21//!     └── collected_snapshots ─► Snapshots from graceful shutdown
22//! ```
23//!
24//! # Signal Handling
25//!
26//! Signals are interrupts from Human (or authorized Components):
27//!
28//! - **Veto**: Immediate engine stop (checked first)
29//! - **Cancel**: Stop specific Channel/Component
30//! - **Pause/Resume**: Suspend/continue execution
31//! - **Approve/Reject**: HIL responses
32//!
33//! Signals are broadcast to all Runners, which deliver them to their
34//! bound Components.
35
36use super::eventbus::EventBus;
37use crate::board::{self, SharedBoard};
38use crate::channel::{
39    ChannelConfig, ChannelHandle, ChannelRunner, ClientRunner, ClientRunnerConfig, LuaChildLoader,
40    OutputSender, RunnerResult, World, WorldCommand, WorldCommandSender, WorldManager,
41};
42use crate::io::IOPort;
43use crate::Principal;
44use orcs_component::{Component, ComponentLoader, ComponentSnapshot};
45use orcs_event::Signal;
46use orcs_hook::SharedHookRegistry;
47use orcs_types::{ChannelId, ComponentId};
48use std::collections::HashMap;
49use std::sync::Arc;
50use std::time::Duration;
51use tokio::sync::{broadcast, RwLock};
52use tracing::{debug, info, warn};
53
54/// Signal broadcast channel buffer size.
55///
56/// 256 signals provides sufficient buffering for HIL interactions.
57/// Lagged receivers will miss old signals but continue receiving new ones.
58const SIGNAL_BUFFER_SIZE: usize = 256;
59
60/// Timeout for waiting on each runner task during graceful shutdown.
61const RUNNER_SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(5);
62
63/// OrcsEngine - Main runtime for ORCS CLI.
64///
65/// Coordinates parallel execution of ChannelRunners and manages
66/// the World (channel hierarchy).
67///
68/// # Parallel Execution Model
69///
70/// Engine operates in parallel mode with:
71/// - [`WorldManager`](crate::channel::WorldManager) for concurrent World access
72/// - [`ChannelRunner`](crate::channel::ChannelRunner) per channel
73/// - Event injection via channel handles
74/// - Snapshot collection via graceful runner shutdown
75///
76/// # Example
77///
78/// ```ignore
79/// use orcs_runtime::{World, ChannelConfig};
80///
81/// // Create World with IO channel
82/// let mut world = World::new();
83/// let io = world.create_channel(ChannelConfig::interactive());
84///
85/// // Inject into engine with IO channel (required)
86/// let engine = OrcsEngine::new(world, io);
87///
88/// // Spawn channel runners with bound components
89/// engine.spawn_runner(io, Box::new(MyComponent::new()));
90///
91/// // Run the engine (parallel execution)
92/// engine.run().await;
93/// ```
94///
95/// # Signal Priority
96///
97/// Signals are broadcast to all Runners. A Veto signal
98/// immediately stops the engine without processing further.
99pub struct OrcsEngine {
100    /// EventBus for channel event injection
101    eventbus: EventBus,
102    /// Running state
103    running: bool,
104    // --- Parallel execution infrastructure ---
105    /// World command sender for async modifications
106    world_tx: WorldCommandSender,
107    /// Read-only World access for parallel reads
108    world_read: Arc<RwLock<World>>,
109    /// Signal broadcaster for all channel runners
110    signal_tx: broadcast::Sender<Signal>,
111    /// Channel runner handles (for event injection)
112    channel_handles: HashMap<ChannelId, ChannelHandle>,
113    /// WorldManager task handle
114    manager_task: Option<tokio::task::JoinHandle<()>>,
115    /// Channel runner task handles.
116    ///
117    /// Each runner returns a [`RunnerResult`] containing the component's
118    /// shutdown snapshot. The Engine awaits these on shutdown instead of
119    /// aborting them.
120    runner_tasks: HashMap<ChannelId, tokio::task::JoinHandle<RunnerResult>>,
121    /// Snapshots collected from runners during graceful shutdown.
122    ///
123    /// Populated by [`shutdown_parallel()`] when runners complete.
124    /// Keyed by component FQN (fully qualified name).
125    collected_snapshots: HashMap<String, ComponentSnapshot>,
126    // --- IO Channel (required) ---
127    /// IO channel for Human input/output.
128    ///
129    /// This is a required field - every engine must have an IO channel
130    /// for Human interaction.
131    io_channel: ChannelId,
132    /// Shared Board for cross-component event visibility.
133    board: SharedBoard,
134    /// Shared hook registry (injected into all spawned runners).
135    hook_registry: Option<SharedHookRegistry>,
136}
137
138impl OrcsEngine {
139    /// Create new engine with injected World and IO channel.
140    ///
141    /// The World is immediately transferred to a [`WorldManager`] which
142    /// starts processing commands. The IO channel is required for Human
143    /// input/output.
144    ///
145    /// # Arguments
146    ///
147    /// * `world` - The World containing the IO channel
148    /// * `io_channel` - The IO channel ID (must exist in World)
149    ///
150    /// # Example
151    ///
152    /// ```ignore
153    /// let mut world = World::new();
154    /// let io = world.create_channel(ChannelConfig::interactive());
155    ///
156    /// let engine = OrcsEngine::new(world, io);
157    /// ```
158    #[must_use]
159    pub fn new(world: World, io_channel: ChannelId) -> Self {
160        // Create WorldManager immediately (takes ownership of World)
161        let (manager, world_tx) = WorldManager::with_world(world);
162        let world_read = manager.world();
163
164        // Create signal broadcaster
165        let (signal_tx, _) = broadcast::channel(SIGNAL_BUFFER_SIZE);
166
167        // Start WorldManager task
168        let manager_task = tokio::spawn(manager.run());
169
170        info!(
171            "OrcsEngine created with IO channel {} (WorldManager started)",
172            io_channel
173        );
174
175        Self {
176            eventbus: EventBus::new(),
177            running: false,
178            world_tx,
179            world_read,
180            signal_tx,
181            channel_handles: HashMap::new(),
182            manager_task: Some(manager_task),
183            runner_tasks: HashMap::new(),
184            collected_snapshots: HashMap::new(),
185            io_channel,
186            board: board::shared_board(),
187            hook_registry: None,
188        }
189    }
190
191    /// Sets the shared hook registry for all spawned runners.
192    ///
193    /// Must be called before spawning runners to ensure they receive
194    /// the registry.
195    pub fn set_hook_registry(&mut self, registry: SharedHookRegistry) {
196        self.eventbus.set_hook_registry(Arc::clone(&registry));
197        self.hook_registry = Some(registry);
198    }
199
200    /// Returns a reference to the hook registry, if configured.
201    #[must_use]
202    pub fn hook_registry(&self) -> Option<&SharedHookRegistry> {
203        self.hook_registry.as_ref()
204    }
205
206    /// Applies the hook registry to a ChannelRunnerBuilder if configured.
207    fn apply_hook_registry(
208        &self,
209        builder: crate::channel::ChannelRunnerBuilder,
210    ) -> crate::channel::ChannelRunnerBuilder {
211        match &self.hook_registry {
212            Some(reg) => builder.with_hook_registry(Arc::clone(reg)),
213            None => builder,
214        }
215    }
216
217    /// Registers runner with EventBus, stores handle, and spawns the tokio task.
218    ///
219    /// Common finalization extracted from `spawn_runner*` methods.
220    fn finalize_runner(
221        &mut self,
222        channel_id: ChannelId,
223        component_id: &ComponentId,
224        runner: ChannelRunner,
225        handle: ChannelHandle,
226    ) -> ChannelHandle {
227        self.eventbus.register_channel(handle.clone());
228        self.eventbus
229            .register_component_channel(component_id, channel_id);
230        self.channel_handles.insert(channel_id, handle.clone());
231        let runner_task = tokio::spawn(runner.run());
232        self.runner_tasks.insert(channel_id, runner_task);
233        handle
234    }
235
236    /// Spawn a channel runner for a channel with a bound Component.
237    ///
238    /// Returns the channel handle for event injection.
239    ///
240    /// # Arguments
241    ///
242    /// * `channel_id` - The channel to run
243    /// * `component` - The Component to bind (1:1 relationship)
244    pub fn spawn_runner(
245        &mut self,
246        channel_id: ChannelId,
247        component: Box<dyn Component>,
248    ) -> ChannelHandle {
249        let component_id = component.id().clone();
250        let signal_rx = self.signal_tx.subscribe();
251        let builder = ChannelRunner::builder(
252            channel_id,
253            self.world_tx.clone(),
254            Arc::clone(&self.world_read),
255            signal_rx,
256            component,
257        )
258        .with_request_channel();
259        let (runner, handle) = self.apply_hook_registry(builder).build();
260
261        let handle = self.finalize_runner(channel_id, &component_id, runner, handle);
262        info!("Spawned runner for channel {}", channel_id);
263        handle
264    }
265
266    /// Spawn a channel runner with an EventEmitter injected into the Component.
267    ///
268    /// This enables IO-less (event-only) execution for Components that support
269    /// the Emitter trait. The Component can emit output via `emit_output()`.
270    ///
271    /// Use this for ChannelRunner-based execution instead of ClientRunner.
272    /// The Component receives events via `on_request()` and emits output via Emitter.
273    ///
274    /// # Arguments
275    ///
276    /// * `channel_id` - The channel to run
277    /// * `component` - The Component to bind (must implement `set_emitter`)
278    /// * `output_tx` - Optional sender for routing Output events to IO channel
279    ///
280    /// # Example
281    ///
282    /// ```ignore
283    /// // Lua scripts can use orcs.output() after emitter is set
284    /// let lua_component = LuaComponent::from_script("...")?;
285    ///
286    /// // Without output routing (Output stays in this channel)
287    /// let handle = engine.spawn_runner_with_emitter(channel_id, component, None);
288    ///
289    /// // With output routing to IO channel
290    /// let (_, io_event_tx) = engine.spawn_client_runner(io_channel, io_port, principal);
291    /// let handle = engine.spawn_runner_with_emitter(channel_id, component, Some(io_event_tx));
292    /// ```
293    pub fn spawn_runner_with_emitter(
294        &mut self,
295        channel_id: ChannelId,
296        component: Box<dyn Component>,
297        output_tx: Option<OutputSender>,
298    ) -> ChannelHandle {
299        let signal_rx = self.signal_tx.subscribe();
300        let component_id = component.id().clone();
301
302        // Use builder with emitter to ensure event_tx/event_rx consistency
303        let mut builder = ChannelRunner::builder(
304            channel_id,
305            self.world_tx.clone(),
306            Arc::clone(&self.world_read),
307            signal_rx,
308            component,
309        )
310        .with_emitter(self.signal_tx.clone())
311        .with_shared_handles(self.eventbus.shared_handles())
312        .with_component_channel_map(self.eventbus.shared_component_channel_map())
313        .with_board(Arc::clone(&self.board))
314        .with_request_channel();
315
316        // Route Output events to IO channel if specified
317        if let Some(tx) = output_tx {
318            builder = builder.with_output_channel(tx);
319        }
320
321        let (runner, handle) = self.apply_hook_registry(builder).build();
322
323        let handle = self.finalize_runner(channel_id, &component_id, runner, handle);
324        info!(
325            "Spawned runner with emitter for channel {} (component={})",
326            channel_id,
327            component_id.fqn()
328        );
329        handle
330    }
331
332    /// Spawn a ChannelRunner with all options (emitter, output routing, child spawner).
333    ///
334    /// This is the most flexible spawn method that supports:
335    /// - Event emission (signal broadcasting)
336    /// - Output routing to IO channel
337    /// - Child spawning via LuaChildLoader
338    /// - Component loading for spawn_runner_from_script
339    ///
340    /// # Arguments
341    ///
342    /// * `channel_id` - The channel to run
343    /// * `component` - The Component to bind
344    /// * `output_tx` - Optional channel for Output event routing
345    /// * `lua_loader` - Optional loader for spawning Lua children
346    /// * `component_loader` - Optional loader for spawning Components as runners
347    ///
348    /// # Returns
349    ///
350    /// A handle for injecting Events into the Channel.
351    pub fn spawn_runner_full(
352        &mut self,
353        channel_id: ChannelId,
354        component: Box<dyn Component>,
355        output_tx: Option<OutputSender>,
356        lua_loader: Option<Arc<dyn LuaChildLoader>>,
357        component_loader: Option<Arc<dyn ComponentLoader>>,
358    ) -> ChannelHandle {
359        let signal_rx = self.signal_tx.subscribe();
360        let component_id = component.id().clone();
361
362        // Use builder with all options
363        let mut builder = ChannelRunner::builder(
364            channel_id,
365            self.world_tx.clone(),
366            Arc::clone(&self.world_read),
367            signal_rx,
368            component,
369        )
370        .with_emitter(self.signal_tx.clone())
371        .with_shared_handles(self.eventbus.shared_handles())
372        .with_component_channel_map(self.eventbus.shared_component_channel_map())
373        .with_board(Arc::clone(&self.board))
374        .with_request_channel();
375
376        // Route Output events to IO channel if specified
377        if let Some(tx) = output_tx {
378            builder = builder.with_output_channel(tx);
379        }
380
381        // Enable child spawning if loader provided
382        let has_child_spawner = lua_loader.is_some();
383        if has_child_spawner {
384            builder = builder.with_child_spawner(lua_loader);
385        }
386
387        // Enable component loader for spawn_runner_from_script
388        if let Some(loader) = component_loader {
389            builder = builder.with_component_loader(loader);
390        }
391
392        let (runner, handle) = self.apply_hook_registry(builder).build();
393
394        let handle = self.finalize_runner(channel_id, &component_id, runner, handle);
395        info!(
396            "Spawned runner (full) for channel {} (component={}, child_spawner={})",
397            channel_id,
398            component_id.fqn(),
399            has_child_spawner
400        );
401        handle
402    }
403
404    /// Spawn a ClientRunner for an IO channel (no component).
405    ///
406    /// ClientRunner is dedicated to Human I/O bridging. It broadcasts UserInput
407    /// events to all channels and displays Output events from other channels.
408    ///
409    /// # Arguments
410    ///
411    /// * `channel_id` - The channel to run
412    /// * `io_port` - IO port for View communication
413    /// * `principal` - Principal for signal creation from IO input
414    ///
415    /// # Returns
416    ///
417    /// A tuple of (ChannelHandle, event_tx) where event_tx can be used by
418    /// other runners to send Output events to this runner for display.
419    pub fn spawn_client_runner(
420        &mut self,
421        channel_id: ChannelId,
422        io_port: IOPort,
423        principal: orcs_types::Principal,
424    ) -> (ChannelHandle, OutputSender) {
425        let config = ClientRunnerConfig {
426            world_tx: self.world_tx.clone(),
427            world: Arc::clone(&self.world_read),
428            signal_rx: self.signal_tx.subscribe(),
429            channel_handles: self.eventbus.shared_handles(),
430        };
431        let (runner, handle) = ClientRunner::new(channel_id, config, io_port, principal);
432
433        // Get event_tx for other runners to send Output events (wrapped as OutputSender)
434        let output_sender = OutputSender::new(runner.event_tx().clone());
435
436        // Register handle with EventBus for event injection
437        self.eventbus.register_channel(handle.clone());
438
439        // Store handle
440        self.channel_handles.insert(channel_id, handle.clone());
441
442        // Spawn runner task
443        let runner_task = tokio::spawn(runner.run());
444        self.runner_tasks.insert(channel_id, runner_task);
445
446        info!("Spawned ClientRunner for channel {}", channel_id);
447        (handle, output_sender)
448    }
449
450    /// Spawn a child channel with configuration and bound Component.
451    ///
452    /// Creates a new channel in World and spawns its runner.
453    ///
454    /// # Arguments
455    ///
456    /// * `parent` - Parent channel ID
457    /// * `config` - Channel configuration
458    /// * `component` - Component to bind to the new channel (1:1)
459    pub async fn spawn_channel(
460        &mut self,
461        parent: ChannelId,
462        config: ChannelConfig,
463        component: Box<dyn Component>,
464    ) -> Option<ChannelId> {
465        // Send spawn command
466        let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
467        let cmd = WorldCommand::Spawn {
468            parent,
469            config,
470            reply: reply_tx,
471        };
472
473        if self.world_tx.send(cmd).await.is_err() {
474            return None;
475        }
476
477        // Wait for reply
478        let child_id = reply_rx.await.ok()??;
479
480        // Spawn runner for new channel with bound component
481        self.spawn_runner(child_id, component);
482
483        Some(child_id)
484    }
485
486    /// Spawn a child channel with authentication/authorization.
487    ///
488    /// Creates a new channel in World and spawns its runner with
489    /// Session and PermissionChecker configured.
490    ///
491    /// # Authorization Check
492    ///
493    /// Before spawning, checks if the session is allowed to spawn children
494    /// using the provided checker. Returns `None` if unauthorized.
495    ///
496    /// # Arguments
497    ///
498    /// * `parent` - Parent channel ID
499    /// * `config` - Channel configuration
500    /// * `component` - Component to bind to the new channel (1:1)
501    /// * `session` - Session for permission checking (Arc for shared grants)
502    /// * `checker` - Permission checker policy
503    ///
504    /// # Returns
505    ///
506    /// - `Some(ChannelId)` if spawn succeeded
507    /// - `None` if parent doesn't exist or permission denied
508    pub async fn spawn_channel_with_auth(
509        &mut self,
510        parent: ChannelId,
511        config: ChannelConfig,
512        component: Box<dyn Component>,
513        session: Arc<crate::Session>,
514        checker: Arc<dyn crate::auth::PermissionChecker>,
515    ) -> Option<ChannelId> {
516        // Permission check: can this session spawn children?
517        if !checker.can_spawn_child(&session) {
518            warn!(
519                principal = ?session.principal(),
520                "spawn_channel denied: permission denied"
521            );
522            return None;
523        }
524
525        // Send spawn command
526        let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
527        let cmd = WorldCommand::Spawn {
528            parent,
529            config,
530            reply: reply_tx,
531        };
532
533        if self.world_tx.send(cmd).await.is_err() {
534            return None;
535        }
536
537        // Wait for reply
538        let child_id = reply_rx.await.ok()??;
539
540        // Spawn runner with auth configured
541        self.spawn_runner_with_auth(child_id, component, session, checker);
542
543        Some(child_id)
544    }
545
546    /// Spawn a runner with Session and PermissionChecker.
547    ///
548    /// The Session and Checker are passed to the ChildContext for
549    /// command permission checking.
550    ///
551    /// # Arguments
552    ///
553    /// * `channel_id` - The channel to run
554    /// * `component` - The Component to bind
555    /// * `session` - Session for permission checking
556    /// * `checker` - Permission checker policy
557    pub fn spawn_runner_with_auth(
558        &mut self,
559        channel_id: ChannelId,
560        component: Box<dyn Component>,
561        session: Arc<crate::Session>,
562        checker: Arc<dyn crate::auth::PermissionChecker>,
563    ) -> ChannelHandle {
564        let signal_rx = self.signal_tx.subscribe();
565        let component_id = component.id().clone();
566
567        // Build runner with auth
568        let builder = ChannelRunner::builder(
569            channel_id,
570            self.world_tx.clone(),
571            Arc::clone(&self.world_read),
572            signal_rx,
573            component,
574        )
575        .with_emitter(self.signal_tx.clone())
576        .with_shared_handles(self.eventbus.shared_handles())
577        .with_component_channel_map(self.eventbus.shared_component_channel_map())
578        .with_board(Arc::clone(&self.board))
579        .with_session_arc(session)
580        .with_checker(checker)
581        .with_request_channel();
582        let (runner, handle) = self.apply_hook_registry(builder).build();
583
584        let handle = self.finalize_runner(channel_id, &component_id, runner, handle);
585        info!(
586            "Spawned runner (with auth) for channel {} (component={})",
587            channel_id,
588            component_id.fqn(),
589        );
590        handle
591    }
592
593    /// Spawn a ChannelRunner with all options including auth.
594    ///
595    /// This is the most complete spawn method that supports:
596    /// - Event emission (signal broadcasting)
597    /// - Output routing to IO channel
598    /// - Child spawning via LuaChildLoader
599    /// - Component loading for spawn_runner_from_script
600    /// - Session-based permission checking
601    ///
602    /// # Arguments
603    ///
604    /// * `channel_id` - The channel to run
605    /// * `component` - The Component to bind
606    /// * `output_tx` - Optional channel for Output event routing
607    /// * `lua_loader` - Optional loader for spawning Lua children
608    /// * `component_loader` - Optional loader for spawning Components as runners
609    /// * `session` - Session for permission checking
610    /// * `checker` - Permission checker policy
611    ///
612    /// # Returns
613    ///
614    /// A handle for injecting Events into the Channel.
615    #[allow(clippy::too_many_arguments)]
616    pub fn spawn_runner_full_auth(
617        &mut self,
618        channel_id: ChannelId,
619        component: Box<dyn Component>,
620        output_tx: Option<OutputSender>,
621        lua_loader: Option<Arc<dyn LuaChildLoader>>,
622        component_loader: Option<Arc<dyn ComponentLoader>>,
623        session: Arc<crate::Session>,
624        checker: Arc<dyn crate::auth::PermissionChecker>,
625        grants: Arc<dyn orcs_auth::GrantPolicy>,
626    ) -> ChannelHandle {
627        let empty_config = serde_json::Value::Object(serde_json::Map::new());
628        self.spawn_runner_full_auth_with_snapshot(
629            channel_id,
630            component,
631            output_tx,
632            lua_loader,
633            component_loader,
634            session,
635            checker,
636            grants,
637            None,
638            empty_config,
639        )
640    }
641
642    /// Spawns a ChannelRunner with full auth and an optional initial snapshot.
643    ///
644    /// Same as [`Self::spawn_runner_full_auth`] but accepts an optional
645    /// [`ComponentSnapshot`] to restore before `init()` (session resume).
646    #[allow(clippy::too_many_arguments)]
647    pub fn spawn_runner_full_auth_with_snapshot(
648        &mut self,
649        channel_id: ChannelId,
650        component: Box<dyn Component>,
651        output_tx: Option<OutputSender>,
652        lua_loader: Option<Arc<dyn LuaChildLoader>>,
653        component_loader: Option<Arc<dyn ComponentLoader>>,
654        session: Arc<crate::Session>,
655        checker: Arc<dyn crate::auth::PermissionChecker>,
656        grants: Arc<dyn orcs_auth::GrantPolicy>,
657        initial_snapshot: Option<ComponentSnapshot>,
658        component_config: serde_json::Value,
659    ) -> ChannelHandle {
660        let signal_rx = self.signal_tx.subscribe();
661        let component_id = component.id().clone();
662
663        // Use builder with all options including auth
664        let mut builder = ChannelRunner::builder(
665            channel_id,
666            self.world_tx.clone(),
667            Arc::clone(&self.world_read),
668            signal_rx,
669            component,
670        )
671        .with_emitter(self.signal_tx.clone())
672        .with_shared_handles(self.eventbus.shared_handles())
673        .with_component_channel_map(self.eventbus.shared_component_channel_map())
674        .with_board(Arc::clone(&self.board))
675        .with_session_arc(session)
676        .with_checker(checker)
677        .with_grants(grants)
678        .with_request_channel()
679        .with_component_config(component_config);
680
681        // Route Output events to IO channel if specified
682        if let Some(tx) = output_tx {
683            builder = builder.with_output_channel(tx);
684        }
685
686        // Enable child spawning if loader provided
687        let has_child_spawner = lua_loader.is_some();
688        if has_child_spawner {
689            builder = builder.with_child_spawner(lua_loader);
690        }
691
692        // Enable component loader for spawn_runner_from_script
693        if let Some(loader) = component_loader {
694            builder = builder.with_component_loader(loader);
695        }
696
697        // Restore from initial snapshot if provided (session resume)
698        if let Some(snapshot) = initial_snapshot {
699            builder = builder.with_initial_snapshot(snapshot);
700        }
701
702        let (runner, handle) = self.apply_hook_registry(builder).build();
703
704        let handle = self.finalize_runner(channel_id, &component_id, runner, handle);
705        info!(
706            "Spawned runner (full+auth) for channel {} (component={}, child_spawner={})",
707            channel_id,
708            component_id.fqn(),
709            has_child_spawner
710        );
711        handle
712    }
713
714    /// Returns the read-only World handle for parallel access.
715    #[must_use]
716    pub fn world_read(&self) -> &Arc<RwLock<World>> {
717        &self.world_read
718    }
719
720    /// Returns the world command sender.
721    #[must_use]
722    pub fn world_tx(&self) -> &WorldCommandSender {
723        &self.world_tx
724    }
725
726    /// Injects an event to a specific channel (targeted delivery).
727    ///
728    /// Unlike signal broadcast, this sends to a single channel only.
729    /// Used for `@component message` routing.
730    ///
731    /// # Errors
732    ///
733    /// Returns `EngineError::ChannelNotFound` if channel doesn't exist,
734    /// or `EngineError::SendFailed` if the buffer is full.
735    pub fn inject_event(
736        &self,
737        channel_id: ChannelId,
738        event: crate::channel::Event,
739    ) -> Result<(), super::EngineError> {
740        self.eventbus.try_inject(channel_id, event)
741    }
742
743    /// Send signal (from external, e.g., human input)
744    ///
745    /// Broadcasts to all ChannelRunners via signal_tx.
746    pub fn signal(&self, signal: Signal) {
747        info!("Signal dispatched: {:?}", signal.kind);
748        // Broadcast to ChannelRunners
749        let _ = self.signal_tx.send(signal);
750    }
751
752    /// Check if engine is running
753    #[must_use]
754    pub fn is_running(&self) -> bool {
755        self.running
756    }
757
758    /// Start the engine (set running flag).
759    ///
760    /// Use this when you need to start the engine without entering the run loop,
761    /// for example in interactive mode where you control the polling yourself.
762    pub fn start(&mut self) {
763        self.running = true;
764        info!("OrcsEngine started");
765    }
766
767    /// Stop the engine by sending a Veto signal.
768    ///
769    /// This triggers graceful shutdown via the signal broadcast channel.
770    /// Call [`shutdown()`](Self::shutdown) after this to await runner
771    /// completion and collect snapshots.
772    pub fn stop(&self) {
773        info!("Engine stop requested");
774        let _ = self.signal_tx.send(Signal::veto(Principal::System));
775    }
776
777    /// Awaits all runners, collects snapshots, and cleans up.
778    ///
779    /// Must be called after [`stop()`](Self::stop) to complete the
780    /// shutdown sequence. Snapshots are available via
781    /// [`collected_snapshots()`](Self::collected_snapshots) after this returns.
782    pub async fn shutdown(&mut self) {
783        self.shutdown_parallel().await;
784    }
785
786    /// Returns the shared Board handle.
787    ///
788    /// External code (e.g., Lua API registration) can use this to
789    /// query the Board for recent events.
790    #[must_use]
791    pub fn board(&self) -> &SharedBoard {
792        &self.board
793    }
794
795    /// Returns the IO channel ID.
796    ///
797    /// The IO channel is required and always available.
798    #[must_use]
799    pub fn io_channel(&self) -> ChannelId {
800        self.io_channel
801    }
802
803    /// Main run loop with parallel execution.
804    ///
805    /// This method:
806    /// 1. Waits for stop signal (running flag set to false)
807    /// 2. Shuts down all runners on exit
808    ///
809    /// # Note
810    ///
811    /// Caller must spawn runners via `spawn_runner()` before calling `run()`.
812    /// Each runner requires a bound Component (1:1 relationship).
813    ///
814    /// # Example
815    ///
816    /// ```ignore
817    /// let mut engine = OrcsEngine::new(world);
818    /// engine.spawn_runner(io_id, Box::new(MyComponent::new()));
819    /// engine.run().await;
820    /// ```
821    pub async fn run(&mut self) {
822        self.start();
823        info!("Entering parallel run loop");
824
825        let mut signal_rx = self.signal_tx.subscribe();
826
827        loop {
828            tokio::select! {
829                result = signal_rx.recv() => {
830                    match result {
831                        Ok(signal) if signal.is_veto() => {
832                            info!("Veto signal received, stopping engine");
833                            break;
834                        }
835                        Ok(_) => {
836                            // Other signals: continue running
837                        }
838                        Err(broadcast::error::RecvError::Lagged(n)) => {
839                            warn!("Signal receiver lagged by {} messages", n);
840                        }
841                        Err(broadcast::error::RecvError::Closed) => {
842                            warn!("Signal channel closed unexpectedly");
843                            break;
844                        }
845                    }
846                }
847            }
848        }
849
850        self.running = false;
851        self.shutdown_parallel().await;
852        info!("OrcsEngine stopped (parallel mode)");
853    }
854
855    /// Shutdown parallel execution infrastructure (graceful).
856    ///
857    /// # Shutdown Order
858    ///
859    /// 1. Await all runner tasks with timeout (Veto already broadcast).
860    ///    Runners execute their shutdown sequence (snapshot → shutdown)
861    ///    and return [`RunnerResult`] with captured snapshots.
862    /// 2. Collect snapshots from completed runners into `collected_snapshots`.
863    /// 3. Shutdown WorldManager (runners may need `world_tx` during shutdown,
864    ///    so WorldManager must stay alive until runners complete).
865    /// 4. Unregister channel handles and clean up.
866    async fn shutdown_parallel(&mut self) {
867        // 1. Await all runner tasks in parallel with per-runner timeout.
868        //    Each runner is wrapped in a tokio task that applies the timeout
869        //    and aborts on expiry. All wrappers run concurrently, so total
870        //    wall time is bounded by RUNNER_SHUTDOWN_TIMEOUT (not N × timeout).
871        let tasks: Vec<_> = self.runner_tasks.drain().collect();
872        let mut wrapper_handles = Vec::with_capacity(tasks.len());
873
874        for (id, mut task) in tasks {
875            wrapper_handles.push(tokio::spawn(async move {
876                tokio::select! {
877                    result = &mut task => {
878                        match result {
879                            Ok(runner_result) => Some((id, runner_result)),
880                            Err(e) => {
881                                warn!(channel = %id, error = %e, "runner task panicked");
882                                None
883                            }
884                        }
885                    }
886                    _ = tokio::time::sleep(RUNNER_SHUTDOWN_TIMEOUT) => {
887                        warn!(channel = %id, "runner task timed out, aborting");
888                        task.abort();
889                        None
890                    }
891                }
892            }));
893        }
894
895        // 2. Collect snapshots from completed runners.
896        for handle in wrapper_handles {
897            if let Ok(Some((id, result))) = handle.await {
898                debug!(
899                    channel = %id,
900                    component = %result.component_fqn,
901                    has_snapshot = result.snapshot.is_some(),
902                    "runner completed gracefully"
903                );
904                if let Some(snapshot) = result.snapshot {
905                    self.collected_snapshots
906                        .insert(result.component_fqn.into_owned(), snapshot);
907                }
908            }
909        }
910
911        info!(
912            "collected {} snapshots from runners",
913            self.collected_snapshots.len()
914        );
915
916        // 3. Shutdown WorldManager (after runners complete)
917        let _ = self.world_tx.send(WorldCommand::Shutdown).await;
918        if let Some(task) = self.manager_task.take() {
919            let _ = task.await;
920        }
921
922        // 4. Unregister channel handles
923        for id in self.channel_handles.keys() {
924            self.eventbus.unregister_channel(id);
925        }
926        self.channel_handles.clear();
927    }
928
929    // === Session Persistence ===
930
931    /// Returns a reference to snapshots collected during graceful shutdown.
932    ///
933    /// Populated by `shutdown_parallel()` when runners complete their
934    /// shutdown sequence. Keyed by component FQN.
935    ///
936    /// # Usage
937    ///
938    /// Call this after `run()` completes to retrieve snapshots for session
939    /// persistence.
940    #[must_use]
941    pub fn collected_snapshots(&self) -> &HashMap<String, ComponentSnapshot> {
942        &self.collected_snapshots
943    }
944}
945
946#[cfg(test)]
947mod tests {
948    use super::*;
949    use crate::channel::{ChannelCore, WorldCommand};
950    use crate::Principal;
951    use orcs_component::ComponentError;
952    use orcs_event::{Request, SignalResponse};
953    use orcs_types::ComponentId;
954    use serde_json::Value;
955
956    /// Create a World with IO channel for testing.
957    fn test_world() -> (World, ChannelId) {
958        let mut world = World::new();
959        let io = world.create_channel(ChannelConfig::interactive());
960        (world, io)
961    }
962
963    struct EchoComponent {
964        id: ComponentId,
965        aborted: bool,
966    }
967
968    impl EchoComponent {
969        fn new() -> Self {
970            Self {
971                id: ComponentId::builtin("echo"),
972                aborted: false,
973            }
974        }
975    }
976
977    impl Component for EchoComponent {
978        fn id(&self) -> &ComponentId {
979            &self.id
980        }
981
982        fn status(&self) -> orcs_component::Status {
983            orcs_component::Status::Idle
984        }
985
986        fn on_request(&mut self, request: &Request) -> Result<Value, ComponentError> {
987            Ok(request.payload.clone())
988        }
989
990        fn on_signal(&mut self, signal: &Signal) -> SignalResponse {
991            if signal.is_veto() {
992                SignalResponse::Abort
993            } else {
994                SignalResponse::Handled
995            }
996        }
997
998        fn abort(&mut self) {
999            self.aborted = true;
1000        }
1001
1002        fn status_detail(&self) -> Option<orcs_component::StatusDetail> {
1003            None
1004        }
1005
1006        fn init(&mut self, _config: &serde_json::Value) -> Result<(), ComponentError> {
1007            Ok(())
1008        }
1009
1010        fn shutdown(&mut self) {
1011            // Default: no-op
1012        }
1013    }
1014
1015    #[tokio::test]
1016    async fn engine_creation() {
1017        let (world, io) = test_world();
1018        let engine = OrcsEngine::new(world, io);
1019        assert!(!engine.is_running());
1020        assert_eq!(engine.io_channel(), io);
1021
1022        // Cleanup
1023        let _ = engine.world_tx().send(WorldCommand::Shutdown).await;
1024    }
1025
1026    #[tokio::test]
1027    async fn engine_world_access() {
1028        let (world, io) = test_world();
1029        let engine = OrcsEngine::new(world, io);
1030
1031        // WorldManager starts immediately in new()
1032        let w = engine.world_read().read().await;
1033        assert!(w.get(&io).is_some());
1034        drop(w);
1035
1036        // Cleanup
1037        let _ = engine.world_tx().send(WorldCommand::Shutdown).await;
1038    }
1039
1040    #[tokio::test]
1041    async fn stop_engine_sends_veto_signal() {
1042        let (world, io) = test_world();
1043        let engine = OrcsEngine::new(world, io);
1044
1045        // Subscribe before stop
1046        let mut rx = engine.signal_tx.subscribe();
1047
1048        // stop() should send Veto signal
1049        engine.stop();
1050
1051        // Verify Veto was sent
1052        let received = rx.recv().await.expect("receive signal");
1053        assert!(received.is_veto());
1054
1055        // Cleanup
1056        let _ = engine.world_tx().send(WorldCommand::Shutdown).await;
1057    }
1058
1059    #[tokio::test]
1060    async fn world_access_parallel() {
1061        let (world, io) = test_world();
1062        let engine = OrcsEngine::new(world, io);
1063
1064        // Complete via command
1065        let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
1066        engine
1067            .world_tx()
1068            .send(WorldCommand::Complete {
1069                id: io,
1070                reply: reply_tx,
1071            })
1072            .await
1073            .expect("send complete command");
1074        assert!(reply_rx.await.expect("receive complete reply"));
1075
1076        // Verify state
1077        let w = engine.world_read().read().await;
1078        assert!(!w.get(&io).expect("get channel").is_running());
1079        drop(w);
1080
1081        // Cleanup
1082        let _ = engine.world_tx().send(WorldCommand::Shutdown).await;
1083    }
1084
1085    #[tokio::test]
1086    async fn spawn_runner_creates_task() {
1087        let (world, io) = test_world();
1088        let mut engine = OrcsEngine::new(world, io);
1089
1090        let echo = Box::new(EchoComponent::new());
1091        let _handle = engine.spawn_runner(io, echo);
1092
1093        // Runner task should be stored
1094        assert_eq!(engine.runner_tasks.len(), 1);
1095        assert!(engine.runner_tasks.contains_key(&io));
1096
1097        // Cleanup
1098        let _ = engine.world_tx().send(WorldCommand::Shutdown).await;
1099    }
1100
1101    #[tokio::test]
1102    async fn signal_broadcast_works() {
1103        let (world, io) = test_world();
1104        let engine = OrcsEngine::new(world, io);
1105
1106        // Subscribe before signal
1107        let mut rx = engine.signal_tx.subscribe();
1108
1109        let principal = Principal::System;
1110        let cancel = Signal::cancel(io, principal);
1111        engine.signal(cancel.clone());
1112
1113        // Should receive the signal
1114        let received = rx.recv().await.expect("receive signal");
1115        assert!(matches!(received.kind, orcs_event::SignalKind::Cancel));
1116
1117        // Cleanup
1118        let _ = engine.world_tx().send(WorldCommand::Shutdown).await;
1119    }
1120
1121    // === Snapshot Tests ===
1122
1123    /// A component that supports snapshots for testing.
1124    struct SnapshottableComponent {
1125        id: ComponentId,
1126        counter: u64,
1127    }
1128
1129    impl SnapshottableComponent {
1130        fn new(name: &str, initial_value: u64) -> Self {
1131            Self {
1132                id: ComponentId::builtin(name),
1133                counter: initial_value,
1134            }
1135        }
1136    }
1137
1138    impl Component for SnapshottableComponent {
1139        fn id(&self) -> &ComponentId {
1140            &self.id
1141        }
1142
1143        fn status(&self) -> orcs_component::Status {
1144            orcs_component::Status::Idle
1145        }
1146
1147        fn on_request(&mut self, _request: &Request) -> Result<Value, ComponentError> {
1148            self.counter += 1;
1149            Ok(Value::Number(self.counter.into()))
1150        }
1151
1152        fn on_signal(&mut self, signal: &Signal) -> SignalResponse {
1153            if signal.is_veto() {
1154                SignalResponse::Abort
1155            } else {
1156                SignalResponse::Handled
1157            }
1158        }
1159
1160        fn abort(&mut self) {}
1161
1162        fn snapshot(
1163            &self,
1164        ) -> Result<orcs_component::ComponentSnapshot, orcs_component::SnapshotError> {
1165            orcs_component::ComponentSnapshot::from_state(self.id.fqn(), &self.counter)
1166        }
1167
1168        fn restore(
1169            &mut self,
1170            snapshot: &orcs_component::ComponentSnapshot,
1171        ) -> Result<(), orcs_component::SnapshotError> {
1172            self.counter = snapshot.to_state()?;
1173            Ok(())
1174        }
1175    }
1176
1177    #[tokio::test]
1178    async fn graceful_shutdown_collects_snapshots() {
1179        let (world, io) = test_world();
1180        let mut engine = OrcsEngine::new(world, io);
1181
1182        // Spawn runner with snapshottable component
1183        let comp = Box::new(SnapshottableComponent::new("snap", 42));
1184        let _handle = engine.spawn_runner(io, comp);
1185
1186        // Run engine briefly then stop via Veto
1187        let engine_signal_tx = engine.signal_tx.clone();
1188        tokio::spawn(async move {
1189            tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1190            let _ = engine_signal_tx.send(Signal::veto(Principal::System));
1191        });
1192
1193        engine.run().await;
1194
1195        // Snapshots should have been collected during graceful shutdown
1196        let snapshots = engine.collected_snapshots();
1197        assert_eq!(snapshots.len(), 1);
1198        assert!(snapshots.contains_key("builtin::snap"));
1199    }
1200
1201    #[tokio::test]
1202    async fn snapshots_persist_via_store_after_graceful_shutdown() {
1203        use crate::session::{LocalFileStore, SessionAsset, SessionStore};
1204        use tempfile::TempDir;
1205
1206        let temp_dir = TempDir::new().expect("create temp dir");
1207        let store = LocalFileStore::new(temp_dir.path().to_path_buf()).expect("create store");
1208
1209        let (world, io) = test_world();
1210        let mut engine = OrcsEngine::new(world, io);
1211
1212        // Spawn runner with snapshottable component
1213        let comp = Box::new(SnapshottableComponent::new("snap", 42));
1214        let _handle = engine.spawn_runner(io, comp);
1215
1216        // Run engine briefly then stop via Veto
1217        let engine_signal_tx = engine.signal_tx.clone();
1218        tokio::spawn(async move {
1219            tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1220            let _ = engine_signal_tx.send(Signal::veto(Principal::System));
1221        });
1222
1223        engine.run().await;
1224
1225        // App-layer responsibility: transfer snapshots → asset → store
1226        let mut asset = SessionAsset::new();
1227        let session_id = asset.id.clone();
1228        for (fqn, snapshot) in engine.collected_snapshots() {
1229            asset
1230                .component_snapshots
1231                .insert(fqn.clone(), snapshot.clone());
1232        }
1233        asset.touch();
1234        store.save(&asset).await.expect("save session");
1235
1236        // Verify snapshot was saved
1237        assert!(asset.get_snapshot("builtin::snap").is_some());
1238
1239        // Load session directly from store (no engine involvement)
1240        let loaded = store.load(&session_id).await.expect("load session");
1241        assert_eq!(loaded.id, session_id);
1242        assert!(loaded.get_snapshot("builtin::snap").is_some());
1243    }
1244
1245    // === spawn_channel_with_auth Tests ===
1246
1247    mod spawn_channel_with_auth_tests {
1248        use super::*;
1249        use crate::auth::{DefaultPolicy, Session};
1250        use orcs_types::PrincipalId;
1251        use std::time::Duration;
1252
1253        fn standard_session() -> Arc<Session> {
1254            Arc::new(Session::new(Principal::User(PrincipalId::new())))
1255        }
1256
1257        fn elevated_session() -> Arc<Session> {
1258            Arc::new(
1259                Session::new(Principal::User(PrincipalId::new())).elevate(Duration::from_secs(60)),
1260            )
1261        }
1262
1263        fn default_checker() -> Arc<dyn crate::auth::PermissionChecker> {
1264            Arc::new(DefaultPolicy)
1265        }
1266
1267        #[tokio::test]
1268        async fn spawn_channel_with_auth_denied_for_standard_session() {
1269            let (world, io) = test_world();
1270            let mut engine = OrcsEngine::new(world, io);
1271
1272            let session = standard_session();
1273            let checker = default_checker();
1274            let config = ChannelConfig::new(100, true);
1275            let component = Box::new(EchoComponent::new());
1276
1277            // Standard session should be denied
1278            let result = engine
1279                .spawn_channel_with_auth(io, config, component, session, checker)
1280                .await;
1281
1282            assert!(result.is_none(), "standard session should be denied");
1283
1284            // Cleanup
1285            let _ = engine.world_tx().send(WorldCommand::Shutdown).await;
1286        }
1287
1288        #[tokio::test]
1289        async fn spawn_channel_with_auth_allowed_for_elevated_session() {
1290            let (world, io) = test_world();
1291            let mut engine = OrcsEngine::new(world, io);
1292
1293            let session = elevated_session();
1294            let checker = default_checker();
1295            let config = ChannelConfig::new(100, true);
1296            let component = Box::new(EchoComponent::new());
1297
1298            // Elevated session should be allowed
1299            let result = engine
1300                .spawn_channel_with_auth(io, config, component, session, checker)
1301                .await;
1302
1303            assert!(result.is_some(), "elevated session should be allowed");
1304
1305            let child_id = result.expect("elevated session should produce a child channel id");
1306            // Verify child was created in world
1307            let w = engine.world_read().read().await;
1308            assert!(w.get(&child_id).is_some());
1309            drop(w);
1310
1311            // Cleanup
1312            let _ = engine.world_tx().send(WorldCommand::Shutdown).await;
1313        }
1314
1315        #[tokio::test]
1316        async fn spawn_runner_with_auth_creates_runner() {
1317            let (world, io) = test_world();
1318            let mut engine = OrcsEngine::new(world, io);
1319
1320            let session = elevated_session();
1321            let checker = default_checker();
1322            let component = Box::new(EchoComponent::new());
1323
1324            let handle = engine.spawn_runner_with_auth(io, component, session, checker);
1325
1326            assert_eq!(handle.id, io);
1327            assert!(engine.runner_tasks.contains_key(&io));
1328
1329            // Cleanup
1330            let _ = engine.world_tx().send(WorldCommand::Shutdown).await;
1331        }
1332    }
1333}