1use super::base::OutputSender;
7use super::child_spawner::ChildSpawner;
8use super::{ChannelRunner, Event};
9use crate::auth::{CommandCheckResult, PermissionChecker, Session};
10use crate::channel::{World, WorldCommand};
11use crate::engine::{SharedChannelHandles, SharedComponentChannelMap};
12use orcs_auth::Capability;
13use orcs_auth::{CommandGrant, GrantPolicy};
14use orcs_component::{
15 async_trait, AsyncChildContext, AsyncChildHandle, ChildConfig, ChildContext, ChildHandle,
16 ChildResult, Component, ComponentLoader, RunError, SpawnError,
17};
18use orcs_event::{EventCategory, Signal};
19use orcs_hook::SharedHookRegistry;
20use orcs_types::{ChannelId, ComponentId};
21use std::fmt::Debug;
22use std::sync::{Arc, Mutex};
23use tokio::sync::{broadcast, mpsc, RwLock};
24
25#[derive(Clone)]
33pub struct ChildContextImpl {
34 parent_id: String,
36 output_tx: OutputSender,
38 io_output_tx: Option<OutputSender>,
41 spawner: Arc<Mutex<ChildSpawner>>,
43 lua_loader: Option<Arc<dyn LuaChildLoader>>,
45 component_loader: Option<Arc<dyn ComponentLoader>>,
47
48 session: Option<Arc<Session>>,
51 checker: Option<Arc<dyn PermissionChecker>>,
53 grants: Option<Arc<dyn GrantPolicy>>,
55
56 world_tx: Option<mpsc::Sender<WorldCommand>>,
59 world: Option<Arc<RwLock<World>>>,
61 signal_tx: Option<broadcast::Sender<Signal>>,
63
64 shared_handles: Option<SharedChannelHandles>,
67 component_channel_map: Option<SharedComponentChannelMap>,
69 channel_id: Option<ChannelId>,
71
72 hook_registry: Option<SharedHookRegistry>,
75
76 mcp_manager: Option<Arc<orcs_mcp::McpClientManager>>,
79
80 capabilities: Capability,
84}
85
86pub trait LuaChildLoader: Send + Sync {
91 fn load(
93 &self,
94 config: &ChildConfig,
95 ) -> Result<Box<dyn orcs_component::RunnableChild>, SpawnError>;
96}
97
98impl ChildContextImpl {
99 #[must_use]
107 pub fn new(
108 parent_id: impl Into<String>,
109 output_tx: OutputSender,
110 spawner: Arc<Mutex<ChildSpawner>>,
111 ) -> Self {
112 Self {
113 parent_id: parent_id.into(),
114 output_tx,
115 io_output_tx: None,
116 spawner,
117 lua_loader: None,
118 component_loader: None,
119 session: None,
120 checker: None,
121 grants: None,
122 world_tx: None,
123 world: None,
124 signal_tx: None,
125 shared_handles: None,
126 component_channel_map: None,
127 channel_id: None,
128 hook_registry: None,
129 mcp_manager: None,
130 capabilities: Capability::ALL,
131 }
132 }
133
134 #[must_use]
139 pub fn with_io_output_channel(mut self, tx: OutputSender) -> Self {
140 self.io_output_tx = Some(tx);
141 self
142 }
143
144 fn send_to_output(&self, event: Event) {
146 if let Some(io_tx) = &self.io_output_tx {
147 let _ = io_tx.try_send_direct(event);
148 } else {
149 let _ = self.output_tx.try_send_direct(event);
150 }
151 }
152
153 #[must_use]
157 pub fn with_session(mut self, session: Session) -> Self {
158 self.session = Some(Arc::new(session));
159 self
160 }
161
162 #[must_use]
166 pub fn with_session_arc(mut self, session: Arc<Session>) -> Self {
167 self.session = Some(session);
168 self
169 }
170
171 #[must_use]
173 pub fn with_checker(mut self, checker: Arc<dyn PermissionChecker>) -> Self {
174 self.checker = Some(checker);
175 self
176 }
177
178 #[must_use]
180 pub fn with_grants(mut self, grants: Arc<dyn GrantPolicy>) -> Self {
181 self.grants = Some(grants);
182 self
183 }
184
185 #[must_use]
187 pub fn session(&self) -> Option<&Arc<Session>> {
188 self.session.as_ref()
189 }
190
191 #[must_use]
193 pub fn checker(&self) -> Option<&Arc<dyn PermissionChecker>> {
194 self.checker.as_ref()
195 }
196
197 #[must_use]
203 pub fn can_execute_command(&self, cmd: &str) -> bool {
204 match (&self.session, &self.checker) {
205 (Some(session), Some(checker)) => checker.can_execute_command(session, cmd),
206 _ => true, }
208 }
209
210 #[must_use]
243 pub fn check_command(&self, cmd: &str) -> CommandCheckResult {
244 let pre_payload = serde_json::json!({ "command": cmd });
246 let pre_action = self.dispatch_hook(orcs_hook::HookPoint::AuthPreCheck, pre_payload);
247
248 match &pre_action {
249 orcs_hook::HookAction::Abort { reason } => {
250 return CommandCheckResult::Denied(reason.clone());
251 }
252 orcs_hook::HookAction::Skip(value) => {
253 let allowed = value
255 .as_object()
256 .and_then(|o| o.get("allowed"))
257 .and_then(|v| v.as_bool())
258 .unwrap_or(value.as_bool().unwrap_or(true));
259 if allowed {
260 return CommandCheckResult::Allowed;
261 }
262 let reason = value
263 .as_object()
264 .and_then(|o| o.get("reason"))
265 .and_then(|v| v.as_str())
266 .unwrap_or("denied by auth pre-check hook")
267 .to_string();
268 return CommandCheckResult::Denied(reason);
269 }
270 _ => {} }
272
273 let result = match (&self.session, &self.checker) {
275 (Some(session), Some(checker)) => {
276 let empty;
278 let grants: &dyn GrantPolicy = match &self.grants {
279 Some(g) => g.as_ref(),
280 None => {
281 empty = crate::auth::DefaultGrantStore::new();
282 &empty
283 }
284 };
285 checker.check_command(session, grants, cmd)
286 }
287 _ => CommandCheckResult::Allowed, };
289
290 let post_payload = serde_json::json!({
292 "command": cmd,
293 "result": match &result {
294 CommandCheckResult::Allowed => "allowed",
295 CommandCheckResult::Denied(_) => "denied",
296 CommandCheckResult::RequiresApproval { .. } => "requires_approval",
297 },
298 });
299 let _ = self.dispatch_hook(orcs_hook::HookPoint::AuthPostCheck, post_payload);
300
301 result
302 }
303
304 pub fn grant_command(&self, pattern: &str) {
311 self.grant_command_inner(pattern);
312 }
313
314 #[must_use]
316 pub fn can_spawn_child_auth(&self) -> bool {
317 match (&self.session, &self.checker) {
318 (Some(session), Some(checker)) => checker.can_spawn_child(session),
319 _ => true, }
321 }
322
323 #[must_use]
325 pub fn can_spawn_runner_auth(&self) -> bool {
326 match (&self.session, &self.checker) {
327 (Some(session), Some(checker)) => checker.can_spawn_runner(session),
328 _ => true, }
330 }
331
332 #[must_use]
334 pub fn with_lua_loader(mut self, loader: Arc<dyn LuaChildLoader>) -> Self {
335 self.lua_loader = Some(loader);
336 self
337 }
338
339 #[must_use]
341 pub fn with_component_loader(mut self, loader: Arc<dyn ComponentLoader>) -> Self {
342 self.component_loader = Some(loader);
343 self
344 }
345
346 #[must_use]
348 pub fn with_runner_support(
349 mut self,
350 world_tx: mpsc::Sender<WorldCommand>,
351 world: Arc<RwLock<World>>,
352 signal_tx: broadcast::Sender<Signal>,
353 ) -> Self {
354 self.world_tx = Some(world_tx);
355 self.world = Some(world);
356 self.signal_tx = Some(signal_tx);
357 self
358 }
359
360 #[must_use]
365 pub fn with_rpc_support(
366 mut self,
367 shared_handles: SharedChannelHandles,
368 component_channel_map: SharedComponentChannelMap,
369 channel_id: ChannelId,
370 ) -> Self {
371 self.shared_handles = Some(shared_handles);
372 self.component_channel_map = Some(component_channel_map);
373 self.channel_id = Some(channel_id);
374 self
375 }
376
377 #[must_use]
379 pub fn with_hook_registry(mut self, registry: SharedHookRegistry) -> Self {
380 self.hook_registry = Some(registry);
381 self
382 }
383
384 #[must_use]
386 pub fn with_mcp_manager(mut self, manager: Arc<orcs_mcp::McpClientManager>) -> Self {
387 self.mcp_manager = Some(manager);
388 self
389 }
390
391 #[must_use]
396 pub fn with_capabilities(mut self, caps: Capability) -> Self {
397 self.capabilities = caps;
398 self
399 }
400
401 #[must_use]
403 pub fn can_spawn_runner(&self) -> bool {
404 self.world_tx.is_some() && self.world.is_some() && self.signal_tx.is_some()
405 }
406
407 pub fn spawn_runner(
440 &self,
441 component: Box<dyn Component>,
442 ) -> Result<(ChannelId, tokio::sync::oneshot::Receiver<()>), SpawnError> {
443 if !self.can_spawn_runner_auth() {
445 tracing::warn!(
446 parent_id = %self.parent_id,
447 "spawn_runner denied: requires elevated privilege"
448 );
449 return Err(SpawnError::Internal(
450 "spawn_runner requires elevated privilege".into(),
451 ));
452 }
453
454 let world_tx = self.world_tx.as_ref().ok_or_else(|| {
455 SpawnError::Internal("runner spawning not enabled (no world_tx)".into())
456 })?;
457 let world = self
458 .world
459 .as_ref()
460 .ok_or_else(|| SpawnError::Internal("runner spawning not enabled (no world)".into()))?;
461 let signal_tx = self.signal_tx.as_ref().ok_or_else(|| {
462 SpawnError::Internal("runner spawning not enabled (no signal_tx)".into())
463 })?;
464
465 let channel_id = ChannelId::new();
467 let component_id = component.id().clone();
468 let component_fqn = component_id.fqn();
469
470 let parent_channel_id = self.channel_id.ok_or_else(|| {
472 SpawnError::Internal("spawn_runner requires a parent channel_id".into())
473 })?;
474
475 let world_tx_clone = world_tx.clone();
477 let world_clone = Arc::clone(world);
478 let signal_rx = signal_tx.subscribe();
479 let signal_tx_clone = signal_tx.clone();
480 let effective_output_tx = self
486 .io_output_tx
487 .clone()
488 .unwrap_or_else(|| self.output_tx.clone());
489
490 let session_clone = self.session.clone();
492 let checker_clone = self.checker.clone();
493 let grants_clone = self.grants.clone();
494 let hook_registry_clone = self.hook_registry.clone();
495 let mcp_manager_clone = self.mcp_manager.clone();
496
497 let shared_handles_clone = self.shared_handles.clone();
500 let component_channel_map_clone = self.component_channel_map.clone();
501
502 let lua_loader_clone = self.lua_loader.clone();
505 let component_loader_clone = self.component_loader.clone();
506
507 let (ready_tx, ready_rx) = tokio::sync::oneshot::channel::<()>();
510
511 tokio::spawn(async move {
513 let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
515 if let Err(e) = world_tx_clone
516 .send(WorldCommand::SpawnWithId {
517 parent: parent_channel_id,
518 id: channel_id,
519 config: crate::channel::ChannelConfig::default(),
520 reply: reply_tx,
521 })
522 .await
523 {
524 tracing::error!(
525 "spawn_runner: failed to send SpawnWithId for {}: {}",
526 component_fqn,
527 e
528 );
529 return;
530 }
531 match reply_rx.await {
532 Ok(true) => {}
533 Ok(false) => {
534 tracing::error!(
535 "spawn_runner: SpawnWithId failed (parent {} not found) for {}",
536 parent_channel_id,
537 component_fqn
538 );
539 return;
540 }
541 Err(e) => {
542 tracing::error!(
543 "spawn_runner: SpawnWithId reply dropped for {}: {}",
544 component_fqn,
545 e
546 );
547 return;
548 }
549 }
550
551 let mut builder = ChannelRunner::builder(
553 channel_id,
554 world_tx_clone.clone(),
555 world_clone,
556 signal_rx,
557 component,
558 )
559 .with_emitter(signal_tx_clone)
560 .with_output_channel(effective_output_tx);
561
562 builder = builder.with_request_channel();
565
566 if let Some(ref handles) = shared_handles_clone {
569 builder = builder.with_shared_handles(Arc::clone(handles));
570 }
571 if let Some(ref map) = component_channel_map_clone {
572 builder = builder.with_component_channel_map(Arc::clone(map));
573 }
574
575 builder = builder.with_child_spawner(lua_loader_clone);
578 if let Some(loader) = component_loader_clone {
579 builder = builder.with_component_loader(loader);
580 }
581
582 if let Some(session) = session_clone {
584 builder = builder.with_session_arc(session);
585 }
586 if let Some(checker) = checker_clone {
587 builder = builder.with_checker(checker);
588 }
589 if let Some(grants) = grants_clone {
590 builder = builder.with_grants(grants);
591 }
592 if let Some(registry) = hook_registry_clone {
594 builder = builder.with_hook_registry(registry);
595 }
596 if let Some(manager) = mcp_manager_clone {
598 builder = builder.with_mcp_manager(manager);
599 }
600
601 let (runner, handle) = builder.build();
602
603 if let Some(ref handles) = shared_handles_clone {
607 handles.write().insert(channel_id, handle);
608 }
609
610 if let Some(ref map) = component_channel_map_clone {
613 map.write().insert(component_fqn.clone(), channel_id);
614 }
615
616 tracing::info!(
617 "Spawned child runner: channel={}, component={} (World registered, RPC enabled)",
618 channel_id,
619 component_fqn
620 );
621
622 let _ = ready_tx.send(());
625
626 runner.run().await;
627
628 if let Some(ref handles) = shared_handles_clone {
630 handles.write().remove(&channel_id);
631 }
632 if let Some(ref map) = component_channel_map_clone {
633 map.write().remove(&component_fqn);
634 }
635 tracing::info!(
636 "Child runner exited: channel={}, component={} (cleanup done)",
637 channel_id,
638 component_fqn
639 );
640 });
641
642 Ok((channel_id, ready_rx))
643 }
644
645 fn create_child_context(
652 &self,
653 child_id: &str,
654 requested_caps: Option<Capability>,
655 ) -> Box<dyn ChildContext> {
656 let effective_caps =
657 Capability::inherit(self.capabilities, requested_caps.unwrap_or(Capability::ALL));
658
659 let mut ctx =
660 ChildContextImpl::new(child_id, self.output_tx.clone(), Arc::clone(&self.spawner))
661 .with_capabilities(effective_caps);
662 if let Some(loader) = &self.lua_loader {
663 ctx = ctx.with_lua_loader(Arc::clone(loader));
664 }
665 if let Some(s) = &self.session {
666 ctx = ctx.with_session_arc(Arc::clone(s));
667 }
668 if let Some(c) = &self.checker {
669 ctx = ctx.with_checker(Arc::clone(c));
670 }
671 if let Some(g) = &self.grants {
672 ctx = ctx.with_grants(Arc::clone(g));
673 }
674 if let (Some(h), Some(m), Some(ch)) = (
675 &self.shared_handles,
676 &self.component_channel_map,
677 self.channel_id,
678 ) {
679 ctx = ctx.with_rpc_support(Arc::clone(h), Arc::clone(m), ch);
680 }
681 if let Some(reg) = &self.hook_registry {
682 ctx = ctx.with_hook_registry(Arc::clone(reg));
683 }
684 Box::new(ctx)
685 }
686
687 fn dispatch_hook(
692 &self,
693 point: orcs_hook::HookPoint,
694 payload: serde_json::Value,
695 ) -> orcs_hook::HookAction {
696 let component_id = ComponentId::child(&self.parent_id);
697 let channel_id = self.channel_id.unwrap_or_else(ChannelId::new);
698
699 let ctx = orcs_hook::HookContext::new(
700 point,
701 component_id.clone(),
702 channel_id,
703 orcs_types::Principal::System,
704 0,
705 payload,
706 );
707
708 let Some(registry) = &self.hook_registry else {
709 return orcs_hook::HookAction::Continue(Box::new(ctx));
710 };
711
712 let guard = registry.read().unwrap_or_else(|poisoned| {
713 tracing::warn!("hook registry lock poisoned, using inner value");
714 poisoned.into_inner()
715 });
716 guard.dispatch(point, &component_id, None, ctx)
717 }
718
719 fn grant_command_inner(&self, pattern: &str) {
723 if let Some(grants) = &self.grants {
724 if let Err(e) = grants.grant(CommandGrant::persistent(pattern)) {
725 tracing::error!("grant_command failed: {e}");
726 }
727 }
728
729 let payload = serde_json::json!({
731 "pattern": pattern,
732 "granted_by": self.parent_id,
733 });
734 let _ = self.dispatch_hook(orcs_hook::HookPoint::AuthOnGrant, payload);
735 }
736
737 fn create_output_event(&self, message: &str, level: &str) -> Event {
739 Event {
740 category: EventCategory::Output,
741 operation: "display".to_string(),
742 source: ComponentId::child(&self.parent_id),
743 payload: serde_json::json!({
744 "message": message,
745 "level": level,
746 "source": self.parent_id,
747 }),
748 }
749 }
750}
751
752impl Debug for ChildContextImpl {
753 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
754 f.debug_struct("ChildContextImpl")
755 .field("parent_id", &self.parent_id)
756 .field("capabilities", &self.capabilities)
757 .field("has_lua_loader", &self.lua_loader.is_some())
758 .field("has_session", &self.session.is_some())
759 .field("has_checker", &self.checker.is_some())
760 .field("has_grants", &self.grants.is_some())
761 .field("can_spawn_runner", &self.can_spawn_runner())
762 .finish()
763 }
764}
765
766impl ChildContext for ChildContextImpl {
767 fn parent_id(&self) -> &str {
768 &self.parent_id
769 }
770
771 fn emit_output(&self, message: &str) {
772 let event = self.create_output_event(message, "info");
773 self.send_to_output(event);
774 }
775
776 fn emit_output_with_level(&self, message: &str, level: &str) {
777 let event = self.create_output_event(message, level);
778 self.send_to_output(event);
779 }
780
781 fn emit_approval_request(&self, operation: &str, description: &str) -> String {
782 let approval_id = uuid::Uuid::new_v4().to_string();
783 let event = Event {
784 category: EventCategory::Output,
785 operation: "approval_request".to_string(),
786 source: ComponentId::child(&self.parent_id),
787 payload: serde_json::json!({
788 "type": "approval_request",
789 "approval_id": approval_id,
790 "operation": operation,
791 "description": description,
792 "source": self.parent_id,
793 }),
794 };
795 self.send_to_output(event);
796 tracing::info!(
797 approval_id = %approval_id,
798 operation = %operation,
799 "Emitted approval request: {}",
800 description
801 );
802 approval_id
803 }
804
805 fn spawn_child(&self, config: ChildConfig) -> Result<Box<dyn ChildHandle>, SpawnError> {
806 let loader = self
808 .lua_loader
809 .as_ref()
810 .ok_or_else(|| SpawnError::Internal("no lua loader configured".into()))?;
811
812 let mut child = loader.load(&config)?;
814
815 let child_ctx = self.create_child_context(&config.id, config.capabilities);
817 child.set_context(child_ctx);
818
819 let mut spawner = self
821 .spawner
822 .lock()
823 .map_err(|e| SpawnError::Internal(format!("spawner lock failed: {}", e)))?;
824
825 spawner.spawn(config, child)
826 }
827
828 fn child_count(&self) -> usize {
829 self.spawner.lock().map(|s| s.child_count()).unwrap_or(0)
830 }
831
832 fn max_children(&self) -> usize {
833 self.spawner.lock().map(|s| s.max_children()).unwrap_or(0)
834 }
835
836 fn send_to_child(
837 &self,
838 child_id: &str,
839 input: serde_json::Value,
840 ) -> Result<ChildResult, RunError> {
841 let spawner = self
842 .spawner
843 .lock()
844 .map_err(|e| RunError::ExecutionFailed(format!("spawner lock failed: {}", e)))?;
845
846 spawner.run_child(child_id, input)
847 }
848
849 fn send_to_child_async(
850 &self,
851 child_id: &str,
852 input: serde_json::Value,
853 ) -> Result<(), RunError> {
854 let child_arc = {
856 let spawner = self
857 .spawner
858 .lock()
859 .map_err(|e| RunError::ExecutionFailed(format!("spawner lock failed: {}", e)))?;
860 spawner
861 .get_child_arc(child_id)
862 .ok_or_else(|| RunError::NotFound(child_id.to_string()))?
863 };
864 let child_id_owned = child_id.to_string();
867 std::thread::spawn(move || match child_arc.lock() {
868 Ok(mut child) => {
869 if let orcs_component::ChildResult::Err(e) = child.run(input) {
870 tracing::warn!(
871 child_id = %child_id_owned,
872 "send_to_child_async: child returned error: {}", e
873 );
874 }
875 }
876 Err(e) => {
877 tracing::error!(
878 child_id = %child_id_owned,
879 "send_to_child_async: child lock failed: {}", e
880 );
881 }
882 });
883
884 Ok(())
885 }
886
887 fn send_to_children_batch(
888 &self,
889 requests: Vec<(String, serde_json::Value)>,
890 ) -> Vec<(String, Result<ChildResult, RunError>)> {
891 if requests.is_empty() {
892 return Vec::new();
893 }
894
895 type ChildArc = Arc<Mutex<Box<dyn orcs_component::RunnableChild>>>;
897 let child_arcs: Vec<(String, Option<ChildArc>)> = {
898 let spawner = match self.spawner.lock() {
899 Ok(s) => s,
900 Err(e) => {
901 let msg = format!("spawner lock failed: {}", e);
902 return requests
903 .into_iter()
904 .map(|(id, _)| (id, Err(RunError::ExecutionFailed(msg.clone()))))
905 .collect();
906 }
907 };
908 requests
909 .iter()
910 .map(|(id, _)| (id.clone(), spawner.get_child_arc(id)))
911 .collect()
912 };
913 std::thread::scope(|s| {
917 let handles: Vec<_> = child_arcs
918 .into_iter()
919 .zip(requests)
920 .map(|((id, child_opt), (_, input))| {
921 s.spawn(move || match child_opt {
922 None => (
923 id.clone(),
924 Err(RunError::ExecutionFailed(format!(
925 "child not found: {}",
926 id
927 ))),
928 ),
929 Some(child_arc) => {
930 let mut guard = match child_arc.lock() {
931 Ok(g) => g,
932 Err(e) => {
933 return (
934 id,
935 Err(RunError::ExecutionFailed(format!(
936 "child lock failed: {}",
937 e
938 ))),
939 );
940 }
941 };
942 let result = guard.run(input);
943 drop(guard);
944 (id, Ok(result))
945 }
946 })
947 })
948 .collect();
949
950 handles
951 .into_iter()
952 .map(|h| {
953 h.join().unwrap_or_else(|_| {
954 (
955 String::from("<panic>"),
956 Err(RunError::ExecutionFailed("child thread panicked".into())),
957 )
958 })
959 })
960 .collect()
961 })
962 }
963
964 fn spawn_runner_from_script(
965 &self,
966 script: &str,
967 id: Option<&str>,
968 globals: Option<&serde_json::Map<String, serde_json::Value>>,
969 ) -> Result<(ChannelId, String), SpawnError> {
970 let loader = self
972 .component_loader
973 .as_ref()
974 .ok_or_else(|| SpawnError::Internal("no component loader configured".into()))?;
975
976 let component = loader.load_from_script(script, id, globals)?;
978 let fqn = component.id().fqn();
979
980 let (channel_id, ready_rx) = self.spawn_runner(component)?;
985
986 tokio::task::block_in_place(|| {
987 tokio::runtime::Handle::current().block_on(async {
988 match tokio::time::timeout(std::time::Duration::from_secs(5), ready_rx).await {
989 Ok(Ok(())) => {
990 tracing::debug!(
991 "spawn_runner_from_script: {} ready (channel={})",
992 fqn,
993 channel_id
994 );
995 }
996 Ok(Err(_)) => {
997 tracing::warn!(
998 "spawn_runner_from_script: ready channel dropped for {}",
999 fqn
1000 );
1001 }
1002 Err(_) => {
1003 tracing::warn!(
1004 "spawn_runner_from_script: registration timeout for {}",
1005 fqn
1006 );
1007 }
1008 }
1009 })
1010 });
1011
1012 Ok((channel_id, fqn))
1013 }
1014
1015 fn spawn_runner_from_builtin(
1016 &self,
1017 name: &str,
1018 id: Option<&str>,
1019 globals: Option<&serde_json::Map<String, serde_json::Value>>,
1020 ) -> Result<(ChannelId, String), SpawnError> {
1021 let loader = self
1022 .component_loader
1023 .as_ref()
1024 .ok_or_else(|| SpawnError::Internal("no component loader configured".into()))?;
1025
1026 let script = loader
1027 .resolve_builtin(name)
1028 .ok_or_else(|| SpawnError::Internal(format!("builtin not found: {name}")))?;
1029
1030 self.spawn_runner_from_script(&script, id, globals)
1031 }
1032
1033 fn can_execute_command(&self, cmd: &str) -> bool {
1034 match (&self.session, &self.checker) {
1035 (Some(session), Some(checker)) => checker.can_execute_command(session, cmd),
1036 _ => true, }
1038 }
1039
1040 fn check_command_permission(&self, cmd: &str) -> orcs_component::CommandPermission {
1041 use orcs_component::CommandPermission;
1042 let result = self.check_command(cmd);
1044 match result {
1045 CommandCheckResult::Allowed => CommandPermission::Allowed,
1046 CommandCheckResult::Denied(reason) => CommandPermission::Denied(reason),
1047 CommandCheckResult::RequiresApproval {
1048 request,
1049 grant_pattern,
1050 } => CommandPermission::RequiresApproval {
1051 grant_pattern,
1052 description: request.description.clone(),
1053 },
1054 }
1055 }
1056
1057 fn capabilities(&self) -> Capability {
1058 self.capabilities
1059 }
1060
1061 fn is_command_granted(&self, cmd: &str) -> bool {
1062 match &self.grants {
1063 Some(grants) => grants.is_granted(cmd).unwrap_or(false),
1064 None => false,
1065 }
1066 }
1067
1068 fn grant_command(&self, pattern: &str) {
1069 self.grant_command_inner(pattern);
1070 }
1071
1072 fn can_spawn_child_auth(&self) -> bool {
1073 match (&self.session, &self.checker) {
1074 (Some(session), Some(checker)) => checker.can_spawn_child(session),
1075 _ => true, }
1077 }
1078
1079 fn can_spawn_runner_auth(&self) -> bool {
1080 match (&self.session, &self.checker) {
1081 (Some(session), Some(checker)) => checker.can_spawn_runner(session),
1082 _ => true, }
1084 }
1085
1086 fn request(
1087 &self,
1088 target_fqn: &str,
1089 operation: &str,
1090 payload: serde_json::Value,
1091 timeout_ms: Option<u64>,
1092 ) -> Result<serde_json::Value, String> {
1093 let map = self
1094 .component_channel_map
1095 .as_ref()
1096 .ok_or("component_channel_map not configured for RPC")?;
1097 let handles = self
1098 .shared_handles
1099 .as_ref()
1100 .ok_or("shared_handles not configured for RPC")?;
1101
1102 let timeout = timeout_ms.unwrap_or(orcs_event::DEFAULT_TIMEOUT_MS);
1103 let source_id = ComponentId::child(&self.parent_id);
1104 let source_channel = self.channel_id.unwrap_or_else(ChannelId::new);
1105
1106 tokio::task::block_in_place(|| {
1107 tokio::runtime::Handle::current().block_on(super::rpc::resolve_and_send_rpc(
1108 super::rpc::RpcParams {
1109 component_channel_map: map,
1110 shared_handles: handles,
1111 target_fqn,
1112 operation,
1113 source_id,
1114 source_channel,
1115 payload,
1116 timeout_ms: timeout,
1117 },
1118 ))
1119 })
1120 }
1121
1122 fn request_batch(
1123 &self,
1124 requests: Vec<(String, String, serde_json::Value, Option<u64>)>,
1125 ) -> Vec<Result<serde_json::Value, String>> {
1126 if requests.is_empty() {
1127 return Vec::new();
1128 }
1129
1130 let map = match self.component_channel_map.as_ref() {
1131 Some(m) => m,
1132 None => {
1133 return requests
1134 .iter()
1135 .map(|_| Err("component_channel_map not configured for RPC".into()))
1136 .collect();
1137 }
1138 };
1139 let handles = match self.shared_handles.as_ref() {
1140 Some(h) => h,
1141 None => {
1142 return requests
1143 .iter()
1144 .map(|_| Err("shared_handles not configured for RPC".into()))
1145 .collect();
1146 }
1147 };
1148
1149 let source_id = ComponentId::child(&self.parent_id);
1150 let source_channel = self.channel_id.unwrap_or_else(ChannelId::new);
1151
1152 tokio::task::block_in_place(|| {
1154 tokio::runtime::Handle::current().block_on(async {
1155 let join_handles: Vec<_> = requests
1156 .into_iter()
1157 .map(|(target, op, payload, timeout_ms)| {
1158 let timeout = timeout_ms.unwrap_or(orcs_event::DEFAULT_TIMEOUT_MS);
1159 let src_id = source_id.clone();
1160 let map = Arc::clone(map);
1161 let handles = Arc::clone(handles);
1162 tokio::spawn(async move {
1163 super::rpc::resolve_and_send_rpc(super::rpc::RpcParams {
1164 component_channel_map: &map,
1165 shared_handles: &handles,
1166 target_fqn: &target,
1167 operation: &op,
1168 source_id: src_id,
1169 source_channel,
1170 payload,
1171 timeout_ms: timeout,
1172 })
1173 .await
1174 })
1175 })
1176 .collect();
1177
1178 let mut results = Vec::with_capacity(join_handles.len());
1179 for jh in join_handles {
1180 results.push(
1181 jh.await
1182 .unwrap_or_else(|e| Err(format!("rpc task failed: {e}"))),
1183 );
1184 }
1185 results
1186 })
1187 })
1188 }
1189
1190 fn request_stop(&self) -> Result<(), String> {
1191 let world_tx = self
1192 .world_tx
1193 .as_ref()
1194 .ok_or("request_stop: no world_tx (runner spawning not configured)")?;
1195 let channel_id = self
1196 .channel_id
1197 .ok_or("request_stop: no channel_id (not running in a ChannelRunner)")?;
1198
1199 let reason = format!("component {} requested self-stop", self.parent_id);
1200 let tx = world_tx.clone();
1201
1202 tokio::task::block_in_place(|| {
1203 tokio::runtime::Handle::current().block_on(async {
1204 super::common::send_abort(&tx, channel_id, &reason).await;
1205 });
1206 });
1207
1208 tracing::info!(
1209 component = %self.parent_id,
1210 channel = %channel_id,
1211 "request_stop: abort sent"
1212 );
1213 Ok(())
1214 }
1215
1216 fn extension(&self, key: &str) -> Option<Box<dyn std::any::Any + Send + Sync>> {
1217 match key {
1218 "hook_registry" => self
1219 .hook_registry
1220 .as_ref()
1221 .map(|r| Box::new(Arc::clone(r)) as Box<dyn std::any::Any + Send + Sync>),
1222 "mcp_manager" => self
1223 .mcp_manager
1224 .as_ref()
1225 .map(|m| Box::new(Arc::clone(m)) as Box<dyn std::any::Any + Send + Sync>),
1226 _ => None,
1227 }
1228 }
1229
1230 fn clone_box(&self) -> Box<dyn ChildContext> {
1231 Box::new(self.clone())
1232 }
1233}
1234
1235#[async_trait]
1236impl AsyncChildContext for ChildContextImpl {
1237 fn parent_id(&self) -> &str {
1238 &self.parent_id
1239 }
1240
1241 fn emit_output(&self, message: &str) {
1242 let event = self.create_output_event(message, "info");
1243 self.send_to_output(event);
1244 }
1245
1246 fn emit_output_with_level(&self, message: &str, level: &str) {
1247 let event = self.create_output_event(message, level);
1248 self.send_to_output(event);
1249 }
1250
1251 async fn spawn_child(
1252 &self,
1253 config: ChildConfig,
1254 ) -> Result<Box<dyn AsyncChildHandle>, SpawnError> {
1255 let loader = self
1257 .lua_loader
1258 .as_ref()
1259 .ok_or_else(|| SpawnError::Internal("no lua loader configured".into()))?;
1260
1261 let mut child = loader.load(&config)?;
1263
1264 let child_ctx = self.create_child_context(&config.id, config.capabilities);
1266 child.set_context(child_ctx);
1267
1268 let mut spawner = self
1270 .spawner
1271 .lock()
1272 .map_err(|e| SpawnError::Internal(format!("spawner lock failed: {}", e)))?;
1273
1274 spawner.spawn_async(config, child)
1275 }
1276
1277 fn child_count(&self) -> usize {
1278 self.spawner.lock().map(|s| s.child_count()).unwrap_or(0)
1279 }
1280
1281 fn max_children(&self) -> usize {
1282 self.spawner.lock().map(|s| s.max_children()).unwrap_or(0)
1283 }
1284
1285 fn send_to_child(
1286 &self,
1287 child_id: &str,
1288 input: serde_json::Value,
1289 ) -> Result<ChildResult, RunError> {
1290 let spawner = self
1291 .spawner
1292 .lock()
1293 .map_err(|e| RunError::ExecutionFailed(format!("spawner lock failed: {}", e)))?;
1294
1295 spawner.run_child(child_id, input)
1296 }
1297
1298 fn send_to_child_async(
1299 &self,
1300 child_id: &str,
1301 input: serde_json::Value,
1302 ) -> Result<(), RunError> {
1303 let child_arc = {
1304 let spawner = self
1305 .spawner
1306 .lock()
1307 .map_err(|e| RunError::ExecutionFailed(format!("spawner lock failed: {}", e)))?;
1308 spawner
1309 .get_child_arc(child_id)
1310 .ok_or_else(|| RunError::NotFound(child_id.to_string()))?
1311 };
1312
1313 let child_id_owned = child_id.to_string();
1314 std::thread::spawn(move || match child_arc.lock() {
1315 Ok(mut child) => {
1316 if let orcs_component::ChildResult::Err(e) = child.run(input) {
1317 tracing::warn!(
1318 child_id = %child_id_owned,
1319 "send_to_child_async: child returned error: {}", e
1320 );
1321 }
1322 }
1323 Err(e) => {
1324 tracing::error!(
1325 child_id = %child_id_owned,
1326 "send_to_child_async: child lock failed: {}", e
1327 );
1328 }
1329 });
1330
1331 Ok(())
1332 }
1333
1334 fn clone_box(&self) -> Box<dyn AsyncChildContext> {
1335 Box::new(self.clone())
1336 }
1337}
1338
1339#[cfg(test)]
1340mod tests {
1341 use super::*;
1342 use orcs_component::{
1343 Child, ChildResult, Identifiable, RunnableChild, SignalReceiver, Status, Statusable,
1344 };
1345 use orcs_event::{Signal, SignalResponse};
1346
1347 struct TestWorker {
1349 id: String,
1350 status: Status,
1351 }
1352
1353 impl TestWorker {
1354 fn new(id: &str) -> Self {
1355 Self {
1356 id: id.into(),
1357 status: Status::Idle,
1358 }
1359 }
1360 }
1361
1362 impl Identifiable for TestWorker {
1363 fn id(&self) -> &str {
1364 &self.id
1365 }
1366 }
1367
1368 impl SignalReceiver for TestWorker {
1369 fn on_signal(&mut self, signal: &Signal) -> SignalResponse {
1370 if signal.is_veto() {
1371 self.status = Status::Aborted;
1372 SignalResponse::Abort
1373 } else {
1374 SignalResponse::Handled
1375 }
1376 }
1377
1378 fn abort(&mut self) {
1379 self.status = Status::Aborted;
1380 }
1381 }
1382
1383 impl Statusable for TestWorker {
1384 fn status(&self) -> Status {
1385 self.status
1386 }
1387 }
1388
1389 impl Child for TestWorker {}
1390
1391 impl RunnableChild for TestWorker {
1392 fn run(&mut self, input: serde_json::Value) -> ChildResult {
1393 ChildResult::Ok(input)
1394 }
1395 }
1396
1397 struct TestLoader;
1399
1400 impl LuaChildLoader for TestLoader {
1401 fn load(&self, config: &ChildConfig) -> Result<Box<dyn RunnableChild>, SpawnError> {
1402 Ok(Box::new(TestWorker::new(&config.id)))
1403 }
1404 }
1405
1406 fn setup() -> (ChildContextImpl, super::super::base::OutputReceiver) {
1407 let (output_tx, output_rx) = OutputSender::channel(64);
1408
1409 let spawner = ChildSpawner::new("test-parent", output_tx.clone());
1410 let spawner_arc = Arc::new(Mutex::new(spawner));
1411
1412 let ctx = ChildContextImpl::new("test-parent", output_tx, spawner_arc)
1413 .with_lua_loader(Arc::new(TestLoader));
1414
1415 (ctx, output_rx)
1416 }
1417
1418 #[test]
1419 fn parent_id() {
1420 let (ctx, _) = setup();
1421 assert_eq!(ChildContext::parent_id(&ctx), "test-parent");
1422 }
1423
1424 #[test]
1425 fn emit_output() {
1426 let (ctx, mut output_rx) = setup();
1427
1428 ChildContext::emit_output(&ctx, "Hello, World!");
1429
1430 let event = output_rx
1431 .try_recv()
1432 .expect("should receive emit_output event");
1433 assert_eq!(event.category, EventCategory::Output);
1434 assert_eq!(event.payload["message"], "Hello, World!");
1435 assert_eq!(event.payload["level"], "info");
1436 }
1437
1438 #[test]
1439 fn emit_output_with_level() {
1440 let (ctx, mut output_rx) = setup();
1441
1442 ChildContext::emit_output_with_level(&ctx, "Warning message", "warn");
1443
1444 let event = output_rx
1445 .try_recv()
1446 .expect("should receive emit_output_with_level event");
1447 assert_eq!(event.payload["message"], "Warning message");
1448 assert_eq!(event.payload["level"], "warn");
1449 }
1450
1451 #[test]
1452 fn spawn_child() {
1453 let (ctx, _) = setup();
1454
1455 let config = ChildConfig::new("worker-1");
1456 let result = ChildContext::spawn_child(&ctx, config);
1457
1458 assert!(result.is_ok());
1459 assert_eq!(ChildContext::child_count(&ctx), 1);
1460 }
1461
1462 #[test]
1463 fn max_children() {
1464 let (ctx, _) = setup();
1465 assert!(ChildContext::max_children(&ctx) > 0);
1466 }
1467
1468 #[test]
1469 fn clone_box() {
1470 let (ctx, _) = setup();
1471 let cloned: Box<dyn ChildContext> = ChildContext::clone_box(&ctx);
1472
1473 assert_eq!(cloned.parent_id(), "test-parent");
1474 }
1475
1476 #[tokio::test]
1479 async fn async_spawn_child() {
1480 let (ctx, _) = setup();
1481
1482 let config = ChildConfig::new("async-worker-1");
1483 let result = AsyncChildContext::spawn_child(&ctx, config).await;
1484
1485 assert!(result.is_ok());
1486 assert_eq!(AsyncChildContext::child_count(&ctx), 1);
1487 }
1488
1489 #[tokio::test]
1490 async fn async_spawn_child_and_run() {
1491 let (ctx, _) = setup();
1492
1493 let config = ChildConfig::new("async-worker-2");
1494 let mut handle = AsyncChildContext::spawn_child(&ctx, config)
1495 .await
1496 .expect("async spawn child should succeed");
1497
1498 let result = handle.run(serde_json::json!({"test": true})).await;
1499
1500 assert!(result.is_ok());
1501 if let Ok(ChildResult::Ok(data)) = result {
1502 assert_eq!(data["test"], true);
1503 }
1504 }
1505
1506 #[tokio::test]
1507 async fn async_emit_output() {
1508 let (ctx, mut output_rx) = setup();
1509
1510 AsyncChildContext::emit_output(&ctx, "Async Hello!");
1511
1512 let event = output_rx
1513 .try_recv()
1514 .expect("should receive async emit_output event");
1515 assert_eq!(event.payload["message"], "Async Hello!");
1516 }
1517
1518 #[tokio::test]
1519 async fn async_clone_box() {
1520 let (ctx, _) = setup();
1521 let cloned: Box<dyn AsyncChildContext> = AsyncChildContext::clone_box(&ctx);
1522
1523 assert_eq!(AsyncChildContext::parent_id(cloned.as_ref()), "test-parent");
1524 }
1525
1526 #[test]
1529 fn send_to_child_async_returns_immediately() {
1530 let (ctx, _) = setup();
1531
1532 let config = ChildConfig::new("async-worker");
1533 ChildContext::spawn_child(&ctx, config).expect("spawn async-worker");
1534
1535 let result = ChildContext::send_to_child_async(
1536 &ctx,
1537 "async-worker",
1538 serde_json::json!({"fire": "forget"}),
1539 );
1540
1541 assert!(result.is_ok(), "should return Ok immediately");
1542 }
1543
1544 #[test]
1545 fn send_to_child_async_missing_child_returns_error() {
1546 let (ctx, _) = setup();
1547
1548 let result = ChildContext::send_to_child_async(&ctx, "nonexistent", serde_json::json!({}));
1549
1550 assert!(result.is_err(), "should return Err for missing child");
1551 let err = result.expect_err("expected NotFound error");
1552 assert!(
1553 err.to_string().contains("not found"),
1554 "error should mention 'not found', got: {}",
1555 err
1556 );
1557 }
1558
1559 #[test]
1560 fn send_to_child_async_child_actually_runs() {
1561 use std::sync::atomic::{AtomicBool, Ordering};
1562 use std::sync::Arc;
1563
1564 struct FlagWorker {
1566 id: String,
1567 status: Status,
1568 ran: Arc<AtomicBool>,
1569 }
1570
1571 impl Identifiable for FlagWorker {
1572 fn id(&self) -> &str {
1573 &self.id
1574 }
1575 }
1576
1577 impl SignalReceiver for FlagWorker {
1578 fn on_signal(&mut self, _: &Signal) -> SignalResponse {
1579 SignalResponse::Handled
1580 }
1581 fn abort(&mut self) {
1582 self.status = Status::Aborted;
1583 }
1584 }
1585
1586 impl Statusable for FlagWorker {
1587 fn status(&self) -> Status {
1588 self.status
1589 }
1590 }
1591
1592 impl Child for FlagWorker {}
1593
1594 impl RunnableChild for FlagWorker {
1595 fn run(&mut self, input: serde_json::Value) -> ChildResult {
1596 self.ran.store(true, Ordering::SeqCst);
1597 ChildResult::Ok(input)
1598 }
1599 }
1600
1601 let ran_flag = Arc::new(AtomicBool::new(false));
1602
1603 let (output_tx, _output_rx) = OutputSender::channel(64);
1604 let spawner = ChildSpawner::new("test-parent", output_tx.clone());
1605 let spawner_arc = Arc::new(Mutex::new(spawner));
1606
1607 let ctx = ChildContextImpl::new("test-parent", output_tx, Arc::clone(&spawner_arc));
1608
1609 {
1611 let worker = Box::new(FlagWorker {
1612 id: "flag-worker".into(),
1613 status: Status::Idle,
1614 ran: Arc::clone(&ran_flag),
1615 });
1616 let mut spawner_guard = spawner_arc.lock().expect("lock spawner");
1617 spawner_guard
1618 .spawn(ChildConfig::new("flag-worker"), worker)
1619 .expect("spawn flag-worker");
1620 }
1621
1622 let result =
1623 ChildContext::send_to_child_async(&ctx, "flag-worker", serde_json::json!({"go": true}));
1624 assert!(result.is_ok(), "async send should succeed");
1625
1626 for _ in 0..100 {
1628 if ran_flag.load(Ordering::SeqCst) {
1629 break;
1630 }
1631 std::thread::sleep(std::time::Duration::from_millis(10));
1632 }
1633
1634 assert!(
1635 ran_flag.load(Ordering::SeqCst),
1636 "child should have been executed by background thread"
1637 );
1638 }
1639
1640 #[test]
1641 fn send_to_child_async_multiple_children_concurrent() {
1642 use std::sync::atomic::{AtomicUsize, Ordering};
1643 use std::sync::Arc;
1644
1645 struct CounterWorker {
1646 id: String,
1647 status: Status,
1648 counter: Arc<AtomicUsize>,
1649 }
1650
1651 impl Identifiable for CounterWorker {
1652 fn id(&self) -> &str {
1653 &self.id
1654 }
1655 }
1656
1657 impl SignalReceiver for CounterWorker {
1658 fn on_signal(&mut self, _: &Signal) -> SignalResponse {
1659 SignalResponse::Handled
1660 }
1661 fn abort(&mut self) {
1662 self.status = Status::Aborted;
1663 }
1664 }
1665
1666 impl Statusable for CounterWorker {
1667 fn status(&self) -> Status {
1668 self.status
1669 }
1670 }
1671
1672 impl Child for CounterWorker {}
1673
1674 impl RunnableChild for CounterWorker {
1675 fn run(&mut self, input: serde_json::Value) -> ChildResult {
1676 self.counter.fetch_add(1, Ordering::SeqCst);
1677 ChildResult::Ok(input)
1678 }
1679 }
1680
1681 let counter = Arc::new(AtomicUsize::new(0));
1682
1683 let (output_tx, _output_rx) = OutputSender::channel(64);
1684 let spawner = ChildSpawner::new("test-parent", output_tx.clone());
1685 let spawner_arc = Arc::new(Mutex::new(spawner));
1686
1687 let ctx = ChildContextImpl::new("test-parent", output_tx, Arc::clone(&spawner_arc));
1688
1689 for i in 0..3 {
1691 let worker = Box::new(CounterWorker {
1692 id: format!("counter-{}", i),
1693 status: Status::Idle,
1694 counter: Arc::clone(&counter),
1695 });
1696 let mut spawner_guard = spawner_arc.lock().expect("lock spawner");
1697 spawner_guard
1698 .spawn(ChildConfig::new(format!("counter-{}", i)), worker)
1699 .expect("spawn counter worker");
1700 }
1701
1702 for i in 0..3 {
1704 let result = ChildContext::send_to_child_async(
1705 &ctx,
1706 &format!("counter-{}", i),
1707 serde_json::json!({"i": i}),
1708 );
1709 assert!(result.is_ok(), "async send to counter-{} should succeed", i);
1710 }
1711
1712 for _ in 0..200 {
1714 if counter.load(Ordering::SeqCst) >= 3 {
1715 break;
1716 }
1717 std::thread::sleep(std::time::Duration::from_millis(10));
1718 }
1719
1720 assert_eq!(
1721 counter.load(Ordering::SeqCst),
1722 3,
1723 "all 3 children should have run"
1724 );
1725 }
1726
1727 #[test]
1730 fn batch_send_empty_returns_empty() {
1731 let (ctx, _) = setup();
1732 let results = ChildContext::send_to_children_batch(&ctx, vec![]);
1733 assert!(results.is_empty());
1734 }
1735
1736 #[test]
1737 fn batch_send_single_child() {
1738 let (ctx, _) = setup();
1739
1740 let config = ChildConfig::new("batch-worker-1");
1742 ChildContext::spawn_child(&ctx, config).expect("spawn batch-worker-1");
1743
1744 let requests = vec![("batch-worker-1".to_string(), serde_json::json!({"x": 1}))];
1745 let results = ChildContext::send_to_children_batch(&ctx, requests);
1746
1747 assert_eq!(results.len(), 1);
1748 let (id, result) = &results[0];
1749 assert_eq!(id, "batch-worker-1");
1750 let child_result = result.as_ref().expect("should succeed");
1751 assert!(child_result.is_ok(), "child should return Ok");
1752 }
1753
1754 #[test]
1755 fn batch_send_multiple_children_parallel() {
1756 let (ctx, _) = setup();
1757
1758 for i in 0..5 {
1760 let config = ChildConfig::new(format!("par-worker-{}", i));
1761 ChildContext::spawn_child(&ctx, config)
1762 .unwrap_or_else(|e| panic!("spawn par-worker-{}: {}", i, e));
1763 }
1764
1765 let requests: Vec<_> = (0..5)
1766 .map(|i| (format!("par-worker-{}", i), serde_json::json!({"index": i})))
1767 .collect();
1768
1769 let results = ChildContext::send_to_children_batch(&ctx, requests);
1770
1771 assert_eq!(results.len(), 5);
1772 for (id, result) in &results {
1773 let child_result = result
1774 .as_ref()
1775 .unwrap_or_else(|e| panic!("{} should succeed: {}", id, e));
1776 assert!(child_result.is_ok(), "{} should return Ok", id);
1777 if let ChildResult::Ok(data) = child_result {
1778 assert!(data.get("index").is_some(), "{} should echo input", id);
1779 }
1780 }
1781 }
1782
1783 #[test]
1784 fn batch_send_with_missing_child_returns_error() {
1785 let (ctx, _) = setup();
1786
1787 let config = ChildConfig::new("exists");
1789 ChildContext::spawn_child(&ctx, config).expect("spawn exists");
1790
1791 let requests = vec![
1792 ("exists".to_string(), serde_json::json!({"a": 1})),
1793 ("missing".to_string(), serde_json::json!({"b": 2})),
1794 ];
1795 let results = ChildContext::send_to_children_batch(&ctx, requests);
1796
1797 assert_eq!(results.len(), 2);
1798
1799 assert!(results[0].1.is_ok(), "existing child should succeed");
1801
1802 assert!(results[1].1.is_err(), "missing child should return error");
1804 let err = results[1]
1805 .1
1806 .as_ref()
1807 .expect_err("expected Err for missing child");
1808 assert!(
1809 err.to_string().contains("not found"),
1810 "error should mention 'not found', got: {}",
1811 err
1812 );
1813 }
1814
1815 #[test]
1818 fn request_batch_empty_returns_empty() {
1819 let (ctx, _) = setup();
1820 let results = ChildContext::request_batch(&ctx, vec![]);
1821 assert!(results.is_empty());
1822 }
1823
1824 #[test]
1825 fn request_batch_without_rpc_returns_errors() {
1826 let (ctx, _) = setup();
1827 let requests = vec![
1829 (
1830 "comp-a".to_string(),
1831 "ping".to_string(),
1832 serde_json::json!({}),
1833 None,
1834 ),
1835 (
1836 "comp-b".to_string(),
1837 "ping".to_string(),
1838 serde_json::json!({}),
1839 None,
1840 ),
1841 ];
1842 let results = ChildContext::request_batch(&ctx, requests);
1843
1844 assert_eq!(results.len(), 2);
1845 for result in &results {
1846 assert!(result.is_err(), "should fail without RPC configured");
1847 let err = result.as_ref().expect_err("expected error");
1848 assert!(
1849 err.contains("not configured"),
1850 "error should mention not configured, got: {}",
1851 err
1852 );
1853 }
1854 }
1855
1856 mod check_command_tests {
1859 use super::*;
1860 use crate::auth::{DefaultGrantStore, DefaultPolicy, Session};
1861 use orcs_types::{Principal, PrincipalId};
1862 use std::time::Duration;
1863
1864 fn setup_with_auth(
1865 elevated: bool,
1866 ) -> (ChildContextImpl, super::super::super::base::OutputReceiver) {
1867 let (output_tx, output_rx) = OutputSender::channel(64);
1868
1869 let spawner = ChildSpawner::new("test-parent", output_tx.clone());
1870 let spawner_arc = Arc::new(Mutex::new(spawner));
1871
1872 let session = if elevated {
1873 Session::new(Principal::User(PrincipalId::new())).elevate(Duration::from_secs(60))
1874 } else {
1875 Session::new(Principal::User(PrincipalId::new()))
1876 };
1877
1878 let grants: Arc<dyn GrantPolicy> = Arc::new(DefaultGrantStore::new());
1879
1880 let ctx = ChildContextImpl::new("test-parent", output_tx, spawner_arc)
1881 .with_lua_loader(Arc::new(TestLoader))
1882 .with_session(session)
1883 .with_checker(Arc::new(DefaultPolicy))
1884 .with_grants(grants);
1885
1886 (ctx, output_rx)
1887 }
1888
1889 #[test]
1890 fn check_command_permissive_without_auth() {
1891 let (ctx, _) = setup(); let result = ctx.check_command("rm -rf /");
1893 assert!(result.is_allowed()); }
1895
1896 #[test]
1897 fn check_command_elevated_allows_any() {
1898 let (ctx, _) = setup_with_auth(true); let result = ctx.check_command("rm -rf /");
1901 assert!(result.is_allowed());
1902 }
1903
1904 #[test]
1905 fn check_command_safe_requires_approval_not_elevated() {
1906 let (ctx, _) = setup_with_auth(false); let result = ctx.check_command("ls -la");
1908 assert!(result.requires_approval()); }
1910
1911 #[test]
1912 fn check_command_dangerous_requires_approval() {
1913 let (ctx, _) = setup_with_auth(false); let result = ctx.check_command("rm -rf ./temp");
1915 assert!(result.requires_approval());
1916 assert!(result.approval_request().is_some());
1917 }
1918
1919 #[test]
1920 fn check_command_elevated_allows_dangerous() {
1921 let (ctx, _) = setup_with_auth(true); let result = ctx.check_command("rm -rf ./temp");
1923 assert!(result.is_allowed());
1924 }
1925
1926 #[test]
1927 fn grant_command_allows_future_execution() {
1928 let (ctx, _) = setup_with_auth(false); let result = ctx.check_command("rm -rf ./temp");
1932 assert!(result.requires_approval());
1933
1934 ctx.grant_command("rm -rf");
1936
1937 let result = ctx.check_command("rm -rf ./temp");
1939 assert!(result.is_allowed());
1940 }
1941
1942 #[test]
1943 fn shared_grants_across_contexts() {
1944 let (output_tx, _) = OutputSender::channel(64);
1945 let spawner = ChildSpawner::new("test", output_tx.clone());
1946 let spawner_arc = Arc::new(Mutex::new(spawner));
1947
1948 let session = Arc::new(Session::new(Principal::User(PrincipalId::new())));
1949 let grants: Arc<dyn GrantPolicy> = Arc::new(DefaultGrantStore::new());
1950
1951 let ctx1 = ChildContextImpl::new("ctx1", output_tx.clone(), Arc::clone(&spawner_arc))
1952 .with_session_arc(Arc::clone(&session))
1953 .with_checker(Arc::new(DefaultPolicy))
1954 .with_grants(Arc::clone(&grants));
1955
1956 let ctx2 = ChildContextImpl::new("ctx2", output_tx, spawner_arc)
1957 .with_session_arc(Arc::clone(&session))
1958 .with_checker(Arc::new(DefaultPolicy))
1959 .with_grants(Arc::clone(&grants));
1960
1961 ctx1.grant_command("rm -rf");
1963
1964 let result = ctx2.check_command("rm -rf ./temp");
1966 assert!(result.is_allowed());
1967 }
1968
1969 #[test]
1972 fn trait_check_command_permission_requires_approval_not_elevated() {
1973 let (ctx, _) = setup_with_auth(false);
1974 let ctx_dyn: &dyn ChildContext = &ctx;
1975 let perm = ctx_dyn.check_command_permission("ls -la");
1976 assert!(perm.requires_approval());
1977 assert_eq!(perm.status_str(), "requires_approval");
1978 }
1979
1980 #[test]
1981 fn trait_check_command_permission_elevated_allows_any() {
1982 let (ctx, _) = setup_with_auth(true); let ctx_dyn: &dyn ChildContext = &ctx;
1984 let perm = ctx_dyn.check_command_permission("rm -rf /");
1986 assert!(perm.is_allowed());
1987 assert_eq!(perm.status_str(), "allowed");
1988 }
1989
1990 #[test]
1991 fn trait_check_command_permission_requires_approval() {
1992 let (ctx, _) = setup_with_auth(false); let ctx_dyn: &dyn ChildContext = &ctx;
1994 let perm = ctx_dyn.check_command_permission("rm -rf ./temp");
1995 assert!(perm.requires_approval());
1996 assert_eq!(perm.status_str(), "requires_approval");
1997 if let orcs_component::CommandPermission::RequiresApproval {
1998 grant_pattern,
1999 description,
2000 } = &perm
2001 {
2002 assert!(!grant_pattern.is_empty());
2003 assert!(!description.is_empty());
2004 } else {
2005 panic!("expected RequiresApproval");
2006 }
2007 }
2008
2009 #[test]
2010 fn trait_grant_command_then_allowed() {
2011 let (ctx, _) = setup_with_auth(false);
2012 let ctx_dyn: &dyn ChildContext = &ctx;
2013
2014 let perm = ctx_dyn.check_command_permission("rm -rf ./temp");
2016 assert!(perm.requires_approval());
2017
2018 ctx_dyn.grant_command("rm -rf");
2020
2021 let perm = ctx_dyn.check_command_permission("rm -rf ./temp");
2023 assert!(perm.is_allowed());
2024 }
2025
2026 #[test]
2027 fn trait_permissive_without_auth() {
2028 let (ctx, _) = setup(); let ctx_dyn: &dyn ChildContext = &ctx;
2030 let perm = ctx_dyn.check_command_permission("rm -rf /");
2031 assert!(perm.is_allowed()); }
2033 }
2034
2035 mod auth_hook_tests {
2038 use super::*;
2039 use crate::auth::{DefaultGrantStore, DefaultPolicy, Session};
2040 use orcs_hook::testing::MockHook;
2041 use orcs_hook::HookPoint;
2042 use orcs_types::{Principal, PrincipalId};
2043 use serde_json::json;
2044 use std::time::Duration;
2045
2046 fn setup_with_hooks(
2047 elevated: bool,
2048 ) -> (
2049 ChildContextImpl,
2050 orcs_hook::SharedHookRegistry,
2051 super::super::super::base::OutputReceiver,
2052 ) {
2053 let (output_tx, output_rx) = OutputSender::channel(64);
2054 let spawner = ChildSpawner::new("test-parent", output_tx.clone());
2055 let spawner_arc = Arc::new(Mutex::new(spawner));
2056
2057 let session = if elevated {
2058 Session::new(Principal::User(PrincipalId::new())).elevate(Duration::from_secs(60))
2059 } else {
2060 Session::new(Principal::User(PrincipalId::new()))
2061 };
2062
2063 let grants: Arc<dyn GrantPolicy> = Arc::new(DefaultGrantStore::new());
2064 let registry = orcs_hook::shared_hook_registry();
2065
2066 let ctx = ChildContextImpl::new("test-parent", output_tx, spawner_arc)
2067 .with_lua_loader(Arc::new(TestLoader))
2068 .with_session(session)
2069 .with_checker(Arc::new(DefaultPolicy))
2070 .with_grants(grants)
2071 .with_hook_registry(Arc::clone(®istry));
2072
2073 (ctx, registry, output_rx)
2074 }
2075
2076 #[test]
2077 fn auth_pre_check_abort_denies_command() {
2078 let (ctx, registry, _) = setup_with_hooks(true); let hook = MockHook::aborter(
2082 "deny-rm",
2083 "*::*",
2084 HookPoint::AuthPreCheck,
2085 "blocked by policy",
2086 );
2087 registry
2088 .write()
2089 .expect("acquire hook registry write lock")
2090 .register(Box::new(hook));
2091
2092 let result = ctx.check_command("rm -rf /");
2093 assert!(result.is_denied());
2094 assert_eq!(result.denial_reason(), Some("blocked by policy"));
2095 }
2096
2097 #[test]
2098 fn auth_pre_check_skip_allows_command() {
2099 let (ctx, registry, _) = setup_with_hooks(false); let hook = MockHook::skipper(
2103 "allow-all",
2104 "*::*",
2105 HookPoint::AuthPreCheck,
2106 json!({"allowed": true}),
2107 );
2108 registry
2109 .write()
2110 .expect("acquire hook registry write lock")
2111 .register(Box::new(hook));
2112
2113 let result = ctx.check_command("rm -rf /");
2114 assert!(result.is_allowed());
2115 }
2116
2117 #[test]
2118 fn auth_pre_check_skip_denies_with_reason() {
2119 let (ctx, registry, _) = setup_with_hooks(true); let hook = MockHook::skipper(
2122 "deny-custom",
2123 "*::*",
2124 HookPoint::AuthPreCheck,
2125 json!({"allowed": false, "reason": "custom deny"}),
2126 );
2127 registry
2128 .write()
2129 .expect("acquire hook registry write lock")
2130 .register(Box::new(hook));
2131
2132 let result = ctx.check_command("echo hello");
2133 assert!(result.is_denied());
2134 assert_eq!(result.denial_reason(), Some("custom deny"));
2135 }
2136
2137 #[test]
2138 fn auth_pre_check_continue_preserves_normal_flow() {
2139 let (ctx, registry, _) = setup_with_hooks(false); let hook = MockHook::pass_through("observer", "*::*", HookPoint::AuthPreCheck);
2143 let counter = hook.call_count.clone();
2144 registry
2145 .write()
2146 .expect("acquire hook registry write lock")
2147 .register(Box::new(hook));
2148
2149 let result = ctx.check_command("ls -la");
2150 assert_eq!(counter.load(std::sync::atomic::Ordering::SeqCst), 1);
2152 assert!(result.requires_approval());
2154 }
2155
2156 #[test]
2157 fn auth_post_check_fires_after_check() {
2158 let (ctx, registry, _) = setup_with_hooks(true); let hook = MockHook::pass_through("post-observer", "*::*", HookPoint::AuthPostCheck);
2161 let counter = hook.call_count.clone();
2162 registry
2163 .write()
2164 .expect("acquire hook registry write lock")
2165 .register(Box::new(hook));
2166
2167 let result = ctx.check_command("echo hello");
2168 assert!(result.is_allowed());
2169 assert_eq!(counter.load(std::sync::atomic::Ordering::SeqCst), 1);
2171 }
2172
2173 #[test]
2174 fn auth_post_check_receives_result_in_payload() {
2175 let (ctx, registry, _) = setup_with_hooks(true); let hook =
2178 MockHook::modifier("post-checker", "*::*", HookPoint::AuthPostCheck, |ctx| {
2179 assert_eq!(ctx.payload["result"], "allowed");
2181 assert_eq!(ctx.payload["command"], "echo hello");
2182 });
2183 let counter = hook.call_count.clone();
2184 registry
2185 .write()
2186 .expect("acquire hook registry write lock")
2187 .register(Box::new(hook));
2188
2189 let _ = ctx.check_command("echo hello");
2190 assert_eq!(counter.load(std::sync::atomic::Ordering::SeqCst), 1);
2191 }
2192
2193 #[test]
2194 fn auth_on_grant_fires_on_grant_command() {
2195 let (ctx, registry, _) = setup_with_hooks(false);
2196
2197 let hook = MockHook::pass_through("grant-observer", "*::*", HookPoint::AuthOnGrant);
2198 let counter = hook.call_count.clone();
2199 registry
2200 .write()
2201 .expect("acquire hook registry write lock")
2202 .register(Box::new(hook));
2203
2204 ctx.grant_command("rm -rf");
2205
2206 assert_eq!(counter.load(std::sync::atomic::Ordering::SeqCst), 1);
2207 }
2208
2209 #[test]
2210 fn auth_on_grant_payload_contains_pattern() {
2211 let (ctx, registry, _) = setup_with_hooks(false);
2212
2213 let hook = MockHook::modifier("grant-checker", "*::*", HookPoint::AuthOnGrant, |ctx| {
2214 assert_eq!(ctx.payload["pattern"], "rm -rf");
2215 assert_eq!(ctx.payload["granted_by"], "test-parent");
2216 });
2217 let counter = hook.call_count.clone();
2218 registry
2219 .write()
2220 .expect("acquire hook registry write lock")
2221 .register(Box::new(hook));
2222
2223 ctx.grant_command("rm -rf");
2224 assert_eq!(counter.load(std::sync::atomic::Ordering::SeqCst), 1);
2225 }
2226
2227 #[test]
2228 fn auth_hooks_no_registry_passthrough() {
2229 let (output_tx, _) = OutputSender::channel(64);
2231 let spawner = ChildSpawner::new("test", output_tx.clone());
2232 let spawner_arc = Arc::new(Mutex::new(spawner));
2233
2234 let session =
2235 Session::new(Principal::User(PrincipalId::new())).elevate(Duration::from_secs(60));
2236
2237 let ctx = ChildContextImpl::new("test", output_tx, spawner_arc)
2238 .with_session(session)
2239 .with_checker(Arc::new(DefaultPolicy));
2240 let result = ctx.check_command("echo hello");
2243 assert!(result.is_allowed());
2244 }
2245
2246 #[test]
2247 fn trait_grant_command_dispatches_hook() {
2248 let (ctx, registry, _) = setup_with_hooks(false);
2249 let ctx_dyn: &dyn ChildContext = &ctx;
2250
2251 let hook = MockHook::pass_through("trait-grant", "*::*", HookPoint::AuthOnGrant);
2252 let counter = hook.call_count.clone();
2253 registry
2254 .write()
2255 .expect("acquire hook registry write lock")
2256 .register(Box::new(hook));
2257
2258 ctx_dyn.grant_command("ls");
2259 assert_eq!(counter.load(std::sync::atomic::Ordering::SeqCst), 1);
2260 }
2261
2262 #[test]
2263 fn trait_check_command_permission_dispatches_hooks() {
2264 let (ctx, registry, _) = setup_with_hooks(true);
2265 let ctx_dyn: &dyn ChildContext = &ctx;
2266
2267 let pre_hook = MockHook::pass_through("pre", "*::*", HookPoint::AuthPreCheck);
2268 let pre_counter = pre_hook.call_count.clone();
2269 let post_hook = MockHook::pass_through("post", "*::*", HookPoint::AuthPostCheck);
2270 let post_counter = post_hook.call_count.clone();
2271
2272 {
2273 let mut guard = registry
2274 .write()
2275 .expect("acquire hook registry write lock for multi-hook test");
2276 guard.register(Box::new(pre_hook));
2277 guard.register(Box::new(post_hook));
2278 }
2279
2280 let perm = ctx_dyn.check_command_permission("echo hello");
2281 assert!(perm.is_allowed());
2282 assert_eq!(pre_counter.load(std::sync::atomic::Ordering::SeqCst), 1);
2283 assert_eq!(post_counter.load(std::sync::atomic::Ordering::SeqCst), 1);
2284 }
2285 }
2286
2287 mod capability_tests {
2290 use super::*;
2291 use orcs_auth::Capability;
2292
2293 #[test]
2294 fn default_capabilities_is_all() {
2295 let (ctx, _) = setup();
2296 assert_eq!(
2297 ChildContext::capabilities(&ctx),
2298 Capability::ALL,
2299 "new context should default to ALL"
2300 );
2301 }
2302
2303 #[test]
2304 fn with_capabilities_restricts() {
2305 let (output_tx, _) = OutputSender::channel(64);
2306 let spawner = ChildSpawner::new("test", output_tx.clone());
2307 let spawner_arc = Arc::new(Mutex::new(spawner));
2308
2309 let caps = Capability::READ | Capability::WRITE;
2310 let ctx = ChildContextImpl::new("test", output_tx, spawner_arc).with_capabilities(caps);
2311
2312 assert_eq!(ChildContext::capabilities(&ctx), caps);
2313 assert!(ChildContext::has_capability(&ctx, Capability::READ));
2314 assert!(ChildContext::has_capability(&ctx, Capability::WRITE));
2315 assert!(
2316 !ChildContext::has_capability(&ctx, Capability::EXECUTE),
2317 "EXECUTE should not be present"
2318 );
2319 assert!(
2320 !ChildContext::has_capability(&ctx, Capability::SPAWN),
2321 "SPAWN should not be present"
2322 );
2323 }
2324
2325 #[test]
2326 fn create_child_context_inherits_parent_caps() {
2327 let (output_tx, _) = OutputSender::channel(64);
2328 let spawner = ChildSpawner::new("parent", output_tx.clone());
2329 let spawner_arc = Arc::new(Mutex::new(spawner));
2330
2331 let parent_caps = Capability::READ | Capability::WRITE | Capability::EXECUTE;
2332 let ctx = ChildContextImpl::new("parent", output_tx, spawner_arc)
2333 .with_capabilities(parent_caps);
2334
2335 let child_ctx = ctx.create_child_context("child-1", None);
2337 assert_eq!(
2338 child_ctx.capabilities(),
2339 parent_caps,
2340 "child should inherit parent caps when no restriction requested"
2341 );
2342 }
2343
2344 #[test]
2345 fn create_child_context_narrows_with_request() {
2346 let (output_tx, _) = OutputSender::channel(64);
2347 let spawner = ChildSpawner::new("parent", output_tx.clone());
2348 let spawner_arc = Arc::new(Mutex::new(spawner));
2349
2350 let parent_caps = Capability::READ | Capability::WRITE | Capability::EXECUTE;
2351 let ctx = ChildContextImpl::new("parent", output_tx, spawner_arc)
2352 .with_capabilities(parent_caps);
2353
2354 let requested = Capability::READ | Capability::WRITE;
2356 let child_ctx = ctx.create_child_context("child-1", Some(requested));
2357
2358 assert_eq!(
2359 child_ctx.capabilities(),
2360 Capability::READ | Capability::WRITE,
2361 "effective = parent & requested"
2362 );
2363 assert!(
2364 !child_ctx.has_capability(Capability::EXECUTE),
2365 "EXECUTE dropped by intersection"
2366 );
2367 }
2368
2369 #[test]
2370 fn create_child_context_cannot_exceed_parent() {
2371 let (output_tx, _) = OutputSender::channel(64);
2372 let spawner = ChildSpawner::new("parent", output_tx.clone());
2373 let spawner_arc = Arc::new(Mutex::new(spawner));
2374
2375 let parent_caps = Capability::READ;
2376 let ctx = ChildContextImpl::new("parent", output_tx, spawner_arc)
2377 .with_capabilities(parent_caps);
2378
2379 let child_ctx = ctx.create_child_context("child-1", Some(Capability::ALL));
2381
2382 assert_eq!(
2383 child_ctx.capabilities(),
2384 Capability::READ,
2385 "child cannot exceed parent's capabilities"
2386 );
2387 }
2388
2389 #[test]
2390 fn grandchild_inherits_narrowed_caps() {
2391 let (output_tx, _) = OutputSender::channel(64);
2392 let spawner = ChildSpawner::new("root", output_tx.clone());
2393 let spawner_arc = Arc::new(Mutex::new(spawner));
2394
2395 let root = ChildContextImpl::new("root", output_tx, spawner_arc)
2397 .with_capabilities(Capability::ALL);
2398
2399 let child_ctx =
2401 root.create_child_context("child", Some(Capability::READ | Capability::WRITE));
2402 assert_eq!(
2403 child_ctx.capabilities(),
2404 Capability::READ | Capability::WRITE,
2405 );
2406
2407 assert!(!child_ctx.has_capability(Capability::EXECUTE));
2411 assert!(!child_ctx.has_capability(Capability::SPAWN));
2412 assert!(!child_ctx.has_capability(Capability::LLM));
2413 }
2414
2415 #[test]
2416 fn empty_intersection_yields_no_caps() {
2417 let (output_tx, _) = OutputSender::channel(64);
2418 let spawner = ChildSpawner::new("parent", output_tx.clone());
2419 let spawner_arc = Arc::new(Mutex::new(spawner));
2420
2421 let parent_caps = Capability::READ | Capability::WRITE;
2422 let ctx = ChildContextImpl::new("parent", output_tx, spawner_arc)
2423 .with_capabilities(parent_caps);
2424
2425 let child_ctx =
2427 ctx.create_child_context("child-1", Some(Capability::EXECUTE | Capability::SPAWN));
2428
2429 assert_eq!(
2430 child_ctx.capabilities(),
2431 Capability::empty(),
2432 "no overlap should produce empty capabilities"
2433 );
2434 assert!(!child_ctx.has_capability(Capability::READ));
2435 assert!(!child_ctx.has_capability(Capability::EXECUTE));
2436 }
2437
2438 #[test]
2439 fn spawn_child_applies_config_capabilities() {
2440 let (ctx, _) = setup(); let config = ChildConfig::new("restricted-worker").with_capabilities(Capability::READ);
2444 let handle = ChildContext::spawn_child(&ctx, config).expect("spawn restricted-worker");
2445
2446 assert_eq!(handle.id(), "restricted-worker");
2447 }
2452
2453 #[test]
2454 fn capabilities_preserved_in_clone_box() {
2455 let (output_tx, _) = OutputSender::channel(64);
2456 let spawner = ChildSpawner::new("test", output_tx.clone());
2457 let spawner_arc = Arc::new(Mutex::new(spawner));
2458
2459 let caps = Capability::READ | Capability::EXECUTE;
2460 let ctx = ChildContextImpl::new("test", output_tx, spawner_arc).with_capabilities(caps);
2461
2462 let cloned: Box<dyn ChildContext> = ChildContext::clone_box(&ctx);
2463 assert_eq!(
2464 cloned.capabilities(),
2465 caps,
2466 "clone_box should preserve capabilities"
2467 );
2468 }
2469 }
2470}