1use super::base::{ChannelHandle, Event, ExitReason, InboundEvent, RunnerResult};
33use super::common::{
34 determine_channel_action, is_channel_active, is_channel_paused, send_abort, send_transition,
35 SignalAction,
36};
37use super::paused_queue::PausedEventQueue;
38use crate::channel::command::{StateTransition, WorldCommand};
39use crate::channel::World;
40use crate::components::IOBridge;
41use crate::engine::SharedChannelHandles;
42use crate::io::{IOPort, InputCommand};
43use orcs_event::{EventCategory, Signal, SignalKind};
44use orcs_types::{ChannelId, ComponentId, Principal};
45use std::sync::Arc;
46use tokio::sync::{broadcast, mpsc, RwLock};
47use tracing::{debug, info, warn};
48
49const EVENT_BUFFER_SIZE: usize = 64;
51
52const CLIENT_RUNNER_FQN: &str = "io_bridge";
54
55#[allow(dead_code)]
59mod response_fields {
60 pub const STATUS: &str = "status";
61 pub const PENDING_APPROVAL: &str = "pending_approval";
62 pub const APPROVAL_ID: &str = "approval_id";
63 pub const MESSAGE: &str = "message";
64 pub const RESPONSE: &str = "response";
65 pub const DATA: &str = "data";
66}
67
68#[allow(dead_code)]
70#[derive(Debug, Clone, PartialEq, Eq)]
71pub enum ComponentResponse<'a> {
72 PendingApproval {
74 approval_id: &'a str,
75 description: &'a str,
76 },
77 TextResponse(&'a str),
79 ErrorResponse(&'a str),
83 Empty,
85}
86
87#[allow(dead_code)]
88impl<'a> ComponentResponse<'a> {
89 #[must_use]
97 pub fn from_json(value: &'a serde_json::Value) -> Self {
98 use response_fields::*;
99
100 if value.get(STATUS).and_then(|v| v.as_str()) == Some(PENDING_APPROVAL) {
102 if let Some(approval_id) = value.get(APPROVAL_ID).and_then(|v| v.as_str()) {
103 let description = value
104 .get(MESSAGE)
105 .and_then(|v| v.as_str())
106 .unwrap_or("Awaiting approval");
107 return Self::PendingApproval {
108 approval_id,
109 description,
110 };
111 }
112 }
113
114 if value.get("success").and_then(|v| v.as_bool()) == Some(false) {
116 if let Some(error_msg) = value.get("error").and_then(|v| v.as_str()) {
117 return Self::ErrorResponse(error_msg);
118 }
119 return Self::ErrorResponse("Unknown error");
120 }
121
122 if let Some(text) = value.get(RESPONSE).and_then(|v| v.as_str()) {
124 return Self::TextResponse(text);
125 }
126
127 if let Some(text) = value
129 .get(DATA)
130 .and_then(|d| d.get(RESPONSE))
131 .and_then(|v| v.as_str())
132 {
133 return Self::TextResponse(text);
134 }
135
136 Self::Empty
137 }
138}
139pub struct ClientRunnerConfig {
143 pub world_tx: mpsc::Sender<WorldCommand>,
145 pub world: Arc<RwLock<World>>,
147 pub signal_rx: broadcast::Receiver<Signal>,
149 pub channel_handles: SharedChannelHandles,
151}
152
153pub struct ClientRunner {
173 id: ChannelId,
175 event_tx: mpsc::Sender<InboundEvent>,
177 event_rx: mpsc::Receiver<InboundEvent>,
179 signal_rx: broadcast::Receiver<Signal>,
181 world_tx: mpsc::Sender<WorldCommand>,
183 world: Arc<RwLock<World>>,
185 channel_handles: SharedChannelHandles,
187 paused_queue: PausedEventQueue,
189
190 io_bridge: IOBridge,
193 principal: Principal,
195 source_id: ComponentId,
197}
198
199impl ClientRunner {
200 #[must_use]
212 pub fn new(
213 id: ChannelId,
214 config: ClientRunnerConfig,
215 io_port: IOPort,
216 principal: Principal,
217 ) -> (Self, ChannelHandle) {
218 let (event_tx, event_rx) = mpsc::channel(EVENT_BUFFER_SIZE);
219
220 let runner = Self {
221 id,
222 event_tx: event_tx.clone(),
223 event_rx,
224 signal_rx: config.signal_rx,
225 world_tx: config.world_tx,
226 world: config.world,
227 channel_handles: config.channel_handles,
228 paused_queue: PausedEventQueue::new(),
229 io_bridge: IOBridge::new(io_port),
230 principal,
231 source_id: ComponentId::builtin("io_bridge"),
232 };
233
234 let handle = ChannelHandle::new(id, event_tx);
235 (runner, handle)
236 }
237
238 #[must_use]
240 pub fn id(&self) -> ChannelId {
241 self.id
242 }
243
244 #[must_use]
248 pub(crate) fn event_tx(&self) -> &mpsc::Sender<InboundEvent> {
249 &self.event_tx
250 }
251
252 #[must_use]
254 pub fn io_bridge(&self) -> &IOBridge {
255 &self.io_bridge
256 }
257
258 #[must_use]
260 pub fn io_bridge_mut(&mut self) -> &mut IOBridge {
261 &mut self.io_bridge
262 }
263
264 pub async fn run(mut self) -> RunnerResult {
271 info!("ClientRunner {} started", self.id);
272
273 #[allow(unused_assignments)]
275 let mut exit_reason = ExitReason::Signal;
276
277 loop {
278 tokio::select! {
279 biased;
281
282 signal = self.signal_rx.recv() => {
283 match signal {
284 Ok(sig) => {
285 if !self.handle_signal(sig).await {
286 exit_reason = ExitReason::Signal;
287 break;
288 }
289 }
290 Err(broadcast::error::RecvError::Closed) => {
291 info!("ClientRunner {}: signal channel closed", self.id);
292 exit_reason = ExitReason::SignalChannelClosed;
293 break;
294 }
295 Err(broadcast::error::RecvError::Lagged(n)) => {
296 warn!("ClientRunner {}: lagged {} signals", self.id, n);
297 }
298 }
299 }
300
301 event = self.event_rx.recv() => {
303 match event {
304 Some(evt) => {
305 if !self.handle_event(evt).await {
306 exit_reason = ExitReason::ComponentStopped;
307 break;
308 }
309 }
310 None => {
311 info!("ClientRunner {}: event channel closed", self.id);
312 exit_reason = ExitReason::EventChannelClosed;
313 break;
314 }
315 }
316 }
317
318 io_result = self.io_bridge.recv_input(&self.principal) => {
320 match io_result {
321 Some(Ok(signal)) => {
322 debug!("ClientRunner {}: IO input → {:?}", self.id, signal.kind);
324 if !self.handle_signal(signal).await {
325 exit_reason = ExitReason::Signal;
326 break;
327 }
328 }
329 Some(Err(cmd)) => {
330 if !self.handle_io_command(cmd).await {
332 exit_reason = ExitReason::UserQuit;
333 break;
334 }
335 }
336 None => {
337 info!("ClientRunner {}: IO closed", self.id);
338 exit_reason = ExitReason::IoChannelClosed;
339 break;
340 }
341 }
342 }
343 }
344
345 if !is_channel_active(&self.world, self.id).await {
347 debug!("ClientRunner {}: channel no longer active", self.id);
348 exit_reason = ExitReason::ChannelInactive;
349 break;
350 }
351 }
352
353 info!("ClientRunner {} stopped (reason={})", self.id, exit_reason);
354
355 RunnerResult {
356 channel_id: self.id,
357 component_fqn: std::borrow::Cow::Borrowed(CLIENT_RUNNER_FQN),
358 snapshot: None,
359 exit_reason,
360 }
361 }
362
363 async fn handle_io_command(&mut self, cmd: InputCommand) -> bool {
373 match cmd {
374 InputCommand::Quit => {
375 info!("ClientRunner {}: quit requested", self.id);
376 send_abort(&self.world_tx, self.id, "user quit").await;
377 false
378 }
379 InputCommand::Unknown { input } => {
380 self.handle_user_message(&input);
382 true
383 }
384 InputCommand::Empty => {
385 true
387 }
388 _ => true,
389 }
390 }
391
392 fn handle_user_message(&self, message: &str) {
398 debug!("ClientRunner {}: user message: {}", self.id, message);
399
400 let event = Event {
402 category: EventCategory::UserInput,
403 operation: "input".to_string(),
404 source: self.source_id.clone(),
405 payload: serde_json::json!({
406 "message": message
407 }),
408 };
409
410 let handles = self.channel_handles.read();
412 let mut delivered = 0;
413 for (channel_id, handle) in handles.iter() {
414 if *channel_id == self.id {
416 continue;
417 }
418 if handle.try_inject(event.clone()).is_ok() {
419 delivered += 1;
420 }
421 }
422 debug!(
423 "ClientRunner {}: broadcast UserInput to {} channels",
424 self.id, delivered
425 );
426 }
427
428 async fn handle_signal(&mut self, signal: Signal) -> bool {
433 debug!(
434 "ClientRunner {}: received signal {:?}",
435 self.id, signal.kind
436 );
437
438 if !signal.affects_channel(self.id) {
440 return true;
441 }
442
443 let action = determine_channel_action(&signal.kind);
445 match action {
446 SignalAction::Stop { reason } => {
447 if let SignalKind::Reject {
449 approval_id,
450 reason: rej_reason,
451 } = &signal.kind
452 {
453 let _ = self
454 .io_bridge
455 .show_rejected(approval_id, rej_reason.as_deref())
456 .await;
457 }
458 info!("ClientRunner {}: stopping - {}", self.id, reason);
459 send_abort(&self.world_tx, self.id, &reason).await;
460 return false;
461 }
462 SignalAction::Transition(transition) => {
463 if let SignalKind::Approve { approval_id } = &signal.kind {
465 let _ = self.io_bridge.show_approved(approval_id).await;
466 }
467
468 send_transition(&self.world_tx, self.id, transition.clone()).await;
469
470 if matches!(transition, StateTransition::Resume) {
472 self.drain_paused_queue().await;
473 }
474 }
475 SignalAction::Continue => {}
476 }
477
478 true
479 }
480
481 async fn handle_event(&mut self, inbound: InboundEvent) -> bool {
486 let event = inbound.into_event();
487
488 debug!(
489 "ClientRunner {}: received event {:?} op={}",
490 self.id, event.category, event.operation
491 );
492
493 if is_channel_paused(&self.world, self.id).await {
495 self.paused_queue
496 .try_enqueue(event, "ClientRunner", self.id);
497 return true;
498 }
499
500 self.process_event(event).await;
501 true
502 }
503
504 async fn process_event(&self, event: Event) {
509 if event.category == EventCategory::Output {
511 self.handle_output_event(&event).await;
512 return;
513 }
514
515 debug!(
517 "ClientRunner {}: ignoring non-Output event {:?}",
518 self.id, event.category
519 );
520 }
521
522 async fn handle_output_event(&self, event: &Event) {
524 if event.payload.get("type").and_then(|v| v.as_str()) == Some("processing") {
526 let component = event
527 .payload
528 .get("component")
529 .and_then(|v| v.as_str())
530 .unwrap_or("unknown");
531 let operation = event
532 .payload
533 .get("operation")
534 .and_then(|v| v.as_str())
535 .unwrap_or("request");
536 let _ = self.io_bridge.show_processing(component, operation).await;
537 return;
538 }
539
540 if event.payload.get("type").and_then(|v| v.as_str()) == Some("approval_request") {
542 let approval_id = event.payload["approval_id"].as_str().unwrap_or("");
543 let operation = event.payload["operation"].as_str().unwrap_or("exec");
544 let description = event.payload["description"].as_str().unwrap_or("");
545
546 let request = crate::components::ApprovalRequest::with_id(
547 approval_id,
548 operation,
549 description,
550 event.payload.clone(),
551 );
552 let _ = self.io_bridge.show_approval_request(&request).await;
553 return;
554 }
555
556 let message = event
557 .payload
558 .get("message")
559 .and_then(|v| v.as_str())
560 .unwrap_or("");
561
562 let level = event
563 .payload
564 .get("level")
565 .and_then(|v| v.as_str())
566 .unwrap_or("info");
567
568 match level {
569 "warn" | "warning" => {
570 let _ = self.io_bridge.warn(message).await;
571 }
572 "error" => {
573 let _ = self.io_bridge.error(message).await;
574 }
575 _ => {
576 let _ = self.io_bridge.info(message).await;
577 }
578 }
579 }
580
581 async fn drain_paused_queue(&mut self) {
583 let events: Vec<_> = self.paused_queue.drain("ClientRunner", self.id).collect();
585
586 for event in events {
587 self.process_event(event).await;
588 }
589 }
590}
591
592#[cfg(test)]
593mod tests {
594 use super::*;
595 use crate::channel::config::ChannelConfig;
596 use crate::channel::manager::WorldManager;
597 use crate::io::IOPort;
598 use orcs_types::PrincipalId;
599 use parking_lot::RwLock as ParkingRwLock;
600 use std::collections::HashMap;
601
602 fn test_principal() -> Principal {
603 Principal::User(PrincipalId::new())
604 }
605
606 async fn setup() -> (
607 tokio::task::JoinHandle<()>,
608 mpsc::Sender<WorldCommand>,
609 Arc<RwLock<World>>,
610 broadcast::Sender<Signal>,
611 ChannelId,
612 ) {
613 let mut world = World::new();
614 let io = world.create_channel(ChannelConfig::interactive());
615
616 let (manager, world_tx) = WorldManager::with_world(world);
617 let world_handle = manager.world();
618
619 let manager_task = tokio::spawn(manager.run());
620
621 let (signal_tx, _) = broadcast::channel(64);
622
623 (manager_task, world_tx, world_handle, signal_tx, io)
624 }
625
626 async fn teardown(
627 manager_task: tokio::task::JoinHandle<()>,
628 world_tx: mpsc::Sender<WorldCommand>,
629 ) {
630 let _ = world_tx.send(WorldCommand::Shutdown).await;
631 let _ = manager_task.await;
632 }
633
634 fn make_config(
635 world_tx: mpsc::Sender<WorldCommand>,
636 world: Arc<RwLock<World>>,
637 signal_tx: &broadcast::Sender<Signal>,
638 ) -> ClientRunnerConfig {
639 let channel_handles: SharedChannelHandles = Arc::new(ParkingRwLock::new(HashMap::new()));
640 ClientRunnerConfig {
641 world_tx,
642 world,
643 signal_rx: signal_tx.subscribe(),
644 channel_handles,
645 }
646 }
647
648 #[tokio::test]
649 async fn client_runner_creation() {
650 let (manager_task, world_tx, world, signal_tx, primary) = setup().await;
651
652 let (port, _input, _output) = IOPort::with_defaults(primary);
653 let config = make_config(world_tx.clone(), world, &signal_tx);
654 let (runner, handle) = ClientRunner::new(primary, config, port, test_principal());
655
656 assert_eq!(runner.id(), primary);
657 assert_eq!(handle.id, primary);
658
659 teardown(manager_task, world_tx).await;
660 }
661
662 #[tokio::test]
663 async fn client_runner_handles_io_input() {
664 use crate::io::IOInput;
665
666 let (manager_task, world_tx, world, signal_tx, primary) = setup().await;
667
668 let (port, input_handle, _output_handle) = IOPort::with_defaults(primary);
669 let config = make_config(world_tx.clone(), world, &signal_tx);
670 let (runner, _handle) = ClientRunner::new(primary, config, port, test_principal());
671
672 let runner_task = tokio::spawn(runner.run());
673 tokio::task::yield_now().await;
674
675 input_handle
677 .send(IOInput::line("q"))
678 .await
679 .expect("send quit command via IO");
680
681 let result = tokio::time::timeout(std::time::Duration::from_millis(200), runner_task).await;
683 assert!(result.is_ok());
684
685 teardown(manager_task, world_tx).await;
686 }
687
688 #[tokio::test]
689 async fn client_runner_handles_approval() {
690 use crate::io::{IOInput, InputContext};
691
692 let (manager_task, world_tx, world, signal_tx, primary) = setup().await;
693
694 let (port, input_handle, mut output_handle) = IOPort::with_defaults(primary);
695 let config = make_config(world_tx.clone(), world, &signal_tx);
696 let (runner, _handle) = ClientRunner::new(primary, config, port, test_principal());
697
698 let runner_task = tokio::spawn(runner.run());
699 tokio::task::yield_now().await;
700
701 let ctx = InputContext::with_approval_id("req-123");
703 input_handle
704 .send(IOInput::line_with_context("y", ctx))
705 .await
706 .expect("send approval input via IO");
707
708 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
710
711 if let Some(output) = output_handle.try_recv() {
713 debug!("Received output: {:?}", output);
715 }
716
717 signal_tx
719 .send(Signal::cancel(primary, Principal::System))
720 .expect("send cancel signal for cleanup");
721 let _ = tokio::time::timeout(std::time::Duration::from_millis(100), runner_task).await;
722
723 teardown(manager_task, world_tx).await;
724 }
725
726 #[tokio::test]
727 async fn client_runner_handles_veto() {
728 let (manager_task, world_tx, world, signal_tx, primary) = setup().await;
729
730 let (port, _input, _output) = IOPort::with_defaults(primary);
731 let config = make_config(world_tx.clone(), world, &signal_tx);
732 let (runner, _handle) = ClientRunner::new(primary, config, port, test_principal());
733
734 let runner_task = tokio::spawn(runner.run());
735 tokio::task::yield_now().await;
736
737 signal_tx
739 .send(Signal::veto(Principal::System))
740 .expect("send veto signal");
741
742 let result = tokio::time::timeout(std::time::Duration::from_millis(100), runner_task).await;
744 assert!(result.is_ok());
745
746 teardown(manager_task, world_tx).await;
747 }
748
749 mod component_response_tests {
757 use super::*;
758 use serde_json::json;
759
760 #[test]
761 fn from_json_pending_approval() {
762 let json = json!({
763 "status": "pending_approval",
764 "approval_id": "req-123",
765 "message": "Confirm action?"
766 });
767
768 let response = ComponentResponse::from_json(&json);
769
770 assert_eq!(
771 response,
772 ComponentResponse::PendingApproval {
773 approval_id: "req-123",
774 description: "Confirm action?"
775 }
776 );
777 }
778
779 #[test]
780 fn from_json_pending_approval_default_message() {
781 let json = json!({
782 "status": "pending_approval",
783 "approval_id": "req-456"
784 });
785
786 let response = ComponentResponse::from_json(&json);
787
788 assert_eq!(
789 response,
790 ComponentResponse::PendingApproval {
791 approval_id: "req-456",
792 description: "Awaiting approval"
793 }
794 );
795 }
796
797 #[test]
798 fn from_json_pending_approval_missing_id_returns_empty() {
799 let json = json!({
800 "status": "pending_approval"
801 });
803
804 let response = ComponentResponse::from_json(&json);
805
806 assert_eq!(response, ComponentResponse::Empty);
807 }
808
809 #[test]
810 fn from_json_direct_response() {
811 let json = json!({
812 "response": "Hello, world!"
813 });
814
815 let response = ComponentResponse::from_json(&json);
816
817 assert_eq!(response, ComponentResponse::TextResponse("Hello, world!"));
818 }
819
820 #[test]
821 fn from_json_nested_data_response() {
822 let json = json!({
823 "data": {
824 "response": "Nested response",
825 "source": "test"
826 }
827 });
828
829 let response = ComponentResponse::from_json(&json);
830
831 assert_eq!(response, ComponentResponse::TextResponse("Nested response"));
832 }
833
834 #[test]
835 fn from_json_empty_object() {
836 let json = json!({});
837
838 let response = ComponentResponse::from_json(&json);
839
840 assert_eq!(response, ComponentResponse::Empty);
841 }
842
843 #[test]
844 fn from_json_unrelated_fields() {
845 let json = json!({
846 "status": "completed",
847 "result": 42
848 });
849
850 let response = ComponentResponse::from_json(&json);
851
852 assert_eq!(response, ComponentResponse::Empty);
853 }
854
855 #[test]
856 fn from_json_priority_pending_over_response() {
857 let json = json!({
858 "status": "pending_approval",
859 "approval_id": "req-789",
860 "response": "This should be ignored"
861 });
862
863 let response = ComponentResponse::from_json(&json);
864
865 assert_eq!(
866 response,
867 ComponentResponse::PendingApproval {
868 approval_id: "req-789",
869 description: "Awaiting approval"
870 }
871 );
872 }
873
874 #[test]
875 fn from_json_response_priority_over_nested() {
876 let json = json!({
877 "response": "Direct",
878 "data": {
879 "response": "Nested"
880 }
881 });
882
883 let response = ComponentResponse::from_json(&json);
884
885 assert_eq!(response, ComponentResponse::TextResponse("Direct"));
886 }
887
888 #[test]
889 fn from_json_error_response() {
890 let json = json!({
891 "success": false,
892 "error": "Command failed"
893 });
894
895 let response = ComponentResponse::from_json(&json);
896
897 assert_eq!(response, ComponentResponse::ErrorResponse("Command failed"));
898 }
899
900 #[test]
901 fn from_json_error_response_no_message() {
902 let json = json!({
903 "success": false
904 });
905
906 let response = ComponentResponse::from_json(&json);
907
908 assert_eq!(response, ComponentResponse::ErrorResponse("Unknown error"));
909 }
910
911 #[test]
912 fn from_json_success_true_not_error() {
913 let json = json!({
914 "success": true,
915 "response": "Operation succeeded"
916 });
917
918 let response = ComponentResponse::from_json(&json);
919
920 assert_eq!(
921 response,
922 ComponentResponse::TextResponse("Operation succeeded")
923 );
924 }
925
926 #[test]
927 fn from_json_pending_priority_over_error() {
928 let json = json!({
929 "status": "pending_approval",
930 "approval_id": "req-999",
931 "success": false,
932 "error": "This should be ignored"
933 });
934
935 let response = ComponentResponse::from_json(&json);
936
937 assert_eq!(
938 response,
939 ComponentResponse::PendingApproval {
940 approval_id: "req-999",
941 description: "Awaiting approval"
942 }
943 );
944 }
945 }
946}