Skip to main content

clawft_kernel/
a2a.rs

1//! Agent-to-agent IPC protocol.
2//!
3//! The [`A2ARouter`] provides direct PID-to-PID messaging with
4//! capability-checked routing, per-agent inboxes, and request-response
5//! patterns with timeout support. It integrates with the
6//! [`TopicRouter`] for pub/sub delivery.
7//!
8//! # Message Flow
9//!
10//! ```text
11//! Agent A (PID 1)       A2ARouter          Agent B (PID 7)
12//!      |                    |                    |
13//!      |-- send(msg) ------>|                    |
14//!      |                    |-- check_scope ---->|
15//!      |                    |<-- Ok -------------|
16//!      |                    |-- inbox.send(7) -->|
17//!      |                    |                    |-- recv msg
18//! ```
19
20use std::sync::Arc;
21use std::time::{Duration, Instant};
22
23use dashmap::DashMap;
24use tokio::sync::{mpsc, oneshot};
25use tracing::{debug, warn};
26
27use crate::capability::CapabilityChecker;
28use crate::error::{KernelError, KernelResult};
29use crate::ipc::{KernelMessage, MessageTarget};
30use crate::process::{Pid, ProcessState, ProcessTable};
31use crate::service::ServiceRegistry;
32use crate::topic::TopicRouter;
33
34#[cfg(feature = "exochain")]
35use crate::chain::ChainManager;
36
37#[cfg(feature = "mesh")]
38use crate::mesh_ipc::MeshIpcEnvelope;
39#[cfg(feature = "mesh")]
40use crate::mesh_runtime::MeshRuntime;
41
42/// Default inbox channel capacity per agent.
43const DEFAULT_INBOX_CAPACITY: usize = 1024;
44
45/// Maximum serialized message size (16 MiB) -- prevents DoS via oversized payloads.
46///
47/// This mirrors [`crate::mesh::MAX_MESSAGE_SIZE`]. Enforcement happens at the
48/// mesh boundary in `mesh_framing.rs` where raw bytes are received from remote
49/// nodes. Within a single kernel, messages travel as typed `KernelMessage`
50/// structs over `mpsc` channels, so the size limit is not checked on the
51/// in-process hot path.
52#[allow(dead_code)]
53const MAX_A2A_MESSAGE_SIZE: usize = 16 * 1024 * 1024;
54
55/// A pending request awaiting a correlated response.
56struct PendingRequest {
57    /// Sender to deliver the response on.
58    response_tx: oneshot::Sender<KernelMessage>,
59    /// When the request was sent (for timeout tracking / diagnostics).
60    #[allow(dead_code)]
61    sent_at: Instant,
62}
63
64/// Agent-to-agent message router.
65///
66/// Manages per-agent inboxes (bounded `mpsc` channels), validates
67/// IPC scope through the capability checker, and routes messages
68/// to their targets (direct PID, topic, broadcast, service).
69pub struct A2ARouter {
70    /// Process table for state validation.
71    process_table: Arc<ProcessTable>,
72
73    /// Capability checker for IPC scope enforcement.
74    capability_checker: Arc<CapabilityChecker>,
75
76    /// Topic router for pub/sub delivery.
77    topic_router: Arc<TopicRouter>,
78
79    /// Service registry for service-based routing (D1, D19, K2.1).
80    service_registry: Option<Arc<ServiceRegistry>>,
81
82    /// Per-agent inboxes: PID -> sender half of inbox channel.
83    inboxes: DashMap<Pid, mpsc::Sender<KernelMessage>>,
84
85    /// Pending request-response tracking: request_id -> PendingRequest.
86    pending_requests: DashMap<String, PendingRequest>,
87
88    /// Optional routing-time gate backend (C4 dual-layer governance).
89    ///
90    /// Uses `OnceLock` to support post-construction wiring from boot
91    /// (the governance gate is created after the A2ARouter is wrapped
92    /// in `Arc`). `with_gate()` and `set_gate()` both target this field.
93    #[cfg(feature = "exochain")]
94    gate: std::sync::OnceLock<Arc<dyn crate::gate::GateBackend>>,
95
96    /// Dead letter queue for undeliverable messages (os-patterns).
97    #[cfg(feature = "os-patterns")]
98    dead_letter_queue: std::sync::OnceLock<Arc<crate::dead_letter::DeadLetterQueue>>,
99
100    /// Optional mesh runtime for cross-node message delivery (K6).
101    #[cfg(feature = "mesh")]
102    mesh_runtime: std::sync::OnceLock<Arc<MeshRuntime>>,
103}
104
105impl A2ARouter {
106    /// Create a new A2A router.
107    pub fn new(
108        process_table: Arc<ProcessTable>,
109        capability_checker: Arc<CapabilityChecker>,
110        topic_router: Arc<TopicRouter>,
111    ) -> Self {
112        Self {
113            process_table,
114            capability_checker,
115            topic_router,
116            service_registry: None,
117            inboxes: DashMap::new(),
118            pending_requests: DashMap::new(),
119            #[cfg(feature = "exochain")]
120            gate: std::sync::OnceLock::new(),
121            #[cfg(feature = "os-patterns")]
122            dead_letter_queue: std::sync::OnceLock::new(),
123            #[cfg(feature = "mesh")]
124            mesh_runtime: std::sync::OnceLock::new(),
125        }
126    }
127
128    /// Attach a service registry for service-based routing (D1, D19).
129    pub fn with_service_registry(mut self, registry: Arc<ServiceRegistry>) -> Self {
130        self.service_registry = Some(registry);
131        self
132    }
133
134    /// Attach a routing-time gate for dual-layer governance (C4).
135    ///
136    /// When set, every message routed through `send()` is checked against
137    /// the gate *before* inbox delivery. A `Deny` decision blocks the
138    /// message; `Defer` still delivers (the handler-time gate decides).
139    #[cfg(feature = "exochain")]
140    pub fn with_gate(self, gate: Arc<dyn crate::gate::GateBackend>) -> Self {
141        let _ = self.gate.set(gate);
142        self
143    }
144
145    /// Set the routing-time gate after construction (for boot wiring).
146    ///
147    /// This allows the governance gate to be attached after the router
148    /// is already wrapped in an `Arc`, since the `OnceLock` provides
149    /// interior mutability for the first (and only) write.
150    #[cfg(feature = "exochain")]
151    pub fn set_gate(&self, gate: Arc<dyn crate::gate::GateBackend>) {
152        let _ = self.gate.set(gate);
153    }
154
155    /// Set the dead letter queue after construction (for boot wiring).
156    ///
157    /// Like `set_gate()`, uses `OnceLock` for interior mutability so
158    /// the DLQ can be attached after the router is wrapped in `Arc`.
159    #[cfg(feature = "os-patterns")]
160    pub fn set_dead_letter_queue(&self, dlq: Arc<crate::dead_letter::DeadLetterQueue>) {
161        let _ = self.dead_letter_queue.set(dlq);
162    }
163
164    /// Get the dead letter queue (if configured).
165    #[cfg(feature = "os-patterns")]
166    pub fn dead_letter_queue(&self) -> Option<&Arc<crate::dead_letter::DeadLetterQueue>> {
167        self.dead_letter_queue.get()
168    }
169
170    /// Attach the mesh runtime for cross-node message delivery (K6).
171    ///
172    /// Uses `OnceLock` so the runtime can be attached after the router
173    /// is already wrapped in `Arc`.
174    #[cfg(feature = "mesh")]
175    pub fn set_mesh_runtime(&self, runtime: Arc<MeshRuntime>) {
176        let _ = self.mesh_runtime.set(runtime);
177    }
178
179    /// Get the mesh runtime (if configured).
180    #[cfg(feature = "mesh")]
181    pub fn mesh_runtime(&self) -> Option<&Arc<MeshRuntime>> {
182        self.mesh_runtime.get()
183    }
184
185    /// Get the service registry (if configured).
186    pub fn service_registry(&self) -> Option<&Arc<ServiceRegistry>> {
187        self.service_registry.as_ref()
188    }
189
190    /// Create an inbox for a process.
191    ///
192    /// Returns the receiver half that the agent should poll for
193    /// incoming messages. The sender half is stored internally
194    /// for routing.
195    ///
196    /// If an inbox already exists for this PID, the old one is
197    /// replaced (existing messages are lost).
198    pub fn create_inbox(&self, pid: Pid) -> mpsc::Receiver<KernelMessage> {
199        let (tx, rx) = mpsc::channel(DEFAULT_INBOX_CAPACITY);
200        self.inboxes.insert(pid, tx);
201        debug!(pid, "created inbox");
202        rx
203    }
204
205    /// Remove an inbox (used during process cleanup).
206    pub fn remove_inbox(&self, pid: Pid) {
207        self.inboxes.remove(&pid);
208        debug!(pid, "removed inbox");
209    }
210
211    /// Send a message, routing it to the appropriate target.
212    ///
213    /// Validates that the sender exists and is running, checks
214    /// IPC scope via the capability checker, then delivers the
215    /// message to the target.
216    ///
217    /// # Routing
218    ///
219    /// - `Process(pid)`: delivers directly to the target's inbox
220    /// - `Topic(name)`: publishes to all topic subscribers
221    /// - `Broadcast`: delivers to all inboxes except the sender
222    /// - `Service(name)`: logs a warning (service routing is a
223    ///   future extension)
224    /// - `Kernel`: logs a warning (kernel messages are internal)
225    ///
226    /// # Errors
227    ///
228    /// Returns `KernelError::ProcessNotFound` if the sender PID
229    /// is not in the process table, or `KernelError::CapabilityDenied`
230    /// if the sender's IPC scope does not permit the target.
231    pub async fn send(&self, msg: KernelMessage) -> KernelResult<()> {
232        let from = msg.from;
233
234        // Validate sender exists and is running
235        let sender = self
236            .process_table
237            .get(from)
238            .ok_or(KernelError::ProcessNotFound { pid: from })?;
239
240        if !matches!(sender.state, ProcessState::Running | ProcessState::Suspended) {
241            return Err(KernelError::Ipc(format!(
242                "sender PID {from} is not running (state: {})",
243                sender.state
244            )));
245        }
246
247        // C4: Routing-time gate check (first layer of dual-layer governance).
248        // A Deny blocks the message before it reaches any inbox. Defer
249        // still delivers — the handler-time gate makes the final call.
250        #[cfg(feature = "exochain")]
251        if let Some(gate) = self.gate.get() {
252            let action = match &msg.payload {
253                crate::ipc::MessagePayload::ToolCall { name, .. } => format!("tool.{name}"),
254                crate::ipc::MessagePayload::Signal(_) => "ipc.signal".to_string(),
255                _ => "ipc.send".to_string(),
256            };
257            let context = serde_json::json!({
258                "from": from,
259                "target": format!("{:?}", msg.target),
260                "layer": "routing",
261            });
262            match gate.check(&from.to_string(), &action, &context) {
263                crate::gate::GateDecision::Deny { reason, .. } => {
264                    return Err(KernelError::CapabilityDenied {
265                        pid: from,
266                        action,
267                        reason: format!("routing gate denied: {reason}"),
268                    });
269                }
270                crate::gate::GateDecision::Defer { .. }
271                | crate::gate::GateDecision::Permit { .. } => {
272                    // Permitted or deferred — continue to delivery.
273                }
274            }
275        }
276
277        // Route based on target
278        match &msg.target {
279            MessageTarget::Process(target_pid) => {
280                // Check IPC scope
281                self.capability_checker
282                    .check_ipc_target(from, *target_pid)?;
283
284                self.deliver_to_inbox(*target_pid, msg).await
285            }
286            MessageTarget::Topic(topic) => {
287                let subscribers = self.topic_router.live_subscribers(topic);
288                let mut delivered = 0u32;
289                for &sub_pid in &subscribers {
290                    if sub_pid != from {
291                        let msg_clone = msg.clone();
292                        if self.deliver_to_inbox(sub_pid, msg_clone).await.is_ok() {
293                            delivered += 1;
294                        }
295                    }
296                }
297                debug!(from, topic, delivered, "published to topic");
298                Ok(())
299            }
300            MessageTarget::Broadcast => {
301                let mut delivered = 0u32;
302                let pids: Vec<Pid> = self.inboxes.iter().map(|entry| *entry.key()).collect();
303
304                for pid in pids {
305                    if pid != from {
306                        // Check IPC scope for each target
307                        if self.capability_checker.check_ipc_target(from, pid).is_ok() {
308                            let msg_clone = msg.clone();
309                            if self.deliver_to_inbox(pid, msg_clone).await.is_ok() {
310                                delivered += 1;
311                            }
312                        }
313                    }
314                }
315                debug!(from, delivered, "broadcast sent");
316                Ok(())
317            }
318            MessageTarget::Service(name) => {
319                let name = name.clone();
320                self.route_to_service(from, &name, msg).await
321            }
322            MessageTarget::ServiceMethod { service, .. } => {
323                let service_name = service.clone();
324                self.route_to_service(from, &service_name, msg).await
325            }
326            MessageTarget::Kernel => {
327                debug!(from, "kernel message routing not yet implemented");
328                Ok(())
329            }
330            MessageTarget::RemoteNode { node_id, .. } => {
331                #[cfg(feature = "mesh")]
332                {
333                    if let Some(runtime) = self.mesh_runtime.get() {
334                        let node_id = node_id.clone();
335                        debug!(from, %node_id, "routing message to remote node via mesh");
336                        let envelope = MeshIpcEnvelope::new(
337                            runtime.node_id().to_string(),
338                            node_id.clone(),
339                            msg,
340                        );
341                        runtime
342                            .send_to_peer(&node_id, envelope)
343                            .await
344                            .map_err(|e| KernelError::Mesh(format!(
345                                "failed to send to remote node '{node_id}': {e}"
346                            )))
347                    } else {
348                        debug!(from, %node_id, "remote node routing: no mesh runtime attached");
349                        Err(KernelError::Mesh(format!(
350                            "remote routing to node '{node_id}' not yet implemented"
351                        )))
352                    }
353                }
354                #[cfg(not(feature = "mesh"))]
355                {
356                    debug!(from, %node_id, "remote node routing not available (mesh feature disabled)");
357                    Err(KernelError::Mesh(format!(
358                        "remote routing to node '{node_id}' not yet implemented"
359                    )))
360                }
361            }
362        }
363    }
364
365    /// Deliver a message to a specific PID's inbox.
366    ///
367    /// If the inbox does not exist or is full, the message is routed to
368    /// the dead letter queue (when os-patterns is enabled) and an error
369    /// is returned.
370    async fn deliver_to_inbox(&self, pid: Pid, msg: KernelMessage) -> KernelResult<()> {
371        // Clone the sender so we release the DashMap read lock before
372        // any potential remove() call (which needs a write lock on the
373        // same shard — holding both would deadlock).
374        let tx = match self.inboxes.get(&pid) {
375            Some(tx) => tx.clone(),
376            None => {
377                warn!(pid, "no inbox for PID, dead-lettering");
378                #[cfg(feature = "os-patterns")]
379                if let Some(dlq) = self.dead_letter_queue.get() {
380                    dlq.intake(
381                        msg,
382                        crate::dead_letter::DeadLetterReason::TargetNotFound { pid },
383                    );
384                }
385                return Err(KernelError::Ipc(format!("no inbox for PID {pid}")));
386            }
387        };
388
389        match tx.try_send(msg) {
390            Ok(()) => {
391                debug!(pid, "message delivered to inbox");
392                Ok(())
393            }
394            Err(mpsc::error::TrySendError::Full(rejected_msg)) => {
395                warn!(pid, "inbox full, dead-lettering");
396                #[cfg(feature = "os-patterns")]
397                if let Some(dlq) = self.dead_letter_queue.get() {
398                    dlq.intake(
399                        rejected_msg,
400                        crate::dead_letter::DeadLetterReason::InboxFull { pid },
401                    );
402                }
403                Err(KernelError::Ipc(format!("inbox full for PID {pid}")))
404            }
405            Err(mpsc::error::TrySendError::Closed(rejected_msg)) => {
406                warn!(pid, "inbox closed, removing and dead-lettering");
407                self.inboxes.remove(&pid);
408                #[cfg(feature = "os-patterns")]
409                if let Some(dlq) = self.dead_letter_queue.get() {
410                    dlq.intake(
411                        rejected_msg,
412                        crate::dead_letter::DeadLetterReason::AgentExited { pid },
413                    );
414                }
415                Err(KernelError::Ipc(format!("inbox closed for PID {pid}")))
416            }
417        }
418    }
419
420    /// Route a message to a named service via the ServiceRegistry.
421    ///
422    /// Resolves the service name to an owning agent PID, then delivers
423    /// the message to that agent's inbox.
424    async fn route_to_service(
425        &self,
426        from: Pid,
427        service_name: &str,
428        msg: KernelMessage,
429    ) -> KernelResult<()> {
430        let registry = self.service_registry.as_ref().ok_or_else(|| {
431            KernelError::Ipc(format!(
432                "no service registry configured; cannot route to service '{service_name}'"
433            ))
434        })?;
435
436        let target_pid = registry.resolve_target(service_name).ok_or_else(|| {
437            KernelError::Ipc(format!("service not found: '{service_name}'"))
438        })?;
439
440        // Check IPC scope
441        self.capability_checker
442            .check_ipc_target(from, target_pid)?;
443
444        self.deliver_to_inbox(target_pid, msg).await
445    }
446
447    /// Send a message with chain-event logging.
448    ///
449    /// This mirrors `KernelIpc::send_checked` but for the A2ARouter:
450    /// every routed message is logged as an `ipc.send` chain event with
451    /// sender, target, payload type, and message ID — forming a
452    /// tamper-evident IPC audit trail in the exochain.
453    ///
454    /// When the `exochain` feature is disabled this is equivalent to
455    /// a plain `send()`.
456    #[cfg(feature = "exochain")]
457    pub async fn send_checked(
458        &self,
459        msg: KernelMessage,
460        chain: Option<&ChainManager>,
461    ) -> KernelResult<()> {
462        // Log the IPC event before delivery so the chain records intent
463        // even if the inbox is full or closed.
464        if let Some(cm) = chain {
465            cm.append(
466                "ipc",
467                "ipc.send",
468                Some(serde_json::json!({
469                    "from": msg.from,
470                    "target": format!("{:?}", msg.target),
471                    "payload_type": msg.payload.type_name(),
472                    "msg_id": msg.id,
473                })),
474            );
475        }
476        self.send(msg).await
477    }
478
479    /// Get the topic router.
480    pub fn topic_router(&self) -> &Arc<TopicRouter> {
481        &self.topic_router
482    }
483
484    /// Get the number of active inboxes.
485    pub fn inbox_count(&self) -> usize {
486        self.inboxes.len()
487    }
488
489    /// Check whether a PID has an inbox.
490    pub fn has_inbox(&self, pid: Pid) -> bool {
491        self.inboxes.contains_key(&pid)
492    }
493
494    /// Send a request and wait for a correlated response with timeout.
495    ///
496    /// The request message is sent normally, but its `id` is registered
497    /// as a pending request. When a response arrives with a matching
498    /// `correlation_id`, it is delivered to the returned future instead
499    /// of the sender's inbox.
500    ///
501    /// # Errors
502    ///
503    /// Returns `KernelError::Timeout` if no response arrives within the
504    /// specified duration, or `KernelError::Ipc` if the response channel
505    /// is closed before a response arrives.
506    pub async fn request(
507        &self,
508        msg: KernelMessage,
509        timeout: Duration,
510    ) -> KernelResult<KernelMessage> {
511        let request_id = msg.id.clone();
512        let (tx, rx) = oneshot::channel();
513
514        // Register pending request before sending so there is no race.
515        self.pending_requests.insert(
516            request_id.clone(),
517            PendingRequest {
518                response_tx: tx,
519                sent_at: Instant::now(),
520            },
521        );
522
523        // Send the message; clean up on failure.
524        if let Err(e) = self.send(msg).await {
525            self.pending_requests.remove(&request_id);
526            return Err(e);
527        }
528
529        // Wait for response with timeout.
530        match tokio::time::timeout(timeout, rx).await {
531            Ok(Ok(response)) => Ok(response),
532            Ok(Err(_)) => {
533                self.pending_requests.remove(&request_id);
534                Err(KernelError::Ipc("response channel closed".into()))
535            }
536            Err(_) => {
537                self.pending_requests.remove(&request_id);
538                Err(KernelError::Timeout {
539                    operation: format!("request {request_id}"),
540                    duration_ms: timeout.as_millis() as u64,
541                })
542            }
543        }
544    }
545
546    /// Try to complete a pending request with a correlated response.
547    ///
548    /// If the message has a `correlation_id` that matches a pending
549    /// request, the response is delivered to the waiting future and
550    /// `true` is returned. Otherwise returns `false`.
551    pub fn try_complete_request(&self, msg: KernelMessage) -> bool {
552        if let Some(ref corr_id) = msg.correlation_id
553            && let Some((_, pending)) = self.pending_requests.remove(corr_id)
554        {
555            let _ = pending.response_tx.send(msg);
556            return true;
557        }
558        false
559    }
560
561    /// Get the number of pending requests.
562    pub fn pending_request_count(&self) -> usize {
563        self.pending_requests.len()
564    }
565}
566
567#[cfg(test)]
568mod tests {
569    use super::*;
570    use crate::capability::AgentCapabilities;
571    use crate::ipc::MessagePayload;
572    use crate::process::{ProcessEntry, ResourceUsage};
573    use tokio_util::sync::CancellationToken;
574
575    fn setup_router(
576        agent_count: usize,
577    ) -> (A2ARouter, Vec<Pid>, Vec<mpsc::Receiver<KernelMessage>>) {
578        let table = Arc::new(ProcessTable::new(64));
579        let mut pids = Vec::new();
580
581        for i in 0..agent_count {
582            let entry = ProcessEntry {
583                pid: 0,
584                agent_id: format!("agent-{i}"),
585                state: ProcessState::Running,
586                capabilities: AgentCapabilities::default(),
587                resource_usage: ResourceUsage::default(),
588                cancel_token: CancellationToken::new(),
589                parent_pid: None,
590            };
591            let pid = table.insert(entry).unwrap();
592            pids.push(pid);
593        }
594
595        let checker = Arc::new(CapabilityChecker::new(table.clone()));
596        let topic_router = Arc::new(TopicRouter::new(table.clone()));
597        let router = A2ARouter::new(table, checker, topic_router);
598
599        let mut receivers = Vec::new();
600        for &pid in &pids {
601            let rx = router.create_inbox(pid);
602            receivers.push(rx);
603        }
604
605        (router, pids, receivers)
606    }
607
608    #[tokio::test]
609    async fn direct_message_delivery() {
610        let (router, pids, mut receivers) = setup_router(2);
611
612        let msg = KernelMessage::text(pids[0], MessageTarget::Process(pids[1]), "hello");
613        router.send(msg).await.unwrap();
614
615        let received = receivers[1].try_recv().unwrap();
616        assert_eq!(received.from, pids[0]);
617        assert!(matches!(
618            received.payload,
619            MessagePayload::Text(ref t) if t == "hello"
620        ));
621    }
622
623    #[tokio::test]
624    async fn message_to_self_works() {
625        let (router, pids, mut receivers) = setup_router(1);
626
627        let msg = KernelMessage::text(pids[0], MessageTarget::Process(pids[0]), "self-msg");
628        router.send(msg).await.unwrap();
629
630        let received = receivers[0].try_recv().unwrap();
631        assert!(matches!(
632            received.payload,
633            MessagePayload::Text(ref t) if t == "self-msg"
634        ));
635    }
636
637    #[tokio::test]
638    async fn broadcast_delivers_to_all_except_sender() {
639        let (router, pids, mut receivers) = setup_router(3);
640
641        let msg = KernelMessage::text(pids[0], MessageTarget::Broadcast, "broadcast");
642        router.send(msg).await.unwrap();
643
644        // Sender should not receive
645        assert!(receivers[0].try_recv().is_err());
646
647        // Others should receive
648        let r1 = receivers[1].try_recv().unwrap();
649        assert!(matches!(
650            r1.payload,
651            MessagePayload::Text(ref t) if t == "broadcast"
652        ));
653        let r2 = receivers[2].try_recv().unwrap();
654        assert!(matches!(
655            r2.payload,
656            MessagePayload::Text(ref t) if t == "broadcast"
657        ));
658    }
659
660    #[tokio::test]
661    async fn topic_publish_delivers_to_subscribers() {
662        let (router, pids, mut receivers) = setup_router(3);
663
664        // Subscribe pids[1] and pids[2] to "build"
665        router.topic_router().subscribe(pids[1], "build");
666        router.topic_router().subscribe(pids[2], "build");
667
668        let msg = KernelMessage::text(pids[0], MessageTarget::Topic("build".into()), "build done");
669        router.send(msg).await.unwrap();
670
671        // Sender not subscribed, should not receive
672        assert!(receivers[0].try_recv().is_err());
673
674        // Subscribers should receive
675        assert!(receivers[1].try_recv().is_ok());
676        assert!(receivers[2].try_recv().is_ok());
677    }
678
679    #[tokio::test]
680    async fn topic_publish_excludes_sender_if_subscribed() {
681        let (router, pids, mut receivers) = setup_router(2);
682
683        // Both subscribe
684        router.topic_router().subscribe(pids[0], "build");
685        router.topic_router().subscribe(pids[1], "build");
686
687        let msg = KernelMessage::text(pids[0], MessageTarget::Topic("build".into()), "done");
688        router.send(msg).await.unwrap();
689
690        // Sender should not receive their own publish
691        assert!(receivers[0].try_recv().is_err());
692        // Other subscriber should receive
693        assert!(receivers[1].try_recv().is_ok());
694    }
695
696    #[tokio::test]
697    async fn send_from_nonexistent_pid_fails() {
698        let (router, _pids, _receivers) = setup_router(1);
699
700        let msg = KernelMessage::text(999, MessageTarget::Process(1), "hello");
701        let result = router.send(msg).await;
702        assert!(result.is_err());
703    }
704
705    #[tokio::test]
706    async fn send_to_pid_without_inbox_fails() {
707        let table = Arc::new(ProcessTable::new(64));
708
709        // Create sender (running)
710        let sender_entry = ProcessEntry {
711            pid: 0,
712            agent_id: "sender".to_owned(),
713            state: ProcessState::Running,
714            capabilities: AgentCapabilities::default(),
715            resource_usage: ResourceUsage::default(),
716            cancel_token: CancellationToken::new(),
717            parent_pid: None,
718        };
719        let sender_pid = table.insert(sender_entry).unwrap();
720
721        // Create target (running) but don't create inbox
722        let target_entry = ProcessEntry {
723            pid: 0,
724            agent_id: "target".to_owned(),
725            state: ProcessState::Running,
726            capabilities: AgentCapabilities::default(),
727            resource_usage: ResourceUsage::default(),
728            cancel_token: CancellationToken::new(),
729            parent_pid: None,
730        };
731        let target_pid = table.insert(target_entry).unwrap();
732
733        let checker = Arc::new(CapabilityChecker::new(table.clone()));
734        let topic_router = Arc::new(TopicRouter::new(table.clone()));
735        let router = A2ARouter::new(table, checker, topic_router);
736
737        // Create inbox only for sender
738        let _rx = router.create_inbox(sender_pid);
739
740        let msg = KernelMessage::text(sender_pid, MessageTarget::Process(target_pid), "hello");
741        let result = router.send(msg).await;
742        assert!(result.is_err());
743    }
744
745    #[tokio::test]
746    async fn ipc_scope_restricts_messaging() {
747        let table = Arc::new(ProcessTable::new(64));
748
749        // Create sender with restricted IPC scope
750        use crate::capability::IpcScope;
751        let sender_entry = ProcessEntry {
752            pid: 0,
753            agent_id: "restricted".to_owned(),
754            state: ProcessState::Running,
755            capabilities: AgentCapabilities {
756                ipc_scope: IpcScope::Restricted(vec![]), // No allowed PIDs
757                ..Default::default()
758            },
759            resource_usage: ResourceUsage::default(),
760            cancel_token: CancellationToken::new(),
761            parent_pid: None,
762        };
763        let sender_pid = table.insert(sender_entry).unwrap();
764
765        // Create target
766        let target_entry = ProcessEntry {
767            pid: 0,
768            agent_id: "target".to_owned(),
769            state: ProcessState::Running,
770            capabilities: AgentCapabilities::default(),
771            resource_usage: ResourceUsage::default(),
772            cancel_token: CancellationToken::new(),
773            parent_pid: None,
774        };
775        let target_pid = table.insert(target_entry).unwrap();
776
777        let checker = Arc::new(CapabilityChecker::new(table.clone()));
778        let topic_router = Arc::new(TopicRouter::new(table.clone()));
779        let router = A2ARouter::new(table, checker, topic_router);
780        let _rx1 = router.create_inbox(sender_pid);
781        let _rx2 = router.create_inbox(target_pid);
782
783        let msg = KernelMessage::text(sender_pid, MessageTarget::Process(target_pid), "blocked");
784        let result = router.send(msg).await;
785        assert!(result.is_err());
786    }
787
788    #[tokio::test]
789    async fn ipc_scope_none_blocks_all() {
790        let table = Arc::new(ProcessTable::new(64));
791
792        use crate::capability::IpcScope;
793        let sender_entry = ProcessEntry {
794            pid: 0,
795            agent_id: "no-ipc".to_owned(),
796            state: ProcessState::Running,
797            capabilities: AgentCapabilities {
798                can_ipc: false,
799                ipc_scope: IpcScope::None,
800                ..Default::default()
801            },
802            resource_usage: ResourceUsage::default(),
803            cancel_token: CancellationToken::new(),
804            parent_pid: None,
805        };
806        let sender_pid = table.insert(sender_entry).unwrap();
807
808        let target_entry = ProcessEntry {
809            pid: 0,
810            agent_id: "target".to_owned(),
811            state: ProcessState::Running,
812            capabilities: AgentCapabilities::default(),
813            resource_usage: ResourceUsage::default(),
814            cancel_token: CancellationToken::new(),
815            parent_pid: None,
816        };
817        let target_pid = table.insert(target_entry).unwrap();
818
819        let checker = Arc::new(CapabilityChecker::new(table.clone()));
820        let topic_router = Arc::new(TopicRouter::new(table.clone()));
821        let router = A2ARouter::new(table, checker, topic_router);
822        let _rx1 = router.create_inbox(sender_pid);
823        let _rx2 = router.create_inbox(target_pid);
824
825        let msg = KernelMessage::text(sender_pid, MessageTarget::Process(target_pid), "blocked");
826        let result = router.send(msg).await;
827        assert!(result.is_err());
828    }
829
830    #[test]
831    fn create_and_remove_inbox() {
832        let table = Arc::new(ProcessTable::new(64));
833        let checker = Arc::new(CapabilityChecker::new(table.clone()));
834        let topic_router = Arc::new(TopicRouter::new(table.clone()));
835        let router = A2ARouter::new(table, checker, topic_router);
836
837        let _rx = router.create_inbox(42);
838        assert!(router.has_inbox(42));
839        assert_eq!(router.inbox_count(), 1);
840
841        router.remove_inbox(42);
842        assert!(!router.has_inbox(42));
843        assert_eq!(router.inbox_count(), 0);
844    }
845
846    #[tokio::test]
847    async fn tool_call_message_routes() {
848        let (router, pids, mut receivers) = setup_router(2);
849
850        let msg = KernelMessage::tool_call(
851            pids[0],
852            MessageTarget::Process(pids[1]),
853            "read_file",
854            serde_json::json!({"path": "/test"}),
855        );
856        router.send(msg).await.unwrap();
857
858        let received = receivers[1].try_recv().unwrap();
859        assert!(matches!(
860            received.payload,
861            MessagePayload::ToolCall { ref name, .. } if name == "read_file"
862        ));
863    }
864
865    #[tokio::test]
866    async fn tool_result_message_routes() {
867        let (router, pids, mut receivers) = setup_router(2);
868
869        let msg = KernelMessage::tool_result(
870            pids[1],
871            MessageTarget::Process(pids[0]),
872            "call-1",
873            serde_json::json!({"content": "data"}),
874        );
875        router.send(msg).await.unwrap();
876
877        let received = receivers[0].try_recv().unwrap();
878        assert!(matches!(
879            received.payload,
880            MessagePayload::ToolResult { ref call_id, .. } if call_id == "call-1"
881        ));
882    }
883
884    #[cfg(feature = "exochain")]
885    #[tokio::test]
886    async fn send_checked_logs_chain_event() {
887        let (router, pids, mut receivers) = setup_router(2);
888
889        let chain = crate::chain::ChainManager::new(0, 1000);
890        let initial_seq = chain.sequence();
891
892        let msg = KernelMessage::text(pids[0], MessageTarget::Process(pids[1]), "audited");
893        router.send_checked(msg, Some(&chain)).await.unwrap();
894
895        // Message should still be delivered
896        let received = receivers[1].try_recv().unwrap();
897        assert!(matches!(
898            received.payload,
899            MessagePayload::Text(ref t) if t == "audited"
900        ));
901
902        // Chain should have a new ipc.send event
903        assert_eq!(chain.sequence(), initial_seq + 1);
904        let events = chain.tail(1);
905        assert_eq!(events[0].kind, "ipc.send");
906        assert_eq!(events[0].source, "ipc");
907        let payload = events[0].payload.as_ref().unwrap();
908        assert_eq!(payload["from"], pids[0]);
909        assert_eq!(payload["payload_type"], "text");
910    }
911
912    #[cfg(feature = "exochain")]
913    #[tokio::test]
914    async fn send_checked_without_chain_still_delivers() {
915        let (router, pids, mut receivers) = setup_router(2);
916
917        let msg = KernelMessage::text(pids[0], MessageTarget::Process(pids[1]), "no-chain");
918        router.send_checked(msg, None).await.unwrap();
919
920        let received = receivers[1].try_recv().unwrap();
921        assert!(matches!(
922            received.payload,
923            MessagePayload::Text(ref t) if t == "no-chain"
924        ));
925    }
926
927    #[tokio::test]
928    async fn rvf_payload_routes() {
929        let (router, pids, mut receivers) = setup_router(2);
930
931        let msg = KernelMessage::new(
932            pids[0],
933            MessageTarget::Process(pids[1]),
934            MessagePayload::Rvf {
935                segment_type: 0x40,
936                data: vec![0xCA, 0xFE],
937            },
938        );
939        router.send(msg).await.unwrap();
940
941        let received = receivers[1].try_recv().unwrap();
942        assert!(matches!(
943            received.payload,
944            MessagePayload::Rvf { segment_type: 0x40, .. }
945        ));
946    }
947
948    #[tokio::test]
949    async fn closed_inbox_auto_removed() {
950        let (router, pids, receivers) = setup_router(2);
951
952        // Drop receiver for pids[1] — closes the inbox channel
953        drop(receivers);
954
955        assert!(router.has_inbox(pids[1]));
956
957        // Sending should fail with "inbox closed" and auto-remove the entry
958        let msg = KernelMessage::text(pids[0], MessageTarget::Process(pids[1]), "gone");
959        let result = router.send(msg).await;
960        assert!(result.is_err());
961        let err_msg = format!("{}", result.unwrap_err());
962        assert!(err_msg.contains("inbox closed"), "expected 'inbox closed', got: {err_msg}");
963
964        // Inbox should have been removed automatically
965        assert!(!router.has_inbox(pids[1]));
966    }
967
968    #[tokio::test]
969    async fn inbox_overflow_returns_error() {
970        // Create a router with 2 agents, but use a small inbox
971        // We can't change DEFAULT_INBOX_CAPACITY, so we fill a normal inbox.
972        // Instead, create a custom channel with capacity 2 for a targeted test.
973        let table = Arc::new(ProcessTable::new(64));
974
975        let sender_entry = ProcessEntry {
976            pid: 0,
977            agent_id: "sender".to_owned(),
978            state: ProcessState::Running,
979            capabilities: AgentCapabilities::default(),
980            resource_usage: ResourceUsage::default(),
981            cancel_token: CancellationToken::new(),
982            parent_pid: None,
983        };
984        let sender_pid = table.insert(sender_entry).unwrap();
985
986        let target_entry = ProcessEntry {
987            pid: 0,
988            agent_id: "target".to_owned(),
989            state: ProcessState::Running,
990            capabilities: AgentCapabilities::default(),
991            resource_usage: ResourceUsage::default(),
992            cancel_token: CancellationToken::new(),
993            parent_pid: None,
994        };
995        let target_pid = table.insert(target_entry).unwrap();
996
997        let checker = Arc::new(CapabilityChecker::new(table.clone()));
998        let topic_router = Arc::new(TopicRouter::new(table.clone()));
999        let router = A2ARouter::new(table, checker, topic_router);
1000
1001        // Create sender inbox normally
1002        let _rx_sender = router.create_inbox(sender_pid);
1003
1004        // Manually insert a tiny-capacity channel for target (capacity=2)
1005        let (tx, _rx_target) = mpsc::channel(2);
1006        router.inboxes.insert(target_pid, tx);
1007
1008        // Fill it up
1009        let m1 = KernelMessage::text(sender_pid, MessageTarget::Process(target_pid), "msg1");
1010        let m2 = KernelMessage::text(sender_pid, MessageTarget::Process(target_pid), "msg2");
1011        router.send(m1).await.unwrap();
1012        router.send(m2).await.unwrap();
1013
1014        // Third message should fail — inbox full
1015        let m3 = KernelMessage::text(sender_pid, MessageTarget::Process(target_pid), "overflow");
1016        let result = router.send(m3).await;
1017        assert!(result.is_err());
1018        let err_msg = format!("{}", result.unwrap_err());
1019        assert!(err_msg.contains("inbox full"), "expected 'inbox full', got: {err_msg}");
1020
1021        // Inbox should still exist (not removed on full, only on closed)
1022        assert!(router.has_inbox(target_pid));
1023    }
1024
1025    #[tokio::test]
1026    async fn concurrent_sends_to_same_pid() {
1027        let (router, pids, mut receivers) = setup_router(4);
1028        let router = Arc::new(router);
1029        let target = pids[0];
1030
1031        // 3 senders each send a message to the same target concurrently
1032        let mut handles = Vec::new();
1033        for &sender_pid in &pids[1..] {
1034            let r = Arc::clone(&router);
1035            let msg = KernelMessage::text(
1036                sender_pid,
1037                MessageTarget::Process(target),
1038                &format!("from-{sender_pid}"),
1039            );
1040            handles.push(tokio::spawn(async move { r.send(msg).await }));
1041        }
1042
1043        // All sends should succeed
1044        for h in handles {
1045            h.await.unwrap().unwrap();
1046        }
1047
1048        // Target should have received all 3 messages
1049        let mut received = Vec::new();
1050        while let Ok(msg) = receivers[0].try_recv() {
1051            if let MessagePayload::Text(t) = &msg.payload {
1052                received.push(t.clone());
1053            }
1054        }
1055        assert_eq!(received.len(), 3);
1056        // All senders represented (order may vary)
1057        for &sender_pid in &pids[1..] {
1058            assert!(
1059                received.iter().any(|t| t == &format!("from-{sender_pid}")),
1060                "missing message from PID {sender_pid}"
1061            );
1062        }
1063    }
1064
1065    #[tokio::test]
1066    async fn send_from_non_running_process_fails() {
1067        let table = Arc::new(ProcessTable::new(64));
1068
1069        // Create sender in Exited state
1070        let sender_entry = ProcessEntry {
1071            pid: 0,
1072            agent_id: "exited-sender".to_owned(),
1073            state: ProcessState::Exited(0),
1074            capabilities: AgentCapabilities::default(),
1075            resource_usage: ResourceUsage::default(),
1076            cancel_token: CancellationToken::new(),
1077            parent_pid: None,
1078        };
1079        let sender_pid = table.insert(sender_entry).unwrap();
1080
1081        // Create target in Running state
1082        let target_entry = ProcessEntry {
1083            pid: 0,
1084            agent_id: "target".to_owned(),
1085            state: ProcessState::Running,
1086            capabilities: AgentCapabilities::default(),
1087            resource_usage: ResourceUsage::default(),
1088            cancel_token: CancellationToken::new(),
1089            parent_pid: None,
1090        };
1091        let target_pid = table.insert(target_entry).unwrap();
1092
1093        let checker = Arc::new(CapabilityChecker::new(table.clone()));
1094        let topic_router = Arc::new(TopicRouter::new(table.clone()));
1095        let router = A2ARouter::new(table, checker, topic_router);
1096        let _rx1 = router.create_inbox(sender_pid);
1097        let _rx2 = router.create_inbox(target_pid);
1098
1099        let msg = KernelMessage::text(sender_pid, MessageTarget::Process(target_pid), "from-dead");
1100        let result = router.send(msg).await;
1101        assert!(result.is_err(), "send from non-Running process should fail");
1102    }
1103
1104    // ── Service routing tests (K2.1 T3: D19 + D1) ──────────────
1105
1106    fn setup_router_with_registry(
1107        agent_count: usize,
1108    ) -> (
1109        A2ARouter,
1110        Vec<Pid>,
1111        Vec<mpsc::Receiver<KernelMessage>>,
1112        Arc<crate::service::ServiceRegistry>,
1113    ) {
1114        let table = Arc::new(ProcessTable::new(64));
1115        let mut pids = Vec::new();
1116
1117        for i in 0..agent_count {
1118            let entry = ProcessEntry {
1119                pid: 0,
1120                agent_id: format!("agent-{i}"),
1121                state: ProcessState::Running,
1122                capabilities: AgentCapabilities::default(),
1123                resource_usage: ResourceUsage::default(),
1124                cancel_token: CancellationToken::new(),
1125                parent_pid: None,
1126            };
1127            let pid = table.insert(entry).unwrap();
1128            pids.push(pid);
1129        }
1130
1131        let checker = Arc::new(CapabilityChecker::new(table.clone()));
1132        let topic_router = Arc::new(TopicRouter::new(table.clone()));
1133        let registry = Arc::new(crate::service::ServiceRegistry::new());
1134        let router = A2ARouter::new(table, checker, topic_router)
1135            .with_service_registry(registry.clone());
1136
1137        let mut receivers = Vec::new();
1138        for &pid in &pids {
1139            let rx = router.create_inbox(pid);
1140            receivers.push(rx);
1141        }
1142
1143        (router, pids, receivers, registry)
1144    }
1145
1146    #[tokio::test]
1147    async fn route_to_service_by_name() {
1148        let (router, pids, mut receivers, registry) = setup_router_with_registry(2);
1149
1150        // Register a service owned by pids[1]
1151        registry
1152            .register_entry(crate::service::ServiceEntry {
1153                name: "auth".into(),
1154                owner_pid: Some(pids[1]),
1155                endpoint: crate::service::ServiceEndpoint::AgentInbox(pids[1]),
1156                audit_level: crate::service::ServiceAuditLevel::Full,
1157                registered_at: chrono::Utc::now(),
1158            })
1159            .unwrap();
1160
1161        let msg = KernelMessage::text(pids[0], MessageTarget::Service("auth".into()), "validate");
1162        router.send(msg).await.unwrap();
1163
1164        let received = receivers[1].try_recv().unwrap();
1165        assert_eq!(received.from, pids[0]);
1166        assert!(matches!(
1167            received.payload,
1168            MessagePayload::Text(ref t) if t == "validate"
1169        ));
1170    }
1171
1172    #[tokio::test]
1173    async fn route_service_method() {
1174        let (router, pids, mut receivers, registry) = setup_router_with_registry(2);
1175
1176        registry
1177            .register_entry(crate::service::ServiceEntry {
1178                name: "auth".into(),
1179                owner_pid: Some(pids[1]),
1180                endpoint: crate::service::ServiceEndpoint::AgentInbox(pids[1]),
1181                audit_level: crate::service::ServiceAuditLevel::Full,
1182                registered_at: chrono::Utc::now(),
1183            })
1184            .unwrap();
1185
1186        let msg = KernelMessage::text(
1187            pids[0],
1188            MessageTarget::ServiceMethod {
1189                service: "auth".into(),
1190                method: "validate_token".into(),
1191            },
1192            "token-123",
1193        );
1194        router.send(msg).await.unwrap();
1195
1196        let received = receivers[1].try_recv().unwrap();
1197        assert_eq!(received.from, pids[0]);
1198        assert!(matches!(
1199            received.target,
1200            MessageTarget::ServiceMethod { ref service, ref method }
1201            if service == "auth" && method == "validate_token"
1202        ));
1203    }
1204
1205    #[tokio::test]
1206    async fn service_not_found_returns_error() {
1207        let (router, pids, _receivers, _registry) = setup_router_with_registry(2);
1208
1209        let msg = KernelMessage::text(
1210            pids[0],
1211            MessageTarget::Service("nonexistent".into()),
1212            "hello",
1213        );
1214        let result = router.send(msg).await;
1215        assert!(result.is_err());
1216        let err_msg = format!("{}", result.unwrap_err());
1217        assert!(
1218            err_msg.contains("service not found"),
1219            "expected 'service not found', got: {err_msg}"
1220        );
1221    }
1222
1223    #[tokio::test]
1224    async fn service_entry_registration() {
1225        let registry = crate::service::ServiceRegistry::new();
1226
1227        let entry = crate::service::ServiceEntry {
1228            name: "cache".into(),
1229            owner_pid: Some(42),
1230            endpoint: crate::service::ServiceEndpoint::AgentInbox(42),
1231            audit_level: crate::service::ServiceAuditLevel::Full,
1232            registered_at: chrono::Utc::now(),
1233        };
1234        registry.register_entry(entry).unwrap();
1235
1236        let retrieved = registry.get_entry("cache").unwrap();
1237        assert_eq!(retrieved.name, "cache");
1238        assert_eq!(retrieved.owner_pid, Some(42));
1239    }
1240
1241    #[tokio::test]
1242    async fn service_entry_with_audit_level() {
1243        let registry = crate::service::ServiceRegistry::new();
1244
1245        let entry = crate::service::ServiceEntry {
1246            name: "metrics".into(),
1247            owner_pid: Some(10),
1248            endpoint: crate::service::ServiceEndpoint::AgentInbox(10),
1249            audit_level: crate::service::ServiceAuditLevel::GateOnly,
1250            registered_at: chrono::Utc::now(),
1251        };
1252        registry.register_entry(entry).unwrap();
1253
1254        let retrieved = registry.get_entry("metrics").unwrap();
1255        assert_eq!(retrieved.audit_level, crate::service::ServiceAuditLevel::GateOnly);
1256    }
1257
1258    #[tokio::test]
1259    async fn resolve_target_finds_owner_pid() {
1260        let registry = crate::service::ServiceRegistry::new();
1261
1262        let entry = crate::service::ServiceEntry {
1263            name: "search".into(),
1264            owner_pid: Some(77),
1265            endpoint: crate::service::ServiceEndpoint::AgentInbox(77),
1266            audit_level: crate::service::ServiceAuditLevel::Full,
1267            registered_at: chrono::Utc::now(),
1268        };
1269        registry.register_entry(entry).unwrap();
1270
1271        assert_eq!(registry.resolve_target("search"), Some(77));
1272        assert_eq!(registry.resolve_target("nonexistent"), None);
1273    }
1274
1275    #[tokio::test]
1276    async fn service_without_registry_returns_error() {
1277        // Router without service registry should fail on Service target
1278        let (router_no_reg, pids, _receivers) = setup_router(2);
1279
1280        let msg = KernelMessage::text(
1281            pids[0],
1282            MessageTarget::Service("missing".into()),
1283            "hello",
1284        );
1285        let result = router_no_reg.send(msg).await;
1286        assert!(result.is_err());
1287        let err_msg = format!("{}", result.unwrap_err());
1288        assert!(
1289            err_msg.contains("no service registry"),
1290            "expected 'no service registry', got: {err_msg}"
1291        );
1292    }
1293
1294    #[tokio::test]
1295    async fn external_service_no_pid_returns_error() {
1296        let (router, pids, _receivers, registry) = setup_router_with_registry(2);
1297
1298        // Register external service with no owner_pid
1299        registry
1300            .register_entry(crate::service::ServiceEntry {
1301                name: "redis".into(),
1302                owner_pid: None,
1303                endpoint: crate::service::ServiceEndpoint::External {
1304                    url: "redis://localhost:6379".into(),
1305                },
1306                audit_level: crate::service::ServiceAuditLevel::GateOnly,
1307                registered_at: chrono::Utc::now(),
1308            })
1309            .unwrap();
1310
1311        let msg = KernelMessage::text(
1312            pids[0],
1313            MessageTarget::Service("redis".into()),
1314            "ping",
1315        );
1316        let result = router.send(msg).await;
1317        assert!(result.is_err(), "external service with no PID should fail routing");
1318    }
1319
1320    // ── Request-response tests ──────────────────────────────────
1321
1322    #[tokio::test]
1323    async fn request_response_completes() {
1324        let (router, pids, mut receivers) = setup_router(2);
1325        let router = Arc::new(router);
1326        let router2 = Arc::clone(&router);
1327
1328        // Spawn a responder that reads from pids[1]'s inbox and replies.
1329        let from_pid = pids[0];
1330        let to_pid = pids[1];
1331        tokio::spawn(async move {
1332            let msg = receivers[1].recv().await.unwrap();
1333            // Build a correlated response back to the sender.
1334            let reply = KernelMessage::with_correlation(
1335                to_pid,
1336                MessageTarget::Process(from_pid),
1337                MessagePayload::Text("pong".into()),
1338                msg.id.clone(),
1339            );
1340            router2.try_complete_request(reply);
1341        });
1342
1343        let request = KernelMessage::text(from_pid, MessageTarget::Process(to_pid), "ping");
1344        let response = router
1345            .request(request, Duration::from_secs(5))
1346            .await
1347            .unwrap();
1348        assert!(matches!(
1349            response.payload,
1350            MessagePayload::Text(ref t) if t == "pong"
1351        ));
1352        assert_eq!(router.pending_request_count(), 0);
1353    }
1354
1355    #[tokio::test]
1356    async fn request_response_timeout() {
1357        let (router, pids, _receivers) = setup_router(2);
1358
1359        let request =
1360            KernelMessage::text(pids[0], MessageTarget::Process(pids[1]), "no-reply");
1361        let result = router.request(request, Duration::from_millis(50)).await;
1362        assert!(result.is_err());
1363        let err_msg = format!("{}", result.unwrap_err());
1364        assert!(
1365            err_msg.contains("timeout"),
1366            "expected timeout error, got: {err_msg}"
1367        );
1368        assert_eq!(router.pending_request_count(), 0);
1369    }
1370
1371    #[tokio::test]
1372    async fn try_complete_request_matching() {
1373        let (router, pids, _receivers) = setup_router(2);
1374
1375        // Manually register a pending request.
1376        let (tx, rx) = oneshot::channel();
1377        let request_id = "req-42".to_string();
1378        router.pending_requests.insert(
1379            request_id.clone(),
1380            PendingRequest {
1381                response_tx: tx,
1382                sent_at: Instant::now(),
1383            },
1384        );
1385
1386        // Build a response with matching correlation_id.
1387        let reply = KernelMessage::with_correlation(
1388            pids[1],
1389            MessageTarget::Process(pids[0]),
1390            MessagePayload::Text("reply".into()),
1391            request_id,
1392        );
1393        let completed = router.try_complete_request(reply);
1394        assert!(completed, "try_complete_request should return true for matching id");
1395
1396        let response = rx.await.unwrap();
1397        assert!(matches!(
1398            response.payload,
1399            MessagePayload::Text(ref t) if t == "reply"
1400        ));
1401    }
1402
1403    #[tokio::test]
1404    async fn try_complete_request_no_match() {
1405        let (router, pids, _receivers) = setup_router(2);
1406
1407        // No pending requests registered.
1408        let reply = KernelMessage::with_correlation(
1409            pids[1],
1410            MessageTarget::Process(pids[0]),
1411            MessagePayload::Text("orphan".into()),
1412            "nonexistent-id".into(),
1413        );
1414        let completed = router.try_complete_request(reply);
1415        assert!(!completed, "try_complete_request should return false for non-matching id");
1416
1417        // Also test message with no correlation_id.
1418        let plain = KernelMessage::text(pids[1], MessageTarget::Process(pids[0]), "plain");
1419        assert!(!router.try_complete_request(plain));
1420    }
1421
1422    #[tokio::test]
1423    async fn pending_request_count_tracks_correctly() {
1424        let (router, _pids, _receivers) = setup_router(2);
1425        assert_eq!(router.pending_request_count(), 0);
1426
1427        // Insert two pending requests.
1428        let (tx1, _rx1) = oneshot::channel();
1429        let (tx2, _rx2) = oneshot::channel();
1430        router.pending_requests.insert(
1431            "req-a".into(),
1432            PendingRequest {
1433                response_tx: tx1,
1434                sent_at: Instant::now(),
1435            },
1436        );
1437        assert_eq!(router.pending_request_count(), 1);
1438
1439        router.pending_requests.insert(
1440            "req-b".into(),
1441            PendingRequest {
1442                response_tx: tx2,
1443                sent_at: Instant::now(),
1444            },
1445        );
1446        assert_eq!(router.pending_request_count(), 2);
1447
1448        // Complete one via try_complete_request.
1449        let reply = KernelMessage::with_correlation(
1450            1,
1451            MessageTarget::Process(2),
1452            MessagePayload::Text("done".into()),
1453            "req-a".into(),
1454        );
1455        router.try_complete_request(reply);
1456        assert_eq!(router.pending_request_count(), 1);
1457
1458        // Remove the other manually.
1459        router.pending_requests.remove("req-b");
1460        assert_eq!(router.pending_request_count(), 0);
1461    }
1462
1463    // ── C4: Routing-time gate tests ────────────────────────────────
1464
1465    #[cfg(feature = "exochain")]
1466    #[tokio::test]
1467    async fn routing_gate_denies_message() {
1468        struct DenyGate;
1469        impl crate::gate::GateBackend for DenyGate {
1470            fn check(
1471                &self,
1472                _agent: &str,
1473                _action: &str,
1474                _ctx: &serde_json::Value,
1475            ) -> crate::gate::GateDecision {
1476                crate::gate::GateDecision::Deny {
1477                    reason: "blocked by policy".into(),
1478                    receipt: None,
1479                }
1480            }
1481        }
1482
1483        let (router, pids, _receivers) = setup_router(2);
1484        let router = router.with_gate(Arc::new(DenyGate));
1485
1486        let msg = KernelMessage::text(pids[0], MessageTarget::Process(pids[1]), "hello");
1487        let result = router.send(msg).await;
1488        assert!(result.is_err());
1489        let err_msg = format!("{}", result.unwrap_err());
1490        assert!(
1491            err_msg.contains("routing gate denied"),
1492            "expected 'routing gate denied', got: {err_msg}"
1493        );
1494    }
1495
1496    #[cfg(feature = "exochain")]
1497    #[tokio::test]
1498    async fn routing_gate_permits_message() {
1499        struct PermitGate;
1500        impl crate::gate::GateBackend for PermitGate {
1501            fn check(
1502                &self,
1503                _agent: &str,
1504                _action: &str,
1505                _ctx: &serde_json::Value,
1506            ) -> crate::gate::GateDecision {
1507                crate::gate::GateDecision::Permit { token: None }
1508            }
1509        }
1510
1511        let (router, pids, mut receivers) = setup_router(2);
1512        let router = router.with_gate(Arc::new(PermitGate));
1513
1514        let msg = KernelMessage::text(pids[0], MessageTarget::Process(pids[1]), "hello");
1515        router.send(msg).await.unwrap();
1516
1517        let received = receivers[1].try_recv().unwrap();
1518        assert_eq!(received.from, pids[0]);
1519        assert!(matches!(
1520            received.payload,
1521            MessagePayload::Text(ref t) if t == "hello"
1522        ));
1523    }
1524
1525    #[cfg(feature = "exochain")]
1526    #[tokio::test]
1527    async fn routing_gate_defer_still_delivers() {
1528        struct DeferGate;
1529        impl crate::gate::GateBackend for DeferGate {
1530            fn check(
1531                &self,
1532                _agent: &str,
1533                _action: &str,
1534                _ctx: &serde_json::Value,
1535            ) -> crate::gate::GateDecision {
1536                crate::gate::GateDecision::Defer {
1537                    reason: "pending review".into(),
1538                }
1539            }
1540        }
1541
1542        let (router, pids, mut receivers) = setup_router(2);
1543        let router = router.with_gate(Arc::new(DeferGate));
1544
1545        let msg = KernelMessage::text(pids[0], MessageTarget::Process(pids[1]), "deferred");
1546        router.send(msg).await.unwrap(); // should succeed (defer = deliver)
1547        assert!(receivers[1].try_recv().is_ok());
1548    }
1549
1550    // ── W5: Test hardening — additional coverage ─────────────────
1551
1552    #[tokio::test]
1553    async fn multiple_messages_queued_in_order() {
1554        let (router, pids, mut receivers) = setup_router(2);
1555
1556        for i in 0..5 {
1557            let msg = KernelMessage::text(
1558                pids[0],
1559                MessageTarget::Process(pids[1]),
1560                &format!("msg-{i}"),
1561            );
1562            router.send(msg).await.unwrap();
1563        }
1564
1565        // Messages should arrive in FIFO order
1566        for i in 0..5 {
1567            let received = receivers[1].try_recv().unwrap();
1568            let expected = format!("msg-{i}");
1569            assert!(
1570                matches!(received.payload, MessagePayload::Text(ref t) if t == &expected),
1571                "expected '{expected}' but got {:?}",
1572                received.payload,
1573            );
1574        }
1575    }
1576
1577    #[tokio::test]
1578    async fn unsubscribe_stops_topic_delivery() {
1579        let (router, pids, mut receivers) = setup_router(3);
1580
1581        router.topic_router().subscribe(pids[1], "events");
1582        router.topic_router().subscribe(pids[2], "events");
1583
1584        // Unsubscribe pids[1]
1585        router.topic_router().unsubscribe(pids[1], "events");
1586
1587        let msg = KernelMessage::text(
1588            pids[0],
1589            MessageTarget::Topic("events".into()),
1590            "after-unsub",
1591        );
1592        router.send(msg).await.unwrap();
1593
1594        // pids[1] should NOT receive (unsubscribed)
1595        assert!(receivers[1].try_recv().is_err(), "unsubscribed agent should not receive");
1596
1597        // pids[2] should still receive
1598        let received = receivers[2].try_recv().unwrap();
1599        assert!(matches!(
1600            received.payload,
1601            MessagePayload::Text(ref t) if t == "after-unsub"
1602        ));
1603    }
1604
1605    #[tokio::test]
1606    async fn publish_to_empty_topic_succeeds() {
1607        let (router, pids, _receivers) = setup_router(2);
1608
1609        // Publish to a topic with no subscribers — should succeed with no error
1610        let msg = KernelMessage::text(
1611            pids[0],
1612            MessageTarget::Topic("empty-topic".into()),
1613            "nobody-listening",
1614        );
1615        let result = router.send(msg).await;
1616        assert!(result.is_ok(), "publish to empty topic should succeed");
1617    }
1618
1619    #[tokio::test]
1620    async fn broadcast_with_zero_other_agents() {
1621        let (router, pids, mut receivers) = setup_router(1);
1622
1623        let msg = KernelMessage::text(pids[0], MessageTarget::Broadcast, "alone");
1624        let result = router.send(msg).await;
1625        assert!(result.is_ok(), "broadcast with only sender should succeed");
1626
1627        // Sender should not receive their own broadcast
1628        assert!(receivers[0].try_recv().is_err());
1629    }
1630
1631    #[tokio::test]
1632    async fn ipc_scope_restricted_allows_listed_pids() {
1633        let table = Arc::new(ProcessTable::new(64));
1634
1635        use crate::capability::IpcScope;
1636        // Create target first so we know its PID for the restricted list
1637        let target_entry = ProcessEntry {
1638            pid: 0,
1639            agent_id: "target".to_owned(),
1640            state: ProcessState::Running,
1641            capabilities: AgentCapabilities::default(),
1642            resource_usage: ResourceUsage::default(),
1643            cancel_token: CancellationToken::new(),
1644            parent_pid: None,
1645        };
1646        let target_pid = table.insert(target_entry).unwrap();
1647
1648        // Create sender with restricted IPC scope that includes the target
1649        let sender_entry = ProcessEntry {
1650            pid: 0,
1651            agent_id: "restricted-sender".to_owned(),
1652            state: ProcessState::Running,
1653            capabilities: AgentCapabilities {
1654                ipc_scope: IpcScope::Restricted(vec![target_pid]),
1655                ..Default::default()
1656            },
1657            resource_usage: ResourceUsage::default(),
1658            cancel_token: CancellationToken::new(),
1659            parent_pid: None,
1660        };
1661        let sender_pid = table.insert(sender_entry).unwrap();
1662
1663        let checker = Arc::new(CapabilityChecker::new(table.clone()));
1664        let topic_router = Arc::new(TopicRouter::new(table.clone()));
1665        let router = A2ARouter::new(table, checker, topic_router);
1666        let _rx_sender = router.create_inbox(sender_pid);
1667        let mut rx_target = router.create_inbox(target_pid);
1668
1669        let msg = KernelMessage::text(
1670            sender_pid,
1671            MessageTarget::Process(target_pid),
1672            "allowed",
1673        );
1674        let result = router.send(msg).await;
1675        assert!(result.is_ok(), "restricted sender should reach allowed PID");
1676
1677        let received = rx_target.try_recv().unwrap();
1678        assert!(matches!(
1679            received.payload,
1680            MessagePayload::Text(ref t) if t == "allowed"
1681        ));
1682    }
1683
1684    #[tokio::test]
1685    async fn ipc_scope_restricted_blocks_unlisted_pids() {
1686        let table = Arc::new(ProcessTable::new(64));
1687
1688        use crate::capability::IpcScope;
1689        let target_entry = ProcessEntry {
1690            pid: 0,
1691            agent_id: "target".to_owned(),
1692            state: ProcessState::Running,
1693            capabilities: AgentCapabilities::default(),
1694            resource_usage: ResourceUsage::default(),
1695            cancel_token: CancellationToken::new(),
1696            parent_pid: None,
1697        };
1698        let target_pid = table.insert(target_entry).unwrap();
1699
1700        // Restricted to PID 999 (not the target)
1701        let sender_entry = ProcessEntry {
1702            pid: 0,
1703            agent_id: "restricted-sender".to_owned(),
1704            state: ProcessState::Running,
1705            capabilities: AgentCapabilities {
1706                ipc_scope: IpcScope::Restricted(vec![999]),
1707                ..Default::default()
1708            },
1709            resource_usage: ResourceUsage::default(),
1710            cancel_token: CancellationToken::new(),
1711            parent_pid: None,
1712        };
1713        let sender_pid = table.insert(sender_entry).unwrap();
1714
1715        let checker = Arc::new(CapabilityChecker::new(table.clone()));
1716        let topic_router = Arc::new(TopicRouter::new(table.clone()));
1717        let router = A2ARouter::new(table, checker, topic_router);
1718        let _rx_sender = router.create_inbox(sender_pid);
1719        let _rx_target = router.create_inbox(target_pid);
1720
1721        let msg = KernelMessage::text(
1722            sender_pid,
1723            MessageTarget::Process(target_pid),
1724            "blocked",
1725        );
1726        let result = router.send(msg).await;
1727        assert!(result.is_err(), "restricted sender should be blocked from unlisted PID");
1728    }
1729
1730    #[test]
1731    fn create_inbox_returns_receiver() {
1732        let table = Arc::new(ProcessTable::new(64));
1733        let checker = Arc::new(CapabilityChecker::new(table.clone()));
1734        let topic_router = Arc::new(TopicRouter::new(table.clone()));
1735        let router = A2ARouter::new(table, checker, topic_router);
1736
1737        let rx = router.create_inbox(10);
1738        assert!(router.has_inbox(10));
1739        assert_eq!(router.inbox_count(), 1);
1740        drop(rx);
1741    }
1742
1743    #[test]
1744    fn create_inbox_replaces_existing() {
1745        let table = Arc::new(ProcessTable::new(64));
1746        let checker = Arc::new(CapabilityChecker::new(table.clone()));
1747        let topic_router = Arc::new(TopicRouter::new(table.clone()));
1748        let router = A2ARouter::new(table, checker, topic_router);
1749
1750        let _rx1 = router.create_inbox(10);
1751        assert_eq!(router.inbox_count(), 1);
1752
1753        // Replace — old receiver is invalidated
1754        let _rx2 = router.create_inbox(10);
1755        assert_eq!(router.inbox_count(), 1);
1756        assert!(router.has_inbox(10));
1757    }
1758
1759    #[test]
1760    fn remove_nonexistent_inbox_is_noop() {
1761        let table = Arc::new(ProcessTable::new(64));
1762        let checker = Arc::new(CapabilityChecker::new(table.clone()));
1763        let topic_router = Arc::new(TopicRouter::new(table.clone()));
1764        let router = A2ARouter::new(table, checker, topic_router);
1765
1766        // Should not panic
1767        router.remove_inbox(999);
1768        assert_eq!(router.inbox_count(), 0);
1769    }
1770
1771    #[test]
1772    fn inbox_count_tracks_multiple_inboxes() {
1773        let table = Arc::new(ProcessTable::new(64));
1774        let checker = Arc::new(CapabilityChecker::new(table.clone()));
1775        let topic_router = Arc::new(TopicRouter::new(table.clone()));
1776        let router = A2ARouter::new(table, checker, topic_router);
1777
1778        let _rx1 = router.create_inbox(1);
1779        let _rx2 = router.create_inbox(2);
1780        let _rx3 = router.create_inbox(3);
1781        assert_eq!(router.inbox_count(), 3);
1782
1783        router.remove_inbox(2);
1784        assert_eq!(router.inbox_count(), 2);
1785        assert!(!router.has_inbox(2));
1786        assert!(router.has_inbox(1));
1787        assert!(router.has_inbox(3));
1788    }
1789
1790    #[tokio::test]
1791    async fn send_to_kernel_target_succeeds() {
1792        let (router, pids, _receivers) = setup_router(1);
1793
1794        let msg = KernelMessage::text(pids[0], MessageTarget::Kernel, "kernel-msg");
1795        let result = router.send(msg).await;
1796        assert!(result.is_ok(), "kernel target routing should succeed (even if no-op)");
1797    }
1798
1799    #[tokio::test]
1800    async fn send_to_remote_node_fails() {
1801        let (router, pids, _receivers) = setup_router(1);
1802
1803        let msg = KernelMessage::text(
1804            pids[0],
1805            MessageTarget::RemoteNode {
1806                node_id: "remote-1".into(),
1807                target: Box::new(MessageTarget::Process(42)),
1808            },
1809            "remote-msg",
1810        );
1811        let result = router.send(msg).await;
1812        assert!(result.is_err(), "remote node routing should fail (not yet implemented)");
1813        let err_msg = format!("{}", result.unwrap_err());
1814        assert!(err_msg.contains("remote routing"), "expected remote routing error, got: {err_msg}");
1815    }
1816
1817    #[tokio::test]
1818    async fn suspended_process_can_still_send() {
1819        let table = Arc::new(ProcessTable::new(64));
1820
1821        let sender_entry = ProcessEntry {
1822            pid: 0,
1823            agent_id: "suspended-sender".to_owned(),
1824            state: ProcessState::Suspended,
1825            capabilities: AgentCapabilities::default(),
1826            resource_usage: ResourceUsage::default(),
1827            cancel_token: CancellationToken::new(),
1828            parent_pid: None,
1829        };
1830        let sender_pid = table.insert(sender_entry).unwrap();
1831
1832        let target_entry = ProcessEntry {
1833            pid: 0,
1834            agent_id: "target".to_owned(),
1835            state: ProcessState::Running,
1836            capabilities: AgentCapabilities::default(),
1837            resource_usage: ResourceUsage::default(),
1838            cancel_token: CancellationToken::new(),
1839            parent_pid: None,
1840        };
1841        let target_pid = table.insert(target_entry).unwrap();
1842
1843        let checker = Arc::new(CapabilityChecker::new(table.clone()));
1844        let topic_router = Arc::new(TopicRouter::new(table.clone()));
1845        let router = A2ARouter::new(table, checker, topic_router);
1846        let _rx_sender = router.create_inbox(sender_pid);
1847        let mut rx_target = router.create_inbox(target_pid);
1848
1849        let msg = KernelMessage::text(sender_pid, MessageTarget::Process(target_pid), "from-suspended");
1850        let result = router.send(msg).await;
1851        assert!(result.is_ok(), "suspended process should be allowed to send");
1852
1853        let received = rx_target.try_recv().unwrap();
1854        assert!(matches!(
1855            received.payload,
1856            MessagePayload::Text(ref t) if t == "from-suspended"
1857        ));
1858    }
1859
1860    #[tokio::test]
1861    async fn topic_multiple_subscribers_receive_same_message() {
1862        let (router, pids, mut receivers) = setup_router(5);
1863
1864        // Subscribe pids[1..5] to the topic
1865        for &pid in &pids[1..] {
1866            router.topic_router().subscribe(pid, "news");
1867        }
1868
1869        let msg = KernelMessage::text(
1870            pids[0],
1871            MessageTarget::Topic("news".into()),
1872            "breaking",
1873        );
1874        router.send(msg).await.unwrap();
1875
1876        // All subscribers should receive the same payload
1877        for (i, rx) in receivers[1..].iter_mut().enumerate() {
1878            let received = rx.try_recv().unwrap_or_else(|_| {
1879                panic!("subscriber {} (pid {}) should have received message", i + 1, pids[i + 1])
1880            });
1881            assert!(matches!(
1882                received.payload,
1883                MessagePayload::Text(ref t) if t == "breaking"
1884            ));
1885            assert_eq!(received.from, pids[0]);
1886        }
1887    }
1888
1889    #[tokio::test]
1890    async fn broadcast_skips_scope_restricted_targets() {
1891        let table = Arc::new(ProcessTable::new(64));
1892
1893        use crate::capability::IpcScope;
1894        // Sender with restricted scope (empty list = nobody allowed)
1895        let sender_entry = ProcessEntry {
1896            pid: 0,
1897            agent_id: "restricted-broadcaster".to_owned(),
1898            state: ProcessState::Running,
1899            capabilities: AgentCapabilities {
1900                ipc_scope: IpcScope::Restricted(vec![]),
1901                ..Default::default()
1902            },
1903            resource_usage: ResourceUsage::default(),
1904            cancel_token: CancellationToken::new(),
1905            parent_pid: None,
1906        };
1907        let sender_pid = table.insert(sender_entry).unwrap();
1908
1909        let target_entry = ProcessEntry {
1910            pid: 0,
1911            agent_id: "target".to_owned(),
1912            state: ProcessState::Running,
1913            capabilities: AgentCapabilities::default(),
1914            resource_usage: ResourceUsage::default(),
1915            cancel_token: CancellationToken::new(),
1916            parent_pid: None,
1917        };
1918        let target_pid = table.insert(target_entry).unwrap();
1919
1920        let checker = Arc::new(CapabilityChecker::new(table.clone()));
1921        let topic_router = Arc::new(TopicRouter::new(table.clone()));
1922        let router = A2ARouter::new(table, checker, topic_router);
1923        let _rx_sender = router.create_inbox(sender_pid);
1924        let mut rx_target = router.create_inbox(target_pid);
1925
1926        let msg = KernelMessage::text(sender_pid, MessageTarget::Broadcast, "restricted-broadcast");
1927        let result = router.send(msg).await;
1928        assert!(result.is_ok(), "broadcast itself should succeed even if all targets are blocked");
1929
1930        // Target should NOT receive (scope blocks it)
1931        assert!(rx_target.try_recv().is_err(), "restricted broadcast should not reach any target");
1932    }
1933
1934    #[test]
1935    fn service_registry_accessor() {
1936        let table = Arc::new(ProcessTable::new(64));
1937        let checker = Arc::new(CapabilityChecker::new(table.clone()));
1938        let topic_router = Arc::new(TopicRouter::new(table.clone()));
1939
1940        let router = A2ARouter::new(table.clone(), checker.clone(), topic_router.clone());
1941        assert!(router.service_registry().is_none(), "no registry by default");
1942
1943        let registry = Arc::new(crate::service::ServiceRegistry::new());
1944        let router = router.with_service_registry(registry);
1945        assert!(router.service_registry().is_some(), "registry should be present after with_service_registry");
1946    }
1947
1948    #[test]
1949    fn topic_router_accessor() {
1950        let table = Arc::new(ProcessTable::new(64));
1951        let checker = Arc::new(CapabilityChecker::new(table.clone()));
1952        let topic_router = Arc::new(TopicRouter::new(table.clone()));
1953        let expected = Arc::clone(&topic_router);
1954
1955        let router = A2ARouter::new(table, checker, topic_router);
1956        assert!(Arc::ptr_eq(router.topic_router(), &expected));
1957    }
1958
1959    #[tokio::test]
1960    async fn json_payload_routes() {
1961        let (router, pids, mut receivers) = setup_router(2);
1962
1963        let msg = KernelMessage::new(
1964            pids[0],
1965            MessageTarget::Process(pids[1]),
1966            MessagePayload::Json(serde_json::json!({"key": "value"})),
1967        );
1968        router.send(msg).await.unwrap();
1969
1970        let received = receivers[1].try_recv().unwrap();
1971        assert!(matches!(
1972            received.payload,
1973            MessagePayload::Json(ref v) if v["key"] == "value"
1974        ));
1975    }
1976
1977    #[tokio::test]
1978    async fn signal_payload_routes() {
1979        use crate::ipc::KernelSignal;
1980        let (router, pids, mut receivers) = setup_router(2);
1981
1982        let msg = KernelMessage::signal(
1983            pids[0],
1984            MessageTarget::Process(pids[1]),
1985            KernelSignal::Shutdown,
1986        );
1987        router.send(msg).await.unwrap();
1988
1989        let received = receivers[1].try_recv().unwrap();
1990        assert!(matches!(
1991            received.payload,
1992            MessagePayload::Signal(KernelSignal::Shutdown)
1993        ));
1994    }
1995
1996    #[tokio::test]
1997    async fn request_to_nonexistent_target_cleans_up_pending() {
1998        let table = Arc::new(ProcessTable::new(64));
1999
2000        let sender_entry = ProcessEntry {
2001            pid: 0,
2002            agent_id: "sender".to_owned(),
2003            state: ProcessState::Running,
2004            capabilities: AgentCapabilities::default(),
2005            resource_usage: ResourceUsage::default(),
2006            cancel_token: CancellationToken::new(),
2007            parent_pid: None,
2008        };
2009        let sender_pid = table.insert(sender_entry).unwrap();
2010
2011        // Create target in process table but no inbox
2012        let target_entry = ProcessEntry {
2013            pid: 0,
2014            agent_id: "no-inbox-target".to_owned(),
2015            state: ProcessState::Running,
2016            capabilities: AgentCapabilities::default(),
2017            resource_usage: ResourceUsage::default(),
2018            cancel_token: CancellationToken::new(),
2019            parent_pid: None,
2020        };
2021        let target_pid = table.insert(target_entry).unwrap();
2022
2023        let checker = Arc::new(CapabilityChecker::new(table.clone()));
2024        let topic_router = Arc::new(TopicRouter::new(table.clone()));
2025        let router = A2ARouter::new(table, checker, topic_router);
2026        let _rx_sender = router.create_inbox(sender_pid);
2027        // Intentionally no inbox for target
2028
2029        let msg = KernelMessage::text(sender_pid, MessageTarget::Process(target_pid), "request");
2030        let result = router.request(msg, Duration::from_millis(100)).await;
2031        assert!(result.is_err(), "request to PID without inbox should fail");
2032        assert_eq!(router.pending_request_count(), 0, "pending request should be cleaned up on send failure");
2033    }
2034
2035    #[tokio::test]
2036    async fn service_method_not_found_returns_error() {
2037        let (router, pids, _receivers, _registry) = setup_router_with_registry(2);
2038
2039        let msg = KernelMessage::text(
2040            pids[0],
2041            MessageTarget::ServiceMethod {
2042                service: "nonexistent-svc".into(),
2043                method: "do_thing".into(),
2044            },
2045            "call",
2046        );
2047        let result = router.send(msg).await;
2048        assert!(result.is_err());
2049        let err_msg = format!("{}", result.unwrap_err());
2050        assert!(
2051            err_msg.contains("service not found"),
2052            "expected 'service not found' for missing ServiceMethod target, got: {err_msg}"
2053        );
2054    }
2055}