1use super::eventbus::EventBus;
37use crate::board::{self, SharedBoard};
38use crate::channel::{
39 ChannelConfig, ChannelHandle, ChannelRunner, ClientRunner, ClientRunnerConfig, LuaChildLoader,
40 OutputSender, RunnerResult, World, WorldCommand, WorldCommandSender, WorldManager,
41};
42use crate::io::IOPort;
43use crate::Principal;
44use orcs_component::{Component, ComponentLoader, ComponentSnapshot};
45use orcs_event::Signal;
46use orcs_hook::SharedHookRegistry;
47use orcs_types::{ChannelId, ComponentId};
48use std::collections::HashMap;
49use std::sync::Arc;
50use std::time::Duration;
51use tokio::sync::{broadcast, mpsc, RwLock};
52use tracing::{debug, info, warn};
53
54const SIGNAL_BUFFER_SIZE: usize = 256;
59
60const RUNNER_SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(5);
62
63#[derive(Debug)]
68struct RunnerExitNotice {
69 channel_id: ChannelId,
70 component_fqn: String,
71 exit_reason: crate::channel::ExitReason,
73}
74
75pub struct OrcsEngine {
112 eventbus: EventBus,
114 running: bool,
116 world_tx: WorldCommandSender,
119 world_read: Arc<RwLock<World>>,
121 signal_tx: broadcast::Sender<Signal>,
123 channel_handles: HashMap<ChannelId, ChannelHandle>,
125 manager_task: Option<tokio::task::JoinHandle<()>>,
127 runner_tasks: HashMap<ChannelId, tokio::task::JoinHandle<RunnerResult>>,
133 collected_snapshots: HashMap<String, ComponentSnapshot>,
138 io_channel: ChannelId,
144 board: SharedBoard,
146 hook_registry: Option<SharedHookRegistry>,
148 mcp_manager: Option<Arc<orcs_mcp::McpClientManager>>,
150 runner_exit_tx: mpsc::UnboundedSender<RunnerExitNotice>,
152 runner_exit_rx: Option<mpsc::UnboundedReceiver<RunnerExitNotice>>,
154 monitor_task: Option<tokio::task::JoinHandle<()>>,
156}
157
158impl OrcsEngine {
159 #[must_use]
179 pub fn new(world: World, io_channel: ChannelId) -> Self {
180 let (manager, world_tx) = WorldManager::with_world(world);
182 let world_read = manager.world();
183
184 let (signal_tx, _) = broadcast::channel(SIGNAL_BUFFER_SIZE);
186
187 let manager_task = tokio::spawn(manager.run());
189
190 let (runner_exit_tx, runner_exit_rx) = mpsc::unbounded_channel();
192
193 info!(
194 "OrcsEngine created with IO channel {} (WorldManager started)",
195 io_channel
196 );
197
198 Self {
199 eventbus: EventBus::new(),
200 running: false,
201 world_tx,
202 world_read,
203 signal_tx,
204 channel_handles: HashMap::new(),
205 manager_task: Some(manager_task),
206 runner_tasks: HashMap::new(),
207 collected_snapshots: HashMap::new(),
208 io_channel,
209 board: board::shared_board(),
210 hook_registry: None,
211 mcp_manager: None,
212 runner_exit_tx,
213 runner_exit_rx: Some(runner_exit_rx),
214 monitor_task: None,
215 }
216 }
217
218 pub fn set_mcp_manager(&mut self, manager: Arc<orcs_mcp::McpClientManager>) {
223 self.mcp_manager = Some(manager);
224 }
225
226 pub fn set_hook_registry(&mut self, registry: SharedHookRegistry) {
231 self.eventbus.set_hook_registry(Arc::clone(®istry));
232 self.hook_registry = Some(registry);
233 }
234
235 #[must_use]
237 pub fn hook_registry(&self) -> Option<&SharedHookRegistry> {
238 self.hook_registry.as_ref()
239 }
240
241 fn apply_hook_registry(
243 &self,
244 builder: crate::channel::ChannelRunnerBuilder,
245 ) -> crate::channel::ChannelRunnerBuilder {
246 match &self.hook_registry {
247 Some(reg) => builder.with_hook_registry(Arc::clone(reg)),
248 None => builder,
249 }
250 }
251
252 fn apply_mcp_manager(
254 &self,
255 builder: crate::channel::ChannelRunnerBuilder,
256 ) -> crate::channel::ChannelRunnerBuilder {
257 match &self.mcp_manager {
258 Some(mgr) => builder.with_mcp_manager(Arc::clone(mgr)),
259 None => builder,
260 }
261 }
262
263 fn finalize_runner(
267 &mut self,
268 channel_id: ChannelId,
269 component_id: &ComponentId,
270 runner: ChannelRunner,
271 handle: ChannelHandle,
272 ) -> ChannelHandle {
273 self.eventbus.register_channel(handle.clone());
274 self.eventbus
275 .register_component_channel(component_id, channel_id);
276 self.channel_handles.insert(channel_id, handle.clone());
277
278 let exit_tx = self.runner_exit_tx.clone();
280 let fqn = component_id.fqn().to_string();
281 let runner_task = tokio::spawn(async move {
282 let result = runner.run().await;
283 let _ = exit_tx.send(RunnerExitNotice {
285 channel_id,
286 component_fqn: fqn,
287 exit_reason: result.exit_reason,
288 });
289 result
290 });
291 self.runner_tasks.insert(channel_id, runner_task);
292 handle
293 }
294
295 pub fn spawn_runner(
304 &mut self,
305 channel_id: ChannelId,
306 component: Box<dyn Component>,
307 ) -> ChannelHandle {
308 let component_id = component.id().clone();
309 let signal_rx = self.signal_tx.subscribe();
310 let builder = ChannelRunner::builder(
311 channel_id,
312 self.world_tx.clone(),
313 Arc::clone(&self.world_read),
314 signal_rx,
315 component,
316 )
317 .with_request_channel();
318 let (runner, handle) = self
319 .apply_mcp_manager(self.apply_hook_registry(builder))
320 .build();
321
322 let handle = self.finalize_runner(channel_id, &component_id, runner, handle);
323 info!("Spawned runner for channel {}", channel_id);
324 handle
325 }
326
327 pub fn spawn_client_runner(
349 &mut self,
350 channel_id: ChannelId,
351 io_port: IOPort,
352 principal: orcs_types::Principal,
353 ) -> (ChannelHandle, OutputSender) {
354 let config = ClientRunnerConfig {
355 world_tx: self.world_tx.clone(),
356 world: Arc::clone(&self.world_read),
357 signal_rx: self.signal_tx.subscribe(),
358 channel_handles: self.eventbus.shared_handles(),
359 };
360 let (runner, handle) = ClientRunner::new(channel_id, config, io_port, principal);
361
362 let output_sender = OutputSender::new(runner.event_tx().clone());
364
365 self.eventbus.register_channel(handle.clone());
367
368 self.channel_handles.insert(channel_id, handle.clone());
370
371 let runner_task = tokio::spawn(runner.run());
373 self.runner_tasks.insert(channel_id, runner_task);
374
375 info!("Spawned ClientRunner for channel {}", channel_id);
376 (handle, output_sender)
377 }
378
379 pub async fn spawn_channel(
389 &mut self,
390 parent: ChannelId,
391 config: ChannelConfig,
392 component: Box<dyn Component>,
393 ) -> Option<ChannelId> {
394 let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
396 let cmd = WorldCommand::Spawn {
397 parent,
398 config,
399 reply: reply_tx,
400 };
401
402 if self.world_tx.send(cmd).await.is_err() {
403 return None;
404 }
405
406 let child_id = reply_rx.await.ok()??;
408
409 self.spawn_runner(child_id, component);
411
412 Some(child_id)
413 }
414
415 pub async fn spawn_channel_with_auth(
438 &mut self,
439 parent: ChannelId,
440 config: ChannelConfig,
441 component: Box<dyn Component>,
442 session: Arc<crate::Session>,
443 checker: Arc<dyn crate::auth::PermissionChecker>,
444 ) -> Option<ChannelId> {
445 if !checker.can_spawn_child(&session) {
447 warn!(
448 principal = ?session.principal(),
449 "spawn_channel denied: permission denied"
450 );
451 return None;
452 }
453
454 let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
456 let cmd = WorldCommand::Spawn {
457 parent,
458 config,
459 reply: reply_tx,
460 };
461
462 if self.world_tx.send(cmd).await.is_err() {
463 return None;
464 }
465
466 let child_id = reply_rx.await.ok()??;
468
469 self.spawn_runner_with_auth(child_id, component, session, checker);
471
472 Some(child_id)
473 }
474
475 pub fn spawn_runner_with_auth(
487 &mut self,
488 channel_id: ChannelId,
489 component: Box<dyn Component>,
490 session: Arc<crate::Session>,
491 checker: Arc<dyn crate::auth::PermissionChecker>,
492 ) -> ChannelHandle {
493 let signal_rx = self.signal_tx.subscribe();
494 let component_id = component.id().clone();
495
496 let builder = ChannelRunner::builder(
498 channel_id,
499 self.world_tx.clone(),
500 Arc::clone(&self.world_read),
501 signal_rx,
502 component,
503 )
504 .with_emitter(self.signal_tx.clone())
505 .with_shared_handles(self.eventbus.shared_handles())
506 .with_component_channel_map(self.eventbus.shared_component_channel_map())
507 .with_board(Arc::clone(&self.board))
508 .with_session_arc(session)
509 .with_checker(checker)
510 .with_request_channel();
511 let (runner, handle) = self
512 .apply_mcp_manager(self.apply_hook_registry(builder))
513 .build();
514
515 let handle = self.finalize_runner(channel_id, &component_id, runner, handle);
516 info!(
517 "Spawned runner (with auth) for channel {} (component={})",
518 channel_id,
519 component_id.fqn(),
520 );
521 handle
522 }
523
524 #[allow(clippy::too_many_arguments)]
547 pub fn spawn_runner_full_auth(
548 &mut self,
549 channel_id: ChannelId,
550 component: Box<dyn Component>,
551 output_tx: Option<OutputSender>,
552 lua_loader: Option<Arc<dyn LuaChildLoader>>,
553 component_loader: Option<Arc<dyn ComponentLoader>>,
554 session: Arc<crate::Session>,
555 checker: Arc<dyn crate::auth::PermissionChecker>,
556 grants: Arc<dyn orcs_auth::GrantPolicy>,
557 ) -> ChannelHandle {
558 let empty_config = serde_json::Value::Object(serde_json::Map::new());
559 self.spawn_runner_full_auth_with_snapshot(
560 channel_id,
561 component,
562 output_tx,
563 lua_loader,
564 component_loader,
565 session,
566 checker,
567 grants,
568 None,
569 empty_config,
570 )
571 }
572
573 #[allow(clippy::too_many_arguments)]
578 pub fn spawn_runner_full_auth_with_snapshot(
579 &mut self,
580 channel_id: ChannelId,
581 component: Box<dyn Component>,
582 output_tx: Option<OutputSender>,
583 lua_loader: Option<Arc<dyn LuaChildLoader>>,
584 component_loader: Option<Arc<dyn ComponentLoader>>,
585 session: Arc<crate::Session>,
586 checker: Arc<dyn crate::auth::PermissionChecker>,
587 grants: Arc<dyn orcs_auth::GrantPolicy>,
588 initial_snapshot: Option<ComponentSnapshot>,
589 component_config: serde_json::Value,
590 ) -> ChannelHandle {
591 let signal_rx = self.signal_tx.subscribe();
592 let component_id = component.id().clone();
593
594 let mut builder = ChannelRunner::builder(
596 channel_id,
597 self.world_tx.clone(),
598 Arc::clone(&self.world_read),
599 signal_rx,
600 component,
601 )
602 .with_emitter(self.signal_tx.clone())
603 .with_shared_handles(self.eventbus.shared_handles())
604 .with_component_channel_map(self.eventbus.shared_component_channel_map())
605 .with_board(Arc::clone(&self.board))
606 .with_session_arc(session)
607 .with_checker(checker)
608 .with_grants(grants)
609 .with_request_channel()
610 .with_component_config(component_config);
611
612 if let Some(tx) = output_tx {
614 builder = builder.with_output_channel(tx);
615 }
616
617 let has_child_spawner = lua_loader.is_some();
619 if has_child_spawner {
620 builder = builder.with_child_spawner(lua_loader);
621 }
622
623 if let Some(loader) = component_loader {
625 builder = builder.with_component_loader(loader);
626 }
627
628 if let Some(snapshot) = initial_snapshot {
630 builder = builder.with_initial_snapshot(snapshot);
631 }
632
633 let (runner, handle) = self
634 .apply_mcp_manager(self.apply_hook_registry(builder))
635 .build();
636
637 let handle = self.finalize_runner(channel_id, &component_id, runner, handle);
638 info!(
639 "Spawned runner (full+auth) for channel {} (component={}, child_spawner={})",
640 channel_id,
641 component_id.fqn(),
642 has_child_spawner
643 );
644 handle
645 }
646
647 #[must_use]
649 pub fn world_read(&self) -> &Arc<RwLock<World>> {
650 &self.world_read
651 }
652
653 #[must_use]
655 pub fn world_tx(&self) -> &WorldCommandSender {
656 &self.world_tx
657 }
658
659 pub fn inject_event(
669 &self,
670 channel_id: ChannelId,
671 event: crate::channel::Event,
672 ) -> Result<(), super::EngineError> {
673 self.eventbus.try_inject(channel_id, event)
674 }
675
676 pub fn signal(&self, signal: Signal) {
680 info!("Signal dispatched: {:?}", signal.kind);
681 let _ = self.signal_tx.send(signal);
683 }
684
685 #[must_use]
687 pub fn is_running(&self) -> bool {
688 self.running
689 }
690
691 pub fn start(&mut self) {
696 self.running = true;
697 self.start_runner_monitor();
698 info!("OrcsEngine started");
699 }
700
701 fn start_runner_monitor(&mut self) {
706 if self.monitor_task.is_some() {
707 return;
708 }
709 let Some(mut exit_rx) = self.runner_exit_rx.take() else {
710 warn!("Runner monitor already started (exit_rx already taken)");
711 return;
712 };
713
714 let shared_handles = self.eventbus.shared_handles();
715 let shared_ccm = self.eventbus.shared_component_channel_map();
716 let task = tokio::spawn(async move {
717 while let Some(notice) = exit_rx.recv().await {
718 let reason_str = notice.exit_reason.as_str();
719 warn!(
720 channel = %notice.channel_id,
721 component = %notice.component_fqn,
722 exit_reason = reason_str,
723 "Runner exited"
724 );
725
726 let event = crate::channel::Event {
728 category: orcs_event::EventCategory::Lifecycle,
729 operation: "runner_exited".into(),
730 source: orcs_types::ComponentId::builtin("engine"),
731 payload: serde_json::json!({
732 "channel_id": notice.channel_id.to_string(),
733 "component_fqn": notice.component_fqn,
734 "exit_reason": reason_str,
735 }),
736 };
737 {
738 let handles = shared_handles.read();
739 let mut delivered = 0usize;
740 for handle in handles.values() {
741 if handle.try_inject(event.clone()).is_ok() {
742 delivered += 1;
743 }
744 }
745 info!(
746 channel = %notice.channel_id,
747 component = %notice.component_fqn,
748 exit_reason = reason_str,
749 delivered,
750 "Lifecycle::runner_exited event broadcast"
751 );
752 }
753
754 {
758 let mut handles = shared_handles.write();
759 handles.remove(¬ice.channel_id);
760 }
761 {
762 let mut ccm = shared_ccm.write();
763 ccm.retain(|_, cid| *cid != notice.channel_id);
764 }
765 debug!(
766 channel = %notice.channel_id,
767 component = %notice.component_fqn,
768 "Dead channel handle removed"
769 );
770 }
771 debug!("Runner monitor stopped (all exit senders dropped)");
772 });
773 self.monitor_task = Some(task);
774 debug!("Runner monitor task spawned");
775 }
776
777 pub fn stop(&self) {
783 info!("Engine stop requested");
784 let _ = self.signal_tx.send(Signal::veto(Principal::System));
785 }
786
787 pub async fn shutdown(&mut self) {
793 self.shutdown_parallel().await;
794 }
795
796 #[must_use]
801 pub fn board(&self) -> &SharedBoard {
802 &self.board
803 }
804
805 #[must_use]
809 pub fn io_channel(&self) -> ChannelId {
810 self.io_channel
811 }
812
813 pub async fn run(&mut self) {
832 self.start();
833 info!("Entering parallel run loop");
834
835 let mut signal_rx = self.signal_tx.subscribe();
836
837 loop {
838 tokio::select! {
839 result = signal_rx.recv() => {
840 match result {
841 Ok(signal) if signal.is_veto() => {
842 info!("Veto signal received, stopping engine");
843 break;
844 }
845 Ok(_) => {
846 }
848 Err(broadcast::error::RecvError::Lagged(n)) => {
849 warn!("Signal receiver lagged by {} messages", n);
850 }
851 Err(broadcast::error::RecvError::Closed) => {
852 warn!("Signal channel closed unexpectedly");
853 break;
854 }
855 }
856 }
857 }
858 }
859
860 self.running = false;
861 self.shutdown_parallel().await;
862 info!("OrcsEngine stopped (parallel mode)");
863 }
864
865 async fn shutdown_parallel(&mut self) {
878 if let Some(task) = self.monitor_task.take() {
881 task.abort();
882 debug!("Runner monitor aborted for shutdown");
883 }
884 let tasks: Vec<_> = self.runner_tasks.drain().collect();
889 let mut wrapper_handles = Vec::with_capacity(tasks.len());
890
891 for (id, mut task) in tasks {
892 wrapper_handles.push(tokio::spawn(async move {
893 tokio::select! {
894 result = &mut task => {
895 match result {
896 Ok(runner_result) => Some((id, runner_result)),
897 Err(e) => {
898 warn!(channel = %id, error = %e, "runner task panicked");
899 None
900 }
901 }
902 }
903 _ = tokio::time::sleep(RUNNER_SHUTDOWN_TIMEOUT) => {
904 warn!(channel = %id, "runner task timed out, aborting");
905 task.abort();
906 None
907 }
908 }
909 }));
910 }
911
912 for handle in wrapper_handles {
914 if let Ok(Some((id, result))) = handle.await {
915 debug!(
916 channel = %id,
917 component = %result.component_fqn,
918 has_snapshot = result.snapshot.is_some(),
919 "runner completed gracefully"
920 );
921 if let Some(snapshot) = result.snapshot {
922 self.collected_snapshots
923 .insert(result.component_fqn.into_owned(), snapshot);
924 }
925 }
926 }
927
928 info!(
929 "collected {} snapshots from runners",
930 self.collected_snapshots.len()
931 );
932
933 let _ = self.world_tx.send(WorldCommand::Shutdown).await;
935 if let Some(task) = self.manager_task.take() {
936 let _ = task.await;
937 }
938
939 for id in self.channel_handles.keys() {
941 self.eventbus.unregister_channel(id);
942 }
943 self.channel_handles.clear();
944 }
945
946 #[must_use]
958 pub fn collected_snapshots(&self) -> &HashMap<String, ComponentSnapshot> {
959 &self.collected_snapshots
960 }
961}
962
963#[cfg(test)]
964mod tests {
965 use super::*;
966 use crate::channel::{ChannelCore, WorldCommand};
967 use crate::Principal;
968 use orcs_component::ComponentError;
969 use orcs_event::{Request, SignalResponse};
970 use orcs_types::ComponentId;
971 use serde_json::Value;
972
973 fn test_world() -> (World, ChannelId) {
975 let mut world = World::new();
976 let io = world.create_channel(ChannelConfig::interactive());
977 (world, io)
978 }
979
980 struct EchoComponent {
981 id: ComponentId,
982 aborted: bool,
983 }
984
985 impl EchoComponent {
986 fn new() -> Self {
987 Self {
988 id: ComponentId::builtin("echo"),
989 aborted: false,
990 }
991 }
992 }
993
994 impl Component for EchoComponent {
995 fn id(&self) -> &ComponentId {
996 &self.id
997 }
998
999 fn status(&self) -> orcs_component::Status {
1000 orcs_component::Status::Idle
1001 }
1002
1003 fn on_request(&mut self, request: &Request) -> Result<Value, ComponentError> {
1004 Ok(request.payload.clone())
1005 }
1006
1007 fn on_signal(&mut self, signal: &Signal) -> SignalResponse {
1008 if signal.is_veto() {
1009 SignalResponse::Abort
1010 } else {
1011 SignalResponse::Handled
1012 }
1013 }
1014
1015 fn abort(&mut self) {
1016 self.aborted = true;
1017 }
1018
1019 fn status_detail(&self) -> Option<orcs_component::StatusDetail> {
1020 None
1021 }
1022
1023 fn init(&mut self, _config: &serde_json::Value) -> Result<(), ComponentError> {
1024 Ok(())
1025 }
1026
1027 fn shutdown(&mut self) {
1028 }
1030 }
1031
1032 #[tokio::test]
1033 async fn engine_creation() {
1034 let (world, io) = test_world();
1035 let engine = OrcsEngine::new(world, io);
1036 assert!(!engine.is_running());
1037 assert_eq!(engine.io_channel(), io);
1038
1039 let _ = engine.world_tx().send(WorldCommand::Shutdown).await;
1041 }
1042
1043 #[tokio::test]
1044 async fn engine_world_access() {
1045 let (world, io) = test_world();
1046 let engine = OrcsEngine::new(world, io);
1047
1048 let w = engine.world_read().read().await;
1050 assert!(w.get(&io).is_some());
1051 drop(w);
1052
1053 let _ = engine.world_tx().send(WorldCommand::Shutdown).await;
1055 }
1056
1057 #[tokio::test]
1058 async fn stop_engine_sends_veto_signal() {
1059 let (world, io) = test_world();
1060 let engine = OrcsEngine::new(world, io);
1061
1062 let mut rx = engine.signal_tx.subscribe();
1064
1065 engine.stop();
1067
1068 let received = rx.recv().await.expect("receive signal");
1070 assert!(received.is_veto());
1071
1072 let _ = engine.world_tx().send(WorldCommand::Shutdown).await;
1074 }
1075
1076 #[tokio::test]
1077 async fn world_access_parallel() {
1078 let (world, io) = test_world();
1079 let engine = OrcsEngine::new(world, io);
1080
1081 let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
1083 engine
1084 .world_tx()
1085 .send(WorldCommand::Complete {
1086 id: io,
1087 reply: reply_tx,
1088 })
1089 .await
1090 .expect("send complete command");
1091 assert!(reply_rx.await.expect("receive complete reply"));
1092
1093 let w = engine.world_read().read().await;
1095 assert!(!w.get(&io).expect("get channel").is_running());
1096 drop(w);
1097
1098 let _ = engine.world_tx().send(WorldCommand::Shutdown).await;
1100 }
1101
1102 #[tokio::test]
1103 async fn spawn_runner_creates_task() {
1104 let (world, io) = test_world();
1105 let mut engine = OrcsEngine::new(world, io);
1106
1107 let echo = Box::new(EchoComponent::new());
1108 let _handle = engine.spawn_runner(io, echo);
1109
1110 assert_eq!(engine.runner_tasks.len(), 1);
1112 assert!(engine.runner_tasks.contains_key(&io));
1113
1114 let _ = engine.world_tx().send(WorldCommand::Shutdown).await;
1116 }
1117
1118 #[tokio::test]
1119 async fn signal_broadcast_works() {
1120 let (world, io) = test_world();
1121 let engine = OrcsEngine::new(world, io);
1122
1123 let mut rx = engine.signal_tx.subscribe();
1125
1126 let principal = Principal::System;
1127 let cancel = Signal::cancel(io, principal);
1128 engine.signal(cancel.clone());
1129
1130 let received = rx.recv().await.expect("receive signal");
1132 assert!(matches!(received.kind, orcs_event::SignalKind::Cancel));
1133
1134 let _ = engine.world_tx().send(WorldCommand::Shutdown).await;
1136 }
1137
1138 struct SnapshottableComponent {
1142 id: ComponentId,
1143 counter: u64,
1144 }
1145
1146 impl SnapshottableComponent {
1147 fn new(name: &str, initial_value: u64) -> Self {
1148 Self {
1149 id: ComponentId::builtin(name),
1150 counter: initial_value,
1151 }
1152 }
1153 }
1154
1155 impl Component for SnapshottableComponent {
1156 fn id(&self) -> &ComponentId {
1157 &self.id
1158 }
1159
1160 fn status(&self) -> orcs_component::Status {
1161 orcs_component::Status::Idle
1162 }
1163
1164 fn on_request(&mut self, _request: &Request) -> Result<Value, ComponentError> {
1165 self.counter += 1;
1166 Ok(Value::Number(self.counter.into()))
1167 }
1168
1169 fn on_signal(&mut self, signal: &Signal) -> SignalResponse {
1170 if signal.is_veto() {
1171 SignalResponse::Abort
1172 } else {
1173 SignalResponse::Handled
1174 }
1175 }
1176
1177 fn abort(&mut self) {}
1178
1179 fn snapshot(
1180 &self,
1181 ) -> Result<orcs_component::ComponentSnapshot, orcs_component::SnapshotError> {
1182 orcs_component::ComponentSnapshot::from_state(self.id.fqn(), &self.counter)
1183 }
1184
1185 fn restore(
1186 &mut self,
1187 snapshot: &orcs_component::ComponentSnapshot,
1188 ) -> Result<(), orcs_component::SnapshotError> {
1189 self.counter = snapshot.to_state()?;
1190 Ok(())
1191 }
1192 }
1193
1194 #[tokio::test]
1195 async fn graceful_shutdown_collects_snapshots() {
1196 let (world, io) = test_world();
1197 let mut engine = OrcsEngine::new(world, io);
1198
1199 let comp = Box::new(SnapshottableComponent::new("snap", 42));
1201 let _handle = engine.spawn_runner(io, comp);
1202
1203 let engine_signal_tx = engine.signal_tx.clone();
1205 tokio::spawn(async move {
1206 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1207 let _ = engine_signal_tx.send(Signal::veto(Principal::System));
1208 });
1209
1210 engine.run().await;
1211
1212 let snapshots = engine.collected_snapshots();
1214 assert_eq!(snapshots.len(), 1);
1215 assert!(snapshots.contains_key("builtin::snap"));
1216 }
1217
1218 #[tokio::test]
1219 async fn snapshots_persist_via_store_after_graceful_shutdown() {
1220 use crate::session::{LocalFileStore, SessionAsset, SessionStore};
1221 use crate::WorkDir;
1222
1223 let wd = WorkDir::temporary().expect("should create temp WorkDir for store test");
1224 let store = LocalFileStore::new(wd.path().to_path_buf()).expect("create store");
1225
1226 let (world, io) = test_world();
1227 let mut engine = OrcsEngine::new(world, io);
1228
1229 let comp = Box::new(SnapshottableComponent::new("snap", 42));
1231 let _handle = engine.spawn_runner(io, comp);
1232
1233 let engine_signal_tx = engine.signal_tx.clone();
1235 tokio::spawn(async move {
1236 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1237 let _ = engine_signal_tx.send(Signal::veto(Principal::System));
1238 });
1239
1240 engine.run().await;
1241
1242 let mut asset = SessionAsset::new();
1244 let session_id = asset.id.clone();
1245 for (fqn, snapshot) in engine.collected_snapshots() {
1246 asset
1247 .component_snapshots
1248 .insert(fqn.clone(), snapshot.clone());
1249 }
1250 asset.touch();
1251 store.save(&asset).await.expect("save session");
1252
1253 assert!(asset.get_snapshot("builtin::snap").is_some());
1255
1256 let loaded = store.load(&session_id).await.expect("load session");
1258 assert_eq!(loaded.id, session_id);
1259 assert!(loaded.get_snapshot("builtin::snap").is_some());
1260 }
1261
1262 mod spawn_channel_with_auth_tests {
1265 use super::*;
1266 use crate::auth::{DefaultPolicy, Session};
1267 use orcs_types::PrincipalId;
1268 use std::time::Duration;
1269
1270 fn standard_session() -> Arc<Session> {
1271 Arc::new(Session::new(Principal::User(PrincipalId::new())))
1272 }
1273
1274 fn elevated_session() -> Arc<Session> {
1275 Arc::new(
1276 Session::new(Principal::User(PrincipalId::new())).elevate(Duration::from_secs(60)),
1277 )
1278 }
1279
1280 fn default_checker() -> Arc<dyn crate::auth::PermissionChecker> {
1281 Arc::new(DefaultPolicy)
1282 }
1283
1284 #[tokio::test]
1285 async fn spawn_channel_with_auth_denied_for_standard_session() {
1286 let (world, io) = test_world();
1287 let mut engine = OrcsEngine::new(world, io);
1288
1289 let session = standard_session();
1290 let checker = default_checker();
1291 let config = ChannelConfig::new(100, true);
1292 let component = Box::new(EchoComponent::new());
1293
1294 let result = engine
1296 .spawn_channel_with_auth(io, config, component, session, checker)
1297 .await;
1298
1299 assert!(result.is_none(), "standard session should be denied");
1300
1301 let _ = engine.world_tx().send(WorldCommand::Shutdown).await;
1303 }
1304
1305 #[tokio::test]
1306 async fn spawn_channel_with_auth_allowed_for_elevated_session() {
1307 let (world, io) = test_world();
1308 let mut engine = OrcsEngine::new(world, io);
1309
1310 let session = elevated_session();
1311 let checker = default_checker();
1312 let config = ChannelConfig::new(100, true);
1313 let component = Box::new(EchoComponent::new());
1314
1315 let result = engine
1317 .spawn_channel_with_auth(io, config, component, session, checker)
1318 .await;
1319
1320 assert!(result.is_some(), "elevated session should be allowed");
1321
1322 let child_id = result.expect("elevated session should produce a child channel id");
1323 let w = engine.world_read().read().await;
1325 assert!(w.get(&child_id).is_some());
1326 drop(w);
1327
1328 let _ = engine.world_tx().send(WorldCommand::Shutdown).await;
1330 }
1331
1332 #[tokio::test]
1333 async fn spawn_runner_with_auth_creates_runner() {
1334 let (world, io) = test_world();
1335 let mut engine = OrcsEngine::new(world, io);
1336
1337 let session = elevated_session();
1338 let checker = default_checker();
1339 let component = Box::new(EchoComponent::new());
1340
1341 let handle = engine.spawn_runner_with_auth(io, component, session, checker);
1342
1343 assert_eq!(handle.id, io);
1344 assert!(engine.runner_tasks.contains_key(&io));
1345
1346 let _ = engine.world_tx().send(WorldCommand::Shutdown).await;
1348 }
1349 }
1350}