1use super::child_context::{ChildContextImpl, LuaChildLoader};
35use super::child_spawner::ChildSpawner;
36use super::common::{
37 determine_channel_action, dispatch_signal_to_component, is_channel_active, is_channel_paused,
38 send_abort, send_transition, SignalAction,
39};
40use super::paused_queue::PausedEventQueue;
41use super::EventEmitter;
42use crate::auth::PermissionChecker;
43use crate::auth::Session;
44use crate::channel::command::{StateTransition, WorldCommand};
45use crate::channel::config::ChannelConfig;
46use crate::channel::World;
47use crate::engine::SharedChannelHandles;
48use orcs_component::{
49 AsyncChildContext, ChildContext, Component, ComponentError, ComponentLoader, ComponentSnapshot,
50 SnapshotError,
51};
52use orcs_event::{EventCategory, Request, Signal, SubscriptionEntry};
53use orcs_hook::SharedHookRegistry;
54use orcs_types::ChannelId;
55use serde::{Deserialize, Serialize};
56use serde_json::Value;
57use std::borrow::Cow;
58use std::sync::{Arc, Mutex as StdMutex};
59use tokio::sync::{broadcast, mpsc, oneshot, Mutex, RwLock};
60use tracing::{debug, error, info, warn};
61
62#[derive(Debug, Clone, Serialize, Deserialize)]
68pub struct Event {
69 pub category: EventCategory,
71 pub operation: String,
73 pub source: orcs_types::ComponentId,
75 pub payload: serde_json::Value,
77}
78
79#[derive(Debug, Clone)]
91pub(crate) enum InboundEvent {
92 Broadcast(Event),
94 Direct(Event),
96}
97
98impl InboundEvent {
99 pub(crate) fn into_event(self) -> Event {
101 match self {
102 Self::Broadcast(e) | Self::Direct(e) => e,
103 }
104 }
105
106 pub(crate) fn is_direct(&self) -> bool {
108 matches!(self, Self::Direct(_))
109 }
110}
111
112#[derive(Clone, Debug)]
121pub struct OutputSender {
122 inner: mpsc::Sender<InboundEvent>,
123}
124
125impl OutputSender {
126 #[must_use]
132 pub fn channel(buffer: usize) -> (Self, OutputReceiver) {
133 let (tx, rx) = mpsc::channel(buffer);
134 (Self { inner: tx }, OutputReceiver { inner: rx })
135 }
136
137 pub(crate) fn new(tx: mpsc::Sender<InboundEvent>) -> Self {
139 Self { inner: tx }
140 }
141
142 #[allow(dead_code)]
144 pub(crate) fn into_inner(self) -> mpsc::Sender<InboundEvent> {
145 self.inner
146 }
147
148 #[allow(clippy::result_large_err)]
150 pub(crate) fn try_send_direct(
151 &self,
152 event: Event,
153 ) -> Result<(), mpsc::error::TrySendError<InboundEvent>> {
154 self.inner.try_send(InboundEvent::Direct(event))
155 }
156
157 #[allow(dead_code)]
159 pub(crate) async fn send_direct(
160 &self,
161 event: Event,
162 ) -> Result<(), mpsc::error::SendError<InboundEvent>> {
163 self.inner.send(InboundEvent::Direct(event)).await
164 }
165}
166
167pub struct OutputReceiver {
172 inner: mpsc::Receiver<InboundEvent>,
173}
174
175impl OutputReceiver {
176 pub async fn recv(&mut self) -> Option<Event> {
180 self.inner.recv().await.map(InboundEvent::into_event)
181 }
182
183 pub fn try_recv(&mut self) -> Result<Event, mpsc::error::TryRecvError> {
185 self.inner.try_recv().map(InboundEvent::into_event)
186 }
187}
188
189#[derive(Debug, Clone, Copy, PartialEq, Eq)]
197pub enum ExitReason {
198 Signal,
200 EventChannelClosed,
202 SignalChannelClosed,
204 ChannelInactive,
206 ComponentStopped,
208 IoChannelClosed,
210 UserQuit,
212}
213
214impl ExitReason {
215 pub fn as_str(&self) -> &'static str {
217 match self {
218 Self::Signal => "signal",
219 Self::EventChannelClosed => "event_channel_closed",
220 Self::SignalChannelClosed => "signal_channel_closed",
221 Self::ChannelInactive => "channel_inactive",
222 Self::ComponentStopped => "component_stopped",
223 Self::IoChannelClosed => "io_channel_closed",
224 Self::UserQuit => "user_quit",
225 }
226 }
227}
228
229impl std::fmt::Display for ExitReason {
230 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
231 f.write_str(self.as_str())
232 }
233}
234
235#[derive(Debug)]
239pub struct RunnerResult {
240 pub channel_id: ChannelId,
242 pub component_fqn: Cow<'static, str>,
247 pub snapshot: Option<ComponentSnapshot>,
249 pub exit_reason: ExitReason,
251}
252
253pub(crate) struct RequestEnvelope {
259 pub request: Request,
261 pub reply_tx: oneshot::Sender<Result<Value, String>>,
263}
264
265const REQUEST_BUFFER_SIZE: usize = 32;
267
268const EVENT_BUFFER_SIZE: usize = 64;
270
271#[derive(Clone, Debug)]
273pub struct ChannelHandle {
274 pub id: ChannelId,
276 event_tx: mpsc::Sender<InboundEvent>,
278 request_tx: Option<mpsc::Sender<RequestEnvelope>>,
280}
281
282impl ChannelHandle {
283 #[must_use]
285 pub(crate) fn new(id: ChannelId, event_tx: mpsc::Sender<InboundEvent>) -> Self {
286 Self {
287 id,
288 event_tx,
289 request_tx: None,
290 }
291 }
292
293 #[must_use]
295 pub fn accepts_requests(&self) -> bool {
296 self.request_tx.is_some()
297 }
298
299 #[must_use]
305 pub fn is_alive(&self) -> bool {
306 !self.event_tx.is_closed()
307 }
308
309 pub(crate) async fn send_request(
319 &self,
320 request: Request,
321 reply_tx: oneshot::Sender<Result<Value, String>>,
322 ) -> Result<(), mpsc::error::SendError<RequestEnvelope>> {
323 match &self.request_tx {
324 Some(tx) => tx.send(RequestEnvelope { request, reply_tx }).await,
325 None => Err(mpsc::error::SendError(RequestEnvelope {
326 request,
327 reply_tx,
328 })),
329 }
330 }
331
332 pub(crate) async fn inject(
336 &self,
337 event: Event,
338 ) -> Result<(), mpsc::error::SendError<InboundEvent>> {
339 self.event_tx.send(InboundEvent::Broadcast(event)).await
340 }
341
342 #[allow(clippy::result_large_err)]
346 pub(crate) fn try_inject(
347 &self,
348 event: Event,
349 ) -> Result<(), mpsc::error::TrySendError<InboundEvent>> {
350 self.event_tx.try_send(InboundEvent::Broadcast(event))
351 }
352
353 pub(crate) async fn inject_direct(
357 &self,
358 event: Event,
359 ) -> Result<(), mpsc::error::SendError<InboundEvent>> {
360 self.event_tx.send(InboundEvent::Direct(event)).await
361 }
362
363 #[allow(clippy::result_large_err)]
367 pub(crate) fn try_inject_direct(
368 &self,
369 event: Event,
370 ) -> Result<(), mpsc::error::TrySendError<InboundEvent>> {
371 self.event_tx.try_send(InboundEvent::Direct(event))
372 }
373}
374
375struct PendingApproval {
380 approval_id: String,
382 grant_pattern: String,
384 original_request: Request,
386}
387
388pub struct ChannelRunner {
400 id: ChannelId,
402 component_id: orcs_types::ComponentId,
404 event_rx: mpsc::Receiver<InboundEvent>,
406 signal_rx: broadcast::Receiver<Signal>,
408 world_tx: mpsc::Sender<WorldCommand>,
410 world: Arc<RwLock<World>>,
412 component: Arc<Mutex<Box<dyn Component>>>,
414 subscriptions: Vec<SubscriptionEntry>,
421 paused_queue: PausedEventQueue,
423 child_spawner: Option<Arc<StdMutex<ChildSpawner>>>,
425 event_tx: Option<mpsc::Sender<InboundEvent>>,
427 request_rx: Option<mpsc::Receiver<RequestEnvelope>>,
433 initial_snapshot: Option<ComponentSnapshot>,
435 shared_handles: Option<SharedChannelHandles>,
437 component_channel_map: Option<crate::engine::SharedComponentChannelMap>,
439 hook_registry: Option<SharedHookRegistry>,
441 component_config: serde_json::Value,
443 grants: Option<Arc<dyn orcs_auth::GrantPolicy>>,
445 io_output_tx: Option<OutputSender>,
447 pending_approval: Option<PendingApproval>,
449}
450
451async fn recv_request(rx: &mut Option<mpsc::Receiver<RequestEnvelope>>) -> Option<RequestEnvelope> {
456 match rx {
457 Some(rx) => rx.recv().await,
458 None => std::future::pending().await,
459 }
460}
461
462impl ChannelRunner {
463 fn dispatch_hook(
468 &self,
469 point: orcs_hook::HookPoint,
470 payload: serde_json::Value,
471 ) -> orcs_hook::HookAction {
472 let Some(registry) = &self.hook_registry else {
473 let ctx = orcs_hook::HookContext::new(
474 point,
475 self.component_id.clone(),
476 self.id,
477 orcs_types::Principal::System,
478 0,
479 payload,
480 );
481 return orcs_hook::HookAction::Continue(Box::new(ctx));
482 };
483
484 let ctx = orcs_hook::HookContext::new(
485 point,
486 self.component_id.clone(),
487 self.id,
488 orcs_types::Principal::System,
489 0,
490 payload,
491 );
492
493 let guard = registry.read().unwrap_or_else(|poisoned| {
494 warn!("hook registry lock poisoned, using inner value");
495 poisoned.into_inner()
496 });
497 guard.dispatch(point, &self.component_id, None, ctx)
498 }
499
500 #[must_use]
502 pub fn hook_registry(&self) -> Option<&SharedHookRegistry> {
503 self.hook_registry.as_ref()
504 }
505
506 #[must_use]
513 pub fn create_child_context(&self, child_id: &str) -> Option<Box<dyn ChildContext>> {
514 let spawner = self.child_spawner.as_ref()?;
515 let event_tx = self.event_tx.as_ref()?;
516
517 let mut ctx = ChildContextImpl::new(
518 child_id,
519 OutputSender::new(event_tx.clone()),
520 Arc::clone(spawner),
521 );
522 ctx = self.inject_rpc(ctx);
523
524 Some(Box::new(ctx))
525 }
526
527 #[must_use]
534 pub fn create_child_context_with_loader(
535 &self,
536 child_id: &str,
537 loader: Arc<dyn LuaChildLoader>,
538 ) -> Option<Box<dyn ChildContext>> {
539 let spawner = self.child_spawner.as_ref()?;
540 let event_tx = self.event_tx.as_ref()?;
541
542 let mut ctx = ChildContextImpl::new(
543 child_id,
544 OutputSender::new(event_tx.clone()),
545 Arc::clone(spawner),
546 )
547 .with_lua_loader(loader);
548 ctx = self.inject_rpc(ctx);
549
550 Some(Box::new(ctx))
551 }
552
553 #[must_use]
559 pub fn create_async_child_context(&self, child_id: &str) -> Option<Box<dyn AsyncChildContext>> {
560 let spawner = self.child_spawner.as_ref()?;
561 let event_tx = self.event_tx.as_ref()?;
562
563 let mut ctx = ChildContextImpl::new(
564 child_id,
565 OutputSender::new(event_tx.clone()),
566 Arc::clone(spawner),
567 );
568 ctx = self.inject_rpc(ctx);
569
570 Some(Box::new(ctx))
571 }
572
573 #[must_use]
580 pub fn create_async_child_context_with_loader(
581 &self,
582 child_id: &str,
583 loader: Arc<dyn LuaChildLoader>,
584 ) -> Option<Box<dyn AsyncChildContext>> {
585 let spawner = self.child_spawner.as_ref()?;
586 let event_tx = self.event_tx.as_ref()?;
587
588 let mut ctx = ChildContextImpl::new(
589 child_id,
590 OutputSender::new(event_tx.clone()),
591 Arc::clone(spawner),
592 )
593 .with_lua_loader(loader);
594 ctx = self.inject_rpc(ctx);
595
596 Some(Box::new(ctx))
597 }
598
599 fn inject_rpc(&self, ctx: ChildContextImpl) -> ChildContextImpl {
602 if let (Some(handles), Some(map)) = (&self.shared_handles, &self.component_channel_map) {
603 ctx.with_rpc_support(handles.clone(), map.clone(), self.id)
604 } else {
605 ctx
606 }
607 }
608
609 #[must_use]
611 pub fn child_spawner(&self) -> Option<&Arc<StdMutex<ChildSpawner>>> {
612 self.child_spawner.as_ref()
613 }
614
615 #[must_use]
617 pub fn id(&self) -> ChannelId {
618 self.id
619 }
620
621 #[must_use]
623 pub fn world_tx(&self) -> &mpsc::Sender<WorldCommand> {
624 &self.world_tx
625 }
626
627 #[must_use]
629 pub fn world(&self) -> &Arc<RwLock<World>> {
630 &self.world
631 }
632
633 #[tracing::instrument(skip_all, level = "info", fields(channel_id = %self.id))]
649 pub async fn run(mut self) -> RunnerResult {
650 info!("ChannelRunner started");
651
652 {
654 let mut comp = self.component.lock().await;
655
656 if let Some(snapshot) = self.initial_snapshot.take() {
658 match comp.restore(&snapshot) {
659 Ok(()) => info!("restored component from initial snapshot"),
660 Err(SnapshotError::NotSupported(_)) => {
661 debug!("component does not support snapshot restore");
662 }
663 Err(e) => {
664 warn!(error = %e, "failed to restore initial snapshot");
665 }
666 }
667 }
668
669 self.dispatch_hook(
671 orcs_hook::HookPoint::ComponentPreInit,
672 serde_json::json!({ "component": comp.id().fqn() }),
673 );
674
675 if let Err(e) = comp.init(&self.component_config) {
676 warn!(error = %e, "component init failed");
677 }
678
679 self.dispatch_hook(
681 orcs_hook::HookPoint::ComponentPostInit,
682 serde_json::json!({ "component": comp.id().fqn() }),
683 );
684 }
685
686 #[allow(unused_assignments)]
689 let mut exit_reason = ExitReason::Signal;
690 loop {
691 tokio::select! {
692 biased;
694
695 signal = self.signal_rx.recv() => {
696 match signal {
697 Ok(sig) => {
698 if !self.handle_signal(sig).await {
699 exit_reason = ExitReason::Signal;
700 break;
701 }
702 }
703 Err(broadcast::error::RecvError::Closed) => {
704 info!("signal channel closed");
705 exit_reason = ExitReason::SignalChannelClosed;
706 break;
707 }
708 Err(broadcast::error::RecvError::Lagged(n)) => {
709 warn!(lagged = n, "signal receiver lagged");
710 }
711 }
712 }
713
714 Some(envelope) = recv_request(&mut self.request_rx) => {
718 self.handle_rpc_request(envelope).await;
719 }
720
721 event = self.event_rx.recv() => {
722 match event {
723 Some(evt) => {
724 if !self.handle_event(evt).await {
725 exit_reason = ExitReason::ComponentStopped;
726 break;
727 }
728 }
729 None => {
730 info!("event channel closed");
731 exit_reason = ExitReason::EventChannelClosed;
732 break;
733 }
734 }
735 }
736 }
737
738 if !is_channel_active(&self.world, self.id).await {
740 debug!("channel no longer active");
741 exit_reason = ExitReason::ChannelInactive;
742 break;
743 }
744 }
745
746 let (component_fqn, snapshot) = {
748 let mut comp = self.component.lock().await;
749 let fqn = comp.id().fqn();
750
751 let snapshot = match comp.snapshot() {
753 Ok(s) => {
754 debug!(component = %fqn, "captured shutdown snapshot");
755 Some(s)
756 }
757 Err(SnapshotError::NotSupported(_)) => None,
758 Err(e) => {
759 warn!(component = %fqn, error = %e, "snapshot failed during shutdown");
760 None
761 }
762 };
763
764 self.dispatch_hook(
766 orcs_hook::HookPoint::ComponentPreShutdown,
767 serde_json::json!({ "component": &fqn }),
768 );
769
770 comp.shutdown();
771
772 self.dispatch_hook(
773 orcs_hook::HookPoint::ComponentPostShutdown,
774 serde_json::json!({ "component": &fqn }),
775 );
776 debug!(component = %fqn, "component shutdown complete");
777
778 (fqn, snapshot)
779 };
780
781 info!(exit_reason = %exit_reason, "ChannelRunner stopped");
782
783 RunnerResult {
784 channel_id: self.id,
785 component_fqn: Cow::Owned(component_fqn),
786 snapshot,
787 exit_reason,
788 }
789 }
790
791 async fn handle_signal(&mut self, signal: Signal) -> bool {
795 debug!(signal_kind = ?signal.kind, "received signal");
796
797 if !signal.affects_channel(self.id) {
799 return true;
800 }
801
802 let pre_payload = serde_json::json!({
804 "signal_kind": format!("{:?}", signal.kind),
805 "signal_scope": format!("{:?}", signal.scope),
806 });
807 let pre_action = self.dispatch_hook(orcs_hook::HookPoint::SignalPreDispatch, pre_payload);
808 match pre_action {
809 orcs_hook::HookAction::Skip(_) => {
810 debug!("signal skipped by pre-dispatch hook");
811 return true;
812 }
813 orcs_hook::HookAction::Abort { reason } => {
814 warn!(reason = %reason, "signal aborted by pre-dispatch hook");
815 return true;
816 }
817 orcs_hook::HookAction::Continue(_) | orcs_hook::HookAction::Replace(_) => {}
818 }
819
820 if let Some(spawner) = &self.child_spawner {
822 if let Ok(mut s) = spawner.lock() {
823 s.propagate_signal(&signal);
824 }
825 }
826
827 let component_action = dispatch_signal_to_component(&signal, &self.component).await;
829 if let SignalAction::Stop { reason } = component_action {
830 info!(reason = %reason, "component requested stop");
831 self.abort_all_children();
833 send_abort(&self.world_tx, self.id, &reason).await;
834 return false;
835 }
836
837 let action = determine_channel_action(&signal.kind);
839 match action {
840 SignalAction::Stop { reason } => {
841 info!(reason = %reason, "stopping channel");
842 self.abort_all_children();
844 send_abort(&self.world_tx, self.id, &reason).await;
845 return false;
846 }
847 SignalAction::Transition(transition) => {
848 let is_resolve = matches!(transition, StateTransition::ResolveApproval { .. });
849 let accepted = send_transition(&self.world_tx, self.id, transition.clone()).await;
850
851 if matches!(transition, StateTransition::Resume) {
853 self.drain_paused_queue().await;
854 }
855
856 if is_resolve && accepted {
858 self.handle_approval_resolved().await;
859 }
860 }
861 SignalAction::Continue => {
862 if let orcs_event::SignalKind::Reject {
864 approval_id,
865 reason,
866 } = &signal.kind
867 {
868 if let Some(pending) = &self.pending_approval {
869 if pending.approval_id == *approval_id {
870 info!(
871 approval_id = %approval_id,
872 "approval rejected, clearing pending"
873 );
874 send_abort(
876 &self.world_tx,
877 self.id,
878 reason.as_deref().unwrap_or("rejected by user"),
879 )
880 .await;
881 self.pending_approval = None;
882
883 if let Some(io_tx) = &self.io_output_tx {
885 let event = Event {
886 category: EventCategory::Output,
887 operation: "output".to_string(),
888 source: self.component_id.clone(),
889 payload: serde_json::json!({
890 "message": format!(
891 "Rejected: {}",
892 reason.as_deref().unwrap_or("no reason")
893 ),
894 "level": "warn",
895 }),
896 };
897 let _ = io_tx.try_send_direct(event);
898 }
899 }
900 }
901 }
902 }
903 }
904
905 let post_payload = serde_json::json!({
907 "signal_kind": format!("{:?}", signal.kind),
908 "handled": true,
909 });
910 let _post_action =
911 self.dispatch_hook(orcs_hook::HookPoint::SignalPostDispatch, post_payload);
912
913 true
914 }
915
916 async fn handle_approval_resolved(&mut self) {
924 let pending = match self.pending_approval.take() {
925 Some(p) => p,
926 None => {
927 debug!("ResolveApproval accepted but no pending approval stored");
928 return;
929 }
930 };
931
932 info!(
933 approval_id = %pending.approval_id,
934 grant_pattern = %pending.grant_pattern,
935 "approval resolved, granting pattern and re-dispatching"
936 );
937
938 if let Some(grants) = &self.grants {
940 if let Err(e) =
941 grants.grant(orcs_auth::CommandGrant::persistent(&pending.grant_pattern))
942 {
943 warn!(
944 error = %e,
945 pattern = %pending.grant_pattern,
946 "failed to grant pattern after approval"
947 );
948 }
949 } else {
950 warn!("no GrantPolicy configured, cannot grant pattern");
951 }
952
953 if let Some(io_tx) = &self.io_output_tx {
956 let event = Event {
957 category: EventCategory::Output,
958 operation: "output".to_string(),
959 source: self.component_id.clone(),
960 payload: serde_json::json!({
961 "message": format!("Approved: {}", pending.approval_id),
962 "level": "info",
963 }),
964 };
965 let _ = io_tx.try_send_direct(event);
966 }
967
968 let result = {
970 let mut comp = self.component.lock().await;
971 comp.on_request(&pending.original_request)
972 };
973
974 match result {
975 Ok(response) => {
976 debug!(response = ?response, "re-dispatched request succeeded after approval");
977 }
978 Err(ComponentError::Suspended {
979 approval_id: new_approval_id,
980 grant_pattern: new_grant_pattern,
981 pending_request: new_pending_request,
982 }) => {
983 info!(
987 approval_id = %new_approval_id,
988 grant_pattern = %new_grant_pattern,
989 "re-dispatch triggered cascading approval"
990 );
991
992 self.pending_approval = Some(PendingApproval {
993 approval_id: new_approval_id.clone(),
994 grant_pattern: new_grant_pattern,
995 original_request: pending.original_request,
996 });
997
998 send_transition(
999 &self.world_tx,
1000 self.id,
1001 StateTransition::AwaitApproval {
1002 request_id: new_approval_id.clone(),
1003 },
1004 )
1005 .await;
1006
1007 let description = new_pending_request
1008 .get("description")
1009 .and_then(|v| v.as_str())
1010 .unwrap_or("command execution")
1011 .to_string();
1012 let command = new_pending_request
1013 .get("command")
1014 .and_then(|v| v.as_str())
1015 .unwrap_or("")
1016 .to_string();
1017
1018 if let Some(io_tx) = &self.io_output_tx {
1019 let output_event = Event {
1020 category: EventCategory::Output,
1021 operation: "approval_request".to_string(),
1022 source: self.component_id.clone(),
1023 payload: serde_json::json!({
1024 "type": "approval_request",
1025 "approval_id": new_approval_id,
1026 "operation": "exec",
1027 "description": format!("{}: {}", description, command),
1028 "source": self.component_id.fqn(),
1029 }),
1030 };
1031 let _ = io_tx.try_send_direct(output_event);
1032 }
1033 }
1034 Err(e) => {
1035 warn!(error = %e, "re-dispatched request failed after approval");
1036 }
1037 }
1038 }
1039
1040 fn abort_all_children(&self) {
1042 if let Some(spawner) = &self.child_spawner {
1043 if let Ok(mut s) = spawner.lock() {
1044 s.abort_all();
1045 debug!("aborted all children");
1046 }
1047 }
1048 }
1049
1050 async fn handle_event(&mut self, inbound: InboundEvent) -> bool {
1056 let is_direct = inbound.is_direct();
1057 let event = inbound.into_event();
1058
1059 debug!(
1060 category = ?event.category,
1061 operation = %event.operation,
1062 direct = is_direct,
1063 "received event"
1064 );
1065
1066 if !is_direct
1070 && !self
1071 .subscriptions
1072 .iter()
1073 .any(|s| s.matches(&event.category, &event.operation))
1074 {
1075 debug!(category = ?event.category, operation = %event.operation, "skipping event (not subscribed)");
1076 return true;
1077 }
1078
1079 if is_channel_paused(&self.world, self.id).await {
1081 self.paused_queue
1082 .try_enqueue(event, "ChannelRunner", self.id);
1083 return true;
1084 }
1085
1086 self.process_event(event, is_direct).await;
1087 true
1088 }
1089
1090 async fn process_event(&mut self, event: Event, _is_direct: bool) {
1096 let pre_payload = serde_json::json!({
1098 "category": format!("{:?}", event.category),
1099 "operation": &event.operation,
1100 "source": event.source.fqn(),
1101 "payload": &event.payload,
1102 });
1103 let pre_action = self.dispatch_hook(orcs_hook::HookPoint::RequestPreDispatch, pre_payload);
1104 let request_payload = match pre_action {
1105 orcs_hook::HookAction::Continue(ctx) => {
1106 ctx.payload.get("payload").cloned().unwrap_or(event.payload)
1108 }
1109 orcs_hook::HookAction::Skip(value) => {
1110 debug!(value = ?value, "request skipped by pre-dispatch hook");
1111 return;
1112 }
1113 orcs_hook::HookAction::Abort { reason } => {
1114 warn!(reason = %reason, "request aborted by pre-dispatch hook");
1115 return;
1116 }
1117 orcs_hook::HookAction::Replace(_) => {
1118 event.payload
1120 }
1121 };
1122
1123 let is_user_input = event.category == EventCategory::UserInput;
1125 let event_operation = event.operation.clone();
1126
1127 let request = Request::new(
1128 event.category,
1129 &event_operation,
1130 event.source,
1131 self.id,
1132 request_payload,
1133 );
1134
1135 if is_user_input {
1137 if let Some(io_tx) = &self.io_output_tx {
1138 let notify = Event {
1139 category: EventCategory::Output,
1140 operation: "processing".to_string(),
1141 source: self.component_id.clone(),
1142 payload: serde_json::json!({
1143 "type": "processing",
1144 "component": &self.component_id.name,
1145 "operation": &event_operation,
1146 }),
1147 };
1148 let _ = io_tx.try_send_direct(notify);
1149 }
1150 }
1151
1152 let result = {
1153 let mut comp = self.component.lock().await;
1154 comp.on_request(&request)
1155 };
1156
1157 let post_payload = match &result {
1159 Ok(response) => serde_json::json!({
1160 "operation": &event_operation,
1161 "response": response,
1162 "success": true,
1163 }),
1164 Err(e) => serde_json::json!({
1165 "operation": &event_operation,
1166 "error": e.to_string(),
1167 "success": false,
1168 }),
1169 };
1170 let _post_action =
1171 self.dispatch_hook(orcs_hook::HookPoint::RequestPostDispatch, post_payload);
1172
1173 match result {
1174 Ok(response) => {
1175 debug!(response = ?response, "component returned success");
1176 }
1177 Err(ComponentError::Suspended {
1178 approval_id,
1179 grant_pattern,
1180 pending_request,
1181 }) => {
1182 info!(
1183 approval_id = %approval_id,
1184 grant_pattern = %grant_pattern,
1185 "component suspended pending approval"
1186 );
1187
1188 self.pending_approval = Some(PendingApproval {
1190 approval_id: approval_id.clone(),
1191 grant_pattern,
1192 original_request: request,
1193 });
1194
1195 send_transition(
1197 &self.world_tx,
1198 self.id,
1199 StateTransition::AwaitApproval {
1200 request_id: approval_id.clone(),
1201 },
1202 )
1203 .await;
1204
1205 let description = pending_request
1207 .get("description")
1208 .and_then(|v| v.as_str())
1209 .unwrap_or("command execution")
1210 .to_string();
1211 let command = pending_request
1212 .get("command")
1213 .and_then(|v| v.as_str())
1214 .unwrap_or("")
1215 .to_string();
1216
1217 let output_event = Event {
1218 category: EventCategory::Output,
1219 operation: "approval_request".to_string(),
1220 source: self.component_id.clone(),
1221 payload: serde_json::json!({
1222 "type": "approval_request",
1223 "approval_id": approval_id,
1224 "operation": "exec",
1225 "description": format!("{}: {}", description, command),
1226 "source": self.component_id.fqn(),
1227 }),
1228 };
1229 if let Some(io_tx) = &self.io_output_tx {
1230 let _ = io_tx.try_send_direct(output_event);
1231 }
1232 }
1233 Err(ComponentError::Aborted) => {
1234 info!("component aborted");
1235 }
1236 Err(e) => {
1237 error!(error = %e, "component returned error");
1238 }
1239 }
1240 }
1241
1242 async fn drain_paused_queue(&mut self) {
1247 let events: Vec<_> = self.paused_queue.drain("ChannelRunner", self.id).collect();
1249
1250 for event in events {
1251 self.process_event(event, true).await;
1252 }
1253 }
1254
1255 async fn handle_rpc_request(&self, envelope: RequestEnvelope) {
1260 debug!(
1261 request_id = %envelope.request.id,
1262 operation = %envelope.request.operation,
1263 source = %envelope.request.source,
1264 "handling RPC request"
1265 );
1266
1267 let pre_payload = serde_json::json!({
1269 "request_id": envelope.request.id.to_string(),
1270 "operation": &envelope.request.operation,
1271 "source": envelope.request.source.fqn(),
1272 "payload": &envelope.request.payload,
1273 });
1274 let pre_action = self.dispatch_hook(orcs_hook::HookPoint::RequestPreDispatch, pre_payload);
1275 match &pre_action {
1276 orcs_hook::HookAction::Skip(value) => {
1277 debug!(value = ?value, "RPC request skipped by pre-dispatch hook");
1278 let response = Ok(value.clone());
1279 if envelope.reply_tx.send(response).is_err() {
1280 debug!("RPC reply dropped (caller cancelled)");
1281 }
1282 return;
1283 }
1284 orcs_hook::HookAction::Abort { reason } => {
1285 warn!(reason = %reason, "RPC request aborted by pre-dispatch hook");
1286 let response = Err(reason.clone());
1287 if envelope.reply_tx.send(response).is_err() {
1288 debug!("RPC reply dropped (caller cancelled)");
1289 }
1290 return;
1291 }
1292 orcs_hook::HookAction::Continue(_) | orcs_hook::HookAction::Replace(_) => {}
1293 }
1294
1295 let result = {
1296 let mut comp = self.component.lock().await;
1297 comp.on_request(&envelope.request)
1298 };
1299
1300 let post_payload = match &result {
1302 Ok(response) => serde_json::json!({
1303 "operation": &envelope.request.operation,
1304 "response": response,
1305 "success": true,
1306 }),
1307 Err(e) => serde_json::json!({
1308 "operation": &envelope.request.operation,
1309 "error": e.to_string(),
1310 "success": false,
1311 }),
1312 };
1313 let post_action =
1314 self.dispatch_hook(orcs_hook::HookPoint::RequestPostDispatch, post_payload);
1315
1316 let final_result = match post_action {
1318 orcs_hook::HookAction::Replace(value) => Ok(value),
1319 _ => result.map_err(|e| e.to_string()),
1320 };
1321
1322 if envelope.reply_tx.send(final_result).is_err() {
1323 debug!("RPC reply dropped (caller cancelled)");
1324 }
1325 }
1326
1327 pub async fn spawn_child(
1331 &self,
1332 config: ChannelConfig,
1333 signal_rx: broadcast::Receiver<Signal>,
1334 component: Box<dyn Component>,
1335 ) -> Option<(ChannelRunner, ChannelHandle)> {
1336 let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
1337 let cmd = WorldCommand::Spawn {
1338 parent: self.id,
1339 config,
1340 reply: reply_tx,
1341 };
1342
1343 if self.world_tx.send(cmd).await.is_err() {
1344 return None;
1345 }
1346
1347 let child_id = reply_rx.await.ok()??;
1348 let (runner, handle) = ChannelRunner::builder(
1349 child_id,
1350 self.world_tx.clone(),
1351 Arc::clone(&self.world),
1352 signal_rx,
1353 component,
1354 )
1355 .build();
1356
1357 Some((runner, handle))
1358 }
1359}
1360
1361pub struct ChannelRunnerBuilder {
1387 id: ChannelId,
1388 world_tx: mpsc::Sender<WorldCommand>,
1389 world: Arc<RwLock<World>>,
1390 signal_rx: broadcast::Receiver<Signal>,
1391 component: Box<dyn Component>,
1392 emitter_signal_tx: Option<broadcast::Sender<Signal>>,
1394 output_tx: Option<OutputSender>,
1396 enable_child_spawner: bool,
1398 lua_loader: Option<Arc<dyn LuaChildLoader>>,
1400 component_loader: Option<Arc<dyn ComponentLoader>>,
1402 session: Option<Arc<Session>>,
1404 checker: Option<Arc<dyn PermissionChecker>>,
1406 grants: Option<Arc<dyn orcs_auth::GrantPolicy>>,
1408 shared_handles: Option<SharedChannelHandles>,
1410 board: Option<crate::board::SharedBoard>,
1412 initial_snapshot: Option<ComponentSnapshot>,
1414 enable_request_channel: bool,
1416 component_channel_map: Option<crate::engine::SharedComponentChannelMap>,
1418 hook_registry: Option<SharedHookRegistry>,
1420 mcp_manager: Option<Arc<orcs_mcp::McpClientManager>>,
1422 component_config: serde_json::Value,
1424}
1425
1426impl ChannelRunnerBuilder {
1427 #[must_use]
1429 pub fn new(
1430 id: ChannelId,
1431 world_tx: mpsc::Sender<WorldCommand>,
1432 world: Arc<RwLock<World>>,
1433 signal_rx: broadcast::Receiver<Signal>,
1434 component: Box<dyn Component>,
1435 ) -> Self {
1436 Self {
1437 id,
1438 world_tx,
1439 world,
1440 signal_rx,
1441 component,
1442 emitter_signal_tx: None,
1443 output_tx: None,
1444 enable_child_spawner: false,
1445 lua_loader: None,
1446 component_loader: None,
1447 session: None,
1448 checker: None,
1449 grants: None,
1450 shared_handles: None,
1451 board: None,
1452 initial_snapshot: None,
1453 enable_request_channel: false,
1454 component_channel_map: None,
1455 hook_registry: None,
1456 mcp_manager: None,
1457 component_config: serde_json::Value::Object(serde_json::Map::new()),
1458 }
1459 }
1460
1461 #[must_use]
1465 pub fn with_emitter(mut self, signal_tx: broadcast::Sender<Signal>) -> Self {
1466 self.emitter_signal_tx = Some(signal_tx);
1467 self
1468 }
1469
1470 #[must_use]
1481 pub fn with_output_channel(mut self, output_tx: OutputSender) -> Self {
1482 self.output_tx = Some(output_tx);
1483 self
1484 }
1485
1486 #[must_use]
1495 pub fn with_child_spawner(mut self, loader: Option<Arc<dyn LuaChildLoader>>) -> Self {
1496 self.enable_child_spawner = true;
1497 self.lua_loader = loader;
1498 self
1499 }
1500
1501 #[must_use]
1509 pub fn with_component_loader(mut self, loader: Arc<dyn ComponentLoader>) -> Self {
1510 self.component_loader = Some(loader);
1511 self
1512 }
1513
1514 #[must_use]
1526 pub fn with_session(mut self, session: Session) -> Self {
1527 self.session = Some(Arc::new(session));
1528 self
1529 }
1530
1531 #[must_use]
1540 pub fn with_session_arc(mut self, session: Arc<Session>) -> Self {
1541 self.session = Some(session);
1542 self
1543 }
1544
1545 #[must_use]
1551 pub fn with_checker(mut self, checker: Arc<dyn PermissionChecker>) -> Self {
1552 self.checker = Some(checker);
1553 self
1554 }
1555
1556 #[must_use]
1562 pub fn with_grants(mut self, grants: Arc<dyn orcs_auth::GrantPolicy>) -> Self {
1563 self.grants = Some(grants);
1564 self
1565 }
1566
1567 #[must_use]
1572 pub fn with_shared_handles(mut self, handles: SharedChannelHandles) -> Self {
1573 self.shared_handles = Some(handles);
1574 self
1575 }
1576
1577 #[must_use]
1582 pub fn with_component_channel_map(
1583 mut self,
1584 map: crate::engine::SharedComponentChannelMap,
1585 ) -> Self {
1586 self.component_channel_map = Some(map);
1587 self
1588 }
1589
1590 #[must_use]
1595 pub fn with_board(mut self, board: crate::board::SharedBoard) -> Self {
1596 self.board = Some(board);
1597 self
1598 }
1599
1600 #[must_use]
1610 pub fn with_initial_snapshot(mut self, snapshot: ComponentSnapshot) -> Self {
1611 self.initial_snapshot = Some(snapshot);
1612 self
1613 }
1614
1615 #[must_use]
1622 pub fn with_request_channel(mut self) -> Self {
1623 self.enable_request_channel = true;
1624 self
1625 }
1626
1627 #[must_use]
1632 pub fn with_hook_registry(mut self, registry: SharedHookRegistry) -> Self {
1633 self.hook_registry = Some(registry);
1634 self
1635 }
1636
1637 #[must_use]
1642 pub fn with_mcp_manager(mut self, manager: Arc<orcs_mcp::McpClientManager>) -> Self {
1643 self.mcp_manager = Some(manager);
1644 self
1645 }
1646
1647 #[must_use]
1651 pub fn with_component_config(mut self, config: serde_json::Value) -> Self {
1652 self.component_config = config;
1653 self
1654 }
1655
1656 fn configure_context(
1661 &mut self,
1662 mut ctx: ChildContextImpl,
1663 io_output_tx: &Option<OutputSender>,
1664 component_id: &str,
1665 rpc_handles: &Option<SharedChannelHandles>,
1666 rpc_map: &Option<crate::engine::SharedComponentChannelMap>,
1667 channel_id: ChannelId,
1668 ) -> ChildContextImpl {
1669 if let Some(session) = self.session.take() {
1670 ctx = ctx.with_session_arc(session);
1671 info!("ChannelRunnerBuilder: enabled session for {}", component_id);
1672 }
1673 if let Some(checker) = self.checker.take() {
1674 ctx = ctx.with_checker(checker);
1675 info!(
1676 "ChannelRunnerBuilder: enabled permission checker for {}",
1677 component_id
1678 );
1679 }
1680 if let Some(grants) = &self.grants {
1681 ctx = ctx.with_grants(Arc::clone(grants));
1682 info!(
1683 "ChannelRunnerBuilder: enabled grant store for {}",
1684 component_id
1685 );
1686 }
1687 if let Some(io_tx) = io_output_tx.clone() {
1688 ctx = ctx.with_io_output_channel(io_tx);
1689 info!(
1690 "ChannelRunnerBuilder: enabled IO output routing for {}",
1691 component_id
1692 );
1693 }
1694 if let (Some(handles), Some(map)) = (rpc_handles.clone(), rpc_map.clone()) {
1695 ctx = ctx.with_rpc_support(handles, map, channel_id);
1696 }
1697 if let Some(reg) = &self.hook_registry {
1698 ctx = ctx.with_hook_registry(Arc::clone(reg));
1699 }
1700 if let Some(mgr) = &self.mcp_manager {
1701 ctx = ctx.with_mcp_manager(Arc::clone(mgr));
1702 }
1703 ctx
1704 }
1705
1706 #[must_use]
1708 pub fn build(mut self) -> (ChannelRunner, ChannelHandle) {
1709 let (event_tx, event_rx) = mpsc::channel(EVENT_BUFFER_SIZE);
1710
1711 let component_id = self.component.id().clone();
1713
1714 let io_output_tx = self.output_tx.as_ref().cloned();
1716
1717 let rpc_handles = self.shared_handles.clone();
1719 let rpc_map = self.component_channel_map.clone();
1720
1721 if let Some(signal_tx) = &self.emitter_signal_tx {
1723 let component_id = self.component.id().clone();
1724 let mut emitter = EventEmitter::new(
1725 OutputSender::new(event_tx.clone()),
1726 signal_tx.clone(),
1727 component_id.clone(),
1728 );
1729
1730 if let Some(output_tx) = self.output_tx.take() {
1732 emitter = emitter.with_output_channel(output_tx);
1733 info!(
1734 "ChannelRunnerBuilder: routing output to IO channel for {}",
1735 component_id.fqn()
1736 );
1737 }
1738
1739 if let Some(handles) = self.shared_handles.take() {
1741 emitter = emitter.with_shared_handles(handles);
1742 info!(
1743 "ChannelRunnerBuilder: enabled event broadcast for {}",
1744 component_id.fqn()
1745 );
1746 }
1747
1748 if let Some(map) = self.component_channel_map.take() {
1750 emitter = emitter.with_component_channel_map(map, self.id);
1751 }
1752
1753 if let Some(board) = self.board.take() {
1755 emitter = emitter.with_board(board);
1756 }
1757
1758 self.component.set_emitter(Box::new(emitter));
1759
1760 info!(
1761 "ChannelRunnerBuilder: injected emitter for {}",
1762 component_id.fqn()
1763 );
1764 }
1765
1766 let child_spawner = if self.enable_child_spawner {
1768 let component_id = self.component.id().fqn();
1769 let output_sender = OutputSender::new(event_tx.clone());
1770 let spawner = ChildSpawner::new(&component_id, output_sender.clone());
1771 let spawner_arc = Arc::new(StdMutex::new(spawner));
1772
1773 let mut ctx =
1775 ChildContextImpl::new(&component_id, output_sender, Arc::clone(&spawner_arc));
1776
1777 if let Some(loader) = self.lua_loader.take() {
1779 ctx = ctx.with_lua_loader(loader);
1780 info!(
1781 "ChannelRunnerBuilder: created spawner with Lua loader for {}",
1782 component_id
1783 );
1784 } else {
1785 info!(
1786 "ChannelRunnerBuilder: created spawner (no Lua loader) for {}",
1787 component_id
1788 );
1789 }
1790
1791 if let Some(signal_tx) = &self.emitter_signal_tx {
1793 ctx = ctx.with_runner_support(
1794 self.world_tx.clone(),
1795 Arc::clone(&self.world),
1796 signal_tx.clone(),
1797 );
1798 info!(
1799 "ChannelRunnerBuilder: enabled runner spawning for {}",
1800 component_id
1801 );
1802 }
1803
1804 if let Some(loader) = self.component_loader.take() {
1806 ctx = ctx.with_component_loader(loader);
1807 info!(
1808 "ChannelRunnerBuilder: enabled component loader for {}",
1809 component_id
1810 );
1811 }
1812
1813 ctx = self.configure_context(
1815 ctx,
1816 &io_output_tx,
1817 &component_id,
1818 &rpc_handles,
1819 &rpc_map,
1820 self.id,
1821 );
1822
1823 self.component.set_child_context(Box::new(ctx));
1824
1825 Some(spawner_arc)
1826 } else if self.session.is_some() || self.checker.is_some() || self.grants.is_some() {
1827 let component_id = self.component.id().fqn();
1829 let dummy_output = OutputSender::new(event_tx.clone());
1830 let dummy_spawner = ChildSpawner::new(&component_id, dummy_output.clone());
1831 let dummy_arc = Arc::new(StdMutex::new(dummy_spawner));
1832 let mut ctx =
1833 ChildContextImpl::new(&component_id, dummy_output, Arc::clone(&dummy_arc));
1834
1835 ctx = self.configure_context(
1836 ctx,
1837 &io_output_tx,
1838 &component_id,
1839 &rpc_handles,
1840 &rpc_map,
1841 self.id,
1842 );
1843
1844 self.component.set_child_context(Box::new(ctx));
1845 info!(
1846 "ChannelRunnerBuilder: auth-only context injected for {}",
1847 component_id
1848 );
1849 None
1850 } else {
1851 None
1852 };
1853
1854 let event_tx_for_context = if self.enable_child_spawner || self.emitter_signal_tx.is_some()
1856 {
1857 Some(event_tx.clone())
1858 } else {
1859 None
1860 };
1861
1862 let subscriptions = self.component.subscription_entries();
1865
1866 let (request_tx, request_rx) = if self.enable_request_channel {
1868 let (tx, rx) = mpsc::channel(REQUEST_BUFFER_SIZE);
1869 (Some(tx), Some(rx))
1870 } else {
1871 (None, None)
1872 };
1873
1874 let runner = ChannelRunner {
1875 id: self.id,
1876 component_id,
1877 event_rx,
1878 signal_rx: self.signal_rx,
1879 world_tx: self.world_tx,
1880 world: self.world,
1881 component: Arc::new(Mutex::new(self.component)),
1882 subscriptions,
1883 paused_queue: PausedEventQueue::new(),
1884 child_spawner,
1885 event_tx: event_tx_for_context,
1886 request_rx,
1887 initial_snapshot: self.initial_snapshot,
1888 shared_handles: rpc_handles,
1889 component_channel_map: rpc_map,
1890 hook_registry: self.hook_registry,
1891 component_config: self.component_config,
1892 grants: self.grants.clone(),
1893 io_output_tx: io_output_tx.clone(),
1894 pending_approval: None,
1895 };
1896
1897 let mut handle = ChannelHandle::new(self.id, event_tx);
1898 handle.request_tx = request_tx;
1899
1900 (runner, handle)
1901 }
1902}
1903
1904impl ChannelRunner {
1905 #[must_use]
1918 pub fn builder(
1919 id: ChannelId,
1920 world_tx: mpsc::Sender<WorldCommand>,
1921 world: Arc<RwLock<World>>,
1922 signal_rx: broadcast::Receiver<Signal>,
1923 component: Box<dyn Component>,
1924 ) -> ChannelRunnerBuilder {
1925 ChannelRunnerBuilder::new(id, world_tx, world, signal_rx, component)
1926 }
1927}
1928
1929#[cfg(test)]
1930mod tests {
1931 use super::*;
1932 use crate::channel::manager::WorldManager;
1933 use crate::channel::ChannelConfig;
1934 use orcs_component::{ComponentError, Status};
1935 use orcs_event::{EventCategory, SignalResponse};
1936 use orcs_types::{ComponentId, Principal};
1937 use serde_json::Value;
1938
1939 struct MockComponent {
1941 id: ComponentId,
1942 status: Status,
1943 }
1944
1945 impl MockComponent {
1946 fn new(name: &str) -> Self {
1947 Self {
1948 id: ComponentId::builtin(name),
1949 status: Status::Idle,
1950 }
1951 }
1952 }
1953
1954 impl Component for MockComponent {
1955 fn id(&self) -> &ComponentId {
1956 &self.id
1957 }
1958
1959 fn status(&self) -> Status {
1960 self.status
1961 }
1962
1963 fn subscriptions(&self) -> &[EventCategory] {
1964 &[EventCategory::Echo, EventCategory::Lifecycle]
1965 }
1966
1967 fn on_request(&mut self, request: &Request) -> Result<Value, ComponentError> {
1968 Ok(request.payload.clone())
1969 }
1970
1971 fn on_signal(&mut self, signal: &Signal) -> SignalResponse {
1972 if signal.is_veto() {
1973 self.status = Status::Aborted;
1974 SignalResponse::Abort
1975 } else {
1976 SignalResponse::Handled
1977 }
1978 }
1979
1980 fn abort(&mut self) {
1981 self.status = Status::Aborted;
1982 }
1983 }
1984
1985 fn mock_component() -> Box<dyn Component> {
1986 Box::new(MockComponent::new("test"))
1987 }
1988
1989 async fn setup() -> (
1990 tokio::task::JoinHandle<()>,
1991 mpsc::Sender<WorldCommand>,
1992 Arc<RwLock<World>>,
1993 broadcast::Sender<Signal>,
1994 ChannelId,
1995 ) {
1996 let mut world = World::new();
1997 let io = world.create_channel(ChannelConfig::interactive());
1998
1999 let (manager, world_tx) = WorldManager::with_world(world);
2000 let world_handle = manager.world();
2001
2002 let manager_task = tokio::spawn(manager.run());
2003
2004 let (signal_tx, _) = broadcast::channel(64);
2005
2006 (manager_task, world_tx, world_handle, signal_tx, io)
2007 }
2008
2009 async fn teardown(
2010 manager_task: tokio::task::JoinHandle<()>,
2011 world_tx: mpsc::Sender<WorldCommand>,
2012 ) {
2013 let _ = world_tx.send(WorldCommand::Shutdown).await;
2014 let _ = manager_task.await;
2015 }
2016
2017 #[tokio::test]
2018 async fn runner_creation() {
2019 let (manager_task, world_tx, world, signal_tx, primary) = setup().await;
2020
2021 let signal_rx = signal_tx.subscribe();
2022 let (runner, handle) = ChannelRunner::builder(
2023 primary,
2024 world_tx.clone(),
2025 world,
2026 signal_rx,
2027 mock_component(),
2028 )
2029 .build();
2030
2031 assert_eq!(runner.id(), primary);
2032 assert_eq!(handle.id, primary);
2033
2034 teardown(manager_task, world_tx).await;
2035 }
2036
2037 #[tokio::test]
2038 async fn runner_receives_events() {
2039 let (manager_task, world_tx, world, signal_tx, primary) = setup().await;
2040
2041 let signal_rx = signal_tx.subscribe();
2042 let (runner, handle) = ChannelRunner::builder(
2043 primary,
2044 world_tx.clone(),
2045 world,
2046 signal_rx,
2047 mock_component(),
2048 )
2049 .build();
2050
2051 let runner_task = tokio::spawn(runner.run());
2052
2053 let event = Event {
2054 category: EventCategory::Echo,
2055 operation: "echo".to_string(),
2056 source: ComponentId::builtin("test"),
2057 payload: serde_json::json!({"test": true}),
2058 };
2059 handle
2060 .inject(event)
2061 .await
2062 .expect("inject echo event into runner");
2063
2064 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
2065
2066 signal_tx
2067 .send(Signal::cancel(primary, Principal::System))
2068 .expect("send cancel signal");
2069
2070 let _ = tokio::time::timeout(std::time::Duration::from_millis(100), runner_task).await;
2071
2072 teardown(manager_task, world_tx).await;
2073 }
2074
2075 #[tokio::test]
2076 async fn runner_handles_veto() {
2077 let (manager_task, world_tx, world, signal_tx, primary) = setup().await;
2078
2079 let signal_rx = signal_tx.subscribe();
2080 let (runner, _handle) = ChannelRunner::builder(
2081 primary,
2082 world_tx.clone(),
2083 world.clone(),
2084 signal_rx,
2085 mock_component(),
2086 )
2087 .build();
2088
2089 let runner_task = tokio::spawn(runner.run());
2090
2091 tokio::task::yield_now().await;
2092
2093 signal_tx
2094 .send(Signal::veto(Principal::System))
2095 .expect("send veto signal");
2096
2097 let result = tokio::time::timeout(std::time::Duration::from_millis(100), runner_task).await;
2098 assert!(result.is_ok());
2099
2100 teardown(manager_task, world_tx).await;
2101 }
2102
2103 #[tokio::test]
2104 async fn channel_handle_clone() {
2105 let (manager_task, world_tx, world, signal_tx, primary) = setup().await;
2106
2107 let signal_rx = signal_tx.subscribe();
2108 let (_runner, handle) = ChannelRunner::builder(
2109 primary,
2110 world_tx.clone(),
2111 world,
2112 signal_rx,
2113 mock_component(),
2114 )
2115 .build();
2116
2117 let handle2 = handle.clone();
2118 assert_eq!(handle.id, handle2.id);
2119
2120 teardown(manager_task, world_tx).await;
2121 }
2122
2123 #[tokio::test]
2124 async fn runner_with_emitter_creation() {
2125 let (manager_task, world_tx, world, signal_tx, primary) = setup().await;
2126
2127 let signal_rx = signal_tx.subscribe();
2128 let (runner, handle) = ChannelRunner::builder(
2129 primary,
2130 world_tx.clone(),
2131 world,
2132 signal_rx,
2133 mock_component(),
2134 )
2135 .with_emitter(signal_tx.clone())
2136 .build();
2137
2138 assert_eq!(runner.id(), primary);
2139 assert_eq!(handle.id, primary);
2140
2141 teardown(manager_task, world_tx).await;
2142 }
2143
2144 #[tokio::test]
2145 async fn runner_with_emitter_receives_emitted_events() {
2146 use std::sync::atomic::{AtomicUsize, Ordering};
2147 use std::sync::Arc as StdArc;
2148
2149 struct EmittingComponent {
2151 id: ComponentId,
2152 emitter: Option<Box<dyn orcs_component::Emitter>>,
2153 call_count: StdArc<AtomicUsize>,
2154 }
2155
2156 impl EmittingComponent {
2157 fn new(call_count: StdArc<AtomicUsize>) -> Self {
2158 Self {
2159 id: ComponentId::builtin("emitting"),
2160 emitter: None,
2161 call_count,
2162 }
2163 }
2164 }
2165
2166 impl Component for EmittingComponent {
2167 fn id(&self) -> &ComponentId {
2168 &self.id
2169 }
2170
2171 fn status(&self) -> Status {
2172 Status::Idle
2173 }
2174
2175 fn subscriptions(&self) -> &[EventCategory] {
2176 &[EventCategory::Echo]
2177 }
2178
2179 fn on_request(&mut self, request: &Request) -> Result<Value, ComponentError> {
2180 self.call_count.fetch_add(1, Ordering::SeqCst);
2181 if let Some(emitter) = &self.emitter {
2183 emitter.emit_output("Response from component");
2184 }
2185 Ok(request.payload.clone())
2186 }
2187
2188 fn on_signal(&mut self, signal: &Signal) -> SignalResponse {
2189 if signal.is_veto() {
2190 SignalResponse::Abort
2191 } else {
2192 SignalResponse::Handled
2193 }
2194 }
2195
2196 fn abort(&mut self) {}
2197
2198 fn set_emitter(&mut self, emitter: Box<dyn orcs_component::Emitter>) {
2199 self.emitter = Some(emitter);
2200 }
2201 }
2202
2203 let (manager_task, world_tx, world, signal_tx, primary) = setup().await;
2204
2205 let call_count = StdArc::new(AtomicUsize::new(0));
2206 let component = Box::new(EmittingComponent::new(StdArc::clone(&call_count)));
2207
2208 let signal_rx = signal_tx.subscribe();
2209 let (runner, handle) =
2210 ChannelRunner::builder(primary, world_tx.clone(), world, signal_rx, component)
2211 .with_emitter(signal_tx.clone())
2212 .build();
2213
2214 let runner_task = tokio::spawn(runner.run());
2215
2216 let event = Event {
2218 category: EventCategory::Echo,
2219 operation: "test".to_string(),
2220 source: ComponentId::builtin("test"),
2221 payload: serde_json::json!({"trigger": true}),
2222 };
2223 handle
2224 .inject(event)
2225 .await
2226 .expect("inject event into emitting runner");
2227
2228 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2230
2231 assert!(
2233 call_count.load(Ordering::SeqCst) >= 1,
2234 "Component should have received the event"
2235 );
2236
2237 signal_tx
2239 .send(Signal::cancel(primary, Principal::System))
2240 .expect("send cancel signal to emitting runner");
2241
2242 let _ = tokio::time::timeout(std::time::Duration::from_millis(100), runner_task).await;
2243
2244 teardown(manager_task, world_tx).await;
2245 }
2246
2247 #[tokio::test]
2250 async fn builder_basic() {
2251 let (manager_task, world_tx, world, signal_tx, primary) = setup().await;
2252
2253 let signal_rx = signal_tx.subscribe();
2254 let (runner, handle) = ChannelRunner::builder(
2255 primary,
2256 world_tx.clone(),
2257 world,
2258 signal_rx,
2259 mock_component(),
2260 )
2261 .build();
2262
2263 assert_eq!(runner.id(), primary);
2264 assert_eq!(handle.id, primary);
2265 assert!(runner.child_spawner().is_none());
2266
2267 teardown(manager_task, world_tx).await;
2268 }
2269
2270 #[tokio::test]
2271 async fn builder_with_emitter() {
2272 let (manager_task, world_tx, world, signal_tx, primary) = setup().await;
2273
2274 let signal_rx = signal_tx.subscribe();
2275 let (runner, handle) = ChannelRunner::builder(
2276 primary,
2277 world_tx.clone(),
2278 world,
2279 signal_rx,
2280 mock_component(),
2281 )
2282 .with_emitter(signal_tx.clone())
2283 .build();
2284
2285 assert_eq!(runner.id(), primary);
2286 assert_eq!(handle.id, primary);
2287
2288 teardown(manager_task, world_tx).await;
2289 }
2290
2291 #[tokio::test]
2292 async fn builder_with_child_spawner() {
2293 let (manager_task, world_tx, world, signal_tx, primary) = setup().await;
2294
2295 let signal_rx = signal_tx.subscribe();
2296 let (runner, handle) = ChannelRunner::builder(
2297 primary,
2298 world_tx.clone(),
2299 world,
2300 signal_rx,
2301 mock_component(),
2302 )
2303 .with_child_spawner(None)
2304 .build();
2305
2306 assert_eq!(runner.id(), primary);
2307 assert_eq!(handle.id, primary);
2308 assert!(runner.child_spawner().is_some());
2309
2310 teardown(manager_task, world_tx).await;
2311 }
2312
2313 #[tokio::test]
2314 async fn builder_with_full_support() {
2315 let (manager_task, world_tx, world, signal_tx, primary) = setup().await;
2316
2317 let signal_rx = signal_tx.subscribe();
2318 let (runner, handle) = ChannelRunner::builder(
2319 primary,
2320 world_tx.clone(),
2321 world,
2322 signal_rx,
2323 mock_component(),
2324 )
2325 .with_emitter(signal_tx.clone())
2326 .with_child_spawner(None)
2327 .build();
2328
2329 assert_eq!(runner.id(), primary);
2330 assert_eq!(handle.id, primary);
2331 assert!(runner.child_spawner().is_some());
2332
2333 teardown(manager_task, world_tx).await;
2334 }
2335
2336 #[tokio::test]
2337 async fn builder_creates_child_context() {
2338 let (manager_task, world_tx, world, signal_tx, primary) = setup().await;
2339
2340 let signal_rx = signal_tx.subscribe();
2341 let (runner, _handle) = ChannelRunner::builder(
2342 primary,
2343 world_tx.clone(),
2344 world,
2345 signal_rx,
2346 mock_component(),
2347 )
2348 .with_child_spawner(None)
2349 .build();
2350
2351 let ctx = runner.create_child_context("child-1");
2353 assert!(ctx.is_some());
2354
2355 teardown(manager_task, world_tx).await;
2356 }
2357
2358 #[tokio::test]
2359 async fn builder_no_child_context_without_spawner() {
2360 let (manager_task, world_tx, world, signal_tx, primary) = setup().await;
2361
2362 let signal_rx = signal_tx.subscribe();
2363 let (runner, _handle) = ChannelRunner::builder(
2364 primary,
2365 world_tx.clone(),
2366 world,
2367 signal_rx,
2368 mock_component(),
2369 )
2370 .build();
2371
2372 let ctx = runner.create_child_context("child-1");
2374 assert!(ctx.is_none());
2375
2376 teardown(manager_task, world_tx).await;
2377 }
2378
2379 #[tokio::test]
2382 async fn builder_with_request_channel_enables_accepts_requests() {
2383 let (manager_task, world_tx, world, signal_tx, primary) = setup().await;
2384
2385 let signal_rx = signal_tx.subscribe();
2386 let (_runner, handle) = ChannelRunner::builder(
2387 primary,
2388 world_tx.clone(),
2389 world,
2390 signal_rx,
2391 mock_component(),
2392 )
2393 .with_request_channel()
2394 .build();
2395
2396 assert!(handle.accepts_requests());
2397
2398 teardown(manager_task, world_tx).await;
2399 }
2400
2401 #[tokio::test]
2402 async fn builder_without_request_channel_rejects_requests() {
2403 let (manager_task, world_tx, world, signal_tx, primary) = setup().await;
2404
2405 let signal_rx = signal_tx.subscribe();
2406 let (_runner, handle) = ChannelRunner::builder(
2407 primary,
2408 world_tx.clone(),
2409 world,
2410 signal_rx,
2411 mock_component(),
2412 )
2413 .build();
2414
2415 assert!(!handle.accepts_requests());
2416
2417 teardown(manager_task, world_tx).await;
2418 }
2419
2420 #[tokio::test]
2421 async fn rpc_request_routed_to_runner_and_responded() {
2422 let (manager_task, world_tx, world, signal_tx, primary) = setup().await;
2423
2424 let signal_rx = signal_tx.subscribe();
2425 let (runner, handle) = ChannelRunner::builder(
2426 primary,
2427 world_tx.clone(),
2428 world,
2429 signal_rx,
2430 mock_component(),
2431 )
2432 .with_request_channel()
2433 .build();
2434
2435 let runner_task = tokio::spawn(runner.run());
2436
2437 let source = ComponentId::builtin("caller");
2439 let target = ComponentId::builtin("test");
2440 let req = Request::new(
2441 EventCategory::Echo,
2442 "echo",
2443 source,
2444 primary,
2445 Value::String("rpc_payload".into()),
2446 )
2447 .with_target(target);
2448
2449 let (reply_tx, reply_rx) = oneshot::channel();
2450 handle
2451 .send_request(req, reply_tx)
2452 .await
2453 .expect("send RPC request to runner");
2454
2455 let result = tokio::time::timeout(std::time::Duration::from_millis(200), reply_rx)
2457 .await
2458 .expect("reply should arrive within timeout")
2459 .expect("reply channel should not be dropped");
2460
2461 assert_eq!(
2462 result.expect("RPC response should be Ok"),
2463 Value::String("rpc_payload".into())
2464 );
2465
2466 signal_tx
2468 .send(Signal::cancel(primary, Principal::System))
2469 .expect("send cancel signal after RPC test");
2470 let _ = tokio::time::timeout(std::time::Duration::from_millis(100), runner_task).await;
2471 teardown(manager_task, world_tx).await;
2472 }
2473
2474 #[tokio::test]
2475 async fn rpc_request_component_error_returned_as_string() {
2476 struct FailingComponent {
2478 id: ComponentId,
2479 }
2480
2481 impl Component for FailingComponent {
2482 fn id(&self) -> &ComponentId {
2483 &self.id
2484 }
2485 fn status(&self) -> Status {
2486 Status::Idle
2487 }
2488 fn on_request(&mut self, _request: &Request) -> Result<Value, ComponentError> {
2489 Err(ComponentError::ExecutionFailed(
2490 "deliberate test failure".into(),
2491 ))
2492 }
2493 fn on_signal(&mut self, signal: &Signal) -> SignalResponse {
2494 if signal.is_veto() {
2495 SignalResponse::Abort
2496 } else {
2497 SignalResponse::Handled
2498 }
2499 }
2500 fn abort(&mut self) {}
2501 }
2502
2503 let (manager_task, world_tx, world, signal_tx, primary) = setup().await;
2504
2505 let component: Box<dyn Component> = Box::new(FailingComponent {
2506 id: ComponentId::builtin("failing"),
2507 });
2508 let signal_rx = signal_tx.subscribe();
2509 let (runner, handle) =
2510 ChannelRunner::builder(primary, world_tx.clone(), world, signal_rx, component)
2511 .with_request_channel()
2512 .build();
2513
2514 let runner_task = tokio::spawn(runner.run());
2515
2516 let source = ComponentId::builtin("caller");
2517 let target = ComponentId::builtin("failing");
2518 let req = Request::new(EventCategory::Echo, "op", source, primary, Value::Null)
2519 .with_target(target);
2520
2521 let (reply_tx, reply_rx) = oneshot::channel();
2522 handle
2523 .send_request(req, reply_tx)
2524 .await
2525 .expect("send RPC request to failing runner");
2526
2527 let result = tokio::time::timeout(std::time::Duration::from_millis(200), reply_rx)
2528 .await
2529 .expect("reply should arrive")
2530 .expect("channel should not be dropped");
2531
2532 assert!(result.is_err(), "should return Err for component failure");
2533 let err_msg = result.expect_err("expected Err variant for component failure");
2534 assert!(
2535 err_msg.contains("deliberate test failure"),
2536 "error message should contain original error, got: {err_msg}"
2537 );
2538
2539 signal_tx
2540 .send(Signal::cancel(primary, Principal::System))
2541 .expect("send cancel signal after failing RPC test");
2542 let _ = tokio::time::timeout(std::time::Duration::from_millis(100), runner_task).await;
2543 teardown(manager_task, world_tx).await;
2544 }
2545
2546 #[tokio::test]
2554 async fn emitter_output_routes_to_output_channel_via_builder() {
2555 use std::sync::atomic::{AtomicUsize, Ordering};
2556 use std::sync::Arc as StdArc;
2557
2558 struct OutputRoutingComponent {
2560 id: ComponentId,
2561 emitter: Option<Box<dyn orcs_component::Emitter>>,
2562 call_count: StdArc<AtomicUsize>,
2563 }
2564
2565 impl OutputRoutingComponent {
2566 fn new(call_count: StdArc<AtomicUsize>) -> Self {
2567 Self {
2568 id: ComponentId::builtin("output-routing"),
2569 emitter: None,
2570 call_count,
2571 }
2572 }
2573 }
2574
2575 impl Component for OutputRoutingComponent {
2576 fn id(&self) -> &ComponentId {
2577 &self.id
2578 }
2579 fn status(&self) -> Status {
2580 Status::Idle
2581 }
2582 fn subscriptions(&self) -> &[EventCategory] {
2583 &[EventCategory::Echo]
2584 }
2585 fn on_request(&mut self, _request: &Request) -> Result<Value, ComponentError> {
2586 self.call_count.fetch_add(1, Ordering::SeqCst);
2587 if let Some(emitter) = &self.emitter {
2588 emitter.emit_output("routed output message");
2589 }
2590 Ok(serde_json::json!({"success": true}))
2591 }
2592 fn on_signal(&mut self, signal: &Signal) -> SignalResponse {
2593 if signal.is_veto() {
2594 SignalResponse::Abort
2595 } else {
2596 SignalResponse::Handled
2597 }
2598 }
2599 fn abort(&mut self) {}
2600 fn set_emitter(&mut self, emitter: Box<dyn orcs_component::Emitter>) {
2601 self.emitter = Some(emitter);
2602 }
2603 }
2604
2605 let (manager_task, world_tx, world, signal_tx, primary) = setup().await;
2606
2607 let call_count = StdArc::new(AtomicUsize::new(0));
2608 let component = Box::new(OutputRoutingComponent::new(StdArc::clone(&call_count)));
2609
2610 let (output_tx, mut output_rx) = OutputSender::channel(64);
2612
2613 let signal_rx = signal_tx.subscribe();
2614 let (runner, handle) =
2615 ChannelRunner::builder(primary, world_tx.clone(), world, signal_rx, component)
2616 .with_emitter(signal_tx.clone())
2617 .with_output_channel(output_tx)
2618 .build();
2619
2620 let runner_task = tokio::spawn(runner.run());
2621
2622 let event = Event {
2624 category: EventCategory::Echo,
2625 operation: "test".to_string(),
2626 source: ComponentId::builtin("test"),
2627 payload: serde_json::json!({"trigger": true}),
2628 };
2629 handle.inject(event).await.expect("inject event");
2630
2631 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2633
2634 assert!(
2636 call_count.load(Ordering::SeqCst) >= 1,
2637 "Component should have received the event"
2638 );
2639
2640 let output_event = output_rx
2642 .try_recv()
2643 .expect("Output event should arrive on output channel");
2644 assert_eq!(
2645 output_event.category,
2646 EventCategory::Output,
2647 "Event category should be Output"
2648 );
2649 assert_eq!(
2650 output_event.operation, "display",
2651 "Event operation should be 'display'"
2652 );
2653 assert_eq!(
2654 output_event.payload["message"], "routed output message",
2655 "Event payload message should match what the component emitted"
2656 );
2657 assert_eq!(
2658 output_event.payload["level"], "info",
2659 "Event payload level should be 'info'"
2660 );
2661
2662 signal_tx
2664 .send(Signal::cancel(primary, Principal::System))
2665 .expect("send cancel");
2666 let _ = tokio::time::timeout(std::time::Duration::from_millis(100), runner_task).await;
2667 teardown(manager_task, world_tx).await;
2668 }
2669
2670 #[tokio::test]
2671 async fn emitter_output_records_to_board_and_routes_to_output_channel() {
2672 use std::sync::atomic::{AtomicUsize, Ordering};
2673 use std::sync::Arc as StdArc;
2674
2675 struct BoardOutputComponent {
2676 id: ComponentId,
2677 emitter: Option<Box<dyn orcs_component::Emitter>>,
2678 call_count: StdArc<AtomicUsize>,
2679 }
2680
2681 impl BoardOutputComponent {
2682 fn new(call_count: StdArc<AtomicUsize>) -> Self {
2683 Self {
2684 id: ComponentId::builtin("board-output"),
2685 emitter: None,
2686 call_count,
2687 }
2688 }
2689 }
2690
2691 impl Component for BoardOutputComponent {
2692 fn id(&self) -> &ComponentId {
2693 &self.id
2694 }
2695 fn status(&self) -> Status {
2696 Status::Idle
2697 }
2698 fn subscriptions(&self) -> &[EventCategory] {
2699 &[EventCategory::Echo]
2700 }
2701 fn on_request(&mut self, _request: &Request) -> Result<Value, ComponentError> {
2702 self.call_count.fetch_add(1, Ordering::SeqCst);
2703 if let Some(emitter) = &self.emitter {
2704 emitter.emit_output("board and io message");
2705 }
2706 Ok(serde_json::json!({"success": true}))
2707 }
2708 fn on_signal(&mut self, signal: &Signal) -> SignalResponse {
2709 if signal.is_veto() {
2710 SignalResponse::Abort
2711 } else {
2712 SignalResponse::Handled
2713 }
2714 }
2715 fn abort(&mut self) {}
2716 fn set_emitter(&mut self, emitter: Box<dyn orcs_component::Emitter>) {
2717 self.emitter = Some(emitter);
2718 }
2719 }
2720
2721 let (manager_task, world_tx, world, signal_tx, primary) = setup().await;
2722
2723 let call_count = StdArc::new(AtomicUsize::new(0));
2724 let component = Box::new(BoardOutputComponent::new(StdArc::clone(&call_count)));
2725
2726 let (output_tx, mut output_rx) = OutputSender::channel(64);
2727 let board = crate::board::shared_board();
2728
2729 let signal_rx = signal_tx.subscribe();
2730 let (runner, handle) =
2731 ChannelRunner::builder(primary, world_tx.clone(), world, signal_rx, component)
2732 .with_emitter(signal_tx.clone())
2733 .with_output_channel(output_tx)
2734 .with_board(Arc::clone(&board))
2735 .build();
2736
2737 let runner_task = tokio::spawn(runner.run());
2738
2739 let event = Event {
2741 category: EventCategory::Echo,
2742 operation: "test".to_string(),
2743 source: ComponentId::builtin("test"),
2744 payload: serde_json::json!({"trigger": true}),
2745 };
2746 handle.inject(event).await.expect("inject event");
2747
2748 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2750
2751 assert!(call_count.load(Ordering::SeqCst) >= 1);
2753
2754 let output_event = output_rx
2756 .try_recv()
2757 .expect("Output event should arrive on output channel");
2758 assert_eq!(output_event.payload["message"], "board and io message");
2759
2760 let b = board.read();
2762 assert!(b.len() >= 1, "Board should have at least 1 entry");
2763 let entries = b.recent(1);
2764 assert_eq!(
2765 entries[0].payload["message"], "board and io message",
2766 "Board entry should match the emitted message"
2767 );
2768
2769 signal_tx
2771 .send(Signal::cancel(primary, Principal::System))
2772 .expect("send cancel");
2773 let _ = tokio::time::timeout(std::time::Duration::from_millis(100), runner_task).await;
2774 teardown(manager_task, world_tx).await;
2775 }
2776
2777 #[test]
2780 fn handle_is_alive_while_receiver_exists() {
2781 let (tx, rx) = tokio::sync::mpsc::channel::<InboundEvent>(32);
2782 let handle = ChannelHandle::new(ChannelId::new(), tx);
2783
2784 assert!(
2785 handle.is_alive(),
2786 "handle should be alive while receiver exists"
2787 );
2788 drop(rx);
2789 assert!(
2790 !handle.is_alive(),
2791 "handle should not be alive after receiver is dropped"
2792 );
2793 }
2794
2795 #[test]
2798 fn exit_reason_as_str_covers_all_variants() {
2799 let cases = [
2800 (ExitReason::Signal, "signal"),
2801 (ExitReason::EventChannelClosed, "event_channel_closed"),
2802 (ExitReason::SignalChannelClosed, "signal_channel_closed"),
2803 (ExitReason::ChannelInactive, "channel_inactive"),
2804 (ExitReason::ComponentStopped, "component_stopped"),
2805 (ExitReason::IoChannelClosed, "io_channel_closed"),
2806 (ExitReason::UserQuit, "user_quit"),
2807 ];
2808 for (variant, expected) in &cases {
2809 assert_eq!(
2810 variant.as_str(),
2811 *expected,
2812 "ExitReason::{:?}.as_str() mismatch",
2813 variant
2814 );
2815 }
2816 }
2817
2818 #[test]
2819 fn exit_reason_display_matches_as_str() {
2820 let variants = [
2821 ExitReason::Signal,
2822 ExitReason::EventChannelClosed,
2823 ExitReason::SignalChannelClosed,
2824 ExitReason::ChannelInactive,
2825 ExitReason::ComponentStopped,
2826 ExitReason::IoChannelClosed,
2827 ExitReason::UserQuit,
2828 ];
2829 for variant in &variants {
2830 assert_eq!(
2831 variant.to_string(),
2832 variant.as_str(),
2833 "Display and as_str() should match for {:?}",
2834 variant
2835 );
2836 }
2837 }
2838
2839 #[test]
2840 fn exit_reason_clone_and_eq() {
2841 let original = ExitReason::ComponentStopped;
2842 let cloned = original.clone();
2843 assert_eq!(original, cloned, "Clone should produce equal value");
2844 }
2845}