1use std::sync::Arc;
21use std::time::{Duration, Instant};
22
23use dashmap::DashMap;
24use tokio::sync::{mpsc, oneshot};
25use tracing::{debug, warn};
26
27use crate::capability::CapabilityChecker;
28use crate::error::{KernelError, KernelResult};
29use crate::ipc::{KernelMessage, MessageTarget};
30use crate::process::{Pid, ProcessState, ProcessTable};
31use crate::service::ServiceRegistry;
32use crate::topic::TopicRouter;
33
34#[cfg(feature = "exochain")]
35use crate::chain::ChainManager;
36
37#[cfg(feature = "mesh")]
38use crate::mesh_ipc::MeshIpcEnvelope;
39#[cfg(feature = "mesh")]
40use crate::mesh_runtime::MeshRuntime;
41
42const DEFAULT_INBOX_CAPACITY: usize = 1024;
44
45#[allow(dead_code)]
53const MAX_A2A_MESSAGE_SIZE: usize = 16 * 1024 * 1024;
54
55struct PendingRequest {
57 response_tx: oneshot::Sender<KernelMessage>,
59 #[allow(dead_code)]
61 sent_at: Instant,
62}
63
64pub struct A2ARouter {
70 process_table: Arc<ProcessTable>,
72
73 capability_checker: Arc<CapabilityChecker>,
75
76 topic_router: Arc<TopicRouter>,
78
79 service_registry: Option<Arc<ServiceRegistry>>,
81
82 inboxes: DashMap<Pid, mpsc::Sender<KernelMessage>>,
84
85 pending_requests: DashMap<String, PendingRequest>,
87
88 #[cfg(feature = "exochain")]
94 gate: std::sync::OnceLock<Arc<dyn crate::gate::GateBackend>>,
95
96 #[cfg(feature = "os-patterns")]
98 dead_letter_queue: std::sync::OnceLock<Arc<crate::dead_letter::DeadLetterQueue>>,
99
100 #[cfg(feature = "mesh")]
102 mesh_runtime: std::sync::OnceLock<Arc<MeshRuntime>>,
103}
104
105impl A2ARouter {
106 pub fn new(
108 process_table: Arc<ProcessTable>,
109 capability_checker: Arc<CapabilityChecker>,
110 topic_router: Arc<TopicRouter>,
111 ) -> Self {
112 Self {
113 process_table,
114 capability_checker,
115 topic_router,
116 service_registry: None,
117 inboxes: DashMap::new(),
118 pending_requests: DashMap::new(),
119 #[cfg(feature = "exochain")]
120 gate: std::sync::OnceLock::new(),
121 #[cfg(feature = "os-patterns")]
122 dead_letter_queue: std::sync::OnceLock::new(),
123 #[cfg(feature = "mesh")]
124 mesh_runtime: std::sync::OnceLock::new(),
125 }
126 }
127
128 pub fn with_service_registry(mut self, registry: Arc<ServiceRegistry>) -> Self {
130 self.service_registry = Some(registry);
131 self
132 }
133
134 #[cfg(feature = "exochain")]
140 pub fn with_gate(self, gate: Arc<dyn crate::gate::GateBackend>) -> Self {
141 let _ = self.gate.set(gate);
142 self
143 }
144
145 #[cfg(feature = "exochain")]
151 pub fn set_gate(&self, gate: Arc<dyn crate::gate::GateBackend>) {
152 let _ = self.gate.set(gate);
153 }
154
155 #[cfg(feature = "os-patterns")]
160 pub fn set_dead_letter_queue(&self, dlq: Arc<crate::dead_letter::DeadLetterQueue>) {
161 let _ = self.dead_letter_queue.set(dlq);
162 }
163
164 #[cfg(feature = "os-patterns")]
166 pub fn dead_letter_queue(&self) -> Option<&Arc<crate::dead_letter::DeadLetterQueue>> {
167 self.dead_letter_queue.get()
168 }
169
170 #[cfg(feature = "mesh")]
175 pub fn set_mesh_runtime(&self, runtime: Arc<MeshRuntime>) {
176 let _ = self.mesh_runtime.set(runtime);
177 }
178
179 #[cfg(feature = "mesh")]
181 pub fn mesh_runtime(&self) -> Option<&Arc<MeshRuntime>> {
182 self.mesh_runtime.get()
183 }
184
185 pub fn service_registry(&self) -> Option<&Arc<ServiceRegistry>> {
187 self.service_registry.as_ref()
188 }
189
190 pub fn create_inbox(&self, pid: Pid) -> mpsc::Receiver<KernelMessage> {
199 let (tx, rx) = mpsc::channel(DEFAULT_INBOX_CAPACITY);
200 self.inboxes.insert(pid, tx);
201 debug!(pid, "created inbox");
202 rx
203 }
204
205 pub fn remove_inbox(&self, pid: Pid) {
207 self.inboxes.remove(&pid);
208 debug!(pid, "removed inbox");
209 }
210
211 pub async fn send(&self, msg: KernelMessage) -> KernelResult<()> {
232 let from = msg.from;
233
234 let sender = self
236 .process_table
237 .get(from)
238 .ok_or(KernelError::ProcessNotFound { pid: from })?;
239
240 if !matches!(sender.state, ProcessState::Running | ProcessState::Suspended) {
241 return Err(KernelError::Ipc(format!(
242 "sender PID {from} is not running (state: {})",
243 sender.state
244 )));
245 }
246
247 #[cfg(feature = "exochain")]
251 if let Some(gate) = self.gate.get() {
252 let action = match &msg.payload {
253 crate::ipc::MessagePayload::ToolCall { name, .. } => format!("tool.{name}"),
254 crate::ipc::MessagePayload::Signal(_) => "ipc.signal".to_string(),
255 _ => "ipc.send".to_string(),
256 };
257 let context = serde_json::json!({
258 "from": from,
259 "target": format!("{:?}", msg.target),
260 "layer": "routing",
261 });
262 match gate.check(&from.to_string(), &action, &context) {
263 crate::gate::GateDecision::Deny { reason, .. } => {
264 return Err(KernelError::CapabilityDenied {
265 pid: from,
266 action,
267 reason: format!("routing gate denied: {reason}"),
268 });
269 }
270 crate::gate::GateDecision::Defer { .. }
271 | crate::gate::GateDecision::Permit { .. } => {
272 }
274 }
275 }
276
277 match &msg.target {
279 MessageTarget::Process(target_pid) => {
280 self.capability_checker
282 .check_ipc_target(from, *target_pid)?;
283
284 self.deliver_to_inbox(*target_pid, msg).await
285 }
286 MessageTarget::Topic(topic) => {
287 let subscribers = self.topic_router.live_subscribers(topic);
288 let mut delivered = 0u32;
289 for &sub_pid in &subscribers {
290 if sub_pid != from {
291 let msg_clone = msg.clone();
292 if self.deliver_to_inbox(sub_pid, msg_clone).await.is_ok() {
293 delivered += 1;
294 }
295 }
296 }
297 debug!(from, topic, delivered, "published to topic");
298 Ok(())
299 }
300 MessageTarget::Broadcast => {
301 let mut delivered = 0u32;
302 let pids: Vec<Pid> = self.inboxes.iter().map(|entry| *entry.key()).collect();
303
304 for pid in pids {
305 if pid != from {
306 if self.capability_checker.check_ipc_target(from, pid).is_ok() {
308 let msg_clone = msg.clone();
309 if self.deliver_to_inbox(pid, msg_clone).await.is_ok() {
310 delivered += 1;
311 }
312 }
313 }
314 }
315 debug!(from, delivered, "broadcast sent");
316 Ok(())
317 }
318 MessageTarget::Service(name) => {
319 let name = name.clone();
320 self.route_to_service(from, &name, msg).await
321 }
322 MessageTarget::ServiceMethod { service, .. } => {
323 let service_name = service.clone();
324 self.route_to_service(from, &service_name, msg).await
325 }
326 MessageTarget::Kernel => {
327 debug!(from, "kernel message routing not yet implemented");
328 Ok(())
329 }
330 MessageTarget::RemoteNode { node_id, .. } => {
331 #[cfg(feature = "mesh")]
332 {
333 if let Some(runtime) = self.mesh_runtime.get() {
334 let node_id = node_id.clone();
335 debug!(from, %node_id, "routing message to remote node via mesh");
336 let envelope = MeshIpcEnvelope::new(
337 runtime.node_id().to_string(),
338 node_id.clone(),
339 msg,
340 );
341 runtime
342 .send_to_peer(&node_id, envelope)
343 .await
344 .map_err(|e| KernelError::Mesh(format!(
345 "failed to send to remote node '{node_id}': {e}"
346 )))
347 } else {
348 debug!(from, %node_id, "remote node routing: no mesh runtime attached");
349 Err(KernelError::Mesh(format!(
350 "remote routing to node '{node_id}' not yet implemented"
351 )))
352 }
353 }
354 #[cfg(not(feature = "mesh"))]
355 {
356 debug!(from, %node_id, "remote node routing not available (mesh feature disabled)");
357 Err(KernelError::Mesh(format!(
358 "remote routing to node '{node_id}' not yet implemented"
359 )))
360 }
361 }
362 }
363 }
364
365 async fn deliver_to_inbox(&self, pid: Pid, msg: KernelMessage) -> KernelResult<()> {
371 let tx = match self.inboxes.get(&pid) {
375 Some(tx) => tx.clone(),
376 None => {
377 warn!(pid, "no inbox for PID, dead-lettering");
378 #[cfg(feature = "os-patterns")]
379 if let Some(dlq) = self.dead_letter_queue.get() {
380 dlq.intake(
381 msg,
382 crate::dead_letter::DeadLetterReason::TargetNotFound { pid },
383 );
384 }
385 return Err(KernelError::Ipc(format!("no inbox for PID {pid}")));
386 }
387 };
388
389 match tx.try_send(msg) {
390 Ok(()) => {
391 debug!(pid, "message delivered to inbox");
392 Ok(())
393 }
394 Err(mpsc::error::TrySendError::Full(rejected_msg)) => {
395 warn!(pid, "inbox full, dead-lettering");
396 #[cfg(feature = "os-patterns")]
397 if let Some(dlq) = self.dead_letter_queue.get() {
398 dlq.intake(
399 rejected_msg,
400 crate::dead_letter::DeadLetterReason::InboxFull { pid },
401 );
402 }
403 Err(KernelError::Ipc(format!("inbox full for PID {pid}")))
404 }
405 Err(mpsc::error::TrySendError::Closed(rejected_msg)) => {
406 warn!(pid, "inbox closed, removing and dead-lettering");
407 self.inboxes.remove(&pid);
408 #[cfg(feature = "os-patterns")]
409 if let Some(dlq) = self.dead_letter_queue.get() {
410 dlq.intake(
411 rejected_msg,
412 crate::dead_letter::DeadLetterReason::AgentExited { pid },
413 );
414 }
415 Err(KernelError::Ipc(format!("inbox closed for PID {pid}")))
416 }
417 }
418 }
419
420 async fn route_to_service(
425 &self,
426 from: Pid,
427 service_name: &str,
428 msg: KernelMessage,
429 ) -> KernelResult<()> {
430 let registry = self.service_registry.as_ref().ok_or_else(|| {
431 KernelError::Ipc(format!(
432 "no service registry configured; cannot route to service '{service_name}'"
433 ))
434 })?;
435
436 let target_pid = registry.resolve_target(service_name).ok_or_else(|| {
437 KernelError::Ipc(format!("service not found: '{service_name}'"))
438 })?;
439
440 self.capability_checker
442 .check_ipc_target(from, target_pid)?;
443
444 self.deliver_to_inbox(target_pid, msg).await
445 }
446
447 #[cfg(feature = "exochain")]
457 pub async fn send_checked(
458 &self,
459 msg: KernelMessage,
460 chain: Option<&ChainManager>,
461 ) -> KernelResult<()> {
462 if let Some(cm) = chain {
465 cm.append(
466 "ipc",
467 "ipc.send",
468 Some(serde_json::json!({
469 "from": msg.from,
470 "target": format!("{:?}", msg.target),
471 "payload_type": msg.payload.type_name(),
472 "msg_id": msg.id,
473 })),
474 );
475 }
476 self.send(msg).await
477 }
478
479 pub fn topic_router(&self) -> &Arc<TopicRouter> {
481 &self.topic_router
482 }
483
484 pub fn inbox_count(&self) -> usize {
486 self.inboxes.len()
487 }
488
489 pub fn has_inbox(&self, pid: Pid) -> bool {
491 self.inboxes.contains_key(&pid)
492 }
493
494 pub async fn request(
507 &self,
508 msg: KernelMessage,
509 timeout: Duration,
510 ) -> KernelResult<KernelMessage> {
511 let request_id = msg.id.clone();
512 let (tx, rx) = oneshot::channel();
513
514 self.pending_requests.insert(
516 request_id.clone(),
517 PendingRequest {
518 response_tx: tx,
519 sent_at: Instant::now(),
520 },
521 );
522
523 if let Err(e) = self.send(msg).await {
525 self.pending_requests.remove(&request_id);
526 return Err(e);
527 }
528
529 match tokio::time::timeout(timeout, rx).await {
531 Ok(Ok(response)) => Ok(response),
532 Ok(Err(_)) => {
533 self.pending_requests.remove(&request_id);
534 Err(KernelError::Ipc("response channel closed".into()))
535 }
536 Err(_) => {
537 self.pending_requests.remove(&request_id);
538 Err(KernelError::Timeout {
539 operation: format!("request {request_id}"),
540 duration_ms: timeout.as_millis() as u64,
541 })
542 }
543 }
544 }
545
546 pub fn try_complete_request(&self, msg: KernelMessage) -> bool {
552 if let Some(ref corr_id) = msg.correlation_id
553 && let Some((_, pending)) = self.pending_requests.remove(corr_id)
554 {
555 let _ = pending.response_tx.send(msg);
556 return true;
557 }
558 false
559 }
560
561 pub fn pending_request_count(&self) -> usize {
563 self.pending_requests.len()
564 }
565}
566
567#[cfg(test)]
568mod tests {
569 use super::*;
570 use crate::capability::AgentCapabilities;
571 use crate::ipc::MessagePayload;
572 use crate::process::{ProcessEntry, ResourceUsage};
573 use tokio_util::sync::CancellationToken;
574
575 fn setup_router(
576 agent_count: usize,
577 ) -> (A2ARouter, Vec<Pid>, Vec<mpsc::Receiver<KernelMessage>>) {
578 let table = Arc::new(ProcessTable::new(64));
579 let mut pids = Vec::new();
580
581 for i in 0..agent_count {
582 let entry = ProcessEntry {
583 pid: 0,
584 agent_id: format!("agent-{i}"),
585 state: ProcessState::Running,
586 capabilities: AgentCapabilities::default(),
587 resource_usage: ResourceUsage::default(),
588 cancel_token: CancellationToken::new(),
589 parent_pid: None,
590 };
591 let pid = table.insert(entry).unwrap();
592 pids.push(pid);
593 }
594
595 let checker = Arc::new(CapabilityChecker::new(table.clone()));
596 let topic_router = Arc::new(TopicRouter::new(table.clone()));
597 let router = A2ARouter::new(table, checker, topic_router);
598
599 let mut receivers = Vec::new();
600 for &pid in &pids {
601 let rx = router.create_inbox(pid);
602 receivers.push(rx);
603 }
604
605 (router, pids, receivers)
606 }
607
608 #[tokio::test]
609 async fn direct_message_delivery() {
610 let (router, pids, mut receivers) = setup_router(2);
611
612 let msg = KernelMessage::text(pids[0], MessageTarget::Process(pids[1]), "hello");
613 router.send(msg).await.unwrap();
614
615 let received = receivers[1].try_recv().unwrap();
616 assert_eq!(received.from, pids[0]);
617 assert!(matches!(
618 received.payload,
619 MessagePayload::Text(ref t) if t == "hello"
620 ));
621 }
622
623 #[tokio::test]
624 async fn message_to_self_works() {
625 let (router, pids, mut receivers) = setup_router(1);
626
627 let msg = KernelMessage::text(pids[0], MessageTarget::Process(pids[0]), "self-msg");
628 router.send(msg).await.unwrap();
629
630 let received = receivers[0].try_recv().unwrap();
631 assert!(matches!(
632 received.payload,
633 MessagePayload::Text(ref t) if t == "self-msg"
634 ));
635 }
636
637 #[tokio::test]
638 async fn broadcast_delivers_to_all_except_sender() {
639 let (router, pids, mut receivers) = setup_router(3);
640
641 let msg = KernelMessage::text(pids[0], MessageTarget::Broadcast, "broadcast");
642 router.send(msg).await.unwrap();
643
644 assert!(receivers[0].try_recv().is_err());
646
647 let r1 = receivers[1].try_recv().unwrap();
649 assert!(matches!(
650 r1.payload,
651 MessagePayload::Text(ref t) if t == "broadcast"
652 ));
653 let r2 = receivers[2].try_recv().unwrap();
654 assert!(matches!(
655 r2.payload,
656 MessagePayload::Text(ref t) if t == "broadcast"
657 ));
658 }
659
660 #[tokio::test]
661 async fn topic_publish_delivers_to_subscribers() {
662 let (router, pids, mut receivers) = setup_router(3);
663
664 router.topic_router().subscribe(pids[1], "build");
666 router.topic_router().subscribe(pids[2], "build");
667
668 let msg = KernelMessage::text(pids[0], MessageTarget::Topic("build".into()), "build done");
669 router.send(msg).await.unwrap();
670
671 assert!(receivers[0].try_recv().is_err());
673
674 assert!(receivers[1].try_recv().is_ok());
676 assert!(receivers[2].try_recv().is_ok());
677 }
678
679 #[tokio::test]
680 async fn topic_publish_excludes_sender_if_subscribed() {
681 let (router, pids, mut receivers) = setup_router(2);
682
683 router.topic_router().subscribe(pids[0], "build");
685 router.topic_router().subscribe(pids[1], "build");
686
687 let msg = KernelMessage::text(pids[0], MessageTarget::Topic("build".into()), "done");
688 router.send(msg).await.unwrap();
689
690 assert!(receivers[0].try_recv().is_err());
692 assert!(receivers[1].try_recv().is_ok());
694 }
695
696 #[tokio::test]
697 async fn send_from_nonexistent_pid_fails() {
698 let (router, _pids, _receivers) = setup_router(1);
699
700 let msg = KernelMessage::text(999, MessageTarget::Process(1), "hello");
701 let result = router.send(msg).await;
702 assert!(result.is_err());
703 }
704
705 #[tokio::test]
706 async fn send_to_pid_without_inbox_fails() {
707 let table = Arc::new(ProcessTable::new(64));
708
709 let sender_entry = ProcessEntry {
711 pid: 0,
712 agent_id: "sender".to_owned(),
713 state: ProcessState::Running,
714 capabilities: AgentCapabilities::default(),
715 resource_usage: ResourceUsage::default(),
716 cancel_token: CancellationToken::new(),
717 parent_pid: None,
718 };
719 let sender_pid = table.insert(sender_entry).unwrap();
720
721 let target_entry = ProcessEntry {
723 pid: 0,
724 agent_id: "target".to_owned(),
725 state: ProcessState::Running,
726 capabilities: AgentCapabilities::default(),
727 resource_usage: ResourceUsage::default(),
728 cancel_token: CancellationToken::new(),
729 parent_pid: None,
730 };
731 let target_pid = table.insert(target_entry).unwrap();
732
733 let checker = Arc::new(CapabilityChecker::new(table.clone()));
734 let topic_router = Arc::new(TopicRouter::new(table.clone()));
735 let router = A2ARouter::new(table, checker, topic_router);
736
737 let _rx = router.create_inbox(sender_pid);
739
740 let msg = KernelMessage::text(sender_pid, MessageTarget::Process(target_pid), "hello");
741 let result = router.send(msg).await;
742 assert!(result.is_err());
743 }
744
745 #[tokio::test]
746 async fn ipc_scope_restricts_messaging() {
747 let table = Arc::new(ProcessTable::new(64));
748
749 use crate::capability::IpcScope;
751 let sender_entry = ProcessEntry {
752 pid: 0,
753 agent_id: "restricted".to_owned(),
754 state: ProcessState::Running,
755 capabilities: AgentCapabilities {
756 ipc_scope: IpcScope::Restricted(vec![]), ..Default::default()
758 },
759 resource_usage: ResourceUsage::default(),
760 cancel_token: CancellationToken::new(),
761 parent_pid: None,
762 };
763 let sender_pid = table.insert(sender_entry).unwrap();
764
765 let target_entry = ProcessEntry {
767 pid: 0,
768 agent_id: "target".to_owned(),
769 state: ProcessState::Running,
770 capabilities: AgentCapabilities::default(),
771 resource_usage: ResourceUsage::default(),
772 cancel_token: CancellationToken::new(),
773 parent_pid: None,
774 };
775 let target_pid = table.insert(target_entry).unwrap();
776
777 let checker = Arc::new(CapabilityChecker::new(table.clone()));
778 let topic_router = Arc::new(TopicRouter::new(table.clone()));
779 let router = A2ARouter::new(table, checker, topic_router);
780 let _rx1 = router.create_inbox(sender_pid);
781 let _rx2 = router.create_inbox(target_pid);
782
783 let msg = KernelMessage::text(sender_pid, MessageTarget::Process(target_pid), "blocked");
784 let result = router.send(msg).await;
785 assert!(result.is_err());
786 }
787
788 #[tokio::test]
789 async fn ipc_scope_none_blocks_all() {
790 let table = Arc::new(ProcessTable::new(64));
791
792 use crate::capability::IpcScope;
793 let sender_entry = ProcessEntry {
794 pid: 0,
795 agent_id: "no-ipc".to_owned(),
796 state: ProcessState::Running,
797 capabilities: AgentCapabilities {
798 can_ipc: false,
799 ipc_scope: IpcScope::None,
800 ..Default::default()
801 },
802 resource_usage: ResourceUsage::default(),
803 cancel_token: CancellationToken::new(),
804 parent_pid: None,
805 };
806 let sender_pid = table.insert(sender_entry).unwrap();
807
808 let target_entry = ProcessEntry {
809 pid: 0,
810 agent_id: "target".to_owned(),
811 state: ProcessState::Running,
812 capabilities: AgentCapabilities::default(),
813 resource_usage: ResourceUsage::default(),
814 cancel_token: CancellationToken::new(),
815 parent_pid: None,
816 };
817 let target_pid = table.insert(target_entry).unwrap();
818
819 let checker = Arc::new(CapabilityChecker::new(table.clone()));
820 let topic_router = Arc::new(TopicRouter::new(table.clone()));
821 let router = A2ARouter::new(table, checker, topic_router);
822 let _rx1 = router.create_inbox(sender_pid);
823 let _rx2 = router.create_inbox(target_pid);
824
825 let msg = KernelMessage::text(sender_pid, MessageTarget::Process(target_pid), "blocked");
826 let result = router.send(msg).await;
827 assert!(result.is_err());
828 }
829
830 #[test]
831 fn create_and_remove_inbox() {
832 let table = Arc::new(ProcessTable::new(64));
833 let checker = Arc::new(CapabilityChecker::new(table.clone()));
834 let topic_router = Arc::new(TopicRouter::new(table.clone()));
835 let router = A2ARouter::new(table, checker, topic_router);
836
837 let _rx = router.create_inbox(42);
838 assert!(router.has_inbox(42));
839 assert_eq!(router.inbox_count(), 1);
840
841 router.remove_inbox(42);
842 assert!(!router.has_inbox(42));
843 assert_eq!(router.inbox_count(), 0);
844 }
845
846 #[tokio::test]
847 async fn tool_call_message_routes() {
848 let (router, pids, mut receivers) = setup_router(2);
849
850 let msg = KernelMessage::tool_call(
851 pids[0],
852 MessageTarget::Process(pids[1]),
853 "read_file",
854 serde_json::json!({"path": "/test"}),
855 );
856 router.send(msg).await.unwrap();
857
858 let received = receivers[1].try_recv().unwrap();
859 assert!(matches!(
860 received.payload,
861 MessagePayload::ToolCall { ref name, .. } if name == "read_file"
862 ));
863 }
864
865 #[tokio::test]
866 async fn tool_result_message_routes() {
867 let (router, pids, mut receivers) = setup_router(2);
868
869 let msg = KernelMessage::tool_result(
870 pids[1],
871 MessageTarget::Process(pids[0]),
872 "call-1",
873 serde_json::json!({"content": "data"}),
874 );
875 router.send(msg).await.unwrap();
876
877 let received = receivers[0].try_recv().unwrap();
878 assert!(matches!(
879 received.payload,
880 MessagePayload::ToolResult { ref call_id, .. } if call_id == "call-1"
881 ));
882 }
883
884 #[cfg(feature = "exochain")]
885 #[tokio::test]
886 async fn send_checked_logs_chain_event() {
887 let (router, pids, mut receivers) = setup_router(2);
888
889 let chain = crate::chain::ChainManager::new(0, 1000);
890 let initial_seq = chain.sequence();
891
892 let msg = KernelMessage::text(pids[0], MessageTarget::Process(pids[1]), "audited");
893 router.send_checked(msg, Some(&chain)).await.unwrap();
894
895 let received = receivers[1].try_recv().unwrap();
897 assert!(matches!(
898 received.payload,
899 MessagePayload::Text(ref t) if t == "audited"
900 ));
901
902 assert_eq!(chain.sequence(), initial_seq + 1);
904 let events = chain.tail(1);
905 assert_eq!(events[0].kind, "ipc.send");
906 assert_eq!(events[0].source, "ipc");
907 let payload = events[0].payload.as_ref().unwrap();
908 assert_eq!(payload["from"], pids[0]);
909 assert_eq!(payload["payload_type"], "text");
910 }
911
912 #[cfg(feature = "exochain")]
913 #[tokio::test]
914 async fn send_checked_without_chain_still_delivers() {
915 let (router, pids, mut receivers) = setup_router(2);
916
917 let msg = KernelMessage::text(pids[0], MessageTarget::Process(pids[1]), "no-chain");
918 router.send_checked(msg, None).await.unwrap();
919
920 let received = receivers[1].try_recv().unwrap();
921 assert!(matches!(
922 received.payload,
923 MessagePayload::Text(ref t) if t == "no-chain"
924 ));
925 }
926
927 #[tokio::test]
928 async fn rvf_payload_routes() {
929 let (router, pids, mut receivers) = setup_router(2);
930
931 let msg = KernelMessage::new(
932 pids[0],
933 MessageTarget::Process(pids[1]),
934 MessagePayload::Rvf {
935 segment_type: 0x40,
936 data: vec![0xCA, 0xFE],
937 },
938 );
939 router.send(msg).await.unwrap();
940
941 let received = receivers[1].try_recv().unwrap();
942 assert!(matches!(
943 received.payload,
944 MessagePayload::Rvf { segment_type: 0x40, .. }
945 ));
946 }
947
948 #[tokio::test]
949 async fn closed_inbox_auto_removed() {
950 let (router, pids, receivers) = setup_router(2);
951
952 drop(receivers);
954
955 assert!(router.has_inbox(pids[1]));
956
957 let msg = KernelMessage::text(pids[0], MessageTarget::Process(pids[1]), "gone");
959 let result = router.send(msg).await;
960 assert!(result.is_err());
961 let err_msg = format!("{}", result.unwrap_err());
962 assert!(err_msg.contains("inbox closed"), "expected 'inbox closed', got: {err_msg}");
963
964 assert!(!router.has_inbox(pids[1]));
966 }
967
968 #[tokio::test]
969 async fn inbox_overflow_returns_error() {
970 let table = Arc::new(ProcessTable::new(64));
974
975 let sender_entry = ProcessEntry {
976 pid: 0,
977 agent_id: "sender".to_owned(),
978 state: ProcessState::Running,
979 capabilities: AgentCapabilities::default(),
980 resource_usage: ResourceUsage::default(),
981 cancel_token: CancellationToken::new(),
982 parent_pid: None,
983 };
984 let sender_pid = table.insert(sender_entry).unwrap();
985
986 let target_entry = ProcessEntry {
987 pid: 0,
988 agent_id: "target".to_owned(),
989 state: ProcessState::Running,
990 capabilities: AgentCapabilities::default(),
991 resource_usage: ResourceUsage::default(),
992 cancel_token: CancellationToken::new(),
993 parent_pid: None,
994 };
995 let target_pid = table.insert(target_entry).unwrap();
996
997 let checker = Arc::new(CapabilityChecker::new(table.clone()));
998 let topic_router = Arc::new(TopicRouter::new(table.clone()));
999 let router = A2ARouter::new(table, checker, topic_router);
1000
1001 let _rx_sender = router.create_inbox(sender_pid);
1003
1004 let (tx, _rx_target) = mpsc::channel(2);
1006 router.inboxes.insert(target_pid, tx);
1007
1008 let m1 = KernelMessage::text(sender_pid, MessageTarget::Process(target_pid), "msg1");
1010 let m2 = KernelMessage::text(sender_pid, MessageTarget::Process(target_pid), "msg2");
1011 router.send(m1).await.unwrap();
1012 router.send(m2).await.unwrap();
1013
1014 let m3 = KernelMessage::text(sender_pid, MessageTarget::Process(target_pid), "overflow");
1016 let result = router.send(m3).await;
1017 assert!(result.is_err());
1018 let err_msg = format!("{}", result.unwrap_err());
1019 assert!(err_msg.contains("inbox full"), "expected 'inbox full', got: {err_msg}");
1020
1021 assert!(router.has_inbox(target_pid));
1023 }
1024
1025 #[tokio::test]
1026 async fn concurrent_sends_to_same_pid() {
1027 let (router, pids, mut receivers) = setup_router(4);
1028 let router = Arc::new(router);
1029 let target = pids[0];
1030
1031 let mut handles = Vec::new();
1033 for &sender_pid in &pids[1..] {
1034 let r = Arc::clone(&router);
1035 let msg = KernelMessage::text(
1036 sender_pid,
1037 MessageTarget::Process(target),
1038 &format!("from-{sender_pid}"),
1039 );
1040 handles.push(tokio::spawn(async move { r.send(msg).await }));
1041 }
1042
1043 for h in handles {
1045 h.await.unwrap().unwrap();
1046 }
1047
1048 let mut received = Vec::new();
1050 while let Ok(msg) = receivers[0].try_recv() {
1051 if let MessagePayload::Text(t) = &msg.payload {
1052 received.push(t.clone());
1053 }
1054 }
1055 assert_eq!(received.len(), 3);
1056 for &sender_pid in &pids[1..] {
1058 assert!(
1059 received.iter().any(|t| t == &format!("from-{sender_pid}")),
1060 "missing message from PID {sender_pid}"
1061 );
1062 }
1063 }
1064
1065 #[tokio::test]
1066 async fn send_from_non_running_process_fails() {
1067 let table = Arc::new(ProcessTable::new(64));
1068
1069 let sender_entry = ProcessEntry {
1071 pid: 0,
1072 agent_id: "exited-sender".to_owned(),
1073 state: ProcessState::Exited(0),
1074 capabilities: AgentCapabilities::default(),
1075 resource_usage: ResourceUsage::default(),
1076 cancel_token: CancellationToken::new(),
1077 parent_pid: None,
1078 };
1079 let sender_pid = table.insert(sender_entry).unwrap();
1080
1081 let target_entry = ProcessEntry {
1083 pid: 0,
1084 agent_id: "target".to_owned(),
1085 state: ProcessState::Running,
1086 capabilities: AgentCapabilities::default(),
1087 resource_usage: ResourceUsage::default(),
1088 cancel_token: CancellationToken::new(),
1089 parent_pid: None,
1090 };
1091 let target_pid = table.insert(target_entry).unwrap();
1092
1093 let checker = Arc::new(CapabilityChecker::new(table.clone()));
1094 let topic_router = Arc::new(TopicRouter::new(table.clone()));
1095 let router = A2ARouter::new(table, checker, topic_router);
1096 let _rx1 = router.create_inbox(sender_pid);
1097 let _rx2 = router.create_inbox(target_pid);
1098
1099 let msg = KernelMessage::text(sender_pid, MessageTarget::Process(target_pid), "from-dead");
1100 let result = router.send(msg).await;
1101 assert!(result.is_err(), "send from non-Running process should fail");
1102 }
1103
1104 fn setup_router_with_registry(
1107 agent_count: usize,
1108 ) -> (
1109 A2ARouter,
1110 Vec<Pid>,
1111 Vec<mpsc::Receiver<KernelMessage>>,
1112 Arc<crate::service::ServiceRegistry>,
1113 ) {
1114 let table = Arc::new(ProcessTable::new(64));
1115 let mut pids = Vec::new();
1116
1117 for i in 0..agent_count {
1118 let entry = ProcessEntry {
1119 pid: 0,
1120 agent_id: format!("agent-{i}"),
1121 state: ProcessState::Running,
1122 capabilities: AgentCapabilities::default(),
1123 resource_usage: ResourceUsage::default(),
1124 cancel_token: CancellationToken::new(),
1125 parent_pid: None,
1126 };
1127 let pid = table.insert(entry).unwrap();
1128 pids.push(pid);
1129 }
1130
1131 let checker = Arc::new(CapabilityChecker::new(table.clone()));
1132 let topic_router = Arc::new(TopicRouter::new(table.clone()));
1133 let registry = Arc::new(crate::service::ServiceRegistry::new());
1134 let router = A2ARouter::new(table, checker, topic_router)
1135 .with_service_registry(registry.clone());
1136
1137 let mut receivers = Vec::new();
1138 for &pid in &pids {
1139 let rx = router.create_inbox(pid);
1140 receivers.push(rx);
1141 }
1142
1143 (router, pids, receivers, registry)
1144 }
1145
1146 #[tokio::test]
1147 async fn route_to_service_by_name() {
1148 let (router, pids, mut receivers, registry) = setup_router_with_registry(2);
1149
1150 registry
1152 .register_entry(crate::service::ServiceEntry {
1153 name: "auth".into(),
1154 owner_pid: Some(pids[1]),
1155 endpoint: crate::service::ServiceEndpoint::AgentInbox(pids[1]),
1156 audit_level: crate::service::ServiceAuditLevel::Full,
1157 registered_at: chrono::Utc::now(),
1158 })
1159 .unwrap();
1160
1161 let msg = KernelMessage::text(pids[0], MessageTarget::Service("auth".into()), "validate");
1162 router.send(msg).await.unwrap();
1163
1164 let received = receivers[1].try_recv().unwrap();
1165 assert_eq!(received.from, pids[0]);
1166 assert!(matches!(
1167 received.payload,
1168 MessagePayload::Text(ref t) if t == "validate"
1169 ));
1170 }
1171
1172 #[tokio::test]
1173 async fn route_service_method() {
1174 let (router, pids, mut receivers, registry) = setup_router_with_registry(2);
1175
1176 registry
1177 .register_entry(crate::service::ServiceEntry {
1178 name: "auth".into(),
1179 owner_pid: Some(pids[1]),
1180 endpoint: crate::service::ServiceEndpoint::AgentInbox(pids[1]),
1181 audit_level: crate::service::ServiceAuditLevel::Full,
1182 registered_at: chrono::Utc::now(),
1183 })
1184 .unwrap();
1185
1186 let msg = KernelMessage::text(
1187 pids[0],
1188 MessageTarget::ServiceMethod {
1189 service: "auth".into(),
1190 method: "validate_token".into(),
1191 },
1192 "token-123",
1193 );
1194 router.send(msg).await.unwrap();
1195
1196 let received = receivers[1].try_recv().unwrap();
1197 assert_eq!(received.from, pids[0]);
1198 assert!(matches!(
1199 received.target,
1200 MessageTarget::ServiceMethod { ref service, ref method }
1201 if service == "auth" && method == "validate_token"
1202 ));
1203 }
1204
1205 #[tokio::test]
1206 async fn service_not_found_returns_error() {
1207 let (router, pids, _receivers, _registry) = setup_router_with_registry(2);
1208
1209 let msg = KernelMessage::text(
1210 pids[0],
1211 MessageTarget::Service("nonexistent".into()),
1212 "hello",
1213 );
1214 let result = router.send(msg).await;
1215 assert!(result.is_err());
1216 let err_msg = format!("{}", result.unwrap_err());
1217 assert!(
1218 err_msg.contains("service not found"),
1219 "expected 'service not found', got: {err_msg}"
1220 );
1221 }
1222
1223 #[tokio::test]
1224 async fn service_entry_registration() {
1225 let registry = crate::service::ServiceRegistry::new();
1226
1227 let entry = crate::service::ServiceEntry {
1228 name: "cache".into(),
1229 owner_pid: Some(42),
1230 endpoint: crate::service::ServiceEndpoint::AgentInbox(42),
1231 audit_level: crate::service::ServiceAuditLevel::Full,
1232 registered_at: chrono::Utc::now(),
1233 };
1234 registry.register_entry(entry).unwrap();
1235
1236 let retrieved = registry.get_entry("cache").unwrap();
1237 assert_eq!(retrieved.name, "cache");
1238 assert_eq!(retrieved.owner_pid, Some(42));
1239 }
1240
1241 #[tokio::test]
1242 async fn service_entry_with_audit_level() {
1243 let registry = crate::service::ServiceRegistry::new();
1244
1245 let entry = crate::service::ServiceEntry {
1246 name: "metrics".into(),
1247 owner_pid: Some(10),
1248 endpoint: crate::service::ServiceEndpoint::AgentInbox(10),
1249 audit_level: crate::service::ServiceAuditLevel::GateOnly,
1250 registered_at: chrono::Utc::now(),
1251 };
1252 registry.register_entry(entry).unwrap();
1253
1254 let retrieved = registry.get_entry("metrics").unwrap();
1255 assert_eq!(retrieved.audit_level, crate::service::ServiceAuditLevel::GateOnly);
1256 }
1257
1258 #[tokio::test]
1259 async fn resolve_target_finds_owner_pid() {
1260 let registry = crate::service::ServiceRegistry::new();
1261
1262 let entry = crate::service::ServiceEntry {
1263 name: "search".into(),
1264 owner_pid: Some(77),
1265 endpoint: crate::service::ServiceEndpoint::AgentInbox(77),
1266 audit_level: crate::service::ServiceAuditLevel::Full,
1267 registered_at: chrono::Utc::now(),
1268 };
1269 registry.register_entry(entry).unwrap();
1270
1271 assert_eq!(registry.resolve_target("search"), Some(77));
1272 assert_eq!(registry.resolve_target("nonexistent"), None);
1273 }
1274
1275 #[tokio::test]
1276 async fn service_without_registry_returns_error() {
1277 let (router_no_reg, pids, _receivers) = setup_router(2);
1279
1280 let msg = KernelMessage::text(
1281 pids[0],
1282 MessageTarget::Service("missing".into()),
1283 "hello",
1284 );
1285 let result = router_no_reg.send(msg).await;
1286 assert!(result.is_err());
1287 let err_msg = format!("{}", result.unwrap_err());
1288 assert!(
1289 err_msg.contains("no service registry"),
1290 "expected 'no service registry', got: {err_msg}"
1291 );
1292 }
1293
1294 #[tokio::test]
1295 async fn external_service_no_pid_returns_error() {
1296 let (router, pids, _receivers, registry) = setup_router_with_registry(2);
1297
1298 registry
1300 .register_entry(crate::service::ServiceEntry {
1301 name: "redis".into(),
1302 owner_pid: None,
1303 endpoint: crate::service::ServiceEndpoint::External {
1304 url: "redis://localhost:6379".into(),
1305 },
1306 audit_level: crate::service::ServiceAuditLevel::GateOnly,
1307 registered_at: chrono::Utc::now(),
1308 })
1309 .unwrap();
1310
1311 let msg = KernelMessage::text(
1312 pids[0],
1313 MessageTarget::Service("redis".into()),
1314 "ping",
1315 );
1316 let result = router.send(msg).await;
1317 assert!(result.is_err(), "external service with no PID should fail routing");
1318 }
1319
1320 #[tokio::test]
1323 async fn request_response_completes() {
1324 let (router, pids, mut receivers) = setup_router(2);
1325 let router = Arc::new(router);
1326 let router2 = Arc::clone(&router);
1327
1328 let from_pid = pids[0];
1330 let to_pid = pids[1];
1331 tokio::spawn(async move {
1332 let msg = receivers[1].recv().await.unwrap();
1333 let reply = KernelMessage::with_correlation(
1335 to_pid,
1336 MessageTarget::Process(from_pid),
1337 MessagePayload::Text("pong".into()),
1338 msg.id.clone(),
1339 );
1340 router2.try_complete_request(reply);
1341 });
1342
1343 let request = KernelMessage::text(from_pid, MessageTarget::Process(to_pid), "ping");
1344 let response = router
1345 .request(request, Duration::from_secs(5))
1346 .await
1347 .unwrap();
1348 assert!(matches!(
1349 response.payload,
1350 MessagePayload::Text(ref t) if t == "pong"
1351 ));
1352 assert_eq!(router.pending_request_count(), 0);
1353 }
1354
1355 #[tokio::test]
1356 async fn request_response_timeout() {
1357 let (router, pids, _receivers) = setup_router(2);
1358
1359 let request =
1360 KernelMessage::text(pids[0], MessageTarget::Process(pids[1]), "no-reply");
1361 let result = router.request(request, Duration::from_millis(50)).await;
1362 assert!(result.is_err());
1363 let err_msg = format!("{}", result.unwrap_err());
1364 assert!(
1365 err_msg.contains("timeout"),
1366 "expected timeout error, got: {err_msg}"
1367 );
1368 assert_eq!(router.pending_request_count(), 0);
1369 }
1370
1371 #[tokio::test]
1372 async fn try_complete_request_matching() {
1373 let (router, pids, _receivers) = setup_router(2);
1374
1375 let (tx, rx) = oneshot::channel();
1377 let request_id = "req-42".to_string();
1378 router.pending_requests.insert(
1379 request_id.clone(),
1380 PendingRequest {
1381 response_tx: tx,
1382 sent_at: Instant::now(),
1383 },
1384 );
1385
1386 let reply = KernelMessage::with_correlation(
1388 pids[1],
1389 MessageTarget::Process(pids[0]),
1390 MessagePayload::Text("reply".into()),
1391 request_id,
1392 );
1393 let completed = router.try_complete_request(reply);
1394 assert!(completed, "try_complete_request should return true for matching id");
1395
1396 let response = rx.await.unwrap();
1397 assert!(matches!(
1398 response.payload,
1399 MessagePayload::Text(ref t) if t == "reply"
1400 ));
1401 }
1402
1403 #[tokio::test]
1404 async fn try_complete_request_no_match() {
1405 let (router, pids, _receivers) = setup_router(2);
1406
1407 let reply = KernelMessage::with_correlation(
1409 pids[1],
1410 MessageTarget::Process(pids[0]),
1411 MessagePayload::Text("orphan".into()),
1412 "nonexistent-id".into(),
1413 );
1414 let completed = router.try_complete_request(reply);
1415 assert!(!completed, "try_complete_request should return false for non-matching id");
1416
1417 let plain = KernelMessage::text(pids[1], MessageTarget::Process(pids[0]), "plain");
1419 assert!(!router.try_complete_request(plain));
1420 }
1421
1422 #[tokio::test]
1423 async fn pending_request_count_tracks_correctly() {
1424 let (router, _pids, _receivers) = setup_router(2);
1425 assert_eq!(router.pending_request_count(), 0);
1426
1427 let (tx1, _rx1) = oneshot::channel();
1429 let (tx2, _rx2) = oneshot::channel();
1430 router.pending_requests.insert(
1431 "req-a".into(),
1432 PendingRequest {
1433 response_tx: tx1,
1434 sent_at: Instant::now(),
1435 },
1436 );
1437 assert_eq!(router.pending_request_count(), 1);
1438
1439 router.pending_requests.insert(
1440 "req-b".into(),
1441 PendingRequest {
1442 response_tx: tx2,
1443 sent_at: Instant::now(),
1444 },
1445 );
1446 assert_eq!(router.pending_request_count(), 2);
1447
1448 let reply = KernelMessage::with_correlation(
1450 1,
1451 MessageTarget::Process(2),
1452 MessagePayload::Text("done".into()),
1453 "req-a".into(),
1454 );
1455 router.try_complete_request(reply);
1456 assert_eq!(router.pending_request_count(), 1);
1457
1458 router.pending_requests.remove("req-b");
1460 assert_eq!(router.pending_request_count(), 0);
1461 }
1462
1463 #[cfg(feature = "exochain")]
1466 #[tokio::test]
1467 async fn routing_gate_denies_message() {
1468 struct DenyGate;
1469 impl crate::gate::GateBackend for DenyGate {
1470 fn check(
1471 &self,
1472 _agent: &str,
1473 _action: &str,
1474 _ctx: &serde_json::Value,
1475 ) -> crate::gate::GateDecision {
1476 crate::gate::GateDecision::Deny {
1477 reason: "blocked by policy".into(),
1478 receipt: None,
1479 }
1480 }
1481 }
1482
1483 let (router, pids, _receivers) = setup_router(2);
1484 let router = router.with_gate(Arc::new(DenyGate));
1485
1486 let msg = KernelMessage::text(pids[0], MessageTarget::Process(pids[1]), "hello");
1487 let result = router.send(msg).await;
1488 assert!(result.is_err());
1489 let err_msg = format!("{}", result.unwrap_err());
1490 assert!(
1491 err_msg.contains("routing gate denied"),
1492 "expected 'routing gate denied', got: {err_msg}"
1493 );
1494 }
1495
1496 #[cfg(feature = "exochain")]
1497 #[tokio::test]
1498 async fn routing_gate_permits_message() {
1499 struct PermitGate;
1500 impl crate::gate::GateBackend for PermitGate {
1501 fn check(
1502 &self,
1503 _agent: &str,
1504 _action: &str,
1505 _ctx: &serde_json::Value,
1506 ) -> crate::gate::GateDecision {
1507 crate::gate::GateDecision::Permit { token: None }
1508 }
1509 }
1510
1511 let (router, pids, mut receivers) = setup_router(2);
1512 let router = router.with_gate(Arc::new(PermitGate));
1513
1514 let msg = KernelMessage::text(pids[0], MessageTarget::Process(pids[1]), "hello");
1515 router.send(msg).await.unwrap();
1516
1517 let received = receivers[1].try_recv().unwrap();
1518 assert_eq!(received.from, pids[0]);
1519 assert!(matches!(
1520 received.payload,
1521 MessagePayload::Text(ref t) if t == "hello"
1522 ));
1523 }
1524
1525 #[cfg(feature = "exochain")]
1526 #[tokio::test]
1527 async fn routing_gate_defer_still_delivers() {
1528 struct DeferGate;
1529 impl crate::gate::GateBackend for DeferGate {
1530 fn check(
1531 &self,
1532 _agent: &str,
1533 _action: &str,
1534 _ctx: &serde_json::Value,
1535 ) -> crate::gate::GateDecision {
1536 crate::gate::GateDecision::Defer {
1537 reason: "pending review".into(),
1538 }
1539 }
1540 }
1541
1542 let (router, pids, mut receivers) = setup_router(2);
1543 let router = router.with_gate(Arc::new(DeferGate));
1544
1545 let msg = KernelMessage::text(pids[0], MessageTarget::Process(pids[1]), "deferred");
1546 router.send(msg).await.unwrap(); assert!(receivers[1].try_recv().is_ok());
1548 }
1549
1550 #[tokio::test]
1553 async fn multiple_messages_queued_in_order() {
1554 let (router, pids, mut receivers) = setup_router(2);
1555
1556 for i in 0..5 {
1557 let msg = KernelMessage::text(
1558 pids[0],
1559 MessageTarget::Process(pids[1]),
1560 &format!("msg-{i}"),
1561 );
1562 router.send(msg).await.unwrap();
1563 }
1564
1565 for i in 0..5 {
1567 let received = receivers[1].try_recv().unwrap();
1568 let expected = format!("msg-{i}");
1569 assert!(
1570 matches!(received.payload, MessagePayload::Text(ref t) if t == &expected),
1571 "expected '{expected}' but got {:?}",
1572 received.payload,
1573 );
1574 }
1575 }
1576
1577 #[tokio::test]
1578 async fn unsubscribe_stops_topic_delivery() {
1579 let (router, pids, mut receivers) = setup_router(3);
1580
1581 router.topic_router().subscribe(pids[1], "events");
1582 router.topic_router().subscribe(pids[2], "events");
1583
1584 router.topic_router().unsubscribe(pids[1], "events");
1586
1587 let msg = KernelMessage::text(
1588 pids[0],
1589 MessageTarget::Topic("events".into()),
1590 "after-unsub",
1591 );
1592 router.send(msg).await.unwrap();
1593
1594 assert!(receivers[1].try_recv().is_err(), "unsubscribed agent should not receive");
1596
1597 let received = receivers[2].try_recv().unwrap();
1599 assert!(matches!(
1600 received.payload,
1601 MessagePayload::Text(ref t) if t == "after-unsub"
1602 ));
1603 }
1604
1605 #[tokio::test]
1606 async fn publish_to_empty_topic_succeeds() {
1607 let (router, pids, _receivers) = setup_router(2);
1608
1609 let msg = KernelMessage::text(
1611 pids[0],
1612 MessageTarget::Topic("empty-topic".into()),
1613 "nobody-listening",
1614 );
1615 let result = router.send(msg).await;
1616 assert!(result.is_ok(), "publish to empty topic should succeed");
1617 }
1618
1619 #[tokio::test]
1620 async fn broadcast_with_zero_other_agents() {
1621 let (router, pids, mut receivers) = setup_router(1);
1622
1623 let msg = KernelMessage::text(pids[0], MessageTarget::Broadcast, "alone");
1624 let result = router.send(msg).await;
1625 assert!(result.is_ok(), "broadcast with only sender should succeed");
1626
1627 assert!(receivers[0].try_recv().is_err());
1629 }
1630
1631 #[tokio::test]
1632 async fn ipc_scope_restricted_allows_listed_pids() {
1633 let table = Arc::new(ProcessTable::new(64));
1634
1635 use crate::capability::IpcScope;
1636 let target_entry = ProcessEntry {
1638 pid: 0,
1639 agent_id: "target".to_owned(),
1640 state: ProcessState::Running,
1641 capabilities: AgentCapabilities::default(),
1642 resource_usage: ResourceUsage::default(),
1643 cancel_token: CancellationToken::new(),
1644 parent_pid: None,
1645 };
1646 let target_pid = table.insert(target_entry).unwrap();
1647
1648 let sender_entry = ProcessEntry {
1650 pid: 0,
1651 agent_id: "restricted-sender".to_owned(),
1652 state: ProcessState::Running,
1653 capabilities: AgentCapabilities {
1654 ipc_scope: IpcScope::Restricted(vec![target_pid]),
1655 ..Default::default()
1656 },
1657 resource_usage: ResourceUsage::default(),
1658 cancel_token: CancellationToken::new(),
1659 parent_pid: None,
1660 };
1661 let sender_pid = table.insert(sender_entry).unwrap();
1662
1663 let checker = Arc::new(CapabilityChecker::new(table.clone()));
1664 let topic_router = Arc::new(TopicRouter::new(table.clone()));
1665 let router = A2ARouter::new(table, checker, topic_router);
1666 let _rx_sender = router.create_inbox(sender_pid);
1667 let mut rx_target = router.create_inbox(target_pid);
1668
1669 let msg = KernelMessage::text(
1670 sender_pid,
1671 MessageTarget::Process(target_pid),
1672 "allowed",
1673 );
1674 let result = router.send(msg).await;
1675 assert!(result.is_ok(), "restricted sender should reach allowed PID");
1676
1677 let received = rx_target.try_recv().unwrap();
1678 assert!(matches!(
1679 received.payload,
1680 MessagePayload::Text(ref t) if t == "allowed"
1681 ));
1682 }
1683
1684 #[tokio::test]
1685 async fn ipc_scope_restricted_blocks_unlisted_pids() {
1686 let table = Arc::new(ProcessTable::new(64));
1687
1688 use crate::capability::IpcScope;
1689 let target_entry = ProcessEntry {
1690 pid: 0,
1691 agent_id: "target".to_owned(),
1692 state: ProcessState::Running,
1693 capabilities: AgentCapabilities::default(),
1694 resource_usage: ResourceUsage::default(),
1695 cancel_token: CancellationToken::new(),
1696 parent_pid: None,
1697 };
1698 let target_pid = table.insert(target_entry).unwrap();
1699
1700 let sender_entry = ProcessEntry {
1702 pid: 0,
1703 agent_id: "restricted-sender".to_owned(),
1704 state: ProcessState::Running,
1705 capabilities: AgentCapabilities {
1706 ipc_scope: IpcScope::Restricted(vec![999]),
1707 ..Default::default()
1708 },
1709 resource_usage: ResourceUsage::default(),
1710 cancel_token: CancellationToken::new(),
1711 parent_pid: None,
1712 };
1713 let sender_pid = table.insert(sender_entry).unwrap();
1714
1715 let checker = Arc::new(CapabilityChecker::new(table.clone()));
1716 let topic_router = Arc::new(TopicRouter::new(table.clone()));
1717 let router = A2ARouter::new(table, checker, topic_router);
1718 let _rx_sender = router.create_inbox(sender_pid);
1719 let _rx_target = router.create_inbox(target_pid);
1720
1721 let msg = KernelMessage::text(
1722 sender_pid,
1723 MessageTarget::Process(target_pid),
1724 "blocked",
1725 );
1726 let result = router.send(msg).await;
1727 assert!(result.is_err(), "restricted sender should be blocked from unlisted PID");
1728 }
1729
1730 #[test]
1731 fn create_inbox_returns_receiver() {
1732 let table = Arc::new(ProcessTable::new(64));
1733 let checker = Arc::new(CapabilityChecker::new(table.clone()));
1734 let topic_router = Arc::new(TopicRouter::new(table.clone()));
1735 let router = A2ARouter::new(table, checker, topic_router);
1736
1737 let rx = router.create_inbox(10);
1738 assert!(router.has_inbox(10));
1739 assert_eq!(router.inbox_count(), 1);
1740 drop(rx);
1741 }
1742
1743 #[test]
1744 fn create_inbox_replaces_existing() {
1745 let table = Arc::new(ProcessTable::new(64));
1746 let checker = Arc::new(CapabilityChecker::new(table.clone()));
1747 let topic_router = Arc::new(TopicRouter::new(table.clone()));
1748 let router = A2ARouter::new(table, checker, topic_router);
1749
1750 let _rx1 = router.create_inbox(10);
1751 assert_eq!(router.inbox_count(), 1);
1752
1753 let _rx2 = router.create_inbox(10);
1755 assert_eq!(router.inbox_count(), 1);
1756 assert!(router.has_inbox(10));
1757 }
1758
1759 #[test]
1760 fn remove_nonexistent_inbox_is_noop() {
1761 let table = Arc::new(ProcessTable::new(64));
1762 let checker = Arc::new(CapabilityChecker::new(table.clone()));
1763 let topic_router = Arc::new(TopicRouter::new(table.clone()));
1764 let router = A2ARouter::new(table, checker, topic_router);
1765
1766 router.remove_inbox(999);
1768 assert_eq!(router.inbox_count(), 0);
1769 }
1770
1771 #[test]
1772 fn inbox_count_tracks_multiple_inboxes() {
1773 let table = Arc::new(ProcessTable::new(64));
1774 let checker = Arc::new(CapabilityChecker::new(table.clone()));
1775 let topic_router = Arc::new(TopicRouter::new(table.clone()));
1776 let router = A2ARouter::new(table, checker, topic_router);
1777
1778 let _rx1 = router.create_inbox(1);
1779 let _rx2 = router.create_inbox(2);
1780 let _rx3 = router.create_inbox(3);
1781 assert_eq!(router.inbox_count(), 3);
1782
1783 router.remove_inbox(2);
1784 assert_eq!(router.inbox_count(), 2);
1785 assert!(!router.has_inbox(2));
1786 assert!(router.has_inbox(1));
1787 assert!(router.has_inbox(3));
1788 }
1789
1790 #[tokio::test]
1791 async fn send_to_kernel_target_succeeds() {
1792 let (router, pids, _receivers) = setup_router(1);
1793
1794 let msg = KernelMessage::text(pids[0], MessageTarget::Kernel, "kernel-msg");
1795 let result = router.send(msg).await;
1796 assert!(result.is_ok(), "kernel target routing should succeed (even if no-op)");
1797 }
1798
1799 #[tokio::test]
1800 async fn send_to_remote_node_fails() {
1801 let (router, pids, _receivers) = setup_router(1);
1802
1803 let msg = KernelMessage::text(
1804 pids[0],
1805 MessageTarget::RemoteNode {
1806 node_id: "remote-1".into(),
1807 target: Box::new(MessageTarget::Process(42)),
1808 },
1809 "remote-msg",
1810 );
1811 let result = router.send(msg).await;
1812 assert!(result.is_err(), "remote node routing should fail (not yet implemented)");
1813 let err_msg = format!("{}", result.unwrap_err());
1814 assert!(err_msg.contains("remote routing"), "expected remote routing error, got: {err_msg}");
1815 }
1816
1817 #[tokio::test]
1818 async fn suspended_process_can_still_send() {
1819 let table = Arc::new(ProcessTable::new(64));
1820
1821 let sender_entry = ProcessEntry {
1822 pid: 0,
1823 agent_id: "suspended-sender".to_owned(),
1824 state: ProcessState::Suspended,
1825 capabilities: AgentCapabilities::default(),
1826 resource_usage: ResourceUsage::default(),
1827 cancel_token: CancellationToken::new(),
1828 parent_pid: None,
1829 };
1830 let sender_pid = table.insert(sender_entry).unwrap();
1831
1832 let target_entry = ProcessEntry {
1833 pid: 0,
1834 agent_id: "target".to_owned(),
1835 state: ProcessState::Running,
1836 capabilities: AgentCapabilities::default(),
1837 resource_usage: ResourceUsage::default(),
1838 cancel_token: CancellationToken::new(),
1839 parent_pid: None,
1840 };
1841 let target_pid = table.insert(target_entry).unwrap();
1842
1843 let checker = Arc::new(CapabilityChecker::new(table.clone()));
1844 let topic_router = Arc::new(TopicRouter::new(table.clone()));
1845 let router = A2ARouter::new(table, checker, topic_router);
1846 let _rx_sender = router.create_inbox(sender_pid);
1847 let mut rx_target = router.create_inbox(target_pid);
1848
1849 let msg = KernelMessage::text(sender_pid, MessageTarget::Process(target_pid), "from-suspended");
1850 let result = router.send(msg).await;
1851 assert!(result.is_ok(), "suspended process should be allowed to send");
1852
1853 let received = rx_target.try_recv().unwrap();
1854 assert!(matches!(
1855 received.payload,
1856 MessagePayload::Text(ref t) if t == "from-suspended"
1857 ));
1858 }
1859
1860 #[tokio::test]
1861 async fn topic_multiple_subscribers_receive_same_message() {
1862 let (router, pids, mut receivers) = setup_router(5);
1863
1864 for &pid in &pids[1..] {
1866 router.topic_router().subscribe(pid, "news");
1867 }
1868
1869 let msg = KernelMessage::text(
1870 pids[0],
1871 MessageTarget::Topic("news".into()),
1872 "breaking",
1873 );
1874 router.send(msg).await.unwrap();
1875
1876 for (i, rx) in receivers[1..].iter_mut().enumerate() {
1878 let received = rx.try_recv().unwrap_or_else(|_| {
1879 panic!("subscriber {} (pid {}) should have received message", i + 1, pids[i + 1])
1880 });
1881 assert!(matches!(
1882 received.payload,
1883 MessagePayload::Text(ref t) if t == "breaking"
1884 ));
1885 assert_eq!(received.from, pids[0]);
1886 }
1887 }
1888
1889 #[tokio::test]
1890 async fn broadcast_skips_scope_restricted_targets() {
1891 let table = Arc::new(ProcessTable::new(64));
1892
1893 use crate::capability::IpcScope;
1894 let sender_entry = ProcessEntry {
1896 pid: 0,
1897 agent_id: "restricted-broadcaster".to_owned(),
1898 state: ProcessState::Running,
1899 capabilities: AgentCapabilities {
1900 ipc_scope: IpcScope::Restricted(vec![]),
1901 ..Default::default()
1902 },
1903 resource_usage: ResourceUsage::default(),
1904 cancel_token: CancellationToken::new(),
1905 parent_pid: None,
1906 };
1907 let sender_pid = table.insert(sender_entry).unwrap();
1908
1909 let target_entry = ProcessEntry {
1910 pid: 0,
1911 agent_id: "target".to_owned(),
1912 state: ProcessState::Running,
1913 capabilities: AgentCapabilities::default(),
1914 resource_usage: ResourceUsage::default(),
1915 cancel_token: CancellationToken::new(),
1916 parent_pid: None,
1917 };
1918 let target_pid = table.insert(target_entry).unwrap();
1919
1920 let checker = Arc::new(CapabilityChecker::new(table.clone()));
1921 let topic_router = Arc::new(TopicRouter::new(table.clone()));
1922 let router = A2ARouter::new(table, checker, topic_router);
1923 let _rx_sender = router.create_inbox(sender_pid);
1924 let mut rx_target = router.create_inbox(target_pid);
1925
1926 let msg = KernelMessage::text(sender_pid, MessageTarget::Broadcast, "restricted-broadcast");
1927 let result = router.send(msg).await;
1928 assert!(result.is_ok(), "broadcast itself should succeed even if all targets are blocked");
1929
1930 assert!(rx_target.try_recv().is_err(), "restricted broadcast should not reach any target");
1932 }
1933
1934 #[test]
1935 fn service_registry_accessor() {
1936 let table = Arc::new(ProcessTable::new(64));
1937 let checker = Arc::new(CapabilityChecker::new(table.clone()));
1938 let topic_router = Arc::new(TopicRouter::new(table.clone()));
1939
1940 let router = A2ARouter::new(table.clone(), checker.clone(), topic_router.clone());
1941 assert!(router.service_registry().is_none(), "no registry by default");
1942
1943 let registry = Arc::new(crate::service::ServiceRegistry::new());
1944 let router = router.with_service_registry(registry);
1945 assert!(router.service_registry().is_some(), "registry should be present after with_service_registry");
1946 }
1947
1948 #[test]
1949 fn topic_router_accessor() {
1950 let table = Arc::new(ProcessTable::new(64));
1951 let checker = Arc::new(CapabilityChecker::new(table.clone()));
1952 let topic_router = Arc::new(TopicRouter::new(table.clone()));
1953 let expected = Arc::clone(&topic_router);
1954
1955 let router = A2ARouter::new(table, checker, topic_router);
1956 assert!(Arc::ptr_eq(router.topic_router(), &expected));
1957 }
1958
1959 #[tokio::test]
1960 async fn json_payload_routes() {
1961 let (router, pids, mut receivers) = setup_router(2);
1962
1963 let msg = KernelMessage::new(
1964 pids[0],
1965 MessageTarget::Process(pids[1]),
1966 MessagePayload::Json(serde_json::json!({"key": "value"})),
1967 );
1968 router.send(msg).await.unwrap();
1969
1970 let received = receivers[1].try_recv().unwrap();
1971 assert!(matches!(
1972 received.payload,
1973 MessagePayload::Json(ref v) if v["key"] == "value"
1974 ));
1975 }
1976
1977 #[tokio::test]
1978 async fn signal_payload_routes() {
1979 use crate::ipc::KernelSignal;
1980 let (router, pids, mut receivers) = setup_router(2);
1981
1982 let msg = KernelMessage::signal(
1983 pids[0],
1984 MessageTarget::Process(pids[1]),
1985 KernelSignal::Shutdown,
1986 );
1987 router.send(msg).await.unwrap();
1988
1989 let received = receivers[1].try_recv().unwrap();
1990 assert!(matches!(
1991 received.payload,
1992 MessagePayload::Signal(KernelSignal::Shutdown)
1993 ));
1994 }
1995
1996 #[tokio::test]
1997 async fn request_to_nonexistent_target_cleans_up_pending() {
1998 let table = Arc::new(ProcessTable::new(64));
1999
2000 let sender_entry = ProcessEntry {
2001 pid: 0,
2002 agent_id: "sender".to_owned(),
2003 state: ProcessState::Running,
2004 capabilities: AgentCapabilities::default(),
2005 resource_usage: ResourceUsage::default(),
2006 cancel_token: CancellationToken::new(),
2007 parent_pid: None,
2008 };
2009 let sender_pid = table.insert(sender_entry).unwrap();
2010
2011 let target_entry = ProcessEntry {
2013 pid: 0,
2014 agent_id: "no-inbox-target".to_owned(),
2015 state: ProcessState::Running,
2016 capabilities: AgentCapabilities::default(),
2017 resource_usage: ResourceUsage::default(),
2018 cancel_token: CancellationToken::new(),
2019 parent_pid: None,
2020 };
2021 let target_pid = table.insert(target_entry).unwrap();
2022
2023 let checker = Arc::new(CapabilityChecker::new(table.clone()));
2024 let topic_router = Arc::new(TopicRouter::new(table.clone()));
2025 let router = A2ARouter::new(table, checker, topic_router);
2026 let _rx_sender = router.create_inbox(sender_pid);
2027 let msg = KernelMessage::text(sender_pid, MessageTarget::Process(target_pid), "request");
2030 let result = router.request(msg, Duration::from_millis(100)).await;
2031 assert!(result.is_err(), "request to PID without inbox should fail");
2032 assert_eq!(router.pending_request_count(), 0, "pending request should be cleaned up on send failure");
2033 }
2034
2035 #[tokio::test]
2036 async fn service_method_not_found_returns_error() {
2037 let (router, pids, _receivers, _registry) = setup_router_with_registry(2);
2038
2039 let msg = KernelMessage::text(
2040 pids[0],
2041 MessageTarget::ServiceMethod {
2042 service: "nonexistent-svc".into(),
2043 method: "do_thing".into(),
2044 },
2045 "call",
2046 );
2047 let result = router.send(msg).await;
2048 assert!(result.is_err());
2049 let err_msg = format!("{}", result.unwrap_err());
2050 assert!(
2051 err_msg.contains("service not found"),
2052 "expected 'service not found' for missing ServiceMethod target, got: {err_msg}"
2053 );
2054 }
2055}