Skip to main content

clawft_kernel/
agent_loop.rs

1//! Built-in kernel agent work loop.
2//!
3//! Every daemon-spawned agent runs this loop. It receives messages
4//! from the A2ARouter inbox and processes built-in commands.
5
6use std::sync::Arc;
7use std::time::Instant;
8
9use tokio::sync::mpsc;
10use tokio_util::sync::CancellationToken;
11use tracing::{debug, warn};
12
13use crate::a2a::A2ARouter;
14use crate::cron::CronService;
15use crate::ipc::{KernelMessage, MessagePayload, MessageTarget};
16use crate::process::{Pid, ProcessState, ProcessTable, ResourceUsage};
17
18/// Run the built-in kernel agent work loop.
19///
20/// The agent:
21/// 1. Receives messages from its A2ARouter inbox
22/// 2. Processes built-in commands dispatched as JSON `{"cmd": "..."}` payloads
23/// 3. Sends responses back via A2ARouter
24/// 4. Tracks resource usage (messages_sent, tool_calls, cpu_time_ms)
25/// 5. Supports suspend/resume via `{"cmd":"suspend"}` / `{"cmd":"resume"}`
26/// 6. Enforces gate checks before exec/cron commands (when gate is provided)
27/// 7. Exits when the cancellation token is triggered
28///
29/// Returns an exit code (0 = normal shutdown).
30#[allow(clippy::too_many_arguments)]
31pub async fn kernel_agent_loop(
32    pid: Pid,
33    cancel: CancellationToken,
34    mut inbox: mpsc::Receiver<KernelMessage>,
35    a2a: Arc<A2ARouter>,
36    cron: Arc<CronService>,
37    process_table: Arc<ProcessTable>,
38    tool_registry: Option<Arc<crate::wasm_runner::ToolRegistry>>,
39    #[cfg(feature = "exochain")] chain: Option<Arc<crate::chain::ChainManager>>,
40    #[cfg(feature = "exochain")] gate: Option<Arc<dyn crate::gate::GateBackend>>,
41) -> i32 {
42    let started = Instant::now();
43    debug!(pid, "agent loop started");
44
45    // Extract agent_id once before the loop (used by gate checks)
46    #[cfg(feature = "exochain")]
47    let agent_id = process_table
48        .get(pid)
49        .map(|e| e.agent_id.clone())
50        .unwrap_or_else(|| format!("pid-{pid}"));
51
52    let mut usage = ResourceUsage::default();
53
54    loop {
55        tokio::select! {
56            _ = cancel.cancelled() => {
57                debug!(pid, "agent loop cancelled");
58                // Final resource update
59                usage.cpu_time_ms = started.elapsed().as_millis() as u64;
60                let _ = process_table.update_resources(pid, usage);
61                return 0;
62            }
63            msg = inbox.recv() => {
64                match msg {
65                    Some(message) => {
66                        let cmd = extract_cmd(&message);
67
68                        // Log message receipt on chain
69                        #[cfg(feature = "exochain")]
70                        if let Some(ref cm) = chain {
71                            cm.append(
72                                "ipc",
73                                "ipc.recv",
74                                Some(serde_json::json!({
75                                    "pid": pid,
76                                    "from": message.from,
77                                    "msg_id": &message.id,
78                                    "cmd": cmd.as_deref().unwrap_or("none"),
79                                })),
80                            );
81                        }
82
83                        // Handle suspend command
84                        if cmd.as_deref() == Some("suspend") {
85                            // Send acknowledgement BEFORE transitioning state,
86                            // because A2ARouter.send() checks sender is Running.
87                            let reply = KernelMessage::with_correlation(
88                                pid,
89                                MessageTarget::Process(message.from),
90                                MessagePayload::Json(serde_json::json!({
91                                    "status": "suspended",
92                                    "pid": pid,
93                                })),
94                                message.id.clone(),
95                            );
96                            send_reply(&a2a, reply, #[cfg(feature = "exochain")] chain.as_deref()).await;
97                            usage.messages_sent += 1;
98
99                            // Transition to Suspended
100                            let _ = process_table.update_state(pid, ProcessState::Suspended);
101                            debug!(pid, "agent suspended");
102
103                            #[cfg(feature = "exochain")]
104                            if let Some(ref cm) = chain {
105                                cm.append(
106                                    "supervisor",
107                                    "agent.suspend",
108                                    Some(serde_json::json!({
109                                        "pid": pid,
110                                        "from": message.from,
111                                        "msg_id": &message.id,
112                                    })),
113                                );
114                            }
115
116                            // Enter parking loop
117                            let resumed = parking_loop(
118                                pid,
119                                &cancel,
120                                &mut inbox,
121                                &a2a,
122                                &process_table,
123                                #[cfg(feature = "exochain")] chain.as_deref(),
124                                &mut usage,
125                            ).await;
126
127                            if !resumed {
128                                // Cancelled during suspend
129                                usage.cpu_time_ms = started.elapsed().as_millis() as u64;
130                                let _ = process_table.update_resources(pid, usage);
131                                return 0;
132                            }
133                            // Resumed — continue main loop
134                            continue;
135                        }
136
137                        // Gate check for protected commands
138                        #[cfg(feature = "exochain")]
139                        if let Some(ref gate_backend) = gate
140                            && let Some(ref cmd_str) = cmd
141                        {
142                            let action = match cmd_str.as_str() {
143                                "exec" => Some("tool.exec"),
144                                "cron.add" => Some("service.cron.add"),
145                                "cron.remove" => Some("service.cron.remove"),
146                                _ => None,
147                            };
148                            if let Some(action_str) = action {
149                                // Build enriched gate context with tool name and effect vector (K4 A2)
150                                let context = if cmd_str == "exec" {
151                                    let tool_name = extract_tool_name(&message);
152                                    let effect = tool_name.as_deref().and_then(|tn| {
153                                        tool_registry.as_ref().and_then(|reg| {
154                                            reg.get(tn).map(|t| &t.spec().effect)
155                                        })
156                                    });
157                                    let mut ctx = serde_json::json!({"pid": pid});
158                                    if let Some(tn) = &tool_name {
159                                        ctx["tool"] = serde_json::json!(tn);
160                                    }
161                                    if let Some(ev) = effect {
162                                        ctx["effect"] = serde_json::json!({
163                                            "risk": ev.risk,
164                                            "security": ev.security,
165                                            "privacy": ev.privacy,
166                                        });
167                                    }
168                                    ctx
169                                } else {
170                                    serde_json::json!({"pid": pid})
171                                };
172                                let decision = gate_backend.check(&agent_id, action_str, &context);
173                                match decision {
174                                    crate::gate::GateDecision::Deny { reason, .. } => {
175                                        let reply = KernelMessage::with_correlation(
176                                            pid,
177                                            MessageTarget::Process(message.from),
178                                            MessagePayload::Json(serde_json::json!({
179                                                "error": reason,
180                                                "denied": true,
181                                            })),
182                                            message.id.clone(),
183                                        );
184                                        send_reply(&a2a, reply, chain.as_deref()).await;
185                                        usage.messages_sent += 1;
186                                        continue;
187                                    }
188                                    crate::gate::GateDecision::Defer { reason } => {
189                                        let reply = KernelMessage::with_correlation(
190                                            pid,
191                                            MessageTarget::Process(message.from),
192                                            MessagePayload::Json(serde_json::json!({
193                                                "deferred": true,
194                                                "reason": reason,
195                                            })),
196                                            message.id.clone(),
197                                        );
198                                        send_reply(&a2a, reply, chain.as_deref()).await;
199                                        usage.messages_sent += 1;
200                                        continue;
201                                    }
202                                    crate::gate::GateDecision::Permit { .. } => {
203                                        // Permitted — continue with normal handling
204                                    }
205                                }
206                            }
207                        }
208
209                        // Track tool_calls for exec command and log sudo usage
210                        if cmd.as_deref() == Some("exec") {
211                            usage.tool_calls += 1;
212
213                            // K4 B1: Log sudo override usage to chain
214                            #[cfg(feature = "exochain")]
215                            {
216                                let sudo_flag = match &message.payload {
217                                    MessagePayload::Json(v) => v.get("sudo").and_then(|s| s.as_bool()).unwrap_or(false),
218                                    _ => false,
219                                };
220                                if sudo_flag
221                                    && let Some(ref cm) = chain
222                                {
223                                    cm.append(
224                                        "security",
225                                        "sudo.override",
226                                        Some(serde_json::json!({
227                                            "pid": pid,
228                                            "agent_id": &agent_id,
229                                            "tool": extract_tool_name(&message).unwrap_or_default(),
230                                        })),
231                                    );
232                                }
233                            }
234                        }
235
236                        handle_message(
237                            pid,
238                            &message,
239                            &a2a,
240                            &cron,
241                            tool_registry.as_deref(),
242                            #[cfg(feature = "exochain")]
243                            chain.as_deref(),
244                            &started,
245                        ).await;
246
247                        usage.messages_sent += 1;
248
249                        // Log message acknowledgement on chain
250                        #[cfg(feature = "exochain")]
251                        if let Some(ref cm) = chain {
252                            cm.append(
253                                "ipc",
254                                "ipc.ack",
255                                Some(serde_json::json!({
256                                    "pid": pid,
257                                    "msg_id": &message.id,
258                                    "cmd": cmd.as_deref().unwrap_or("none"),
259                                    "status": "processed",
260                                })),
261                            );
262                        }
263
264                        // Update resource counters every 10 messages and periodically
265                        if usage.messages_sent % 10 == 0 {
266                            usage.cpu_time_ms = started.elapsed().as_millis() as u64;
267                            let _ = process_table.update_resources(pid, usage.clone());
268                        }
269                    }
270                    None => {
271                        // Inbox closed — shutdown
272                        debug!(pid, "inbox closed, exiting");
273                        usage.cpu_time_ms = started.elapsed().as_millis() as u64;
274                        let _ = process_table.update_resources(pid, usage);
275                        return 0;
276                    }
277                }
278            }
279        }
280    }
281}
282
283/// Extract the command string from a message payload.
284fn extract_cmd(msg: &KernelMessage) -> Option<String> {
285    match &msg.payload {
286        MessagePayload::Json(v) => v.get("cmd").and_then(|c| c.as_str()).map(String::from),
287        MessagePayload::Text(text) => {
288            if let Ok(v) = serde_json::from_str::<serde_json::Value>(text) {
289                v.get("cmd").and_then(|c| c.as_str()).map(String::from)
290            } else {
291                None
292            }
293        }
294        _ => None,
295    }
296}
297
298/// Extract the tool name from an exec message payload.
299#[cfg(feature = "exochain")]
300fn extract_tool_name(msg: &KernelMessage) -> Option<String> {
301    match &msg.payload {
302        MessagePayload::Json(v) => v.get("tool").and_then(|t| t.as_str()).map(String::from),
303        MessagePayload::Text(text) => {
304            serde_json::from_str::<serde_json::Value>(text)
305                .ok()
306                .and_then(|v| v.get("tool").and_then(|t| t.as_str()).map(String::from))
307        }
308        _ => None,
309    }
310}
311
312/// Parking loop for suspended agents.
313///
314/// Waits for either a resume command or cancellation. Returns `true`
315/// if resumed, `false` if cancelled.
316async fn parking_loop(
317    pid: Pid,
318    cancel: &CancellationToken,
319    inbox: &mut mpsc::Receiver<KernelMessage>,
320    a2a: &A2ARouter,
321    process_table: &ProcessTable,
322    #[cfg(feature = "exochain")] chain: Option<&crate::chain::ChainManager>,
323    usage: &mut ResourceUsage,
324) -> bool {
325    loop {
326        tokio::select! {
327            _ = cancel.cancelled() => {
328                debug!(pid, "cancelled while suspended");
329                return false;
330            }
331            msg = inbox.recv() => {
332                match msg {
333                    Some(message) => {
334                        let cmd = extract_cmd(&message);
335                        if cmd.as_deref() == Some("resume") {
336                            // Transition back to Running
337                            let _ = process_table.update_state(pid, ProcessState::Running);
338                            debug!(pid, "agent resumed");
339
340                            #[cfg(feature = "exochain")]
341                            if let Some(cm) = chain {
342                                cm.append(
343                                    "supervisor",
344                                    "agent.resume",
345                                    Some(serde_json::json!({
346                                        "pid": pid,
347                                        "from": message.from,
348                                        "msg_id": &message.id,
349                                    })),
350                                );
351                            }
352
353                            let reply = KernelMessage::with_correlation(
354                                pid,
355                                MessageTarget::Process(message.from),
356                                MessagePayload::Json(serde_json::json!({
357                                    "status": "resumed",
358                                    "pid": pid,
359                                })),
360                                message.id.clone(),
361                            );
362                            send_reply(a2a, reply, #[cfg(feature = "exochain")] chain).await;
363                            usage.messages_sent += 1;
364                            return true;
365                        }
366
367                        // All other commands while suspended get an error
368                        let reply = KernelMessage::with_correlation(
369                            pid,
370                            MessageTarget::Process(message.from),
371                            MessagePayload::Json(serde_json::json!({
372                                "error": "agent suspended",
373                                "pid": pid,
374                            })),
375                            message.id.clone(),
376                        );
377                        send_reply(a2a, reply, #[cfg(feature = "exochain")] chain).await;
378                        usage.messages_sent += 1;
379                    }
380                    None => {
381                        // Inbox closed while suspended
382                        return false;
383                    }
384                }
385            }
386        }
387    }
388}
389
390/// Send a reply message through the A2ARouter.
391async fn send_reply(
392    a2a: &A2ARouter,
393    reply: KernelMessage,
394    #[cfg(feature = "exochain")] chain: Option<&crate::chain::ChainManager>,
395) {
396    #[cfg(feature = "exochain")]
397    {
398        if let Err(e) = a2a.send_checked(reply, chain).await {
399            warn!(error = %e, "failed to send reply");
400        }
401    }
402    #[cfg(not(feature = "exochain"))]
403    {
404        if let Err(e) = a2a.send(reply).await {
405            warn!(error = %e, "failed to send reply");
406        }
407    }
408}
409
410/// Handle a single inbound message.
411async fn handle_message(
412    pid: Pid,
413    msg: &KernelMessage,
414    a2a: &A2ARouter,
415    cron: &CronService,
416    tool_registry: Option<&crate::wasm_runner::ToolRegistry>,
417    #[cfg(feature = "exochain")] chain: Option<&crate::chain::ChainManager>,
418    started: &Instant,
419) {
420    // Extract command from payload — supports JSON, Text, and RVF envelopes.
421    let cmd_value = match &msg.payload {
422        MessagePayload::Json(v) => v.clone(),
423        MessagePayload::Text(text) => {
424            // Try parsing text as JSON, otherwise treat as plain text
425            match serde_json::from_str::<serde_json::Value>(text) {
426                Ok(v) => v,
427                Err(_) => serde_json::json!({"cmd": "echo", "text": text}),
428            }
429        }
430        MessagePayload::Rvf { segment_type, data } => {
431            // Decode RVF-typed payloads:
432            //   0x40 (ExochainEvent) — treat inner CBOR/JSON as command
433            //   Other — wrap as a typed envelope for the agent
434            debug!(pid, segment_type, data_len = data.len(), "received RVF payload");
435
436            // With exochain: try CBOR decode first (rvf-wire format), then JSON
437            #[cfg(feature = "exochain")]
438            {
439                if let Ok(val) = ciborium::from_reader::<ciborium::Value, _>(&data[..]) {
440                    let json_str = serde_json::to_string(&val).unwrap_or_default();
441                    match serde_json::from_str::<serde_json::Value>(&json_str) {
442                        Ok(v) => v,
443                        Err(_) => serde_json::json!({
444                            "cmd": "rvf.recv",
445                            "segment_type": segment_type,
446                            "data_len": data.len(),
447                        }),
448                    }
449                } else if let Ok(v) = serde_json::from_slice::<serde_json::Value>(data) {
450                    v
451                } else {
452                    serde_json::json!({
453                        "cmd": "rvf.recv",
454                        "segment_type": segment_type,
455                        "data_len": data.len(),
456                    })
457                }
458            }
459            // Without exochain: try JSON decode, fall back to rvf.recv
460            #[cfg(not(feature = "exochain"))]
461            {
462                if let Ok(v) = serde_json::from_slice::<serde_json::Value>(data) {
463                    v
464                } else {
465                    serde_json::json!({
466                        "cmd": "rvf.recv",
467                        "segment_type": segment_type,
468                        "data_len": data.len(),
469                    })
470                }
471            }
472        }
473        _ => {
474            debug!(pid, "ignoring signal message");
475            return;
476        }
477    };
478
479    let cmd = cmd_value
480        .get("cmd")
481        .and_then(|v| v.as_str())
482        .unwrap_or("unknown");
483
484    let response = match cmd {
485        "ping" => {
486            let uptime_ms = started.elapsed().as_millis() as u64;
487            serde_json::json!({
488                "status": "ok",
489                "pid": pid,
490                "uptime_ms": uptime_ms,
491            })
492        }
493        "cron.add" => {
494            let name = cmd_value
495                .get("name")
496                .and_then(|v| v.as_str())
497                .unwrap_or("unnamed")
498                .to_string();
499            let interval_secs = cmd_value
500                .get("interval_secs")
501                .and_then(|v| v.as_u64())
502                .unwrap_or(60);
503            let command = cmd_value
504                .get("command")
505                .and_then(|v| v.as_str())
506                .unwrap_or("ping")
507                .to_string();
508            let target_pid = cmd_value
509                .get("target_pid")
510                .and_then(|v| v.as_u64());
511
512            let job = cron.add_job(name, interval_secs, command, target_pid);
513
514            #[cfg(feature = "exochain")]
515            if let Some(cm) = chain {
516                cm.append(
517                    "cron",
518                    "cron.add",
519                    Some(serde_json::json!({
520                        "job_id": job.id,
521                        "name": job.name,
522                        "interval_secs": job.interval_secs,
523                        "via_agent": pid,
524                    })),
525                );
526            }
527
528            serde_json::to_value(&job).unwrap_or_default()
529        }
530        "cron.list" => {
531            let jobs = cron.list_jobs();
532            serde_json::to_value(&jobs).unwrap_or_default()
533        }
534        "cron.remove" => {
535            let id = cmd_value
536                .get("id")
537                .and_then(|v| v.as_str())
538                .unwrap_or("");
539            match cron.remove_job(id) {
540                Some(job) => {
541                    #[cfg(feature = "exochain")]
542                    if let Some(cm) = chain {
543                        cm.append(
544                            "cron",
545                            "cron.remove",
546                            Some(serde_json::json!({
547                                "job_id": job.id,
548                                "name": job.name,
549                                "via_agent": pid,
550                            })),
551                        );
552                    }
553                    serde_json::json!({"removed": true, "job_id": job.id})
554                }
555                None => serde_json::json!({"removed": false, "error": "job not found"}),
556            }
557        }
558        "exec" => {
559            // K3 tool dispatch via ToolRegistry
560            let tool_name = cmd_value
561                .get("tool")
562                .and_then(|v| v.as_str())
563                .unwrap_or("");
564            let args = cmd_value
565                .get("args")
566                .cloned()
567                .unwrap_or(serde_json::json!({}));
568
569            if tool_name.is_empty() {
570                // Backwards compat: echo mode when no tool specified
571                let text = cmd_value
572                    .get("text")
573                    .and_then(|v| v.as_str())
574                    .unwrap_or("(no input)");
575                serde_json::json!({
576                    "status": "ok",
577                    "echo": text,
578                    "pid": pid,
579                })
580            } else if let Some(registry) = tool_registry {
581                match registry.execute(tool_name, args) {
582                    Ok(result) => {
583                        #[cfg(feature = "exochain")]
584                        if let Some(cm) = chain {
585                            cm.append(
586                                "tool",
587                                "tool.exec",
588                                Some(serde_json::json!({
589                                    "tool": tool_name,
590                                    "pid": pid,
591                                    "status": "ok",
592                                })),
593                            );
594                        }
595                        serde_json::json!({
596                            "status": "ok",
597                            "tool": tool_name,
598                            "result": result,
599                            "pid": pid,
600                        })
601                    }
602                    Err(e) => {
603                        #[cfg(feature = "exochain")]
604                        if let Some(cm) = chain {
605                            cm.append(
606                                "tool",
607                                "tool.exec",
608                                Some(serde_json::json!({
609                                    "tool": tool_name,
610                                    "pid": pid,
611                                    "status": "error",
612                                    "error": e.to_string(),
613                                })),
614                            );
615                        }
616                        serde_json::json!({
617                            "error": e.to_string(),
618                            "tool": tool_name,
619                            "pid": pid,
620                        })
621                    }
622                }
623            } else {
624                serde_json::json!({
625                    "error": "tool registry not available",
626                    "tool": tool_name,
627                    "pid": pid,
628                })
629            }
630        }
631        "echo" => {
632            let text = cmd_value
633                .get("text")
634                .and_then(|v| v.as_str())
635                .unwrap_or("");
636            serde_json::json!({"echo": text, "pid": pid})
637        }
638        "rvf.recv" => {
639            // Acknowledge receipt of an RVF-typed payload
640            let seg_type = cmd_value
641                .get("segment_type")
642                .and_then(|v| v.as_u64())
643                .unwrap_or(0);
644            let data_len = cmd_value
645                .get("data_len")
646                .and_then(|v| v.as_u64())
647                .unwrap_or(0);
648            serde_json::json!({
649                "status": "ok",
650                "cmd": "rvf.recv",
651                "segment_type": seg_type,
652                "data_len": data_len,
653                "pid": pid,
654            })
655        }
656        unknown => {
657            serde_json::json!({
658                "error": format!("unknown command: {unknown}"),
659                "pid": pid,
660            })
661        }
662    };
663
664    // Send response back to sender via chain-logged path
665    let reply = KernelMessage::with_correlation(
666        pid,
667        MessageTarget::Process(msg.from),
668        MessagePayload::Json(response),
669        msg.id.clone(),
670    );
671
672    #[cfg(feature = "exochain")]
673    {
674        if let Err(e) = a2a.send_checked(reply, chain).await {
675            warn!(pid, error = %e, "failed to send reply");
676        }
677    }
678    #[cfg(not(feature = "exochain"))]
679    {
680        if let Err(e) = a2a.send(reply).await {
681            warn!(pid, error = %e, "failed to send reply");
682        }
683    }
684}
685
686#[cfg(test)]
687mod tests {
688    use super::*;
689    use crate::capability::{AgentCapabilities, CapabilityChecker};
690    use crate::process::{ProcessEntry, ProcessState, ProcessTable, ResourceUsage};
691    use crate::topic::TopicRouter;
692
693    fn setup() -> (Arc<A2ARouter>, Arc<CronService>, Arc<ProcessTable>) {
694        let pt = Arc::new(ProcessTable::new(64));
695
696        // Insert a "kernel" process at PID 0 for message routing
697        let kernel_entry = ProcessEntry {
698            pid: 0,
699            agent_id: "kernel".into(),
700            state: ProcessState::Running,
701            capabilities: AgentCapabilities::default(),
702            resource_usage: ResourceUsage::default(),
703            cancel_token: CancellationToken::new(),
704            parent_pid: None,
705        };
706        pt.insert_with_pid(kernel_entry).unwrap();
707
708        let checker = Arc::new(CapabilityChecker::new(pt.clone()));
709        let topic_router = Arc::new(TopicRouter::new(pt.clone()));
710        let a2a = Arc::new(A2ARouter::new(pt.clone(), checker, topic_router));
711        let cron = Arc::new(CronService::new());
712        (a2a, cron, pt)
713    }
714
715    fn spawn_agent(
716        pt: &ProcessTable,
717        a2a: &A2ARouter,
718        agent_id: &str,
719    ) -> (Pid, mpsc::Receiver<KernelMessage>) {
720        let entry = ProcessEntry {
721            pid: 0,
722            agent_id: agent_id.into(),
723            state: ProcessState::Running,
724            capabilities: AgentCapabilities::default(),
725            resource_usage: ResourceUsage::default(),
726            cancel_token: CancellationToken::new(),
727            parent_pid: None,
728        };
729        let pid = pt.insert(entry).unwrap();
730        let rx = a2a.create_inbox(pid);
731        (pid, rx)
732    }
733
734    /// Helper to spawn the agent loop with the new parameters.
735    fn spawn_loop(
736        agent_pid: Pid,
737        cancel: CancellationToken,
738        inbox: mpsc::Receiver<KernelMessage>,
739        a2a: Arc<A2ARouter>,
740        cron: Arc<CronService>,
741        pt: Arc<ProcessTable>,
742    ) -> tokio::task::JoinHandle<i32> {
743        tokio::spawn(async move {
744            kernel_agent_loop(
745                agent_pid,
746                cancel,
747                inbox,
748                a2a,
749                cron,
750                pt,
751                None, // tool_registry
752                #[cfg(feature = "exochain")]
753                None,
754                #[cfg(feature = "exochain")]
755                None,
756            )
757            .await
758        })
759    }
760
761    #[tokio::test]
762    async fn ping_command() {
763        let (a2a, cron, pt) = setup();
764        let (agent_pid, inbox) = spawn_agent(&pt, &a2a, "test-agent");
765        let mut kernel_inbox = a2a.create_inbox(0);
766
767        let cancel = CancellationToken::new();
768        let handle = spawn_loop(agent_pid, cancel.clone(), inbox, a2a.clone(), cron, pt);
769
770        // Send ping from kernel (PID 0)
771        let msg = KernelMessage::new(
772            0,
773            MessageTarget::Process(agent_pid),
774            MessagePayload::Json(serde_json::json!({"cmd": "ping"})),
775        );
776        a2a.send(msg).await.unwrap();
777
778        // Wait for reply
779        let reply = tokio::time::timeout(
780            std::time::Duration::from_secs(1),
781            kernel_inbox.recv(),
782        )
783        .await
784        .unwrap()
785        .unwrap();
786
787        if let MessagePayload::Json(v) = &reply.payload {
788            assert_eq!(v["status"], "ok");
789            assert_eq!(v["pid"], agent_pid);
790        } else {
791            panic!("expected JSON reply");
792        }
793
794        cancel.cancel();
795        let code = handle.await.unwrap();
796        assert_eq!(code, 0);
797    }
798
799    #[tokio::test]
800    async fn unknown_command() {
801        let (a2a, cron, pt) = setup();
802        let (agent_pid, inbox) = spawn_agent(&pt, &a2a, "test-agent");
803        let mut kernel_inbox = a2a.create_inbox(0);
804
805        let cancel = CancellationToken::new();
806        let handle = spawn_loop(agent_pid, cancel.clone(), inbox, a2a.clone(), cron, pt);
807
808        let msg = KernelMessage::new(
809            0,
810            MessageTarget::Process(agent_pid),
811            MessagePayload::Json(serde_json::json!({"cmd": "nosuch"})),
812        );
813        a2a.send(msg).await.unwrap();
814
815        let reply = tokio::time::timeout(
816            std::time::Duration::from_secs(1),
817            kernel_inbox.recv(),
818        )
819        .await
820        .unwrap()
821        .unwrap();
822
823        if let MessagePayload::Json(v) = &reply.payload {
824            assert!(v["error"].as_str().unwrap().contains("unknown command"));
825        } else {
826            panic!("expected JSON reply");
827        }
828
829        cancel.cancel();
830        handle.await.unwrap();
831    }
832
833    #[tokio::test]
834    async fn cron_add_via_agent() {
835        let (a2a, cron, pt) = setup();
836        let (agent_pid, inbox) = spawn_agent(&pt, &a2a, "test-agent");
837        let mut kernel_inbox = a2a.create_inbox(0);
838
839        let cancel = CancellationToken::new();
840        let handle = spawn_loop(agent_pid, cancel.clone(), inbox, a2a.clone(), cron.clone(), pt);
841
842        let msg = KernelMessage::new(
843            0,
844            MessageTarget::Process(agent_pid),
845            MessagePayload::Json(serde_json::json!({
846                "cmd": "cron.add",
847                "name": "test-job",
848                "interval_secs": 30,
849                "command": "health",
850            })),
851        );
852        a2a.send(msg).await.unwrap();
853
854        let reply = tokio::time::timeout(
855            std::time::Duration::from_secs(1),
856            kernel_inbox.recv(),
857        )
858        .await
859        .unwrap()
860        .unwrap();
861
862        if let MessagePayload::Json(v) = &reply.payload {
863            assert_eq!(v["name"], "test-job");
864            assert!(v["id"].as_str().is_some());
865        } else {
866            panic!("expected JSON reply");
867        }
868
869        // Verify job was actually added
870        assert_eq!(cron.job_count(), 1);
871
872        cancel.cancel();
873        handle.await.unwrap();
874    }
875
876    #[tokio::test]
877    async fn cancellation_exits_cleanly() {
878        let (a2a, cron, pt) = setup();
879        let (agent_pid, inbox) = spawn_agent(&pt, &a2a, "test-agent");
880
881        let cancel = CancellationToken::new();
882        let handle = spawn_loop(agent_pid, cancel.clone(), inbox, a2a, cron, pt);
883
884        cancel.cancel();
885        let code = handle.await.unwrap();
886        assert_eq!(code, 0);
887    }
888
889    #[tokio::test]
890    async fn rvf_json_payload_processed() {
891        let (a2a, cron, pt) = setup();
892        let (agent_pid, inbox) = spawn_agent(&pt, &a2a, "test-agent");
893        let mut kernel_inbox = a2a.create_inbox(0);
894
895        let cancel = CancellationToken::new();
896        let handle = spawn_loop(agent_pid, cancel.clone(), inbox, a2a.clone(), cron, pt);
897
898        // Send an RVF payload containing JSON bytes (e.g. `{"cmd":"ping"}`)
899        let json_bytes = serde_json::to_vec(&serde_json::json!({"cmd": "ping"})).unwrap();
900        let msg = KernelMessage::new(
901            0,
902            MessageTarget::Process(agent_pid),
903            MessagePayload::Rvf {
904                segment_type: 0x40,
905                data: json_bytes,
906            },
907        );
908        a2a.send(msg).await.unwrap();
909
910        let reply = tokio::time::timeout(
911            std::time::Duration::from_secs(1),
912            kernel_inbox.recv(),
913        )
914        .await
915        .unwrap()
916        .unwrap();
917
918        if let MessagePayload::Json(v) = &reply.payload {
919            assert_eq!(v["status"], "ok");
920            assert_eq!(v["pid"], agent_pid);
921        } else {
922            panic!("expected JSON reply to RVF-wrapped ping");
923        }
924
925        cancel.cancel();
926        handle.await.unwrap();
927    }
928
929    #[tokio::test]
930    async fn rvf_opaque_binary_acknowledged() {
931        let (a2a, cron, pt) = setup();
932        let (agent_pid, inbox) = spawn_agent(&pt, &a2a, "test-agent");
933        let mut kernel_inbox = a2a.create_inbox(0);
934
935        let cancel = CancellationToken::new();
936        let handle = spawn_loop(agent_pid, cancel.clone(), inbox, a2a.clone(), cron, pt);
937
938        // Send raw binary that isn't valid JSON or CBOR
939        let msg = KernelMessage::new(
940            0,
941            MessageTarget::Process(agent_pid),
942            MessagePayload::Rvf {
943                segment_type: 0x42,
944                data: vec![0xDE, 0xAD, 0xBE, 0xEF],
945            },
946        );
947        a2a.send(msg).await.unwrap();
948
949        let reply = tokio::time::timeout(
950            std::time::Duration::from_secs(1),
951            kernel_inbox.recv(),
952        )
953        .await
954        .unwrap()
955        .unwrap();
956
957        if let MessagePayload::Json(v) = &reply.payload {
958            assert_eq!(v["cmd"], "rvf.recv");
959            assert_eq!(v["segment_type"], 0x42);
960            assert_eq!(v["data_len"], 4);
961        } else {
962            panic!("expected JSON reply acknowledging RVF binary");
963        }
964
965        cancel.cancel();
966        handle.await.unwrap();
967    }
968
969    #[tokio::test]
970    async fn resource_usage_increments() {
971        let (a2a, cron, pt) = setup();
972        let (agent_pid, inbox) = spawn_agent(&pt, &a2a, "test-agent");
973        let mut kernel_inbox = a2a.create_inbox(0);
974
975        let cancel = CancellationToken::new();
976        let handle = spawn_loop(agent_pid, cancel.clone(), inbox, a2a.clone(), cron, pt.clone());
977
978        // Send a ping
979        let msg = KernelMessage::new(
980            0,
981            MessageTarget::Process(agent_pid),
982            MessagePayload::Json(serde_json::json!({"cmd": "ping"})),
983        );
984        a2a.send(msg).await.unwrap();
985
986        // Wait for reply
987        let _reply = tokio::time::timeout(
988            std::time::Duration::from_secs(1),
989            kernel_inbox.recv(),
990        )
991        .await
992        .unwrap()
993        .unwrap();
994
995        // Cancel and wait for the loop to exit (which triggers final resource update)
996        cancel.cancel();
997        let _code = handle.await.unwrap();
998
999        // Check resource usage was updated
1000        let entry = pt.get(agent_pid).unwrap();
1001        assert!(entry.resource_usage.messages_sent >= 1, "messages_sent should be at least 1");
1002    }
1003
1004    #[tokio::test]
1005    async fn suspend_resume_cycle() {
1006        let (a2a, cron, pt) = setup();
1007        let (agent_pid, inbox) = spawn_agent(&pt, &a2a, "test-agent");
1008        let mut kernel_inbox = a2a.create_inbox(0);
1009
1010        let cancel = CancellationToken::new();
1011        let handle = spawn_loop(agent_pid, cancel.clone(), inbox, a2a.clone(), cron, pt.clone());
1012
1013        // Send suspend
1014        let msg = KernelMessage::new(
1015            0,
1016            MessageTarget::Process(agent_pid),
1017            MessagePayload::Json(serde_json::json!({"cmd": "suspend"})),
1018        );
1019        a2a.send(msg).await.unwrap();
1020
1021        // Wait for suspended acknowledgement
1022        let reply = tokio::time::timeout(
1023            std::time::Duration::from_secs(1),
1024            kernel_inbox.recv(),
1025        )
1026        .await
1027        .unwrap()
1028        .unwrap();
1029
1030        if let MessagePayload::Json(v) = &reply.payload {
1031            assert_eq!(v["status"], "suspended");
1032        } else {
1033            panic!("expected JSON reply");
1034        }
1035
1036        // Verify process state is Suspended
1037        let entry = pt.get(agent_pid).unwrap();
1038        assert_eq!(entry.state, ProcessState::Suspended);
1039
1040        // Send a ping while suspended — should get error
1041        let msg = KernelMessage::new(
1042            0,
1043            MessageTarget::Process(agent_pid),
1044            MessagePayload::Json(serde_json::json!({"cmd": "ping"})),
1045        );
1046        a2a.send(msg).await.unwrap();
1047
1048        let reply = tokio::time::timeout(
1049            std::time::Duration::from_secs(1),
1050            kernel_inbox.recv(),
1051        )
1052        .await
1053        .unwrap()
1054        .unwrap();
1055
1056        if let MessagePayload::Json(v) = &reply.payload {
1057            assert_eq!(v["error"], "agent suspended");
1058        } else {
1059            panic!("expected JSON error reply");
1060        }
1061
1062        // Send resume
1063        let msg = KernelMessage::new(
1064            0,
1065            MessageTarget::Process(agent_pid),
1066            MessagePayload::Json(serde_json::json!({"cmd": "resume"})),
1067        );
1068        a2a.send(msg).await.unwrap();
1069
1070        let reply = tokio::time::timeout(
1071            std::time::Duration::from_secs(1),
1072            kernel_inbox.recv(),
1073        )
1074        .await
1075        .unwrap()
1076        .unwrap();
1077
1078        if let MessagePayload::Json(v) = &reply.payload {
1079            assert_eq!(v["status"], "resumed");
1080        } else {
1081            panic!("expected JSON reply");
1082        }
1083
1084        // Verify process state is Running again
1085        let entry = pt.get(agent_pid).unwrap();
1086        assert_eq!(entry.state, ProcessState::Running);
1087
1088        // Ping should work again after resume
1089        let msg = KernelMessage::new(
1090            0,
1091            MessageTarget::Process(agent_pid),
1092            MessagePayload::Json(serde_json::json!({"cmd": "ping"})),
1093        );
1094        a2a.send(msg).await.unwrap();
1095
1096        let reply = tokio::time::timeout(
1097            std::time::Duration::from_secs(1),
1098            kernel_inbox.recv(),
1099        )
1100        .await
1101        .unwrap()
1102        .unwrap();
1103
1104        if let MessagePayload::Json(v) = &reply.payload {
1105            assert_eq!(v["status"], "ok");
1106        } else {
1107            panic!("expected JSON reply");
1108        }
1109
1110        cancel.cancel();
1111        handle.await.unwrap();
1112    }
1113
1114    #[cfg(feature = "exochain")]
1115    #[tokio::test]
1116    async fn gate_deny_blocks_exec() {
1117        use crate::gate::{GateBackend, GateDecision};
1118
1119        // Gate that always denies
1120        struct AlwaysDeny;
1121        impl GateBackend for AlwaysDeny {
1122            fn check(
1123                &self,
1124                _agent_id: &str,
1125                _action: &str,
1126                _context: &serde_json::Value,
1127            ) -> GateDecision {
1128                GateDecision::Deny {
1129                    reason: "test deny".into(),
1130                    receipt: None,
1131                }
1132            }
1133        }
1134
1135        let (a2a, cron, pt) = setup();
1136        let (agent_pid, inbox) = spawn_agent(&pt, &a2a, "test-agent");
1137        let mut kernel_inbox = a2a.create_inbox(0);
1138
1139        let cancel = CancellationToken::new();
1140        let cancel2 = cancel.clone();
1141        let a2a2 = a2a.clone();
1142        let pt2 = pt.clone();
1143
1144        let handle = tokio::spawn(async move {
1145            kernel_agent_loop(
1146                agent_pid,
1147                cancel2,
1148                inbox,
1149                a2a2,
1150                cron,
1151                pt2,
1152                None, // tool_registry
1153                None, // chain
1154                Some(Arc::new(AlwaysDeny) as Arc<dyn GateBackend>),
1155            )
1156            .await
1157        });
1158
1159        // Send exec — should be denied by gate
1160        let msg = KernelMessage::new(
1161            0,
1162            MessageTarget::Process(agent_pid),
1163            MessagePayload::Json(serde_json::json!({"cmd": "exec", "text": "hello"})),
1164        );
1165        a2a.send(msg).await.unwrap();
1166
1167        let reply = tokio::time::timeout(
1168            std::time::Duration::from_secs(1),
1169            kernel_inbox.recv(),
1170        )
1171        .await
1172        .unwrap()
1173        .unwrap();
1174
1175        if let MessagePayload::Json(v) = &reply.payload {
1176            assert_eq!(v["denied"], true);
1177            assert_eq!(v["error"], "test deny");
1178        } else {
1179            panic!("expected JSON deny reply");
1180        }
1181
1182        cancel.cancel();
1183        handle.await.unwrap();
1184    }
1185
1186    #[cfg(feature = "exochain")]
1187    #[tokio::test]
1188    async fn gate_permit_allows_exec() {
1189        use crate::gate::{GateBackend, GateDecision};
1190
1191        // Gate that always permits
1192        struct AlwaysPermit;
1193        impl GateBackend for AlwaysPermit {
1194            fn check(
1195                &self,
1196                _agent_id: &str,
1197                _action: &str,
1198                _context: &serde_json::Value,
1199            ) -> GateDecision {
1200                GateDecision::Permit { token: None }
1201            }
1202        }
1203
1204        let (a2a, cron, pt) = setup();
1205        let (agent_pid, inbox) = spawn_agent(&pt, &a2a, "test-agent");
1206        let mut kernel_inbox = a2a.create_inbox(0);
1207
1208        let cancel = CancellationToken::new();
1209        let cancel2 = cancel.clone();
1210        let a2a2 = a2a.clone();
1211        let pt2 = pt.clone();
1212
1213        let handle = tokio::spawn(async move {
1214            kernel_agent_loop(
1215                agent_pid,
1216                cancel2,
1217                inbox,
1218                a2a2,
1219                cron,
1220                pt2,
1221                None, // tool_registry
1222                None, // chain
1223                Some(Arc::new(AlwaysPermit) as Arc<dyn GateBackend>),
1224            )
1225            .await
1226        });
1227
1228        // Send exec — should be permitted
1229        let msg = KernelMessage::new(
1230            0,
1231            MessageTarget::Process(agent_pid),
1232            MessagePayload::Json(serde_json::json!({"cmd": "exec", "text": "hello"})),
1233        );
1234        a2a.send(msg).await.unwrap();
1235
1236        let reply = tokio::time::timeout(
1237            std::time::Duration::from_secs(1),
1238            kernel_inbox.recv(),
1239        )
1240        .await
1241        .unwrap()
1242        .unwrap();
1243
1244        if let MessagePayload::Json(v) = &reply.payload {
1245            assert_eq!(v["status"], "ok");
1246            assert_eq!(v["echo"], "hello");
1247        } else {
1248            panic!("expected JSON reply");
1249        }
1250
1251        cancel.cancel();
1252        handle.await.unwrap();
1253    }
1254
1255    #[cfg(feature = "exochain")]
1256    #[tokio::test]
1257    async fn chain_logs_ipc_recv_ack() {
1258        let (a2a, cron, pt) = setup();
1259        let (agent_pid, inbox) = spawn_agent(&pt, &a2a, "chain-test");
1260        let mut kernel_inbox = a2a.create_inbox(0);
1261
1262        let cm = Arc::new(crate::chain::ChainManager::new(0, 1000));
1263        let cancel = CancellationToken::new();
1264        let cancel2 = cancel.clone();
1265        let a2a2 = a2a.clone();
1266        let pt2 = pt.clone();
1267        let cm2 = cm.clone();
1268
1269        let handle = tokio::spawn(async move {
1270            kernel_agent_loop(
1271                agent_pid,
1272                cancel2,
1273                inbox,
1274                a2a2,
1275                cron,
1276                pt2,
1277                None, // tool_registry
1278                Some(cm2),
1279                None, // gate
1280            )
1281            .await
1282        });
1283
1284        // Send ping from kernel (PID 0)
1285        let msg = KernelMessage::new(
1286            0,
1287            MessageTarget::Process(agent_pid),
1288            MessagePayload::Json(serde_json::json!({"cmd": "ping"})),
1289        );
1290        let msg_id = msg.id.clone();
1291        a2a.send(msg).await.unwrap();
1292
1293        // Wait for reply
1294        let _reply = tokio::time::timeout(
1295            std::time::Duration::from_secs(1),
1296            kernel_inbox.recv(),
1297        )
1298        .await
1299        .unwrap()
1300        .unwrap();
1301
1302        cancel.cancel();
1303        handle.await.unwrap();
1304
1305        // Verify chain events: ipc.recv + ipc.ack (plus ipc.send from reply)
1306        let events = cm.tail(10);
1307        let recv_evt = events.iter().find(|e| e.kind == "ipc.recv");
1308        let ack_evt = events.iter().find(|e| e.kind == "ipc.ack");
1309
1310        assert!(recv_evt.is_some(), "expected ipc.recv event on chain");
1311        assert!(ack_evt.is_some(), "expected ipc.ack event on chain");
1312
1313        let recv_payload = recv_evt.unwrap().payload.as_ref().unwrap();
1314        assert_eq!(recv_payload["pid"], agent_pid);
1315        assert_eq!(recv_payload["from"], 0);
1316        assert_eq!(recv_payload["msg_id"], msg_id);
1317        assert_eq!(recv_payload["cmd"], "ping");
1318
1319        let ack_payload = ack_evt.unwrap().payload.as_ref().unwrap();
1320        assert_eq!(ack_payload["pid"], agent_pid);
1321        assert_eq!(ack_payload["msg_id"], msg_id);
1322        assert_eq!(ack_payload["cmd"], "ping");
1323        assert_eq!(ack_payload["status"], "processed");
1324    }
1325
1326    #[cfg(feature = "exochain")]
1327    #[tokio::test]
1328    async fn chain_logs_suspend_resume() {
1329        let (a2a, cron, pt) = setup();
1330        let (agent_pid, inbox) = spawn_agent(&pt, &a2a, "suspend-test");
1331        let mut kernel_inbox = a2a.create_inbox(0);
1332
1333        let cm = Arc::new(crate::chain::ChainManager::new(0, 1000));
1334        let cancel = CancellationToken::new();
1335        let cancel2 = cancel.clone();
1336        let a2a2 = a2a.clone();
1337        let pt2 = pt.clone();
1338        let cm2 = cm.clone();
1339
1340        let handle = tokio::spawn(async move {
1341            kernel_agent_loop(
1342                agent_pid,
1343                cancel2,
1344                inbox,
1345                a2a2,
1346                cron,
1347                pt2,
1348                None, // tool_registry
1349                Some(cm2),
1350                None, // gate
1351            )
1352            .await
1353        });
1354
1355        // Send suspend
1356        let suspend_msg = KernelMessage::new(
1357            0,
1358            MessageTarget::Process(agent_pid),
1359            MessagePayload::Json(serde_json::json!({"cmd": "suspend"})),
1360        );
1361        let suspend_id = suspend_msg.id.clone();
1362        a2a.send(suspend_msg).await.unwrap();
1363
1364        // Wait for suspended ack
1365        let _reply = tokio::time::timeout(
1366            std::time::Duration::from_secs(1),
1367            kernel_inbox.recv(),
1368        )
1369        .await
1370        .unwrap()
1371        .unwrap();
1372
1373        // Send resume
1374        let resume_msg = KernelMessage::new(
1375            0,
1376            MessageTarget::Process(agent_pid),
1377            MessagePayload::Json(serde_json::json!({"cmd": "resume"})),
1378        );
1379        let resume_id = resume_msg.id.clone();
1380        a2a.send(resume_msg).await.unwrap();
1381
1382        // Wait for resumed ack
1383        let _reply = tokio::time::timeout(
1384            std::time::Duration::from_secs(1),
1385            kernel_inbox.recv(),
1386        )
1387        .await
1388        .unwrap()
1389        .unwrap();
1390
1391        cancel.cancel();
1392        handle.await.unwrap();
1393
1394        // Verify chain events
1395        let events = cm.tail(20);
1396        let suspend_evt = events.iter().find(|e| e.kind == "agent.suspend");
1397        let resume_evt = events.iter().find(|e| e.kind == "agent.resume");
1398
1399        assert!(suspend_evt.is_some(), "expected agent.suspend event on chain");
1400        assert!(resume_evt.is_some(), "expected agent.resume event on chain");
1401
1402        let sp = suspend_evt.unwrap().payload.as_ref().unwrap();
1403        assert_eq!(sp["pid"], agent_pid);
1404        assert_eq!(sp["from"], 0);
1405        assert_eq!(sp["msg_id"], suspend_id);
1406
1407        let rp = resume_evt.unwrap().payload.as_ref().unwrap();
1408        assert_eq!(rp["pid"], agent_pid);
1409        assert_eq!(rp["from"], 0);
1410        assert_eq!(rp["msg_id"], resume_id);
1411    }
1412
1413    // ── Additional agent_loop coverage tests ─────────────────────
1414
1415    #[tokio::test]
1416    async fn echo_command() {
1417        let (a2a, cron, pt) = setup();
1418        let (agent_pid, inbox) = spawn_agent(&pt, &a2a, "echo-agent");
1419        let mut kernel_inbox = a2a.create_inbox(0);
1420
1421        let cancel = CancellationToken::new();
1422        let handle = spawn_loop(agent_pid, cancel.clone(), inbox, a2a.clone(), cron, pt);
1423
1424        let msg = KernelMessage::new(
1425            0,
1426            MessageTarget::Process(agent_pid),
1427            MessagePayload::Json(serde_json::json!({"cmd": "echo", "text": "hello world"})),
1428        );
1429        a2a.send(msg).await.unwrap();
1430
1431        let reply = tokio::time::timeout(
1432            std::time::Duration::from_secs(1),
1433            kernel_inbox.recv(),
1434        )
1435        .await
1436        .unwrap()
1437        .unwrap();
1438
1439        if let MessagePayload::Json(v) = &reply.payload {
1440            assert_eq!(v["echo"], "hello world");
1441            assert_eq!(v["pid"], agent_pid);
1442        } else {
1443            panic!("expected JSON reply");
1444        }
1445
1446        cancel.cancel();
1447        handle.await.unwrap();
1448    }
1449
1450    #[tokio::test]
1451    async fn text_payload_parsed_as_json() {
1452        let (a2a, cron, pt) = setup();
1453        let (agent_pid, inbox) = spawn_agent(&pt, &a2a, "text-agent");
1454        let mut kernel_inbox = a2a.create_inbox(0);
1455
1456        let cancel = CancellationToken::new();
1457        let handle = spawn_loop(agent_pid, cancel.clone(), inbox, a2a.clone(), cron, pt);
1458
1459        // Send text payload that is valid JSON
1460        let msg = KernelMessage::new(
1461            0,
1462            MessageTarget::Process(agent_pid),
1463            MessagePayload::Text(r#"{"cmd": "ping"}"#.into()),
1464        );
1465        a2a.send(msg).await.unwrap();
1466
1467        let reply = tokio::time::timeout(
1468            std::time::Duration::from_secs(1),
1469            kernel_inbox.recv(),
1470        )
1471        .await
1472        .unwrap()
1473        .unwrap();
1474
1475        if let MessagePayload::Json(v) = &reply.payload {
1476            assert_eq!(v["status"], "ok");
1477        } else {
1478            panic!("expected JSON reply");
1479        }
1480
1481        cancel.cancel();
1482        handle.await.unwrap();
1483    }
1484
1485    #[tokio::test]
1486    async fn text_payload_non_json_becomes_echo() {
1487        let (a2a, cron, pt) = setup();
1488        let (agent_pid, inbox) = spawn_agent(&pt, &a2a, "text-echo-agent");
1489        let mut kernel_inbox = a2a.create_inbox(0);
1490
1491        let cancel = CancellationToken::new();
1492        let handle = spawn_loop(agent_pid, cancel.clone(), inbox, a2a.clone(), cron, pt);
1493
1494        // Send plain text that isn't JSON
1495        let msg = KernelMessage::new(
1496            0,
1497            MessageTarget::Process(agent_pid),
1498            MessagePayload::Text("just plain text".into()),
1499        );
1500        a2a.send(msg).await.unwrap();
1501
1502        let reply = tokio::time::timeout(
1503            std::time::Duration::from_secs(1),
1504            kernel_inbox.recv(),
1505        )
1506        .await
1507        .unwrap()
1508        .unwrap();
1509
1510        if let MessagePayload::Json(v) = &reply.payload {
1511            // Plain text becomes {"cmd": "echo", "text": "just plain text"}
1512            assert_eq!(v["echo"], "just plain text");
1513        } else {
1514            panic!("expected JSON reply");
1515        }
1516
1517        cancel.cancel();
1518        handle.await.unwrap();
1519    }
1520
1521    #[tokio::test]
1522    async fn inbox_close_causes_clean_exit() {
1523        let pt = Arc::new(ProcessTable::new(64));
1524
1525        // Insert kernel process
1526        let kernel_entry = ProcessEntry {
1527            pid: 0,
1528            agent_id: "kernel".into(),
1529            state: ProcessState::Running,
1530            capabilities: AgentCapabilities::default(),
1531            resource_usage: ResourceUsage::default(),
1532            cancel_token: CancellationToken::new(),
1533            parent_pid: None,
1534        };
1535        pt.insert_with_pid(kernel_entry).unwrap();
1536
1537        let checker = Arc::new(CapabilityChecker::new(pt.clone()));
1538        let topic_router = Arc::new(TopicRouter::new(pt.clone()));
1539        let a2a = Arc::new(A2ARouter::new(pt.clone(), checker, topic_router));
1540        let cron = Arc::new(CronService::new());
1541
1542        // Create agent with a manually controlled inbox
1543        let (tx, rx) = mpsc::channel(32);
1544        let agent_entry = ProcessEntry {
1545            pid: 0,
1546            agent_id: "close-agent".into(),
1547            state: ProcessState::Running,
1548            capabilities: AgentCapabilities::default(),
1549            resource_usage: ResourceUsage::default(),
1550            cancel_token: CancellationToken::new(),
1551            parent_pid: None,
1552        };
1553        let agent_pid = pt.insert(agent_entry).unwrap();
1554
1555        let cancel = CancellationToken::new();
1556        let handle = spawn_loop(agent_pid, cancel.clone(), rx, a2a, cron, pt);
1557
1558        // Drop the sender to close the inbox
1559        drop(tx);
1560
1561        let code = tokio::time::timeout(
1562            std::time::Duration::from_secs(2),
1563            handle,
1564        )
1565        .await
1566        .unwrap()
1567        .unwrap();
1568
1569        assert_eq!(code, 0, "should exit cleanly when inbox closes");
1570    }
1571
1572    #[tokio::test]
1573    async fn cron_list_returns_empty() {
1574        let (a2a, cron, pt) = setup();
1575        let (agent_pid, inbox) = spawn_agent(&pt, &a2a, "cron-list-agent");
1576        let mut kernel_inbox = a2a.create_inbox(0);
1577
1578        let cancel = CancellationToken::new();
1579        let handle = spawn_loop(agent_pid, cancel.clone(), inbox, a2a.clone(), cron, pt);
1580
1581        let msg = KernelMessage::new(
1582            0,
1583            MessageTarget::Process(agent_pid),
1584            MessagePayload::Json(serde_json::json!({"cmd": "cron.list"})),
1585        );
1586        a2a.send(msg).await.unwrap();
1587
1588        let reply = tokio::time::timeout(
1589            std::time::Duration::from_secs(1),
1590            kernel_inbox.recv(),
1591        )
1592        .await
1593        .unwrap()
1594        .unwrap();
1595
1596        if let MessagePayload::Json(v) = &reply.payload {
1597            assert!(v.is_array(), "cron.list should return array");
1598            assert_eq!(v.as_array().unwrap().len(), 0, "should be empty");
1599        } else {
1600            panic!("expected JSON reply");
1601        }
1602
1603        cancel.cancel();
1604        handle.await.unwrap();
1605    }
1606
1607    #[tokio::test]
1608    async fn cron_remove_nonexistent() {
1609        let (a2a, cron, pt) = setup();
1610        let (agent_pid, inbox) = spawn_agent(&pt, &a2a, "cron-rm-agent");
1611        let mut kernel_inbox = a2a.create_inbox(0);
1612
1613        let cancel = CancellationToken::new();
1614        let handle = spawn_loop(agent_pid, cancel.clone(), inbox, a2a.clone(), cron, pt);
1615
1616        let msg = KernelMessage::new(
1617            0,
1618            MessageTarget::Process(agent_pid),
1619            MessagePayload::Json(serde_json::json!({"cmd": "cron.remove", "id": "nonexistent"})),
1620        );
1621        a2a.send(msg).await.unwrap();
1622
1623        let reply = tokio::time::timeout(
1624            std::time::Duration::from_secs(1),
1625            kernel_inbox.recv(),
1626        )
1627        .await
1628        .unwrap()
1629        .unwrap();
1630
1631        if let MessagePayload::Json(v) = &reply.payload {
1632            assert_eq!(v["removed"], false);
1633            assert!(v["error"].as_str().is_some());
1634        } else {
1635            panic!("expected JSON reply");
1636        }
1637
1638        cancel.cancel();
1639        handle.await.unwrap();
1640    }
1641
1642    #[tokio::test]
1643    async fn exec_without_tool_echoes() {
1644        let (a2a, cron, pt) = setup();
1645        let (agent_pid, inbox) = spawn_agent(&pt, &a2a, "exec-agent");
1646        let mut kernel_inbox = a2a.create_inbox(0);
1647
1648        let cancel = CancellationToken::new();
1649        let handle = spawn_loop(agent_pid, cancel.clone(), inbox, a2a.clone(), cron, pt);
1650
1651        // exec without tool name falls back to echo mode
1652        let msg = KernelMessage::new(
1653            0,
1654            MessageTarget::Process(agent_pid),
1655            MessagePayload::Json(serde_json::json!({"cmd": "exec", "text": "fallback"})),
1656        );
1657        a2a.send(msg).await.unwrap();
1658
1659        let reply = tokio::time::timeout(
1660            std::time::Duration::from_secs(1),
1661            kernel_inbox.recv(),
1662        )
1663        .await
1664        .unwrap()
1665        .unwrap();
1666
1667        if let MessagePayload::Json(v) = &reply.payload {
1668            assert_eq!(v["status"], "ok");
1669            assert_eq!(v["echo"], "fallback");
1670        } else {
1671            panic!("expected JSON reply");
1672        }
1673
1674        cancel.cancel();
1675        handle.await.unwrap();
1676    }
1677
1678    #[tokio::test]
1679    async fn exec_with_tool_name_no_registry() {
1680        let (a2a, cron, pt) = setup();
1681        let (agent_pid, inbox) = spawn_agent(&pt, &a2a, "exec-noreg-agent");
1682        let mut kernel_inbox = a2a.create_inbox(0);
1683
1684        let cancel = CancellationToken::new();
1685        let handle = spawn_loop(agent_pid, cancel.clone(), inbox, a2a.clone(), cron, pt);
1686
1687        // exec with tool name but no registry
1688        let msg = KernelMessage::new(
1689            0,
1690            MessageTarget::Process(agent_pid),
1691            MessagePayload::Json(serde_json::json!({"cmd": "exec", "tool": "fs.read", "args": {}})),
1692        );
1693        a2a.send(msg).await.unwrap();
1694
1695        let reply = tokio::time::timeout(
1696            std::time::Duration::from_secs(1),
1697            kernel_inbox.recv(),
1698        )
1699        .await
1700        .unwrap()
1701        .unwrap();
1702
1703        if let MessagePayload::Json(v) = &reply.payload {
1704            assert!(
1705                v["error"].as_str().unwrap().contains("tool registry not available"),
1706                "should report tool registry unavailable"
1707            );
1708        } else {
1709            panic!("expected JSON reply");
1710        }
1711
1712        cancel.cancel();
1713        handle.await.unwrap();
1714    }
1715
1716    #[tokio::test]
1717    async fn multiple_messages_increment_usage() {
1718        let (a2a, cron, pt) = setup();
1719        let (agent_pid, inbox) = spawn_agent(&pt, &a2a, "multi-msg-agent");
1720        let mut kernel_inbox = a2a.create_inbox(0);
1721
1722        let cancel = CancellationToken::new();
1723        let handle = spawn_loop(agent_pid, cancel.clone(), inbox, a2a.clone(), cron, pt.clone());
1724
1725        // Send 3 pings
1726        for _ in 0..3 {
1727            let msg = KernelMessage::new(
1728                0,
1729                MessageTarget::Process(agent_pid),
1730                MessagePayload::Json(serde_json::json!({"cmd": "ping"})),
1731            );
1732            a2a.send(msg).await.unwrap();
1733            let _reply = tokio::time::timeout(
1734                std::time::Duration::from_secs(1),
1735                kernel_inbox.recv(),
1736            )
1737            .await
1738            .unwrap()
1739            .unwrap();
1740        }
1741
1742        cancel.cancel();
1743        let _code = handle.await.unwrap();
1744
1745        let entry = pt.get(agent_pid).unwrap();
1746        assert!(
1747            entry.resource_usage.messages_sent >= 3,
1748            "should have sent at least 3 messages, got {}",
1749            entry.resource_usage.messages_sent
1750        );
1751    }
1752
1753    #[tokio::test]
1754    async fn cancel_during_suspend_exits_cleanly() {
1755        let (a2a, cron, pt) = setup();
1756        let (agent_pid, inbox) = spawn_agent(&pt, &a2a, "cancel-suspend-agent");
1757        let mut kernel_inbox = a2a.create_inbox(0);
1758
1759        let cancel = CancellationToken::new();
1760        let handle = spawn_loop(agent_pid, cancel.clone(), inbox, a2a.clone(), cron, pt);
1761
1762        // Send suspend
1763        let msg = KernelMessage::new(
1764            0,
1765            MessageTarget::Process(agent_pid),
1766            MessagePayload::Json(serde_json::json!({"cmd": "suspend"})),
1767        );
1768        a2a.send(msg).await.unwrap();
1769
1770        // Wait for suspended ack
1771        let _reply = tokio::time::timeout(
1772            std::time::Duration::from_secs(1),
1773            kernel_inbox.recv(),
1774        )
1775        .await
1776        .unwrap()
1777        .unwrap();
1778
1779        // Cancel while suspended
1780        cancel.cancel();
1781        let code = tokio::time::timeout(
1782            std::time::Duration::from_secs(2),
1783            handle,
1784        )
1785        .await
1786        .unwrap()
1787        .unwrap();
1788
1789        assert_eq!(code, 0, "should exit cleanly when cancelled during suspend");
1790    }
1791
1792    #[tokio::test]
1793    async fn extract_cmd_from_json() {
1794        let msg = KernelMessage::new(
1795            0,
1796            MessageTarget::Process(1),
1797            MessagePayload::Json(serde_json::json!({"cmd": "test_cmd"})),
1798        );
1799        assert_eq!(extract_cmd(&msg), Some("test_cmd".to_string()));
1800    }
1801
1802    #[tokio::test]
1803    async fn extract_cmd_from_text() {
1804        let msg = KernelMessage::new(
1805            0,
1806            MessageTarget::Process(1),
1807            MessagePayload::Text(r#"{"cmd": "from_text"}"#.into()),
1808        );
1809        assert_eq!(extract_cmd(&msg), Some("from_text".to_string()));
1810    }
1811
1812    #[tokio::test]
1813    async fn extract_cmd_from_plain_text_returns_none() {
1814        let msg = KernelMessage::new(
1815            0,
1816            MessageTarget::Process(1),
1817            MessagePayload::Text("not json".into()),
1818        );
1819        assert_eq!(extract_cmd(&msg), None);
1820    }
1821
1822    #[tokio::test]
1823    async fn extract_cmd_from_signal_returns_none() {
1824        let msg = KernelMessage::new(
1825            0,
1826            MessageTarget::Process(1),
1827            MessagePayload::Signal(crate::ipc::KernelSignal::Shutdown),
1828        );
1829        assert_eq!(extract_cmd(&msg), None);
1830    }
1831}