1use super::eventbus::EventBus;
37use crate::board::{self, SharedBoard};
38use crate::channel::{
39 ChannelConfig, ChannelHandle, ChannelRunner, ClientRunner, ClientRunnerConfig, LuaChildLoader,
40 OutputSender, RunnerResult, World, WorldCommand, WorldCommandSender, WorldManager,
41};
42use crate::io::IOPort;
43use crate::Principal;
44use orcs_component::{Component, ComponentLoader, ComponentSnapshot};
45use orcs_event::Signal;
46use orcs_hook::SharedHookRegistry;
47use orcs_types::{ChannelId, ComponentId};
48use std::collections::HashMap;
49use std::sync::Arc;
50use std::time::Duration;
51use tokio::sync::{broadcast, RwLock};
52use tracing::{debug, info, warn};
53
54const SIGNAL_BUFFER_SIZE: usize = 256;
59
60const RUNNER_SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(5);
62
63pub struct OrcsEngine {
100 eventbus: EventBus,
102 running: bool,
104 world_tx: WorldCommandSender,
107 world_read: Arc<RwLock<World>>,
109 signal_tx: broadcast::Sender<Signal>,
111 channel_handles: HashMap<ChannelId, ChannelHandle>,
113 manager_task: Option<tokio::task::JoinHandle<()>>,
115 runner_tasks: HashMap<ChannelId, tokio::task::JoinHandle<RunnerResult>>,
121 collected_snapshots: HashMap<String, ComponentSnapshot>,
126 io_channel: ChannelId,
132 board: SharedBoard,
134 hook_registry: Option<SharedHookRegistry>,
136}
137
138impl OrcsEngine {
139 #[must_use]
159 pub fn new(world: World, io_channel: ChannelId) -> Self {
160 let (manager, world_tx) = WorldManager::with_world(world);
162 let world_read = manager.world();
163
164 let (signal_tx, _) = broadcast::channel(SIGNAL_BUFFER_SIZE);
166
167 let manager_task = tokio::spawn(manager.run());
169
170 info!(
171 "OrcsEngine created with IO channel {} (WorldManager started)",
172 io_channel
173 );
174
175 Self {
176 eventbus: EventBus::new(),
177 running: false,
178 world_tx,
179 world_read,
180 signal_tx,
181 channel_handles: HashMap::new(),
182 manager_task: Some(manager_task),
183 runner_tasks: HashMap::new(),
184 collected_snapshots: HashMap::new(),
185 io_channel,
186 board: board::shared_board(),
187 hook_registry: None,
188 }
189 }
190
191 pub fn set_hook_registry(&mut self, registry: SharedHookRegistry) {
196 self.eventbus.set_hook_registry(Arc::clone(®istry));
197 self.hook_registry = Some(registry);
198 }
199
200 #[must_use]
202 pub fn hook_registry(&self) -> Option<&SharedHookRegistry> {
203 self.hook_registry.as_ref()
204 }
205
206 fn apply_hook_registry(
208 &self,
209 builder: crate::channel::ChannelRunnerBuilder,
210 ) -> crate::channel::ChannelRunnerBuilder {
211 match &self.hook_registry {
212 Some(reg) => builder.with_hook_registry(Arc::clone(reg)),
213 None => builder,
214 }
215 }
216
217 fn finalize_runner(
221 &mut self,
222 channel_id: ChannelId,
223 component_id: &ComponentId,
224 runner: ChannelRunner,
225 handle: ChannelHandle,
226 ) -> ChannelHandle {
227 self.eventbus.register_channel(handle.clone());
228 self.eventbus
229 .register_component_channel(component_id, channel_id);
230 self.channel_handles.insert(channel_id, handle.clone());
231 let runner_task = tokio::spawn(runner.run());
232 self.runner_tasks.insert(channel_id, runner_task);
233 handle
234 }
235
236 pub fn spawn_runner(
245 &mut self,
246 channel_id: ChannelId,
247 component: Box<dyn Component>,
248 ) -> ChannelHandle {
249 let component_id = component.id().clone();
250 let signal_rx = self.signal_tx.subscribe();
251 let builder = ChannelRunner::builder(
252 channel_id,
253 self.world_tx.clone(),
254 Arc::clone(&self.world_read),
255 signal_rx,
256 component,
257 )
258 .with_request_channel();
259 let (runner, handle) = self.apply_hook_registry(builder).build();
260
261 let handle = self.finalize_runner(channel_id, &component_id, runner, handle);
262 info!("Spawned runner for channel {}", channel_id);
263 handle
264 }
265
266 pub fn spawn_runner_with_emitter(
294 &mut self,
295 channel_id: ChannelId,
296 component: Box<dyn Component>,
297 output_tx: Option<OutputSender>,
298 ) -> ChannelHandle {
299 let signal_rx = self.signal_tx.subscribe();
300 let component_id = component.id().clone();
301
302 let mut builder = ChannelRunner::builder(
304 channel_id,
305 self.world_tx.clone(),
306 Arc::clone(&self.world_read),
307 signal_rx,
308 component,
309 )
310 .with_emitter(self.signal_tx.clone())
311 .with_shared_handles(self.eventbus.shared_handles())
312 .with_component_channel_map(self.eventbus.shared_component_channel_map())
313 .with_board(Arc::clone(&self.board))
314 .with_request_channel();
315
316 if let Some(tx) = output_tx {
318 builder = builder.with_output_channel(tx);
319 }
320
321 let (runner, handle) = self.apply_hook_registry(builder).build();
322
323 let handle = self.finalize_runner(channel_id, &component_id, runner, handle);
324 info!(
325 "Spawned runner with emitter for channel {} (component={})",
326 channel_id,
327 component_id.fqn()
328 );
329 handle
330 }
331
332 pub fn spawn_runner_full(
352 &mut self,
353 channel_id: ChannelId,
354 component: Box<dyn Component>,
355 output_tx: Option<OutputSender>,
356 lua_loader: Option<Arc<dyn LuaChildLoader>>,
357 component_loader: Option<Arc<dyn ComponentLoader>>,
358 ) -> ChannelHandle {
359 let signal_rx = self.signal_tx.subscribe();
360 let component_id = component.id().clone();
361
362 let mut builder = ChannelRunner::builder(
364 channel_id,
365 self.world_tx.clone(),
366 Arc::clone(&self.world_read),
367 signal_rx,
368 component,
369 )
370 .with_emitter(self.signal_tx.clone())
371 .with_shared_handles(self.eventbus.shared_handles())
372 .with_component_channel_map(self.eventbus.shared_component_channel_map())
373 .with_board(Arc::clone(&self.board))
374 .with_request_channel();
375
376 if let Some(tx) = output_tx {
378 builder = builder.with_output_channel(tx);
379 }
380
381 let has_child_spawner = lua_loader.is_some();
383 if has_child_spawner {
384 builder = builder.with_child_spawner(lua_loader);
385 }
386
387 if let Some(loader) = component_loader {
389 builder = builder.with_component_loader(loader);
390 }
391
392 let (runner, handle) = self.apply_hook_registry(builder).build();
393
394 let handle = self.finalize_runner(channel_id, &component_id, runner, handle);
395 info!(
396 "Spawned runner (full) for channel {} (component={}, child_spawner={})",
397 channel_id,
398 component_id.fqn(),
399 has_child_spawner
400 );
401 handle
402 }
403
404 pub fn spawn_client_runner(
420 &mut self,
421 channel_id: ChannelId,
422 io_port: IOPort,
423 principal: orcs_types::Principal,
424 ) -> (ChannelHandle, OutputSender) {
425 let config = ClientRunnerConfig {
426 world_tx: self.world_tx.clone(),
427 world: Arc::clone(&self.world_read),
428 signal_rx: self.signal_tx.subscribe(),
429 channel_handles: self.eventbus.shared_handles(),
430 };
431 let (runner, handle) = ClientRunner::new(channel_id, config, io_port, principal);
432
433 let output_sender = OutputSender::new(runner.event_tx().clone());
435
436 self.eventbus.register_channel(handle.clone());
438
439 self.channel_handles.insert(channel_id, handle.clone());
441
442 let runner_task = tokio::spawn(runner.run());
444 self.runner_tasks.insert(channel_id, runner_task);
445
446 info!("Spawned ClientRunner for channel {}", channel_id);
447 (handle, output_sender)
448 }
449
450 pub async fn spawn_channel(
460 &mut self,
461 parent: ChannelId,
462 config: ChannelConfig,
463 component: Box<dyn Component>,
464 ) -> Option<ChannelId> {
465 let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
467 let cmd = WorldCommand::Spawn {
468 parent,
469 config,
470 reply: reply_tx,
471 };
472
473 if self.world_tx.send(cmd).await.is_err() {
474 return None;
475 }
476
477 let child_id = reply_rx.await.ok()??;
479
480 self.spawn_runner(child_id, component);
482
483 Some(child_id)
484 }
485
486 pub async fn spawn_channel_with_auth(
509 &mut self,
510 parent: ChannelId,
511 config: ChannelConfig,
512 component: Box<dyn Component>,
513 session: Arc<crate::Session>,
514 checker: Arc<dyn crate::auth::PermissionChecker>,
515 ) -> Option<ChannelId> {
516 if !checker.can_spawn_child(&session) {
518 warn!(
519 principal = ?session.principal(),
520 "spawn_channel denied: permission denied"
521 );
522 return None;
523 }
524
525 let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
527 let cmd = WorldCommand::Spawn {
528 parent,
529 config,
530 reply: reply_tx,
531 };
532
533 if self.world_tx.send(cmd).await.is_err() {
534 return None;
535 }
536
537 let child_id = reply_rx.await.ok()??;
539
540 self.spawn_runner_with_auth(child_id, component, session, checker);
542
543 Some(child_id)
544 }
545
546 pub fn spawn_runner_with_auth(
558 &mut self,
559 channel_id: ChannelId,
560 component: Box<dyn Component>,
561 session: Arc<crate::Session>,
562 checker: Arc<dyn crate::auth::PermissionChecker>,
563 ) -> ChannelHandle {
564 let signal_rx = self.signal_tx.subscribe();
565 let component_id = component.id().clone();
566
567 let builder = ChannelRunner::builder(
569 channel_id,
570 self.world_tx.clone(),
571 Arc::clone(&self.world_read),
572 signal_rx,
573 component,
574 )
575 .with_emitter(self.signal_tx.clone())
576 .with_shared_handles(self.eventbus.shared_handles())
577 .with_component_channel_map(self.eventbus.shared_component_channel_map())
578 .with_board(Arc::clone(&self.board))
579 .with_session_arc(session)
580 .with_checker(checker)
581 .with_request_channel();
582 let (runner, handle) = self.apply_hook_registry(builder).build();
583
584 let handle = self.finalize_runner(channel_id, &component_id, runner, handle);
585 info!(
586 "Spawned runner (with auth) for channel {} (component={})",
587 channel_id,
588 component_id.fqn(),
589 );
590 handle
591 }
592
593 #[allow(clippy::too_many_arguments)]
616 pub fn spawn_runner_full_auth(
617 &mut self,
618 channel_id: ChannelId,
619 component: Box<dyn Component>,
620 output_tx: Option<OutputSender>,
621 lua_loader: Option<Arc<dyn LuaChildLoader>>,
622 component_loader: Option<Arc<dyn ComponentLoader>>,
623 session: Arc<crate::Session>,
624 checker: Arc<dyn crate::auth::PermissionChecker>,
625 grants: Arc<dyn orcs_auth::GrantPolicy>,
626 ) -> ChannelHandle {
627 let empty_config = serde_json::Value::Object(serde_json::Map::new());
628 self.spawn_runner_full_auth_with_snapshot(
629 channel_id,
630 component,
631 output_tx,
632 lua_loader,
633 component_loader,
634 session,
635 checker,
636 grants,
637 None,
638 empty_config,
639 )
640 }
641
642 #[allow(clippy::too_many_arguments)]
647 pub fn spawn_runner_full_auth_with_snapshot(
648 &mut self,
649 channel_id: ChannelId,
650 component: Box<dyn Component>,
651 output_tx: Option<OutputSender>,
652 lua_loader: Option<Arc<dyn LuaChildLoader>>,
653 component_loader: Option<Arc<dyn ComponentLoader>>,
654 session: Arc<crate::Session>,
655 checker: Arc<dyn crate::auth::PermissionChecker>,
656 grants: Arc<dyn orcs_auth::GrantPolicy>,
657 initial_snapshot: Option<ComponentSnapshot>,
658 component_config: serde_json::Value,
659 ) -> ChannelHandle {
660 let signal_rx = self.signal_tx.subscribe();
661 let component_id = component.id().clone();
662
663 let mut builder = ChannelRunner::builder(
665 channel_id,
666 self.world_tx.clone(),
667 Arc::clone(&self.world_read),
668 signal_rx,
669 component,
670 )
671 .with_emitter(self.signal_tx.clone())
672 .with_shared_handles(self.eventbus.shared_handles())
673 .with_component_channel_map(self.eventbus.shared_component_channel_map())
674 .with_board(Arc::clone(&self.board))
675 .with_session_arc(session)
676 .with_checker(checker)
677 .with_grants(grants)
678 .with_request_channel()
679 .with_component_config(component_config);
680
681 if let Some(tx) = output_tx {
683 builder = builder.with_output_channel(tx);
684 }
685
686 let has_child_spawner = lua_loader.is_some();
688 if has_child_spawner {
689 builder = builder.with_child_spawner(lua_loader);
690 }
691
692 if let Some(loader) = component_loader {
694 builder = builder.with_component_loader(loader);
695 }
696
697 if let Some(snapshot) = initial_snapshot {
699 builder = builder.with_initial_snapshot(snapshot);
700 }
701
702 let (runner, handle) = self.apply_hook_registry(builder).build();
703
704 let handle = self.finalize_runner(channel_id, &component_id, runner, handle);
705 info!(
706 "Spawned runner (full+auth) for channel {} (component={}, child_spawner={})",
707 channel_id,
708 component_id.fqn(),
709 has_child_spawner
710 );
711 handle
712 }
713
714 #[must_use]
716 pub fn world_read(&self) -> &Arc<RwLock<World>> {
717 &self.world_read
718 }
719
720 #[must_use]
722 pub fn world_tx(&self) -> &WorldCommandSender {
723 &self.world_tx
724 }
725
726 pub fn inject_event(
736 &self,
737 channel_id: ChannelId,
738 event: crate::channel::Event,
739 ) -> Result<(), super::EngineError> {
740 self.eventbus.try_inject(channel_id, event)
741 }
742
743 pub fn signal(&self, signal: Signal) {
747 info!("Signal dispatched: {:?}", signal.kind);
748 let _ = self.signal_tx.send(signal);
750 }
751
752 #[must_use]
754 pub fn is_running(&self) -> bool {
755 self.running
756 }
757
758 pub fn start(&mut self) {
763 self.running = true;
764 info!("OrcsEngine started");
765 }
766
767 pub fn stop(&self) {
773 info!("Engine stop requested");
774 let _ = self.signal_tx.send(Signal::veto(Principal::System));
775 }
776
777 pub async fn shutdown(&mut self) {
783 self.shutdown_parallel().await;
784 }
785
786 #[must_use]
791 pub fn board(&self) -> &SharedBoard {
792 &self.board
793 }
794
795 #[must_use]
799 pub fn io_channel(&self) -> ChannelId {
800 self.io_channel
801 }
802
803 pub async fn run(&mut self) {
822 self.start();
823 info!("Entering parallel run loop");
824
825 let mut signal_rx = self.signal_tx.subscribe();
826
827 loop {
828 tokio::select! {
829 result = signal_rx.recv() => {
830 match result {
831 Ok(signal) if signal.is_veto() => {
832 info!("Veto signal received, stopping engine");
833 break;
834 }
835 Ok(_) => {
836 }
838 Err(broadcast::error::RecvError::Lagged(n)) => {
839 warn!("Signal receiver lagged by {} messages", n);
840 }
841 Err(broadcast::error::RecvError::Closed) => {
842 warn!("Signal channel closed unexpectedly");
843 break;
844 }
845 }
846 }
847 }
848 }
849
850 self.running = false;
851 self.shutdown_parallel().await;
852 info!("OrcsEngine stopped (parallel mode)");
853 }
854
855 async fn shutdown_parallel(&mut self) {
867 let tasks: Vec<_> = self.runner_tasks.drain().collect();
872 let mut wrapper_handles = Vec::with_capacity(tasks.len());
873
874 for (id, mut task) in tasks {
875 wrapper_handles.push(tokio::spawn(async move {
876 tokio::select! {
877 result = &mut task => {
878 match result {
879 Ok(runner_result) => Some((id, runner_result)),
880 Err(e) => {
881 warn!(channel = %id, error = %e, "runner task panicked");
882 None
883 }
884 }
885 }
886 _ = tokio::time::sleep(RUNNER_SHUTDOWN_TIMEOUT) => {
887 warn!(channel = %id, "runner task timed out, aborting");
888 task.abort();
889 None
890 }
891 }
892 }));
893 }
894
895 for handle in wrapper_handles {
897 if let Ok(Some((id, result))) = handle.await {
898 debug!(
899 channel = %id,
900 component = %result.component_fqn,
901 has_snapshot = result.snapshot.is_some(),
902 "runner completed gracefully"
903 );
904 if let Some(snapshot) = result.snapshot {
905 self.collected_snapshots
906 .insert(result.component_fqn.into_owned(), snapshot);
907 }
908 }
909 }
910
911 info!(
912 "collected {} snapshots from runners",
913 self.collected_snapshots.len()
914 );
915
916 let _ = self.world_tx.send(WorldCommand::Shutdown).await;
918 if let Some(task) = self.manager_task.take() {
919 let _ = task.await;
920 }
921
922 for id in self.channel_handles.keys() {
924 self.eventbus.unregister_channel(id);
925 }
926 self.channel_handles.clear();
927 }
928
929 #[must_use]
941 pub fn collected_snapshots(&self) -> &HashMap<String, ComponentSnapshot> {
942 &self.collected_snapshots
943 }
944}
945
946#[cfg(test)]
947mod tests {
948 use super::*;
949 use crate::channel::{ChannelCore, WorldCommand};
950 use crate::Principal;
951 use orcs_component::ComponentError;
952 use orcs_event::{Request, SignalResponse};
953 use orcs_types::ComponentId;
954 use serde_json::Value;
955
956 fn test_world() -> (World, ChannelId) {
958 let mut world = World::new();
959 let io = world.create_channel(ChannelConfig::interactive());
960 (world, io)
961 }
962
963 struct EchoComponent {
964 id: ComponentId,
965 aborted: bool,
966 }
967
968 impl EchoComponent {
969 fn new() -> Self {
970 Self {
971 id: ComponentId::builtin("echo"),
972 aborted: false,
973 }
974 }
975 }
976
977 impl Component for EchoComponent {
978 fn id(&self) -> &ComponentId {
979 &self.id
980 }
981
982 fn status(&self) -> orcs_component::Status {
983 orcs_component::Status::Idle
984 }
985
986 fn on_request(&mut self, request: &Request) -> Result<Value, ComponentError> {
987 Ok(request.payload.clone())
988 }
989
990 fn on_signal(&mut self, signal: &Signal) -> SignalResponse {
991 if signal.is_veto() {
992 SignalResponse::Abort
993 } else {
994 SignalResponse::Handled
995 }
996 }
997
998 fn abort(&mut self) {
999 self.aborted = true;
1000 }
1001
1002 fn status_detail(&self) -> Option<orcs_component::StatusDetail> {
1003 None
1004 }
1005
1006 fn init(&mut self, _config: &serde_json::Value) -> Result<(), ComponentError> {
1007 Ok(())
1008 }
1009
1010 fn shutdown(&mut self) {
1011 }
1013 }
1014
1015 #[tokio::test]
1016 async fn engine_creation() {
1017 let (world, io) = test_world();
1018 let engine = OrcsEngine::new(world, io);
1019 assert!(!engine.is_running());
1020 assert_eq!(engine.io_channel(), io);
1021
1022 let _ = engine.world_tx().send(WorldCommand::Shutdown).await;
1024 }
1025
1026 #[tokio::test]
1027 async fn engine_world_access() {
1028 let (world, io) = test_world();
1029 let engine = OrcsEngine::new(world, io);
1030
1031 let w = engine.world_read().read().await;
1033 assert!(w.get(&io).is_some());
1034 drop(w);
1035
1036 let _ = engine.world_tx().send(WorldCommand::Shutdown).await;
1038 }
1039
1040 #[tokio::test]
1041 async fn stop_engine_sends_veto_signal() {
1042 let (world, io) = test_world();
1043 let engine = OrcsEngine::new(world, io);
1044
1045 let mut rx = engine.signal_tx.subscribe();
1047
1048 engine.stop();
1050
1051 let received = rx.recv().await.expect("receive signal");
1053 assert!(received.is_veto());
1054
1055 let _ = engine.world_tx().send(WorldCommand::Shutdown).await;
1057 }
1058
1059 #[tokio::test]
1060 async fn world_access_parallel() {
1061 let (world, io) = test_world();
1062 let engine = OrcsEngine::new(world, io);
1063
1064 let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
1066 engine
1067 .world_tx()
1068 .send(WorldCommand::Complete {
1069 id: io,
1070 reply: reply_tx,
1071 })
1072 .await
1073 .expect("send complete command");
1074 assert!(reply_rx.await.expect("receive complete reply"));
1075
1076 let w = engine.world_read().read().await;
1078 assert!(!w.get(&io).expect("get channel").is_running());
1079 drop(w);
1080
1081 let _ = engine.world_tx().send(WorldCommand::Shutdown).await;
1083 }
1084
1085 #[tokio::test]
1086 async fn spawn_runner_creates_task() {
1087 let (world, io) = test_world();
1088 let mut engine = OrcsEngine::new(world, io);
1089
1090 let echo = Box::new(EchoComponent::new());
1091 let _handle = engine.spawn_runner(io, echo);
1092
1093 assert_eq!(engine.runner_tasks.len(), 1);
1095 assert!(engine.runner_tasks.contains_key(&io));
1096
1097 let _ = engine.world_tx().send(WorldCommand::Shutdown).await;
1099 }
1100
1101 #[tokio::test]
1102 async fn signal_broadcast_works() {
1103 let (world, io) = test_world();
1104 let engine = OrcsEngine::new(world, io);
1105
1106 let mut rx = engine.signal_tx.subscribe();
1108
1109 let principal = Principal::System;
1110 let cancel = Signal::cancel(io, principal);
1111 engine.signal(cancel.clone());
1112
1113 let received = rx.recv().await.expect("receive signal");
1115 assert!(matches!(received.kind, orcs_event::SignalKind::Cancel));
1116
1117 let _ = engine.world_tx().send(WorldCommand::Shutdown).await;
1119 }
1120
1121 struct SnapshottableComponent {
1125 id: ComponentId,
1126 counter: u64,
1127 }
1128
1129 impl SnapshottableComponent {
1130 fn new(name: &str, initial_value: u64) -> Self {
1131 Self {
1132 id: ComponentId::builtin(name),
1133 counter: initial_value,
1134 }
1135 }
1136 }
1137
1138 impl Component for SnapshottableComponent {
1139 fn id(&self) -> &ComponentId {
1140 &self.id
1141 }
1142
1143 fn status(&self) -> orcs_component::Status {
1144 orcs_component::Status::Idle
1145 }
1146
1147 fn on_request(&mut self, _request: &Request) -> Result<Value, ComponentError> {
1148 self.counter += 1;
1149 Ok(Value::Number(self.counter.into()))
1150 }
1151
1152 fn on_signal(&mut self, signal: &Signal) -> SignalResponse {
1153 if signal.is_veto() {
1154 SignalResponse::Abort
1155 } else {
1156 SignalResponse::Handled
1157 }
1158 }
1159
1160 fn abort(&mut self) {}
1161
1162 fn snapshot(
1163 &self,
1164 ) -> Result<orcs_component::ComponentSnapshot, orcs_component::SnapshotError> {
1165 orcs_component::ComponentSnapshot::from_state(self.id.fqn(), &self.counter)
1166 }
1167
1168 fn restore(
1169 &mut self,
1170 snapshot: &orcs_component::ComponentSnapshot,
1171 ) -> Result<(), orcs_component::SnapshotError> {
1172 self.counter = snapshot.to_state()?;
1173 Ok(())
1174 }
1175 }
1176
1177 #[tokio::test]
1178 async fn graceful_shutdown_collects_snapshots() {
1179 let (world, io) = test_world();
1180 let mut engine = OrcsEngine::new(world, io);
1181
1182 let comp = Box::new(SnapshottableComponent::new("snap", 42));
1184 let _handle = engine.spawn_runner(io, comp);
1185
1186 let engine_signal_tx = engine.signal_tx.clone();
1188 tokio::spawn(async move {
1189 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1190 let _ = engine_signal_tx.send(Signal::veto(Principal::System));
1191 });
1192
1193 engine.run().await;
1194
1195 let snapshots = engine.collected_snapshots();
1197 assert_eq!(snapshots.len(), 1);
1198 assert!(snapshots.contains_key("builtin::snap"));
1199 }
1200
1201 #[tokio::test]
1202 async fn snapshots_persist_via_store_after_graceful_shutdown() {
1203 use crate::session::{LocalFileStore, SessionAsset, SessionStore};
1204 use tempfile::TempDir;
1205
1206 let temp_dir = TempDir::new().expect("create temp dir");
1207 let store = LocalFileStore::new(temp_dir.path().to_path_buf()).expect("create store");
1208
1209 let (world, io) = test_world();
1210 let mut engine = OrcsEngine::new(world, io);
1211
1212 let comp = Box::new(SnapshottableComponent::new("snap", 42));
1214 let _handle = engine.spawn_runner(io, comp);
1215
1216 let engine_signal_tx = engine.signal_tx.clone();
1218 tokio::spawn(async move {
1219 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1220 let _ = engine_signal_tx.send(Signal::veto(Principal::System));
1221 });
1222
1223 engine.run().await;
1224
1225 let mut asset = SessionAsset::new();
1227 let session_id = asset.id.clone();
1228 for (fqn, snapshot) in engine.collected_snapshots() {
1229 asset
1230 .component_snapshots
1231 .insert(fqn.clone(), snapshot.clone());
1232 }
1233 asset.touch();
1234 store.save(&asset).await.expect("save session");
1235
1236 assert!(asset.get_snapshot("builtin::snap").is_some());
1238
1239 let loaded = store.load(&session_id).await.expect("load session");
1241 assert_eq!(loaded.id, session_id);
1242 assert!(loaded.get_snapshot("builtin::snap").is_some());
1243 }
1244
1245 mod spawn_channel_with_auth_tests {
1248 use super::*;
1249 use crate::auth::{DefaultPolicy, Session};
1250 use orcs_types::PrincipalId;
1251 use std::time::Duration;
1252
1253 fn standard_session() -> Arc<Session> {
1254 Arc::new(Session::new(Principal::User(PrincipalId::new())))
1255 }
1256
1257 fn elevated_session() -> Arc<Session> {
1258 Arc::new(
1259 Session::new(Principal::User(PrincipalId::new())).elevate(Duration::from_secs(60)),
1260 )
1261 }
1262
1263 fn default_checker() -> Arc<dyn crate::auth::PermissionChecker> {
1264 Arc::new(DefaultPolicy)
1265 }
1266
1267 #[tokio::test]
1268 async fn spawn_channel_with_auth_denied_for_standard_session() {
1269 let (world, io) = test_world();
1270 let mut engine = OrcsEngine::new(world, io);
1271
1272 let session = standard_session();
1273 let checker = default_checker();
1274 let config = ChannelConfig::new(100, true);
1275 let component = Box::new(EchoComponent::new());
1276
1277 let result = engine
1279 .spawn_channel_with_auth(io, config, component, session, checker)
1280 .await;
1281
1282 assert!(result.is_none(), "standard session should be denied");
1283
1284 let _ = engine.world_tx().send(WorldCommand::Shutdown).await;
1286 }
1287
1288 #[tokio::test]
1289 async fn spawn_channel_with_auth_allowed_for_elevated_session() {
1290 let (world, io) = test_world();
1291 let mut engine = OrcsEngine::new(world, io);
1292
1293 let session = elevated_session();
1294 let checker = default_checker();
1295 let config = ChannelConfig::new(100, true);
1296 let component = Box::new(EchoComponent::new());
1297
1298 let result = engine
1300 .spawn_channel_with_auth(io, config, component, session, checker)
1301 .await;
1302
1303 assert!(result.is_some(), "elevated session should be allowed");
1304
1305 let child_id = result.expect("elevated session should produce a child channel id");
1306 let w = engine.world_read().read().await;
1308 assert!(w.get(&child_id).is_some());
1309 drop(w);
1310
1311 let _ = engine.world_tx().send(WorldCommand::Shutdown).await;
1313 }
1314
1315 #[tokio::test]
1316 async fn spawn_runner_with_auth_creates_runner() {
1317 let (world, io) = test_world();
1318 let mut engine = OrcsEngine::new(world, io);
1319
1320 let session = elevated_session();
1321 let checker = default_checker();
1322 let component = Box::new(EchoComponent::new());
1323
1324 let handle = engine.spawn_runner_with_auth(io, component, session, checker);
1325
1326 assert_eq!(handle.id, io);
1327 assert!(engine.runner_tasks.contains_key(&io));
1328
1329 let _ = engine.world_tx().send(WorldCommand::Shutdown).await;
1331 }
1332 }
1333}