1use super::base::{ChannelHandle, Event, InboundEvent, RunnerResult};
33use super::common::{
34 determine_channel_action, is_channel_active, is_channel_paused, send_abort, send_transition,
35 SignalAction,
36};
37use super::paused_queue::PausedEventQueue;
38use crate::channel::command::{StateTransition, WorldCommand};
39use crate::channel::World;
40use crate::components::IOBridge;
41use crate::engine::SharedChannelHandles;
42use crate::io::{IOPort, InputCommand};
43use orcs_event::{EventCategory, Signal, SignalKind};
44use orcs_types::{ChannelId, ComponentId, Principal};
45use std::sync::Arc;
46use tokio::sync::{broadcast, mpsc, RwLock};
47use tracing::{debug, info, warn};
48
49const EVENT_BUFFER_SIZE: usize = 64;
51
52const CLIENT_RUNNER_FQN: &str = "io_bridge";
54
55#[allow(dead_code)]
59mod response_fields {
60 pub const STATUS: &str = "status";
61 pub const PENDING_APPROVAL: &str = "pending_approval";
62 pub const APPROVAL_ID: &str = "approval_id";
63 pub const MESSAGE: &str = "message";
64 pub const RESPONSE: &str = "response";
65 pub const DATA: &str = "data";
66}
67
68#[allow(dead_code)]
70#[derive(Debug, Clone, PartialEq, Eq)]
71pub enum ComponentResponse<'a> {
72 PendingApproval {
74 approval_id: &'a str,
75 description: &'a str,
76 },
77 TextResponse(&'a str),
79 ErrorResponse(&'a str),
83 Empty,
85}
86
87#[allow(dead_code)]
88impl<'a> ComponentResponse<'a> {
89 #[must_use]
97 pub fn from_json(value: &'a serde_json::Value) -> Self {
98 use response_fields::*;
99
100 if value.get(STATUS).and_then(|v| v.as_str()) == Some(PENDING_APPROVAL) {
102 if let Some(approval_id) = value.get(APPROVAL_ID).and_then(|v| v.as_str()) {
103 let description = value
104 .get(MESSAGE)
105 .and_then(|v| v.as_str())
106 .unwrap_or("Awaiting approval");
107 return Self::PendingApproval {
108 approval_id,
109 description,
110 };
111 }
112 }
113
114 if value.get("success").and_then(|v| v.as_bool()) == Some(false) {
116 if let Some(error_msg) = value.get("error").and_then(|v| v.as_str()) {
117 return Self::ErrorResponse(error_msg);
118 }
119 return Self::ErrorResponse("Unknown error");
120 }
121
122 if let Some(text) = value.get(RESPONSE).and_then(|v| v.as_str()) {
124 return Self::TextResponse(text);
125 }
126
127 if let Some(text) = value
129 .get(DATA)
130 .and_then(|d| d.get(RESPONSE))
131 .and_then(|v| v.as_str())
132 {
133 return Self::TextResponse(text);
134 }
135
136 Self::Empty
137 }
138}
139pub struct ClientRunnerConfig {
143 pub world_tx: mpsc::Sender<WorldCommand>,
145 pub world: Arc<RwLock<World>>,
147 pub signal_rx: broadcast::Receiver<Signal>,
149 pub channel_handles: SharedChannelHandles,
151}
152
153pub struct ClientRunner {
173 id: ChannelId,
175 event_tx: mpsc::Sender<InboundEvent>,
177 event_rx: mpsc::Receiver<InboundEvent>,
179 signal_rx: broadcast::Receiver<Signal>,
181 world_tx: mpsc::Sender<WorldCommand>,
183 world: Arc<RwLock<World>>,
185 channel_handles: SharedChannelHandles,
187 paused_queue: PausedEventQueue,
189
190 io_bridge: IOBridge,
193 principal: Principal,
195 source_id: ComponentId,
197}
198
199impl ClientRunner {
200 #[must_use]
212 pub fn new(
213 id: ChannelId,
214 config: ClientRunnerConfig,
215 io_port: IOPort,
216 principal: Principal,
217 ) -> (Self, ChannelHandle) {
218 let (event_tx, event_rx) = mpsc::channel(EVENT_BUFFER_SIZE);
219
220 let runner = Self {
221 id,
222 event_tx: event_tx.clone(),
223 event_rx,
224 signal_rx: config.signal_rx,
225 world_tx: config.world_tx,
226 world: config.world,
227 channel_handles: config.channel_handles,
228 paused_queue: PausedEventQueue::new(),
229 io_bridge: IOBridge::new(io_port),
230 principal,
231 source_id: ComponentId::builtin("io_bridge"),
232 };
233
234 let handle = ChannelHandle::new(id, event_tx);
235 (runner, handle)
236 }
237
238 #[must_use]
240 pub fn id(&self) -> ChannelId {
241 self.id
242 }
243
244 #[must_use]
248 pub(crate) fn event_tx(&self) -> &mpsc::Sender<InboundEvent> {
249 &self.event_tx
250 }
251
252 #[must_use]
254 pub fn io_bridge(&self) -> &IOBridge {
255 &self.io_bridge
256 }
257
258 #[must_use]
260 pub fn io_bridge_mut(&mut self) -> &mut IOBridge {
261 &mut self.io_bridge
262 }
263
264 pub async fn run(mut self) -> RunnerResult {
271 info!("ClientRunner {} started", self.id);
272
273 loop {
274 tokio::select! {
275 biased;
277
278 signal = self.signal_rx.recv() => {
279 match signal {
280 Ok(sig) => {
281 if !self.handle_signal(sig).await {
282 break;
283 }
284 }
285 Err(broadcast::error::RecvError::Closed) => {
286 info!("ClientRunner {}: signal channel closed", self.id);
287 break;
288 }
289 Err(broadcast::error::RecvError::Lagged(n)) => {
290 warn!("ClientRunner {}: lagged {} signals", self.id, n);
291 }
292 }
293 }
294
295 event = self.event_rx.recv() => {
297 match event {
298 Some(evt) => {
299 if !self.handle_event(evt).await {
300 break;
301 }
302 }
303 None => {
304 info!("ClientRunner {}: event channel closed", self.id);
305 break;
306 }
307 }
308 }
309
310 io_result = self.io_bridge.recv_input(&self.principal) => {
312 match io_result {
313 Some(Ok(signal)) => {
314 debug!("ClientRunner {}: IO input → {:?}", self.id, signal.kind);
316 if !self.handle_signal(signal).await {
317 break;
318 }
319 }
320 Some(Err(cmd)) => {
321 if !self.handle_io_command(cmd).await {
323 break;
324 }
325 }
326 None => {
327 info!("ClientRunner {}: IO closed", self.id);
328 break;
329 }
330 }
331 }
332 }
333
334 if !is_channel_active(&self.world, self.id).await {
336 debug!("ClientRunner {}: channel no longer active", self.id);
337 break;
338 }
339 }
340
341 info!("ClientRunner {} stopped", self.id);
342
343 RunnerResult {
344 channel_id: self.id,
345 component_fqn: std::borrow::Cow::Borrowed(CLIENT_RUNNER_FQN),
346 snapshot: None,
347 }
348 }
349
350 async fn handle_io_command(&mut self, cmd: InputCommand) -> bool {
360 match cmd {
361 InputCommand::Quit => {
362 info!("ClientRunner {}: quit requested", self.id);
363 send_abort(&self.world_tx, self.id, "user quit").await;
364 false
365 }
366 InputCommand::Unknown { input } => {
367 self.handle_user_message(&input);
369 true
370 }
371 InputCommand::Empty => {
372 true
374 }
375 _ => true,
376 }
377 }
378
379 fn handle_user_message(&self, message: &str) {
385 debug!("ClientRunner {}: user message: {}", self.id, message);
386
387 let event = Event {
389 category: EventCategory::UserInput,
390 operation: "input".to_string(),
391 source: self.source_id.clone(),
392 payload: serde_json::json!({
393 "message": message
394 }),
395 };
396
397 let handles = self.channel_handles.read();
399 let mut delivered = 0;
400 for (channel_id, handle) in handles.iter() {
401 if *channel_id == self.id {
403 continue;
404 }
405 if handle.try_inject(event.clone()).is_ok() {
406 delivered += 1;
407 }
408 }
409 debug!(
410 "ClientRunner {}: broadcast UserInput to {} channels",
411 self.id, delivered
412 );
413 }
414
415 async fn handle_signal(&mut self, signal: Signal) -> bool {
420 debug!(
421 "ClientRunner {}: received signal {:?}",
422 self.id, signal.kind
423 );
424
425 if !signal.affects_channel(self.id) {
427 return true;
428 }
429
430 let action = determine_channel_action(&signal.kind);
432 match action {
433 SignalAction::Stop { reason } => {
434 if let SignalKind::Reject {
436 approval_id,
437 reason: rej_reason,
438 } = &signal.kind
439 {
440 let _ = self
441 .io_bridge
442 .show_rejected(approval_id, rej_reason.as_deref())
443 .await;
444 }
445 info!("ClientRunner {}: stopping - {}", self.id, reason);
446 send_abort(&self.world_tx, self.id, &reason).await;
447 return false;
448 }
449 SignalAction::Transition(transition) => {
450 if let SignalKind::Approve { approval_id } = &signal.kind {
452 let _ = self.io_bridge.show_approved(approval_id).await;
453 }
454
455 send_transition(&self.world_tx, self.id, transition.clone()).await;
456
457 if matches!(transition, StateTransition::Resume) {
459 self.drain_paused_queue().await;
460 }
461 }
462 SignalAction::Continue => {}
463 }
464
465 true
466 }
467
468 async fn handle_event(&mut self, inbound: InboundEvent) -> bool {
473 let event = inbound.into_event();
474
475 debug!(
476 "ClientRunner {}: received event {:?} op={}",
477 self.id, event.category, event.operation
478 );
479
480 if is_channel_paused(&self.world, self.id).await {
482 self.paused_queue
483 .try_enqueue(event, "ClientRunner", self.id);
484 return true;
485 }
486
487 self.process_event(event).await;
488 true
489 }
490
491 async fn process_event(&self, event: Event) {
496 if event.category == EventCategory::Output {
498 self.handle_output_event(&event).await;
499 return;
500 }
501
502 debug!(
504 "ClientRunner {}: ignoring non-Output event {:?}",
505 self.id, event.category
506 );
507 }
508
509 async fn handle_output_event(&self, event: &Event) {
511 if event.payload.get("type").and_then(|v| v.as_str()) == Some("processing") {
513 let component = event
514 .payload
515 .get("component")
516 .and_then(|v| v.as_str())
517 .unwrap_or("unknown");
518 let operation = event
519 .payload
520 .get("operation")
521 .and_then(|v| v.as_str())
522 .unwrap_or("request");
523 let _ = self.io_bridge.show_processing(component, operation).await;
524 return;
525 }
526
527 if event.payload.get("type").and_then(|v| v.as_str()) == Some("approval_request") {
529 let approval_id = event.payload["approval_id"].as_str().unwrap_or("");
530 let operation = event.payload["operation"].as_str().unwrap_or("exec");
531 let description = event.payload["description"].as_str().unwrap_or("");
532
533 let request = crate::components::ApprovalRequest::with_id(
534 approval_id,
535 operation,
536 description,
537 event.payload.clone(),
538 );
539 let _ = self.io_bridge.show_approval_request(&request).await;
540 return;
541 }
542
543 let message = event
544 .payload
545 .get("message")
546 .and_then(|v| v.as_str())
547 .unwrap_or("");
548
549 let level = event
550 .payload
551 .get("level")
552 .and_then(|v| v.as_str())
553 .unwrap_or("info");
554
555 match level {
556 "warn" | "warning" => {
557 let _ = self.io_bridge.warn(message).await;
558 }
559 "error" => {
560 let _ = self.io_bridge.error(message).await;
561 }
562 _ => {
563 let _ = self.io_bridge.info(message).await;
564 }
565 }
566 }
567
568 async fn drain_paused_queue(&mut self) {
570 let events: Vec<_> = self.paused_queue.drain("ClientRunner", self.id).collect();
572
573 for event in events {
574 self.process_event(event).await;
575 }
576 }
577}
578
579#[cfg(test)]
580mod tests {
581 use super::*;
582 use crate::channel::config::ChannelConfig;
583 use crate::channel::manager::WorldManager;
584 use crate::io::IOPort;
585 use orcs_types::PrincipalId;
586 use parking_lot::RwLock as ParkingRwLock;
587 use std::collections::HashMap;
588
589 fn test_principal() -> Principal {
590 Principal::User(PrincipalId::new())
591 }
592
593 async fn setup() -> (
594 tokio::task::JoinHandle<()>,
595 mpsc::Sender<WorldCommand>,
596 Arc<RwLock<World>>,
597 broadcast::Sender<Signal>,
598 ChannelId,
599 ) {
600 let mut world = World::new();
601 let io = world.create_channel(ChannelConfig::interactive());
602
603 let (manager, world_tx) = WorldManager::with_world(world);
604 let world_handle = manager.world();
605
606 let manager_task = tokio::spawn(manager.run());
607
608 let (signal_tx, _) = broadcast::channel(64);
609
610 (manager_task, world_tx, world_handle, signal_tx, io)
611 }
612
613 async fn teardown(
614 manager_task: tokio::task::JoinHandle<()>,
615 world_tx: mpsc::Sender<WorldCommand>,
616 ) {
617 let _ = world_tx.send(WorldCommand::Shutdown).await;
618 let _ = manager_task.await;
619 }
620
621 fn make_config(
622 world_tx: mpsc::Sender<WorldCommand>,
623 world: Arc<RwLock<World>>,
624 signal_tx: &broadcast::Sender<Signal>,
625 ) -> ClientRunnerConfig {
626 let channel_handles: SharedChannelHandles = Arc::new(ParkingRwLock::new(HashMap::new()));
627 ClientRunnerConfig {
628 world_tx,
629 world,
630 signal_rx: signal_tx.subscribe(),
631 channel_handles,
632 }
633 }
634
635 #[tokio::test]
636 async fn client_runner_creation() {
637 let (manager_task, world_tx, world, signal_tx, primary) = setup().await;
638
639 let (port, _input, _output) = IOPort::with_defaults(primary);
640 let config = make_config(world_tx.clone(), world, &signal_tx);
641 let (runner, handle) = ClientRunner::new(primary, config, port, test_principal());
642
643 assert_eq!(runner.id(), primary);
644 assert_eq!(handle.id, primary);
645
646 teardown(manager_task, world_tx).await;
647 }
648
649 #[tokio::test]
650 async fn client_runner_handles_io_input() {
651 use crate::io::IOInput;
652
653 let (manager_task, world_tx, world, signal_tx, primary) = setup().await;
654
655 let (port, input_handle, _output_handle) = IOPort::with_defaults(primary);
656 let config = make_config(world_tx.clone(), world, &signal_tx);
657 let (runner, _handle) = ClientRunner::new(primary, config, port, test_principal());
658
659 let runner_task = tokio::spawn(runner.run());
660 tokio::task::yield_now().await;
661
662 input_handle
664 .send(IOInput::line("q"))
665 .await
666 .expect("send quit command via IO");
667
668 let result = tokio::time::timeout(std::time::Duration::from_millis(200), runner_task).await;
670 assert!(result.is_ok());
671
672 teardown(manager_task, world_tx).await;
673 }
674
675 #[tokio::test]
676 async fn client_runner_handles_approval() {
677 use crate::io::{IOInput, InputContext};
678
679 let (manager_task, world_tx, world, signal_tx, primary) = setup().await;
680
681 let (port, input_handle, mut output_handle) = IOPort::with_defaults(primary);
682 let config = make_config(world_tx.clone(), world, &signal_tx);
683 let (runner, _handle) = ClientRunner::new(primary, config, port, test_principal());
684
685 let runner_task = tokio::spawn(runner.run());
686 tokio::task::yield_now().await;
687
688 let ctx = InputContext::with_approval_id("req-123");
690 input_handle
691 .send(IOInput::line_with_context("y", ctx))
692 .await
693 .expect("send approval input via IO");
694
695 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
697
698 if let Some(output) = output_handle.try_recv() {
700 debug!("Received output: {:?}", output);
702 }
703
704 signal_tx
706 .send(Signal::cancel(primary, Principal::System))
707 .expect("send cancel signal for cleanup");
708 let _ = tokio::time::timeout(std::time::Duration::from_millis(100), runner_task).await;
709
710 teardown(manager_task, world_tx).await;
711 }
712
713 #[tokio::test]
714 async fn client_runner_handles_veto() {
715 let (manager_task, world_tx, world, signal_tx, primary) = setup().await;
716
717 let (port, _input, _output) = IOPort::with_defaults(primary);
718 let config = make_config(world_tx.clone(), world, &signal_tx);
719 let (runner, _handle) = ClientRunner::new(primary, config, port, test_principal());
720
721 let runner_task = tokio::spawn(runner.run());
722 tokio::task::yield_now().await;
723
724 signal_tx
726 .send(Signal::veto(Principal::System))
727 .expect("send veto signal");
728
729 let result = tokio::time::timeout(std::time::Duration::from_millis(100), runner_task).await;
731 assert!(result.is_ok());
732
733 teardown(manager_task, world_tx).await;
734 }
735
736 mod component_response_tests {
744 use super::*;
745 use serde_json::json;
746
747 #[test]
748 fn from_json_pending_approval() {
749 let json = json!({
750 "status": "pending_approval",
751 "approval_id": "req-123",
752 "message": "Confirm action?"
753 });
754
755 let response = ComponentResponse::from_json(&json);
756
757 assert_eq!(
758 response,
759 ComponentResponse::PendingApproval {
760 approval_id: "req-123",
761 description: "Confirm action?"
762 }
763 );
764 }
765
766 #[test]
767 fn from_json_pending_approval_default_message() {
768 let json = json!({
769 "status": "pending_approval",
770 "approval_id": "req-456"
771 });
772
773 let response = ComponentResponse::from_json(&json);
774
775 assert_eq!(
776 response,
777 ComponentResponse::PendingApproval {
778 approval_id: "req-456",
779 description: "Awaiting approval"
780 }
781 );
782 }
783
784 #[test]
785 fn from_json_pending_approval_missing_id_returns_empty() {
786 let json = json!({
787 "status": "pending_approval"
788 });
790
791 let response = ComponentResponse::from_json(&json);
792
793 assert_eq!(response, ComponentResponse::Empty);
794 }
795
796 #[test]
797 fn from_json_direct_response() {
798 let json = json!({
799 "response": "Hello, world!"
800 });
801
802 let response = ComponentResponse::from_json(&json);
803
804 assert_eq!(response, ComponentResponse::TextResponse("Hello, world!"));
805 }
806
807 #[test]
808 fn from_json_nested_data_response() {
809 let json = json!({
810 "data": {
811 "response": "Nested response",
812 "source": "test"
813 }
814 });
815
816 let response = ComponentResponse::from_json(&json);
817
818 assert_eq!(response, ComponentResponse::TextResponse("Nested response"));
819 }
820
821 #[test]
822 fn from_json_empty_object() {
823 let json = json!({});
824
825 let response = ComponentResponse::from_json(&json);
826
827 assert_eq!(response, ComponentResponse::Empty);
828 }
829
830 #[test]
831 fn from_json_unrelated_fields() {
832 let json = json!({
833 "status": "completed",
834 "result": 42
835 });
836
837 let response = ComponentResponse::from_json(&json);
838
839 assert_eq!(response, ComponentResponse::Empty);
840 }
841
842 #[test]
843 fn from_json_priority_pending_over_response() {
844 let json = json!({
845 "status": "pending_approval",
846 "approval_id": "req-789",
847 "response": "This should be ignored"
848 });
849
850 let response = ComponentResponse::from_json(&json);
851
852 assert_eq!(
853 response,
854 ComponentResponse::PendingApproval {
855 approval_id: "req-789",
856 description: "Awaiting approval"
857 }
858 );
859 }
860
861 #[test]
862 fn from_json_response_priority_over_nested() {
863 let json = json!({
864 "response": "Direct",
865 "data": {
866 "response": "Nested"
867 }
868 });
869
870 let response = ComponentResponse::from_json(&json);
871
872 assert_eq!(response, ComponentResponse::TextResponse("Direct"));
873 }
874
875 #[test]
876 fn from_json_error_response() {
877 let json = json!({
878 "success": false,
879 "error": "Command failed"
880 });
881
882 let response = ComponentResponse::from_json(&json);
883
884 assert_eq!(response, ComponentResponse::ErrorResponse("Command failed"));
885 }
886
887 #[test]
888 fn from_json_error_response_no_message() {
889 let json = json!({
890 "success": false
891 });
892
893 let response = ComponentResponse::from_json(&json);
894
895 assert_eq!(response, ComponentResponse::ErrorResponse("Unknown error"));
896 }
897
898 #[test]
899 fn from_json_success_true_not_error() {
900 let json = json!({
901 "success": true,
902 "response": "Operation succeeded"
903 });
904
905 let response = ComponentResponse::from_json(&json);
906
907 assert_eq!(
908 response,
909 ComponentResponse::TextResponse("Operation succeeded")
910 );
911 }
912
913 #[test]
914 fn from_json_pending_priority_over_error() {
915 let json = json!({
916 "status": "pending_approval",
917 "approval_id": "req-999",
918 "success": false,
919 "error": "This should be ignored"
920 });
921
922 let response = ComponentResponse::from_json(&json);
923
924 assert_eq!(
925 response,
926 ComponentResponse::PendingApproval {
927 approval_id: "req-999",
928 description: "Awaiting approval"
929 }
930 );
931 }
932 }
933}