1use super::child_context::{ChildContextImpl, LuaChildLoader};
35use super::child_spawner::ChildSpawner;
36use super::common::{
37 determine_channel_action, dispatch_signal_to_component, is_channel_active, is_channel_paused,
38 send_abort, send_transition, SignalAction,
39};
40use super::paused_queue::PausedEventQueue;
41use super::EventEmitter;
42use crate::auth::PermissionChecker;
43use crate::auth::Session;
44use crate::channel::command::{StateTransition, WorldCommand};
45use crate::channel::config::ChannelConfig;
46use crate::channel::World;
47use crate::engine::SharedChannelHandles;
48use orcs_component::{
49 AsyncChildContext, ChildContext, Component, ComponentError, ComponentLoader, ComponentSnapshot,
50 SnapshotError,
51};
52use orcs_event::{EventCategory, Request, Signal, SubscriptionEntry};
53use orcs_hook::SharedHookRegistry;
54use orcs_types::ChannelId;
55use serde::{Deserialize, Serialize};
56use serde_json::Value;
57use std::borrow::Cow;
58use std::sync::{Arc, Mutex as StdMutex};
59use tokio::sync::{broadcast, mpsc, oneshot, Mutex, RwLock};
60use tracing::{debug, info, warn};
61
62#[derive(Debug, Clone, Serialize, Deserialize)]
68pub struct Event {
69 pub category: EventCategory,
71 pub operation: String,
73 pub source: orcs_types::ComponentId,
75 pub payload: serde_json::Value,
77}
78
79#[derive(Debug, Clone)]
91pub(crate) enum InboundEvent {
92 Broadcast(Event),
94 Direct(Event),
96}
97
98impl InboundEvent {
99 pub(crate) fn into_event(self) -> Event {
101 match self {
102 Self::Broadcast(e) | Self::Direct(e) => e,
103 }
104 }
105
106 pub(crate) fn is_direct(&self) -> bool {
108 matches!(self, Self::Direct(_))
109 }
110}
111
112#[derive(Clone, Debug)]
121pub struct OutputSender {
122 inner: mpsc::Sender<InboundEvent>,
123}
124
125impl OutputSender {
126 #[must_use]
132 pub fn channel(buffer: usize) -> (Self, OutputReceiver) {
133 let (tx, rx) = mpsc::channel(buffer);
134 (Self { inner: tx }, OutputReceiver { inner: rx })
135 }
136
137 pub(crate) fn new(tx: mpsc::Sender<InboundEvent>) -> Self {
139 Self { inner: tx }
140 }
141
142 #[allow(dead_code)]
144 pub(crate) fn into_inner(self) -> mpsc::Sender<InboundEvent> {
145 self.inner
146 }
147
148 #[allow(clippy::result_large_err)]
150 pub(crate) fn try_send_direct(
151 &self,
152 event: Event,
153 ) -> Result<(), mpsc::error::TrySendError<InboundEvent>> {
154 self.inner.try_send(InboundEvent::Direct(event))
155 }
156
157 #[allow(dead_code)]
159 pub(crate) async fn send_direct(
160 &self,
161 event: Event,
162 ) -> Result<(), mpsc::error::SendError<InboundEvent>> {
163 self.inner.send(InboundEvent::Direct(event)).await
164 }
165}
166
167pub struct OutputReceiver {
172 inner: mpsc::Receiver<InboundEvent>,
173}
174
175impl OutputReceiver {
176 pub async fn recv(&mut self) -> Option<Event> {
180 self.inner.recv().await.map(InboundEvent::into_event)
181 }
182
183 pub fn try_recv(&mut self) -> Result<Event, mpsc::error::TryRecvError> {
185 self.inner.try_recv().map(InboundEvent::into_event)
186 }
187}
188
189#[derive(Debug)]
195pub struct RunnerResult {
196 pub channel_id: ChannelId,
198 pub component_fqn: Cow<'static, str>,
203 pub snapshot: Option<ComponentSnapshot>,
205}
206
207pub(crate) struct RequestEnvelope {
213 pub request: Request,
215 pub reply_tx: oneshot::Sender<Result<Value, String>>,
217}
218
219const REQUEST_BUFFER_SIZE: usize = 32;
221
222const EVENT_BUFFER_SIZE: usize = 64;
224
225#[derive(Clone, Debug)]
227pub struct ChannelHandle {
228 pub id: ChannelId,
230 event_tx: mpsc::Sender<InboundEvent>,
232 request_tx: Option<mpsc::Sender<RequestEnvelope>>,
234}
235
236impl ChannelHandle {
237 #[must_use]
239 pub(crate) fn new(id: ChannelId, event_tx: mpsc::Sender<InboundEvent>) -> Self {
240 Self {
241 id,
242 event_tx,
243 request_tx: None,
244 }
245 }
246
247 #[must_use]
249 pub fn accepts_requests(&self) -> bool {
250 self.request_tx.is_some()
251 }
252
253 pub(crate) async fn send_request(
263 &self,
264 request: Request,
265 reply_tx: oneshot::Sender<Result<Value, String>>,
266 ) -> Result<(), mpsc::error::SendError<RequestEnvelope>> {
267 match &self.request_tx {
268 Some(tx) => tx.send(RequestEnvelope { request, reply_tx }).await,
269 None => Err(mpsc::error::SendError(RequestEnvelope {
270 request,
271 reply_tx,
272 })),
273 }
274 }
275
276 pub(crate) async fn inject(
280 &self,
281 event: Event,
282 ) -> Result<(), mpsc::error::SendError<InboundEvent>> {
283 self.event_tx.send(InboundEvent::Broadcast(event)).await
284 }
285
286 #[allow(clippy::result_large_err)]
290 pub(crate) fn try_inject(
291 &self,
292 event: Event,
293 ) -> Result<(), mpsc::error::TrySendError<InboundEvent>> {
294 self.event_tx.try_send(InboundEvent::Broadcast(event))
295 }
296
297 pub(crate) async fn inject_direct(
301 &self,
302 event: Event,
303 ) -> Result<(), mpsc::error::SendError<InboundEvent>> {
304 self.event_tx.send(InboundEvent::Direct(event)).await
305 }
306
307 #[allow(clippy::result_large_err)]
311 pub(crate) fn try_inject_direct(
312 &self,
313 event: Event,
314 ) -> Result<(), mpsc::error::TrySendError<InboundEvent>> {
315 self.event_tx.try_send(InboundEvent::Direct(event))
316 }
317}
318
319struct PendingApproval {
324 approval_id: String,
326 grant_pattern: String,
328 original_request: Request,
330}
331
332pub struct ChannelRunner {
344 id: ChannelId,
346 component_id: orcs_types::ComponentId,
348 event_rx: mpsc::Receiver<InboundEvent>,
350 signal_rx: broadcast::Receiver<Signal>,
352 world_tx: mpsc::Sender<WorldCommand>,
354 world: Arc<RwLock<World>>,
356 component: Arc<Mutex<Box<dyn Component>>>,
358 subscriptions: Vec<SubscriptionEntry>,
365 paused_queue: PausedEventQueue,
367 child_spawner: Option<Arc<StdMutex<ChildSpawner>>>,
369 event_tx: Option<mpsc::Sender<InboundEvent>>,
371 request_rx: Option<mpsc::Receiver<RequestEnvelope>>,
377 initial_snapshot: Option<ComponentSnapshot>,
379 shared_handles: Option<SharedChannelHandles>,
381 component_channel_map: Option<crate::engine::SharedComponentChannelMap>,
383 hook_registry: Option<SharedHookRegistry>,
385 component_config: serde_json::Value,
387 grants: Option<Arc<dyn orcs_auth::GrantPolicy>>,
389 io_output_tx: Option<OutputSender>,
391 pending_approval: Option<PendingApproval>,
393}
394
395async fn recv_request(rx: &mut Option<mpsc::Receiver<RequestEnvelope>>) -> Option<RequestEnvelope> {
400 match rx {
401 Some(rx) => rx.recv().await,
402 None => std::future::pending().await,
403 }
404}
405
406impl ChannelRunner {
407 fn dispatch_hook(
412 &self,
413 point: orcs_hook::HookPoint,
414 payload: serde_json::Value,
415 ) -> orcs_hook::HookAction {
416 let Some(registry) = &self.hook_registry else {
417 let ctx = orcs_hook::HookContext::new(
418 point,
419 self.component_id.clone(),
420 self.id,
421 orcs_types::Principal::System,
422 0,
423 payload,
424 );
425 return orcs_hook::HookAction::Continue(Box::new(ctx));
426 };
427
428 let ctx = orcs_hook::HookContext::new(
429 point,
430 self.component_id.clone(),
431 self.id,
432 orcs_types::Principal::System,
433 0,
434 payload,
435 );
436
437 let guard = registry.read().unwrap_or_else(|poisoned| {
438 warn!("hook registry lock poisoned, using inner value");
439 poisoned.into_inner()
440 });
441 guard.dispatch(point, &self.component_id, None, ctx)
442 }
443
444 #[must_use]
446 pub fn hook_registry(&self) -> Option<&SharedHookRegistry> {
447 self.hook_registry.as_ref()
448 }
449
450 #[must_use]
457 pub fn create_child_context(&self, child_id: &str) -> Option<Box<dyn ChildContext>> {
458 let spawner = self.child_spawner.as_ref()?;
459 let event_tx = self.event_tx.as_ref()?;
460
461 let mut ctx = ChildContextImpl::new(
462 child_id,
463 OutputSender::new(event_tx.clone()),
464 Arc::clone(spawner),
465 );
466 ctx = self.inject_rpc(ctx);
467
468 Some(Box::new(ctx))
469 }
470
471 #[must_use]
478 pub fn create_child_context_with_loader(
479 &self,
480 child_id: &str,
481 loader: Arc<dyn LuaChildLoader>,
482 ) -> Option<Box<dyn ChildContext>> {
483 let spawner = self.child_spawner.as_ref()?;
484 let event_tx = self.event_tx.as_ref()?;
485
486 let mut ctx = ChildContextImpl::new(
487 child_id,
488 OutputSender::new(event_tx.clone()),
489 Arc::clone(spawner),
490 )
491 .with_lua_loader(loader);
492 ctx = self.inject_rpc(ctx);
493
494 Some(Box::new(ctx))
495 }
496
497 #[must_use]
503 pub fn create_async_child_context(&self, child_id: &str) -> Option<Box<dyn AsyncChildContext>> {
504 let spawner = self.child_spawner.as_ref()?;
505 let event_tx = self.event_tx.as_ref()?;
506
507 let mut ctx = ChildContextImpl::new(
508 child_id,
509 OutputSender::new(event_tx.clone()),
510 Arc::clone(spawner),
511 );
512 ctx = self.inject_rpc(ctx);
513
514 Some(Box::new(ctx))
515 }
516
517 #[must_use]
524 pub fn create_async_child_context_with_loader(
525 &self,
526 child_id: &str,
527 loader: Arc<dyn LuaChildLoader>,
528 ) -> Option<Box<dyn AsyncChildContext>> {
529 let spawner = self.child_spawner.as_ref()?;
530 let event_tx = self.event_tx.as_ref()?;
531
532 let mut ctx = ChildContextImpl::new(
533 child_id,
534 OutputSender::new(event_tx.clone()),
535 Arc::clone(spawner),
536 )
537 .with_lua_loader(loader);
538 ctx = self.inject_rpc(ctx);
539
540 Some(Box::new(ctx))
541 }
542
543 fn inject_rpc(&self, ctx: ChildContextImpl) -> ChildContextImpl {
546 if let (Some(handles), Some(map)) = (&self.shared_handles, &self.component_channel_map) {
547 ctx.with_rpc_support(handles.clone(), map.clone(), self.id)
548 } else {
549 ctx
550 }
551 }
552
553 #[must_use]
555 pub fn child_spawner(&self) -> Option<&Arc<StdMutex<ChildSpawner>>> {
556 self.child_spawner.as_ref()
557 }
558
559 #[must_use]
561 pub fn id(&self) -> ChannelId {
562 self.id
563 }
564
565 #[must_use]
567 pub fn world_tx(&self) -> &mpsc::Sender<WorldCommand> {
568 &self.world_tx
569 }
570
571 #[must_use]
573 pub fn world(&self) -> &Arc<RwLock<World>> {
574 &self.world
575 }
576
577 #[tracing::instrument(skip_all, level = "info", fields(channel_id = %self.id))]
593 pub async fn run(mut self) -> RunnerResult {
594 info!("ChannelRunner started");
595
596 {
598 let mut comp = self.component.lock().await;
599
600 if let Some(snapshot) = self.initial_snapshot.take() {
602 match comp.restore(&snapshot) {
603 Ok(()) => info!("restored component from initial snapshot"),
604 Err(SnapshotError::NotSupported(_)) => {
605 debug!("component does not support snapshot restore");
606 }
607 Err(e) => {
608 warn!(error = %e, "failed to restore initial snapshot");
609 }
610 }
611 }
612
613 self.dispatch_hook(
615 orcs_hook::HookPoint::ComponentPreInit,
616 serde_json::json!({ "component": comp.id().fqn() }),
617 );
618
619 if let Err(e) = comp.init(&self.component_config) {
620 warn!(error = %e, "component init failed");
621 }
622
623 self.dispatch_hook(
625 orcs_hook::HookPoint::ComponentPostInit,
626 serde_json::json!({ "component": comp.id().fqn() }),
627 );
628 }
629
630 loop {
631 tokio::select! {
632 biased;
634
635 signal = self.signal_rx.recv() => {
636 match signal {
637 Ok(sig) => {
638 if !self.handle_signal(sig).await {
639 break;
640 }
641 }
642 Err(broadcast::error::RecvError::Closed) => {
643 info!("signal channel closed");
644 break;
645 }
646 Err(broadcast::error::RecvError::Lagged(n)) => {
647 warn!(lagged = n, "signal receiver lagged");
648 }
649 }
650 }
651
652 Some(envelope) = recv_request(&mut self.request_rx) => {
656 self.handle_rpc_request(envelope).await;
657 }
658
659 event = self.event_rx.recv() => {
660 match event {
661 Some(evt) => {
662 if !self.handle_event(evt).await {
663 break;
664 }
665 }
666 None => {
667 info!("event channel closed");
668 break;
669 }
670 }
671 }
672 }
673
674 if !is_channel_active(&self.world, self.id).await {
676 debug!("channel no longer active");
677 break;
678 }
679 }
680
681 let (component_fqn, snapshot) = {
683 let mut comp = self.component.lock().await;
684 let fqn = comp.id().fqn();
685
686 let snapshot = match comp.snapshot() {
688 Ok(s) => {
689 debug!(component = %fqn, "captured shutdown snapshot");
690 Some(s)
691 }
692 Err(SnapshotError::NotSupported(_)) => None,
693 Err(e) => {
694 warn!(component = %fqn, error = %e, "snapshot failed during shutdown");
695 None
696 }
697 };
698
699 self.dispatch_hook(
701 orcs_hook::HookPoint::ComponentPreShutdown,
702 serde_json::json!({ "component": &fqn }),
703 );
704
705 comp.shutdown();
706
707 self.dispatch_hook(
708 orcs_hook::HookPoint::ComponentPostShutdown,
709 serde_json::json!({ "component": &fqn }),
710 );
711 debug!(component = %fqn, "component shutdown complete");
712
713 (fqn, snapshot)
714 };
715
716 info!("ChannelRunner stopped");
717
718 RunnerResult {
719 channel_id: self.id,
720 component_fqn: Cow::Owned(component_fqn),
721 snapshot,
722 }
723 }
724
725 async fn handle_signal(&mut self, signal: Signal) -> bool {
729 debug!(signal_kind = ?signal.kind, "received signal");
730
731 if !signal.affects_channel(self.id) {
733 return true;
734 }
735
736 let pre_payload = serde_json::json!({
738 "signal_kind": format!("{:?}", signal.kind),
739 "signal_scope": format!("{:?}", signal.scope),
740 });
741 let pre_action = self.dispatch_hook(orcs_hook::HookPoint::SignalPreDispatch, pre_payload);
742 match pre_action {
743 orcs_hook::HookAction::Skip(_) => {
744 debug!("signal skipped by pre-dispatch hook");
745 return true;
746 }
747 orcs_hook::HookAction::Abort { reason } => {
748 warn!(reason = %reason, "signal aborted by pre-dispatch hook");
749 return true;
750 }
751 orcs_hook::HookAction::Continue(_) | orcs_hook::HookAction::Replace(_) => {}
752 }
753
754 if let Some(spawner) = &self.child_spawner {
756 if let Ok(mut s) = spawner.lock() {
757 s.propagate_signal(&signal);
758 }
759 }
760
761 let component_action = dispatch_signal_to_component(&signal, &self.component).await;
763 if let SignalAction::Stop { reason } = component_action {
764 info!(reason = %reason, "component requested stop");
765 self.abort_all_children();
767 send_abort(&self.world_tx, self.id, &reason).await;
768 return false;
769 }
770
771 let action = determine_channel_action(&signal.kind);
773 match action {
774 SignalAction::Stop { reason } => {
775 info!(reason = %reason, "stopping channel");
776 self.abort_all_children();
778 send_abort(&self.world_tx, self.id, &reason).await;
779 return false;
780 }
781 SignalAction::Transition(transition) => {
782 let is_resolve = matches!(transition, StateTransition::ResolveApproval { .. });
783 let accepted = send_transition(&self.world_tx, self.id, transition.clone()).await;
784
785 if matches!(transition, StateTransition::Resume) {
787 self.drain_paused_queue().await;
788 }
789
790 if is_resolve && accepted {
792 self.handle_approval_resolved().await;
793 }
794 }
795 SignalAction::Continue => {
796 if let orcs_event::SignalKind::Reject {
798 approval_id,
799 reason,
800 } = &signal.kind
801 {
802 if let Some(pending) = &self.pending_approval {
803 if pending.approval_id == *approval_id {
804 info!(
805 approval_id = %approval_id,
806 "approval rejected, clearing pending"
807 );
808 send_abort(
810 &self.world_tx,
811 self.id,
812 reason.as_deref().unwrap_or("rejected by user"),
813 )
814 .await;
815 self.pending_approval = None;
816
817 if let Some(io_tx) = &self.io_output_tx {
819 let event = Event {
820 category: EventCategory::Output,
821 operation: "output".to_string(),
822 source: self.component_id.clone(),
823 payload: serde_json::json!({
824 "message": format!(
825 "Rejected: {}",
826 reason.as_deref().unwrap_or("no reason")
827 ),
828 "level": "warn",
829 }),
830 };
831 let _ = io_tx.try_send_direct(event);
832 }
833 }
834 }
835 }
836 }
837 }
838
839 let post_payload = serde_json::json!({
841 "signal_kind": format!("{:?}", signal.kind),
842 "handled": true,
843 });
844 let _post_action =
845 self.dispatch_hook(orcs_hook::HookPoint::SignalPostDispatch, post_payload);
846
847 true
848 }
849
850 async fn handle_approval_resolved(&mut self) {
858 let pending = match self.pending_approval.take() {
859 Some(p) => p,
860 None => {
861 debug!("ResolveApproval accepted but no pending approval stored");
862 return;
863 }
864 };
865
866 info!(
867 approval_id = %pending.approval_id,
868 grant_pattern = %pending.grant_pattern,
869 "approval resolved, granting pattern and re-dispatching"
870 );
871
872 if let Some(grants) = &self.grants {
874 if let Err(e) =
875 grants.grant(orcs_auth::CommandGrant::persistent(&pending.grant_pattern))
876 {
877 warn!(
878 error = %e,
879 pattern = %pending.grant_pattern,
880 "failed to grant pattern after approval"
881 );
882 }
883 } else {
884 warn!("no GrantPolicy configured, cannot grant pattern");
885 }
886
887 let result = {
889 let mut comp = self.component.lock().await;
890 comp.on_request(&pending.original_request)
891 };
892
893 match result {
894 Ok(response) => {
895 debug!(response = ?response, "re-dispatched request succeeded after approval");
896 }
897 Err(e) => {
898 warn!(error = %e, "re-dispatched request failed after approval");
899 }
900 }
901
902 if let Some(io_tx) = &self.io_output_tx {
904 let event = Event {
905 category: EventCategory::Output,
906 operation: "output".to_string(),
907 source: self.component_id.clone(),
908 payload: serde_json::json!({
909 "message": format!("Approved: {}", pending.approval_id),
910 "level": "info",
911 }),
912 };
913 let _ = io_tx.try_send_direct(event);
914 }
915 }
916
917 fn abort_all_children(&self) {
919 if let Some(spawner) = &self.child_spawner {
920 if let Ok(mut s) = spawner.lock() {
921 s.abort_all();
922 debug!("aborted all children");
923 }
924 }
925 }
926
927 async fn handle_event(&mut self, inbound: InboundEvent) -> bool {
933 let is_direct = inbound.is_direct();
934 let event = inbound.into_event();
935
936 debug!(
937 category = ?event.category,
938 operation = %event.operation,
939 direct = is_direct,
940 "received event"
941 );
942
943 if !is_direct
947 && !self
948 .subscriptions
949 .iter()
950 .any(|s| s.matches(&event.category, &event.operation))
951 {
952 debug!(category = ?event.category, operation = %event.operation, "skipping event (not subscribed)");
953 return true;
954 }
955
956 if is_channel_paused(&self.world, self.id).await {
958 self.paused_queue
959 .try_enqueue(event, "ChannelRunner", self.id);
960 return true;
961 }
962
963 self.process_event(event, is_direct).await;
964 true
965 }
966
967 async fn process_event(&mut self, event: Event, _is_direct: bool) {
973 let pre_payload = serde_json::json!({
975 "category": format!("{:?}", event.category),
976 "operation": &event.operation,
977 "source": event.source.fqn(),
978 "payload": &event.payload,
979 });
980 let pre_action = self.dispatch_hook(orcs_hook::HookPoint::RequestPreDispatch, pre_payload);
981 let request_payload = match pre_action {
982 orcs_hook::HookAction::Continue(ctx) => {
983 ctx.payload.get("payload").cloned().unwrap_or(event.payload)
985 }
986 orcs_hook::HookAction::Skip(value) => {
987 debug!(value = ?value, "request skipped by pre-dispatch hook");
988 return;
989 }
990 orcs_hook::HookAction::Abort { reason } => {
991 warn!(reason = %reason, "request aborted by pre-dispatch hook");
992 return;
993 }
994 orcs_hook::HookAction::Replace(_) => {
995 event.payload
997 }
998 };
999
1000 let is_user_input = event.category == EventCategory::UserInput;
1002 let event_operation = event.operation.clone();
1003
1004 let request = Request::new(
1005 event.category,
1006 &event_operation,
1007 event.source,
1008 self.id,
1009 request_payload,
1010 );
1011
1012 if is_user_input {
1014 if let Some(io_tx) = &self.io_output_tx {
1015 let notify = Event {
1016 category: EventCategory::Output,
1017 operation: "processing".to_string(),
1018 source: self.component_id.clone(),
1019 payload: serde_json::json!({
1020 "type": "processing",
1021 "component": &self.component_id.name,
1022 "operation": &event_operation,
1023 }),
1024 };
1025 let _ = io_tx.try_send_direct(notify);
1026 }
1027 }
1028
1029 let result = {
1030 let mut comp = self.component.lock().await;
1031 comp.on_request(&request)
1032 };
1033
1034 let post_payload = match &result {
1036 Ok(response) => serde_json::json!({
1037 "operation": &event_operation,
1038 "response": response,
1039 "success": true,
1040 }),
1041 Err(e) => serde_json::json!({
1042 "operation": &event_operation,
1043 "error": e.to_string(),
1044 "success": false,
1045 }),
1046 };
1047 let _post_action =
1048 self.dispatch_hook(orcs_hook::HookPoint::RequestPostDispatch, post_payload);
1049
1050 match result {
1051 Ok(response) => {
1052 debug!(response = ?response, "component returned success");
1053 }
1054 Err(ComponentError::Suspended {
1055 approval_id,
1056 grant_pattern,
1057 pending_request,
1058 }) => {
1059 info!(
1060 approval_id = %approval_id,
1061 grant_pattern = %grant_pattern,
1062 "component suspended pending approval"
1063 );
1064
1065 self.pending_approval = Some(PendingApproval {
1067 approval_id: approval_id.clone(),
1068 grant_pattern,
1069 original_request: request,
1070 });
1071
1072 send_transition(
1074 &self.world_tx,
1075 self.id,
1076 StateTransition::AwaitApproval {
1077 request_id: approval_id.clone(),
1078 },
1079 )
1080 .await;
1081
1082 let description = pending_request
1084 .get("description")
1085 .and_then(|v| v.as_str())
1086 .unwrap_or("command execution")
1087 .to_string();
1088 let command = pending_request
1089 .get("command")
1090 .and_then(|v| v.as_str())
1091 .unwrap_or("")
1092 .to_string();
1093
1094 let output_event = Event {
1095 category: EventCategory::Output,
1096 operation: "approval_request".to_string(),
1097 source: self.component_id.clone(),
1098 payload: serde_json::json!({
1099 "type": "approval_request",
1100 "approval_id": approval_id,
1101 "operation": "exec",
1102 "description": format!("{}: {}", description, command),
1103 "source": self.component_id.fqn(),
1104 }),
1105 };
1106 if let Some(io_tx) = &self.io_output_tx {
1107 let _ = io_tx.try_send_direct(output_event);
1108 }
1109 }
1110 Err(e) => {
1111 warn!(error = %e, "component returned error");
1112 }
1113 }
1114 }
1115
1116 async fn drain_paused_queue(&mut self) {
1121 let events: Vec<_> = self.paused_queue.drain("ChannelRunner", self.id).collect();
1123
1124 for event in events {
1125 self.process_event(event, true).await;
1126 }
1127 }
1128
1129 async fn handle_rpc_request(&self, envelope: RequestEnvelope) {
1134 debug!(
1135 request_id = %envelope.request.id,
1136 operation = %envelope.request.operation,
1137 source = %envelope.request.source,
1138 "handling RPC request"
1139 );
1140
1141 let pre_payload = serde_json::json!({
1143 "request_id": envelope.request.id.to_string(),
1144 "operation": &envelope.request.operation,
1145 "source": envelope.request.source.fqn(),
1146 "payload": &envelope.request.payload,
1147 });
1148 let pre_action = self.dispatch_hook(orcs_hook::HookPoint::RequestPreDispatch, pre_payload);
1149 match &pre_action {
1150 orcs_hook::HookAction::Skip(value) => {
1151 debug!(value = ?value, "RPC request skipped by pre-dispatch hook");
1152 let response = Ok(value.clone());
1153 if envelope.reply_tx.send(response).is_err() {
1154 debug!("RPC reply dropped (caller cancelled)");
1155 }
1156 return;
1157 }
1158 orcs_hook::HookAction::Abort { reason } => {
1159 warn!(reason = %reason, "RPC request aborted by pre-dispatch hook");
1160 let response = Err(reason.clone());
1161 if envelope.reply_tx.send(response).is_err() {
1162 debug!("RPC reply dropped (caller cancelled)");
1163 }
1164 return;
1165 }
1166 orcs_hook::HookAction::Continue(_) | orcs_hook::HookAction::Replace(_) => {}
1167 }
1168
1169 let result = {
1170 let mut comp = self.component.lock().await;
1171 comp.on_request(&envelope.request)
1172 };
1173
1174 let post_payload = match &result {
1176 Ok(response) => serde_json::json!({
1177 "operation": &envelope.request.operation,
1178 "response": response,
1179 "success": true,
1180 }),
1181 Err(e) => serde_json::json!({
1182 "operation": &envelope.request.operation,
1183 "error": e.to_string(),
1184 "success": false,
1185 }),
1186 };
1187 let post_action =
1188 self.dispatch_hook(orcs_hook::HookPoint::RequestPostDispatch, post_payload);
1189
1190 let final_result = match post_action {
1192 orcs_hook::HookAction::Replace(value) => Ok(value),
1193 _ => result.map_err(|e| e.to_string()),
1194 };
1195
1196 if envelope.reply_tx.send(final_result).is_err() {
1197 debug!("RPC reply dropped (caller cancelled)");
1198 }
1199 }
1200
1201 pub async fn spawn_child(
1205 &self,
1206 config: ChannelConfig,
1207 signal_rx: broadcast::Receiver<Signal>,
1208 component: Box<dyn Component>,
1209 ) -> Option<(ChannelRunner, ChannelHandle)> {
1210 let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
1211 let cmd = WorldCommand::Spawn {
1212 parent: self.id,
1213 config,
1214 reply: reply_tx,
1215 };
1216
1217 if self.world_tx.send(cmd).await.is_err() {
1218 return None;
1219 }
1220
1221 let child_id = reply_rx.await.ok()??;
1222 let (runner, handle) = ChannelRunner::builder(
1223 child_id,
1224 self.world_tx.clone(),
1225 Arc::clone(&self.world),
1226 signal_rx,
1227 component,
1228 )
1229 .build();
1230
1231 Some((runner, handle))
1232 }
1233}
1234
1235pub struct ChannelRunnerBuilder {
1261 id: ChannelId,
1262 world_tx: mpsc::Sender<WorldCommand>,
1263 world: Arc<RwLock<World>>,
1264 signal_rx: broadcast::Receiver<Signal>,
1265 component: Box<dyn Component>,
1266 emitter_signal_tx: Option<broadcast::Sender<Signal>>,
1268 output_tx: Option<OutputSender>,
1270 enable_child_spawner: bool,
1272 lua_loader: Option<Arc<dyn LuaChildLoader>>,
1274 component_loader: Option<Arc<dyn ComponentLoader>>,
1276 session: Option<Arc<Session>>,
1278 checker: Option<Arc<dyn PermissionChecker>>,
1280 grants: Option<Arc<dyn orcs_auth::GrantPolicy>>,
1282 shared_handles: Option<SharedChannelHandles>,
1284 board: Option<crate::board::SharedBoard>,
1286 initial_snapshot: Option<ComponentSnapshot>,
1288 enable_request_channel: bool,
1290 component_channel_map: Option<crate::engine::SharedComponentChannelMap>,
1292 hook_registry: Option<SharedHookRegistry>,
1294 component_config: serde_json::Value,
1296}
1297
1298impl ChannelRunnerBuilder {
1299 #[must_use]
1301 pub fn new(
1302 id: ChannelId,
1303 world_tx: mpsc::Sender<WorldCommand>,
1304 world: Arc<RwLock<World>>,
1305 signal_rx: broadcast::Receiver<Signal>,
1306 component: Box<dyn Component>,
1307 ) -> Self {
1308 Self {
1309 id,
1310 world_tx,
1311 world,
1312 signal_rx,
1313 component,
1314 emitter_signal_tx: None,
1315 output_tx: None,
1316 enable_child_spawner: false,
1317 lua_loader: None,
1318 component_loader: None,
1319 session: None,
1320 checker: None,
1321 grants: None,
1322 shared_handles: None,
1323 board: None,
1324 initial_snapshot: None,
1325 enable_request_channel: false,
1326 component_channel_map: None,
1327 hook_registry: None,
1328 component_config: serde_json::Value::Object(serde_json::Map::new()),
1329 }
1330 }
1331
1332 #[must_use]
1336 pub fn with_emitter(mut self, signal_tx: broadcast::Sender<Signal>) -> Self {
1337 self.emitter_signal_tx = Some(signal_tx);
1338 self
1339 }
1340
1341 #[must_use]
1352 pub fn with_output_channel(mut self, output_tx: OutputSender) -> Self {
1353 self.output_tx = Some(output_tx);
1354 self
1355 }
1356
1357 #[must_use]
1366 pub fn with_child_spawner(mut self, loader: Option<Arc<dyn LuaChildLoader>>) -> Self {
1367 self.enable_child_spawner = true;
1368 self.lua_loader = loader;
1369 self
1370 }
1371
1372 #[must_use]
1380 pub fn with_component_loader(mut self, loader: Arc<dyn ComponentLoader>) -> Self {
1381 self.component_loader = Some(loader);
1382 self
1383 }
1384
1385 #[must_use]
1397 pub fn with_session(mut self, session: Session) -> Self {
1398 self.session = Some(Arc::new(session));
1399 self
1400 }
1401
1402 #[must_use]
1411 pub fn with_session_arc(mut self, session: Arc<Session>) -> Self {
1412 self.session = Some(session);
1413 self
1414 }
1415
1416 #[must_use]
1422 pub fn with_checker(mut self, checker: Arc<dyn PermissionChecker>) -> Self {
1423 self.checker = Some(checker);
1424 self
1425 }
1426
1427 #[must_use]
1433 pub fn with_grants(mut self, grants: Arc<dyn orcs_auth::GrantPolicy>) -> Self {
1434 self.grants = Some(grants);
1435 self
1436 }
1437
1438 #[must_use]
1443 pub fn with_shared_handles(mut self, handles: SharedChannelHandles) -> Self {
1444 self.shared_handles = Some(handles);
1445 self
1446 }
1447
1448 #[must_use]
1453 pub fn with_component_channel_map(
1454 mut self,
1455 map: crate::engine::SharedComponentChannelMap,
1456 ) -> Self {
1457 self.component_channel_map = Some(map);
1458 self
1459 }
1460
1461 #[must_use]
1466 pub fn with_board(mut self, board: crate::board::SharedBoard) -> Self {
1467 self.board = Some(board);
1468 self
1469 }
1470
1471 #[must_use]
1481 pub fn with_initial_snapshot(mut self, snapshot: ComponentSnapshot) -> Self {
1482 self.initial_snapshot = Some(snapshot);
1483 self
1484 }
1485
1486 #[must_use]
1493 pub fn with_request_channel(mut self) -> Self {
1494 self.enable_request_channel = true;
1495 self
1496 }
1497
1498 #[must_use]
1503 pub fn with_hook_registry(mut self, registry: SharedHookRegistry) -> Self {
1504 self.hook_registry = Some(registry);
1505 self
1506 }
1507
1508 #[must_use]
1512 pub fn with_component_config(mut self, config: serde_json::Value) -> Self {
1513 self.component_config = config;
1514 self
1515 }
1516
1517 fn configure_context(
1522 &mut self,
1523 mut ctx: ChildContextImpl,
1524 io_output_tx: &Option<OutputSender>,
1525 component_id: &str,
1526 rpc_handles: &Option<SharedChannelHandles>,
1527 rpc_map: &Option<crate::engine::SharedComponentChannelMap>,
1528 channel_id: ChannelId,
1529 ) -> ChildContextImpl {
1530 if let Some(session) = self.session.take() {
1531 ctx = ctx.with_session_arc(session);
1532 info!("ChannelRunnerBuilder: enabled session for {}", component_id);
1533 }
1534 if let Some(checker) = self.checker.take() {
1535 ctx = ctx.with_checker(checker);
1536 info!(
1537 "ChannelRunnerBuilder: enabled permission checker for {}",
1538 component_id
1539 );
1540 }
1541 if let Some(grants) = self.grants.take() {
1542 ctx = ctx.with_grants(grants);
1543 info!(
1544 "ChannelRunnerBuilder: enabled grant store for {}",
1545 component_id
1546 );
1547 }
1548 if let Some(io_tx) = io_output_tx.clone() {
1549 ctx = ctx.with_io_output_channel(io_tx);
1550 info!(
1551 "ChannelRunnerBuilder: enabled IO output routing for {}",
1552 component_id
1553 );
1554 }
1555 if let (Some(handles), Some(map)) = (rpc_handles.clone(), rpc_map.clone()) {
1556 ctx = ctx.with_rpc_support(handles, map, channel_id);
1557 }
1558 if let Some(reg) = &self.hook_registry {
1559 ctx = ctx.with_hook_registry(Arc::clone(reg));
1560 }
1561 ctx
1562 }
1563
1564 #[must_use]
1566 pub fn build(mut self) -> (ChannelRunner, ChannelHandle) {
1567 let (event_tx, event_rx) = mpsc::channel(EVENT_BUFFER_SIZE);
1568
1569 let component_id = self.component.id().clone();
1571
1572 let io_output_tx = self.output_tx.as_ref().cloned();
1574
1575 let rpc_handles = self.shared_handles.clone();
1577 let rpc_map = self.component_channel_map.clone();
1578
1579 if let Some(signal_tx) = &self.emitter_signal_tx {
1581 let component_id = self.component.id().clone();
1582 let mut emitter = EventEmitter::new(
1583 OutputSender::new(event_tx.clone()),
1584 signal_tx.clone(),
1585 component_id.clone(),
1586 );
1587
1588 if let Some(output_tx) = self.output_tx.take() {
1590 emitter = emitter.with_output_channel(output_tx);
1591 info!(
1592 "ChannelRunnerBuilder: routing output to IO channel for {}",
1593 component_id.fqn()
1594 );
1595 }
1596
1597 if let Some(handles) = self.shared_handles.take() {
1599 emitter = emitter.with_shared_handles(handles);
1600 info!(
1601 "ChannelRunnerBuilder: enabled event broadcast for {}",
1602 component_id.fqn()
1603 );
1604 }
1605
1606 if let Some(map) = self.component_channel_map.take() {
1608 emitter = emitter.with_component_channel_map(map, self.id);
1609 }
1610
1611 if let Some(board) = self.board.take() {
1613 emitter = emitter.with_board(board);
1614 }
1615
1616 self.component.set_emitter(Box::new(emitter));
1617
1618 info!(
1619 "ChannelRunnerBuilder: injected emitter for {}",
1620 component_id.fqn()
1621 );
1622 }
1623
1624 let child_spawner = if self.enable_child_spawner {
1626 let component_id = self.component.id().fqn();
1627 let output_sender = OutputSender::new(event_tx.clone());
1628 let spawner = ChildSpawner::new(&component_id, output_sender.clone());
1629 let spawner_arc = Arc::new(StdMutex::new(spawner));
1630
1631 let mut ctx =
1633 ChildContextImpl::new(&component_id, output_sender, Arc::clone(&spawner_arc));
1634
1635 if let Some(loader) = self.lua_loader.take() {
1637 ctx = ctx.with_lua_loader(loader);
1638 info!(
1639 "ChannelRunnerBuilder: created spawner with Lua loader for {}",
1640 component_id
1641 );
1642 } else {
1643 info!(
1644 "ChannelRunnerBuilder: created spawner (no Lua loader) for {}",
1645 component_id
1646 );
1647 }
1648
1649 if let Some(signal_tx) = &self.emitter_signal_tx {
1651 ctx = ctx.with_runner_support(
1652 self.world_tx.clone(),
1653 Arc::clone(&self.world),
1654 signal_tx.clone(),
1655 );
1656 info!(
1657 "ChannelRunnerBuilder: enabled runner spawning for {}",
1658 component_id
1659 );
1660 }
1661
1662 if let Some(loader) = self.component_loader.take() {
1664 ctx = ctx.with_component_loader(loader);
1665 info!(
1666 "ChannelRunnerBuilder: enabled component loader for {}",
1667 component_id
1668 );
1669 }
1670
1671 ctx = self.configure_context(
1673 ctx,
1674 &io_output_tx,
1675 &component_id,
1676 &rpc_handles,
1677 &rpc_map,
1678 self.id,
1679 );
1680
1681 self.component.set_child_context(Box::new(ctx));
1682
1683 Some(spawner_arc)
1684 } else if self.session.is_some() || self.checker.is_some() || self.grants.is_some() {
1685 let component_id = self.component.id().fqn();
1687 let dummy_output = OutputSender::new(event_tx.clone());
1688 let dummy_spawner = ChildSpawner::new(&component_id, dummy_output.clone());
1689 let dummy_arc = Arc::new(StdMutex::new(dummy_spawner));
1690 let mut ctx =
1691 ChildContextImpl::new(&component_id, dummy_output, Arc::clone(&dummy_arc));
1692
1693 ctx = self.configure_context(
1694 ctx,
1695 &io_output_tx,
1696 &component_id,
1697 &rpc_handles,
1698 &rpc_map,
1699 self.id,
1700 );
1701
1702 self.component.set_child_context(Box::new(ctx));
1703 info!(
1704 "ChannelRunnerBuilder: auth-only context injected for {}",
1705 component_id
1706 );
1707 None
1708 } else {
1709 None
1710 };
1711
1712 let event_tx_for_context = if self.enable_child_spawner || self.emitter_signal_tx.is_some()
1714 {
1715 Some(event_tx.clone())
1716 } else {
1717 None
1718 };
1719
1720 let subscriptions = self.component.subscription_entries();
1723
1724 let (request_tx, request_rx) = if self.enable_request_channel {
1726 let (tx, rx) = mpsc::channel(REQUEST_BUFFER_SIZE);
1727 (Some(tx), Some(rx))
1728 } else {
1729 (None, None)
1730 };
1731
1732 let runner = ChannelRunner {
1733 id: self.id,
1734 component_id,
1735 event_rx,
1736 signal_rx: self.signal_rx,
1737 world_tx: self.world_tx,
1738 world: self.world,
1739 component: Arc::new(Mutex::new(self.component)),
1740 subscriptions,
1741 paused_queue: PausedEventQueue::new(),
1742 child_spawner,
1743 event_tx: event_tx_for_context,
1744 request_rx,
1745 initial_snapshot: self.initial_snapshot,
1746 shared_handles: rpc_handles,
1747 component_channel_map: rpc_map,
1748 hook_registry: self.hook_registry,
1749 component_config: self.component_config,
1750 grants: self.grants.clone(),
1751 io_output_tx: io_output_tx.clone(),
1752 pending_approval: None,
1753 };
1754
1755 let mut handle = ChannelHandle::new(self.id, event_tx);
1756 handle.request_tx = request_tx;
1757
1758 (runner, handle)
1759 }
1760}
1761
1762impl ChannelRunner {
1763 #[must_use]
1776 pub fn builder(
1777 id: ChannelId,
1778 world_tx: mpsc::Sender<WorldCommand>,
1779 world: Arc<RwLock<World>>,
1780 signal_rx: broadcast::Receiver<Signal>,
1781 component: Box<dyn Component>,
1782 ) -> ChannelRunnerBuilder {
1783 ChannelRunnerBuilder::new(id, world_tx, world, signal_rx, component)
1784 }
1785}
1786
1787#[cfg(test)]
1788mod tests {
1789 use super::*;
1790 use crate::channel::manager::WorldManager;
1791 use crate::channel::ChannelConfig;
1792 use orcs_component::{ComponentError, Status};
1793 use orcs_event::{EventCategory, SignalResponse};
1794 use orcs_types::{ComponentId, Principal};
1795 use serde_json::Value;
1796
1797 struct MockComponent {
1799 id: ComponentId,
1800 status: Status,
1801 }
1802
1803 impl MockComponent {
1804 fn new(name: &str) -> Self {
1805 Self {
1806 id: ComponentId::builtin(name),
1807 status: Status::Idle,
1808 }
1809 }
1810 }
1811
1812 impl Component for MockComponent {
1813 fn id(&self) -> &ComponentId {
1814 &self.id
1815 }
1816
1817 fn status(&self) -> Status {
1818 self.status
1819 }
1820
1821 fn subscriptions(&self) -> &[EventCategory] {
1822 &[EventCategory::Echo, EventCategory::Lifecycle]
1823 }
1824
1825 fn on_request(&mut self, request: &Request) -> Result<Value, ComponentError> {
1826 Ok(request.payload.clone())
1827 }
1828
1829 fn on_signal(&mut self, signal: &Signal) -> SignalResponse {
1830 if signal.is_veto() {
1831 self.status = Status::Aborted;
1832 SignalResponse::Abort
1833 } else {
1834 SignalResponse::Handled
1835 }
1836 }
1837
1838 fn abort(&mut self) {
1839 self.status = Status::Aborted;
1840 }
1841 }
1842
1843 fn mock_component() -> Box<dyn Component> {
1844 Box::new(MockComponent::new("test"))
1845 }
1846
1847 async fn setup() -> (
1848 tokio::task::JoinHandle<()>,
1849 mpsc::Sender<WorldCommand>,
1850 Arc<RwLock<World>>,
1851 broadcast::Sender<Signal>,
1852 ChannelId,
1853 ) {
1854 let mut world = World::new();
1855 let io = world.create_channel(ChannelConfig::interactive());
1856
1857 let (manager, world_tx) = WorldManager::with_world(world);
1858 let world_handle = manager.world();
1859
1860 let manager_task = tokio::spawn(manager.run());
1861
1862 let (signal_tx, _) = broadcast::channel(64);
1863
1864 (manager_task, world_tx, world_handle, signal_tx, io)
1865 }
1866
1867 async fn teardown(
1868 manager_task: tokio::task::JoinHandle<()>,
1869 world_tx: mpsc::Sender<WorldCommand>,
1870 ) {
1871 let _ = world_tx.send(WorldCommand::Shutdown).await;
1872 let _ = manager_task.await;
1873 }
1874
1875 #[tokio::test]
1876 async fn runner_creation() {
1877 let (manager_task, world_tx, world, signal_tx, primary) = setup().await;
1878
1879 let signal_rx = signal_tx.subscribe();
1880 let (runner, handle) = ChannelRunner::builder(
1881 primary,
1882 world_tx.clone(),
1883 world,
1884 signal_rx,
1885 mock_component(),
1886 )
1887 .build();
1888
1889 assert_eq!(runner.id(), primary);
1890 assert_eq!(handle.id, primary);
1891
1892 teardown(manager_task, world_tx).await;
1893 }
1894
1895 #[tokio::test]
1896 async fn runner_receives_events() {
1897 let (manager_task, world_tx, world, signal_tx, primary) = setup().await;
1898
1899 let signal_rx = signal_tx.subscribe();
1900 let (runner, handle) = ChannelRunner::builder(
1901 primary,
1902 world_tx.clone(),
1903 world,
1904 signal_rx,
1905 mock_component(),
1906 )
1907 .build();
1908
1909 let runner_task = tokio::spawn(runner.run());
1910
1911 let event = Event {
1912 category: EventCategory::Echo,
1913 operation: "echo".to_string(),
1914 source: ComponentId::builtin("test"),
1915 payload: serde_json::json!({"test": true}),
1916 };
1917 handle
1918 .inject(event)
1919 .await
1920 .expect("inject echo event into runner");
1921
1922 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
1923
1924 signal_tx
1925 .send(Signal::cancel(primary, Principal::System))
1926 .expect("send cancel signal");
1927
1928 let _ = tokio::time::timeout(std::time::Duration::from_millis(100), runner_task).await;
1929
1930 teardown(manager_task, world_tx).await;
1931 }
1932
1933 #[tokio::test]
1934 async fn runner_handles_veto() {
1935 let (manager_task, world_tx, world, signal_tx, primary) = setup().await;
1936
1937 let signal_rx = signal_tx.subscribe();
1938 let (runner, _handle) = ChannelRunner::builder(
1939 primary,
1940 world_tx.clone(),
1941 world.clone(),
1942 signal_rx,
1943 mock_component(),
1944 )
1945 .build();
1946
1947 let runner_task = tokio::spawn(runner.run());
1948
1949 tokio::task::yield_now().await;
1950
1951 signal_tx
1952 .send(Signal::veto(Principal::System))
1953 .expect("send veto signal");
1954
1955 let result = tokio::time::timeout(std::time::Duration::from_millis(100), runner_task).await;
1956 assert!(result.is_ok());
1957
1958 teardown(manager_task, world_tx).await;
1959 }
1960
1961 #[tokio::test]
1962 async fn channel_handle_clone() {
1963 let (manager_task, world_tx, world, signal_tx, primary) = setup().await;
1964
1965 let signal_rx = signal_tx.subscribe();
1966 let (_runner, handle) = ChannelRunner::builder(
1967 primary,
1968 world_tx.clone(),
1969 world,
1970 signal_rx,
1971 mock_component(),
1972 )
1973 .build();
1974
1975 let handle2 = handle.clone();
1976 assert_eq!(handle.id, handle2.id);
1977
1978 teardown(manager_task, world_tx).await;
1979 }
1980
1981 #[tokio::test]
1982 async fn runner_with_emitter_creation() {
1983 let (manager_task, world_tx, world, signal_tx, primary) = setup().await;
1984
1985 let signal_rx = signal_tx.subscribe();
1986 let (runner, handle) = ChannelRunner::builder(
1987 primary,
1988 world_tx.clone(),
1989 world,
1990 signal_rx,
1991 mock_component(),
1992 )
1993 .with_emitter(signal_tx.clone())
1994 .build();
1995
1996 assert_eq!(runner.id(), primary);
1997 assert_eq!(handle.id, primary);
1998
1999 teardown(manager_task, world_tx).await;
2000 }
2001
2002 #[tokio::test]
2003 async fn runner_with_emitter_receives_emitted_events() {
2004 use std::sync::atomic::{AtomicUsize, Ordering};
2005 use std::sync::Arc as StdArc;
2006
2007 struct EmittingComponent {
2009 id: ComponentId,
2010 emitter: Option<Box<dyn orcs_component::Emitter>>,
2011 call_count: StdArc<AtomicUsize>,
2012 }
2013
2014 impl EmittingComponent {
2015 fn new(call_count: StdArc<AtomicUsize>) -> Self {
2016 Self {
2017 id: ComponentId::builtin("emitting"),
2018 emitter: None,
2019 call_count,
2020 }
2021 }
2022 }
2023
2024 impl Component for EmittingComponent {
2025 fn id(&self) -> &ComponentId {
2026 &self.id
2027 }
2028
2029 fn status(&self) -> Status {
2030 Status::Idle
2031 }
2032
2033 fn subscriptions(&self) -> &[EventCategory] {
2034 &[EventCategory::Echo]
2035 }
2036
2037 fn on_request(&mut self, request: &Request) -> Result<Value, ComponentError> {
2038 self.call_count.fetch_add(1, Ordering::SeqCst);
2039 if let Some(emitter) = &self.emitter {
2041 emitter.emit_output("Response from component");
2042 }
2043 Ok(request.payload.clone())
2044 }
2045
2046 fn on_signal(&mut self, signal: &Signal) -> SignalResponse {
2047 if signal.is_veto() {
2048 SignalResponse::Abort
2049 } else {
2050 SignalResponse::Handled
2051 }
2052 }
2053
2054 fn abort(&mut self) {}
2055
2056 fn set_emitter(&mut self, emitter: Box<dyn orcs_component::Emitter>) {
2057 self.emitter = Some(emitter);
2058 }
2059 }
2060
2061 let (manager_task, world_tx, world, signal_tx, primary) = setup().await;
2062
2063 let call_count = StdArc::new(AtomicUsize::new(0));
2064 let component = Box::new(EmittingComponent::new(StdArc::clone(&call_count)));
2065
2066 let signal_rx = signal_tx.subscribe();
2067 let (runner, handle) =
2068 ChannelRunner::builder(primary, world_tx.clone(), world, signal_rx, component)
2069 .with_emitter(signal_tx.clone())
2070 .build();
2071
2072 let runner_task = tokio::spawn(runner.run());
2073
2074 let event = Event {
2076 category: EventCategory::Echo,
2077 operation: "test".to_string(),
2078 source: ComponentId::builtin("test"),
2079 payload: serde_json::json!({"trigger": true}),
2080 };
2081 handle
2082 .inject(event)
2083 .await
2084 .expect("inject event into emitting runner");
2085
2086 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2088
2089 assert!(
2091 call_count.load(Ordering::SeqCst) >= 1,
2092 "Component should have received the event"
2093 );
2094
2095 signal_tx
2097 .send(Signal::cancel(primary, Principal::System))
2098 .expect("send cancel signal to emitting runner");
2099
2100 let _ = tokio::time::timeout(std::time::Duration::from_millis(100), runner_task).await;
2101
2102 teardown(manager_task, world_tx).await;
2103 }
2104
2105 #[tokio::test]
2108 async fn builder_basic() {
2109 let (manager_task, world_tx, world, signal_tx, primary) = setup().await;
2110
2111 let signal_rx = signal_tx.subscribe();
2112 let (runner, handle) = ChannelRunner::builder(
2113 primary,
2114 world_tx.clone(),
2115 world,
2116 signal_rx,
2117 mock_component(),
2118 )
2119 .build();
2120
2121 assert_eq!(runner.id(), primary);
2122 assert_eq!(handle.id, primary);
2123 assert!(runner.child_spawner().is_none());
2124
2125 teardown(manager_task, world_tx).await;
2126 }
2127
2128 #[tokio::test]
2129 async fn builder_with_emitter() {
2130 let (manager_task, world_tx, world, signal_tx, primary) = setup().await;
2131
2132 let signal_rx = signal_tx.subscribe();
2133 let (runner, handle) = ChannelRunner::builder(
2134 primary,
2135 world_tx.clone(),
2136 world,
2137 signal_rx,
2138 mock_component(),
2139 )
2140 .with_emitter(signal_tx.clone())
2141 .build();
2142
2143 assert_eq!(runner.id(), primary);
2144 assert_eq!(handle.id, primary);
2145
2146 teardown(manager_task, world_tx).await;
2147 }
2148
2149 #[tokio::test]
2150 async fn builder_with_child_spawner() {
2151 let (manager_task, world_tx, world, signal_tx, primary) = setup().await;
2152
2153 let signal_rx = signal_tx.subscribe();
2154 let (runner, handle) = ChannelRunner::builder(
2155 primary,
2156 world_tx.clone(),
2157 world,
2158 signal_rx,
2159 mock_component(),
2160 )
2161 .with_child_spawner(None)
2162 .build();
2163
2164 assert_eq!(runner.id(), primary);
2165 assert_eq!(handle.id, primary);
2166 assert!(runner.child_spawner().is_some());
2167
2168 teardown(manager_task, world_tx).await;
2169 }
2170
2171 #[tokio::test]
2172 async fn builder_with_full_support() {
2173 let (manager_task, world_tx, world, signal_tx, primary) = setup().await;
2174
2175 let signal_rx = signal_tx.subscribe();
2176 let (runner, handle) = ChannelRunner::builder(
2177 primary,
2178 world_tx.clone(),
2179 world,
2180 signal_rx,
2181 mock_component(),
2182 )
2183 .with_emitter(signal_tx.clone())
2184 .with_child_spawner(None)
2185 .build();
2186
2187 assert_eq!(runner.id(), primary);
2188 assert_eq!(handle.id, primary);
2189 assert!(runner.child_spawner().is_some());
2190
2191 teardown(manager_task, world_tx).await;
2192 }
2193
2194 #[tokio::test]
2195 async fn builder_creates_child_context() {
2196 let (manager_task, world_tx, world, signal_tx, primary) = setup().await;
2197
2198 let signal_rx = signal_tx.subscribe();
2199 let (runner, _handle) = ChannelRunner::builder(
2200 primary,
2201 world_tx.clone(),
2202 world,
2203 signal_rx,
2204 mock_component(),
2205 )
2206 .with_child_spawner(None)
2207 .build();
2208
2209 let ctx = runner.create_child_context("child-1");
2211 assert!(ctx.is_some());
2212
2213 teardown(manager_task, world_tx).await;
2214 }
2215
2216 #[tokio::test]
2217 async fn builder_no_child_context_without_spawner() {
2218 let (manager_task, world_tx, world, signal_tx, primary) = setup().await;
2219
2220 let signal_rx = signal_tx.subscribe();
2221 let (runner, _handle) = ChannelRunner::builder(
2222 primary,
2223 world_tx.clone(),
2224 world,
2225 signal_rx,
2226 mock_component(),
2227 )
2228 .build();
2229
2230 let ctx = runner.create_child_context("child-1");
2232 assert!(ctx.is_none());
2233
2234 teardown(manager_task, world_tx).await;
2235 }
2236
2237 #[tokio::test]
2240 async fn builder_with_request_channel_enables_accepts_requests() {
2241 let (manager_task, world_tx, world, signal_tx, primary) = setup().await;
2242
2243 let signal_rx = signal_tx.subscribe();
2244 let (_runner, handle) = ChannelRunner::builder(
2245 primary,
2246 world_tx.clone(),
2247 world,
2248 signal_rx,
2249 mock_component(),
2250 )
2251 .with_request_channel()
2252 .build();
2253
2254 assert!(handle.accepts_requests());
2255
2256 teardown(manager_task, world_tx).await;
2257 }
2258
2259 #[tokio::test]
2260 async fn builder_without_request_channel_rejects_requests() {
2261 let (manager_task, world_tx, world, signal_tx, primary) = setup().await;
2262
2263 let signal_rx = signal_tx.subscribe();
2264 let (_runner, handle) = ChannelRunner::builder(
2265 primary,
2266 world_tx.clone(),
2267 world,
2268 signal_rx,
2269 mock_component(),
2270 )
2271 .build();
2272
2273 assert!(!handle.accepts_requests());
2274
2275 teardown(manager_task, world_tx).await;
2276 }
2277
2278 #[tokio::test]
2279 async fn rpc_request_routed_to_runner_and_responded() {
2280 let (manager_task, world_tx, world, signal_tx, primary) = setup().await;
2281
2282 let signal_rx = signal_tx.subscribe();
2283 let (runner, handle) = ChannelRunner::builder(
2284 primary,
2285 world_tx.clone(),
2286 world,
2287 signal_rx,
2288 mock_component(),
2289 )
2290 .with_request_channel()
2291 .build();
2292
2293 let runner_task = tokio::spawn(runner.run());
2294
2295 let source = ComponentId::builtin("caller");
2297 let target = ComponentId::builtin("test");
2298 let req = Request::new(
2299 EventCategory::Echo,
2300 "echo",
2301 source,
2302 primary,
2303 Value::String("rpc_payload".into()),
2304 )
2305 .with_target(target);
2306
2307 let (reply_tx, reply_rx) = oneshot::channel();
2308 handle
2309 .send_request(req, reply_tx)
2310 .await
2311 .expect("send RPC request to runner");
2312
2313 let result = tokio::time::timeout(std::time::Duration::from_millis(200), reply_rx)
2315 .await
2316 .expect("reply should arrive within timeout")
2317 .expect("reply channel should not be dropped");
2318
2319 assert_eq!(
2320 result.expect("RPC response should be Ok"),
2321 Value::String("rpc_payload".into())
2322 );
2323
2324 signal_tx
2326 .send(Signal::cancel(primary, Principal::System))
2327 .expect("send cancel signal after RPC test");
2328 let _ = tokio::time::timeout(std::time::Duration::from_millis(100), runner_task).await;
2329 teardown(manager_task, world_tx).await;
2330 }
2331
2332 #[tokio::test]
2333 async fn rpc_request_component_error_returned_as_string() {
2334 struct FailingComponent {
2336 id: ComponentId,
2337 }
2338
2339 impl Component for FailingComponent {
2340 fn id(&self) -> &ComponentId {
2341 &self.id
2342 }
2343 fn status(&self) -> Status {
2344 Status::Idle
2345 }
2346 fn on_request(&mut self, _request: &Request) -> Result<Value, ComponentError> {
2347 Err(ComponentError::ExecutionFailed(
2348 "deliberate test failure".into(),
2349 ))
2350 }
2351 fn on_signal(&mut self, signal: &Signal) -> SignalResponse {
2352 if signal.is_veto() {
2353 SignalResponse::Abort
2354 } else {
2355 SignalResponse::Handled
2356 }
2357 }
2358 fn abort(&mut self) {}
2359 }
2360
2361 let (manager_task, world_tx, world, signal_tx, primary) = setup().await;
2362
2363 let component: Box<dyn Component> = Box::new(FailingComponent {
2364 id: ComponentId::builtin("failing"),
2365 });
2366 let signal_rx = signal_tx.subscribe();
2367 let (runner, handle) =
2368 ChannelRunner::builder(primary, world_tx.clone(), world, signal_rx, component)
2369 .with_request_channel()
2370 .build();
2371
2372 let runner_task = tokio::spawn(runner.run());
2373
2374 let source = ComponentId::builtin("caller");
2375 let target = ComponentId::builtin("failing");
2376 let req = Request::new(EventCategory::Echo, "op", source, primary, Value::Null)
2377 .with_target(target);
2378
2379 let (reply_tx, reply_rx) = oneshot::channel();
2380 handle
2381 .send_request(req, reply_tx)
2382 .await
2383 .expect("send RPC request to failing runner");
2384
2385 let result = tokio::time::timeout(std::time::Duration::from_millis(200), reply_rx)
2386 .await
2387 .expect("reply should arrive")
2388 .expect("channel should not be dropped");
2389
2390 assert!(result.is_err(), "should return Err for component failure");
2391 let err_msg = result.expect_err("expected Err variant for component failure");
2392 assert!(
2393 err_msg.contains("deliberate test failure"),
2394 "error message should contain original error, got: {err_msg}"
2395 );
2396
2397 signal_tx
2398 .send(Signal::cancel(primary, Principal::System))
2399 .expect("send cancel signal after failing RPC test");
2400 let _ = tokio::time::timeout(std::time::Duration::from_millis(100), runner_task).await;
2401 teardown(manager_task, world_tx).await;
2402 }
2403
2404 #[tokio::test]
2412 async fn emitter_output_routes_to_output_channel_via_builder() {
2413 use std::sync::atomic::{AtomicUsize, Ordering};
2414 use std::sync::Arc as StdArc;
2415
2416 struct OutputRoutingComponent {
2418 id: ComponentId,
2419 emitter: Option<Box<dyn orcs_component::Emitter>>,
2420 call_count: StdArc<AtomicUsize>,
2421 }
2422
2423 impl OutputRoutingComponent {
2424 fn new(call_count: StdArc<AtomicUsize>) -> Self {
2425 Self {
2426 id: ComponentId::builtin("output-routing"),
2427 emitter: None,
2428 call_count,
2429 }
2430 }
2431 }
2432
2433 impl Component for OutputRoutingComponent {
2434 fn id(&self) -> &ComponentId {
2435 &self.id
2436 }
2437 fn status(&self) -> Status {
2438 Status::Idle
2439 }
2440 fn subscriptions(&self) -> &[EventCategory] {
2441 &[EventCategory::Echo]
2442 }
2443 fn on_request(&mut self, _request: &Request) -> Result<Value, ComponentError> {
2444 self.call_count.fetch_add(1, Ordering::SeqCst);
2445 if let Some(emitter) = &self.emitter {
2446 emitter.emit_output("routed output message");
2447 }
2448 Ok(serde_json::json!({"success": true}))
2449 }
2450 fn on_signal(&mut self, signal: &Signal) -> SignalResponse {
2451 if signal.is_veto() {
2452 SignalResponse::Abort
2453 } else {
2454 SignalResponse::Handled
2455 }
2456 }
2457 fn abort(&mut self) {}
2458 fn set_emitter(&mut self, emitter: Box<dyn orcs_component::Emitter>) {
2459 self.emitter = Some(emitter);
2460 }
2461 }
2462
2463 let (manager_task, world_tx, world, signal_tx, primary) = setup().await;
2464
2465 let call_count = StdArc::new(AtomicUsize::new(0));
2466 let component = Box::new(OutputRoutingComponent::new(StdArc::clone(&call_count)));
2467
2468 let (output_tx, mut output_rx) = OutputSender::channel(64);
2470
2471 let signal_rx = signal_tx.subscribe();
2472 let (runner, handle) =
2473 ChannelRunner::builder(primary, world_tx.clone(), world, signal_rx, component)
2474 .with_emitter(signal_tx.clone())
2475 .with_output_channel(output_tx)
2476 .build();
2477
2478 let runner_task = tokio::spawn(runner.run());
2479
2480 let event = Event {
2482 category: EventCategory::Echo,
2483 operation: "test".to_string(),
2484 source: ComponentId::builtin("test"),
2485 payload: serde_json::json!({"trigger": true}),
2486 };
2487 handle.inject(event).await.expect("inject event");
2488
2489 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2491
2492 assert!(
2494 call_count.load(Ordering::SeqCst) >= 1,
2495 "Component should have received the event"
2496 );
2497
2498 let output_event = output_rx
2500 .try_recv()
2501 .expect("Output event should arrive on output channel");
2502 assert_eq!(
2503 output_event.category,
2504 EventCategory::Output,
2505 "Event category should be Output"
2506 );
2507 assert_eq!(
2508 output_event.operation, "display",
2509 "Event operation should be 'display'"
2510 );
2511 assert_eq!(
2512 output_event.payload["message"], "routed output message",
2513 "Event payload message should match what the component emitted"
2514 );
2515 assert_eq!(
2516 output_event.payload["level"], "info",
2517 "Event payload level should be 'info'"
2518 );
2519
2520 signal_tx
2522 .send(Signal::cancel(primary, Principal::System))
2523 .expect("send cancel");
2524 let _ = tokio::time::timeout(std::time::Duration::from_millis(100), runner_task).await;
2525 teardown(manager_task, world_tx).await;
2526 }
2527
2528 #[tokio::test]
2529 async fn emitter_output_records_to_board_and_routes_to_output_channel() {
2530 use std::sync::atomic::{AtomicUsize, Ordering};
2531 use std::sync::Arc as StdArc;
2532
2533 struct BoardOutputComponent {
2534 id: ComponentId,
2535 emitter: Option<Box<dyn orcs_component::Emitter>>,
2536 call_count: StdArc<AtomicUsize>,
2537 }
2538
2539 impl BoardOutputComponent {
2540 fn new(call_count: StdArc<AtomicUsize>) -> Self {
2541 Self {
2542 id: ComponentId::builtin("board-output"),
2543 emitter: None,
2544 call_count,
2545 }
2546 }
2547 }
2548
2549 impl Component for BoardOutputComponent {
2550 fn id(&self) -> &ComponentId {
2551 &self.id
2552 }
2553 fn status(&self) -> Status {
2554 Status::Idle
2555 }
2556 fn subscriptions(&self) -> &[EventCategory] {
2557 &[EventCategory::Echo]
2558 }
2559 fn on_request(&mut self, _request: &Request) -> Result<Value, ComponentError> {
2560 self.call_count.fetch_add(1, Ordering::SeqCst);
2561 if let Some(emitter) = &self.emitter {
2562 emitter.emit_output("board and io message");
2563 }
2564 Ok(serde_json::json!({"success": true}))
2565 }
2566 fn on_signal(&mut self, signal: &Signal) -> SignalResponse {
2567 if signal.is_veto() {
2568 SignalResponse::Abort
2569 } else {
2570 SignalResponse::Handled
2571 }
2572 }
2573 fn abort(&mut self) {}
2574 fn set_emitter(&mut self, emitter: Box<dyn orcs_component::Emitter>) {
2575 self.emitter = Some(emitter);
2576 }
2577 }
2578
2579 let (manager_task, world_tx, world, signal_tx, primary) = setup().await;
2580
2581 let call_count = StdArc::new(AtomicUsize::new(0));
2582 let component = Box::new(BoardOutputComponent::new(StdArc::clone(&call_count)));
2583
2584 let (output_tx, mut output_rx) = OutputSender::channel(64);
2585 let board = crate::board::shared_board();
2586
2587 let signal_rx = signal_tx.subscribe();
2588 let (runner, handle) =
2589 ChannelRunner::builder(primary, world_tx.clone(), world, signal_rx, component)
2590 .with_emitter(signal_tx.clone())
2591 .with_output_channel(output_tx)
2592 .with_board(Arc::clone(&board))
2593 .build();
2594
2595 let runner_task = tokio::spawn(runner.run());
2596
2597 let event = Event {
2599 category: EventCategory::Echo,
2600 operation: "test".to_string(),
2601 source: ComponentId::builtin("test"),
2602 payload: serde_json::json!({"trigger": true}),
2603 };
2604 handle.inject(event).await.expect("inject event");
2605
2606 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2608
2609 assert!(call_count.load(Ordering::SeqCst) >= 1);
2611
2612 let output_event = output_rx
2614 .try_recv()
2615 .expect("Output event should arrive on output channel");
2616 assert_eq!(output_event.payload["message"], "board and io message");
2617
2618 let b = board.read();
2620 assert!(b.len() >= 1, "Board should have at least 1 entry");
2621 let entries = b.recent(1);
2622 assert_eq!(
2623 entries[0].payload["message"], "board and io message",
2624 "Board entry should match the emitted message"
2625 );
2626
2627 signal_tx
2629 .send(Signal::cancel(primary, Principal::System))
2630 .expect("send cancel");
2631 let _ = tokio::time::timeout(std::time::Duration::from_millis(100), runner_task).await;
2632 teardown(manager_task, world_tx).await;
2633 }
2634}