1use std::sync::Arc;
7use std::time::Instant;
8
9use tokio::sync::mpsc;
10use tokio_util::sync::CancellationToken;
11use tracing::{debug, warn};
12
13use crate::a2a::A2ARouter;
14use crate::cron::CronService;
15use crate::ipc::{KernelMessage, MessagePayload, MessageTarget};
16use crate::process::{Pid, ProcessState, ProcessTable, ResourceUsage};
17
18#[allow(clippy::too_many_arguments)]
31pub async fn kernel_agent_loop(
32 pid: Pid,
33 cancel: CancellationToken,
34 mut inbox: mpsc::Receiver<KernelMessage>,
35 a2a: Arc<A2ARouter>,
36 cron: Arc<CronService>,
37 process_table: Arc<ProcessTable>,
38 tool_registry: Option<Arc<crate::wasm_runner::ToolRegistry>>,
39 #[cfg(feature = "exochain")] chain: Option<Arc<crate::chain::ChainManager>>,
40 #[cfg(feature = "exochain")] gate: Option<Arc<dyn crate::gate::GateBackend>>,
41) -> i32 {
42 let started = Instant::now();
43 debug!(pid, "agent loop started");
44
45 #[cfg(feature = "exochain")]
47 let agent_id = process_table
48 .get(pid)
49 .map(|e| e.agent_id.clone())
50 .unwrap_or_else(|| format!("pid-{pid}"));
51
52 let mut usage = ResourceUsage::default();
53
54 loop {
55 tokio::select! {
56 _ = cancel.cancelled() => {
57 debug!(pid, "agent loop cancelled");
58 usage.cpu_time_ms = started.elapsed().as_millis() as u64;
60 let _ = process_table.update_resources(pid, usage);
61 return 0;
62 }
63 msg = inbox.recv() => {
64 match msg {
65 Some(message) => {
66 let cmd = extract_cmd(&message);
67
68 #[cfg(feature = "exochain")]
70 if let Some(ref cm) = chain {
71 cm.append(
72 "ipc",
73 "ipc.recv",
74 Some(serde_json::json!({
75 "pid": pid,
76 "from": message.from,
77 "msg_id": &message.id,
78 "cmd": cmd.as_deref().unwrap_or("none"),
79 })),
80 );
81 }
82
83 if cmd.as_deref() == Some("suspend") {
85 let reply = KernelMessage::with_correlation(
88 pid,
89 MessageTarget::Process(message.from),
90 MessagePayload::Json(serde_json::json!({
91 "status": "suspended",
92 "pid": pid,
93 })),
94 message.id.clone(),
95 );
96 send_reply(&a2a, reply, #[cfg(feature = "exochain")] chain.as_deref()).await;
97 usage.messages_sent += 1;
98
99 let _ = process_table.update_state(pid, ProcessState::Suspended);
101 debug!(pid, "agent suspended");
102
103 #[cfg(feature = "exochain")]
104 if let Some(ref cm) = chain {
105 cm.append(
106 "supervisor",
107 "agent.suspend",
108 Some(serde_json::json!({
109 "pid": pid,
110 "from": message.from,
111 "msg_id": &message.id,
112 })),
113 );
114 }
115
116 let resumed = parking_loop(
118 pid,
119 &cancel,
120 &mut inbox,
121 &a2a,
122 &process_table,
123 #[cfg(feature = "exochain")] chain.as_deref(),
124 &mut usage,
125 ).await;
126
127 if !resumed {
128 usage.cpu_time_ms = started.elapsed().as_millis() as u64;
130 let _ = process_table.update_resources(pid, usage);
131 return 0;
132 }
133 continue;
135 }
136
137 #[cfg(feature = "exochain")]
139 if let Some(ref gate_backend) = gate
140 && let Some(ref cmd_str) = cmd
141 {
142 let action = match cmd_str.as_str() {
143 "exec" => Some("tool.exec"),
144 "cron.add" => Some("service.cron.add"),
145 "cron.remove" => Some("service.cron.remove"),
146 _ => None,
147 };
148 if let Some(action_str) = action {
149 let context = if cmd_str == "exec" {
151 let tool_name = extract_tool_name(&message);
152 let effect = tool_name.as_deref().and_then(|tn| {
153 tool_registry.as_ref().and_then(|reg| {
154 reg.get(tn).map(|t| &t.spec().effect)
155 })
156 });
157 let mut ctx = serde_json::json!({"pid": pid});
158 if let Some(tn) = &tool_name {
159 ctx["tool"] = serde_json::json!(tn);
160 }
161 if let Some(ev) = effect {
162 ctx["effect"] = serde_json::json!({
163 "risk": ev.risk,
164 "security": ev.security,
165 "privacy": ev.privacy,
166 });
167 }
168 ctx
169 } else {
170 serde_json::json!({"pid": pid})
171 };
172 let decision = gate_backend.check(&agent_id, action_str, &context);
173 match decision {
174 crate::gate::GateDecision::Deny { reason, .. } => {
175 let reply = KernelMessage::with_correlation(
176 pid,
177 MessageTarget::Process(message.from),
178 MessagePayload::Json(serde_json::json!({
179 "error": reason,
180 "denied": true,
181 })),
182 message.id.clone(),
183 );
184 send_reply(&a2a, reply, chain.as_deref()).await;
185 usage.messages_sent += 1;
186 continue;
187 }
188 crate::gate::GateDecision::Defer { reason } => {
189 let reply = KernelMessage::with_correlation(
190 pid,
191 MessageTarget::Process(message.from),
192 MessagePayload::Json(serde_json::json!({
193 "deferred": true,
194 "reason": reason,
195 })),
196 message.id.clone(),
197 );
198 send_reply(&a2a, reply, chain.as_deref()).await;
199 usage.messages_sent += 1;
200 continue;
201 }
202 crate::gate::GateDecision::Permit { .. } => {
203 }
205 }
206 }
207 }
208
209 if cmd.as_deref() == Some("exec") {
211 usage.tool_calls += 1;
212
213 #[cfg(feature = "exochain")]
215 {
216 let sudo_flag = match &message.payload {
217 MessagePayload::Json(v) => v.get("sudo").and_then(|s| s.as_bool()).unwrap_or(false),
218 _ => false,
219 };
220 if sudo_flag
221 && let Some(ref cm) = chain
222 {
223 cm.append(
224 "security",
225 "sudo.override",
226 Some(serde_json::json!({
227 "pid": pid,
228 "agent_id": &agent_id,
229 "tool": extract_tool_name(&message).unwrap_or_default(),
230 })),
231 );
232 }
233 }
234 }
235
236 handle_message(
237 pid,
238 &message,
239 &a2a,
240 &cron,
241 tool_registry.as_deref(),
242 #[cfg(feature = "exochain")]
243 chain.as_deref(),
244 &started,
245 ).await;
246
247 usage.messages_sent += 1;
248
249 #[cfg(feature = "exochain")]
251 if let Some(ref cm) = chain {
252 cm.append(
253 "ipc",
254 "ipc.ack",
255 Some(serde_json::json!({
256 "pid": pid,
257 "msg_id": &message.id,
258 "cmd": cmd.as_deref().unwrap_or("none"),
259 "status": "processed",
260 })),
261 );
262 }
263
264 if usage.messages_sent % 10 == 0 {
266 usage.cpu_time_ms = started.elapsed().as_millis() as u64;
267 let _ = process_table.update_resources(pid, usage.clone());
268 }
269 }
270 None => {
271 debug!(pid, "inbox closed, exiting");
273 usage.cpu_time_ms = started.elapsed().as_millis() as u64;
274 let _ = process_table.update_resources(pid, usage);
275 return 0;
276 }
277 }
278 }
279 }
280 }
281}
282
283fn extract_cmd(msg: &KernelMessage) -> Option<String> {
285 match &msg.payload {
286 MessagePayload::Json(v) => v.get("cmd").and_then(|c| c.as_str()).map(String::from),
287 MessagePayload::Text(text) => {
288 if let Ok(v) = serde_json::from_str::<serde_json::Value>(text) {
289 v.get("cmd").and_then(|c| c.as_str()).map(String::from)
290 } else {
291 None
292 }
293 }
294 _ => None,
295 }
296}
297
298#[cfg(feature = "exochain")]
300fn extract_tool_name(msg: &KernelMessage) -> Option<String> {
301 match &msg.payload {
302 MessagePayload::Json(v) => v.get("tool").and_then(|t| t.as_str()).map(String::from),
303 MessagePayload::Text(text) => {
304 serde_json::from_str::<serde_json::Value>(text)
305 .ok()
306 .and_then(|v| v.get("tool").and_then(|t| t.as_str()).map(String::from))
307 }
308 _ => None,
309 }
310}
311
312async fn parking_loop(
317 pid: Pid,
318 cancel: &CancellationToken,
319 inbox: &mut mpsc::Receiver<KernelMessage>,
320 a2a: &A2ARouter,
321 process_table: &ProcessTable,
322 #[cfg(feature = "exochain")] chain: Option<&crate::chain::ChainManager>,
323 usage: &mut ResourceUsage,
324) -> bool {
325 loop {
326 tokio::select! {
327 _ = cancel.cancelled() => {
328 debug!(pid, "cancelled while suspended");
329 return false;
330 }
331 msg = inbox.recv() => {
332 match msg {
333 Some(message) => {
334 let cmd = extract_cmd(&message);
335 if cmd.as_deref() == Some("resume") {
336 let _ = process_table.update_state(pid, ProcessState::Running);
338 debug!(pid, "agent resumed");
339
340 #[cfg(feature = "exochain")]
341 if let Some(cm) = chain {
342 cm.append(
343 "supervisor",
344 "agent.resume",
345 Some(serde_json::json!({
346 "pid": pid,
347 "from": message.from,
348 "msg_id": &message.id,
349 })),
350 );
351 }
352
353 let reply = KernelMessage::with_correlation(
354 pid,
355 MessageTarget::Process(message.from),
356 MessagePayload::Json(serde_json::json!({
357 "status": "resumed",
358 "pid": pid,
359 })),
360 message.id.clone(),
361 );
362 send_reply(a2a, reply, #[cfg(feature = "exochain")] chain).await;
363 usage.messages_sent += 1;
364 return true;
365 }
366
367 let reply = KernelMessage::with_correlation(
369 pid,
370 MessageTarget::Process(message.from),
371 MessagePayload::Json(serde_json::json!({
372 "error": "agent suspended",
373 "pid": pid,
374 })),
375 message.id.clone(),
376 );
377 send_reply(a2a, reply, #[cfg(feature = "exochain")] chain).await;
378 usage.messages_sent += 1;
379 }
380 None => {
381 return false;
383 }
384 }
385 }
386 }
387 }
388}
389
390async fn send_reply(
392 a2a: &A2ARouter,
393 reply: KernelMessage,
394 #[cfg(feature = "exochain")] chain: Option<&crate::chain::ChainManager>,
395) {
396 #[cfg(feature = "exochain")]
397 {
398 if let Err(e) = a2a.send_checked(reply, chain).await {
399 warn!(error = %e, "failed to send reply");
400 }
401 }
402 #[cfg(not(feature = "exochain"))]
403 {
404 if let Err(e) = a2a.send(reply).await {
405 warn!(error = %e, "failed to send reply");
406 }
407 }
408}
409
410async fn handle_message(
412 pid: Pid,
413 msg: &KernelMessage,
414 a2a: &A2ARouter,
415 cron: &CronService,
416 tool_registry: Option<&crate::wasm_runner::ToolRegistry>,
417 #[cfg(feature = "exochain")] chain: Option<&crate::chain::ChainManager>,
418 started: &Instant,
419) {
420 let cmd_value = match &msg.payload {
422 MessagePayload::Json(v) => v.clone(),
423 MessagePayload::Text(text) => {
424 match serde_json::from_str::<serde_json::Value>(text) {
426 Ok(v) => v,
427 Err(_) => serde_json::json!({"cmd": "echo", "text": text}),
428 }
429 }
430 MessagePayload::Rvf { segment_type, data } => {
431 debug!(pid, segment_type, data_len = data.len(), "received RVF payload");
435
436 #[cfg(feature = "exochain")]
438 {
439 if let Ok(val) = ciborium::from_reader::<ciborium::Value, _>(&data[..]) {
440 let json_str = serde_json::to_string(&val).unwrap_or_default();
441 match serde_json::from_str::<serde_json::Value>(&json_str) {
442 Ok(v) => v,
443 Err(_) => serde_json::json!({
444 "cmd": "rvf.recv",
445 "segment_type": segment_type,
446 "data_len": data.len(),
447 }),
448 }
449 } else if let Ok(v) = serde_json::from_slice::<serde_json::Value>(data) {
450 v
451 } else {
452 serde_json::json!({
453 "cmd": "rvf.recv",
454 "segment_type": segment_type,
455 "data_len": data.len(),
456 })
457 }
458 }
459 #[cfg(not(feature = "exochain"))]
461 {
462 if let Ok(v) = serde_json::from_slice::<serde_json::Value>(data) {
463 v
464 } else {
465 serde_json::json!({
466 "cmd": "rvf.recv",
467 "segment_type": segment_type,
468 "data_len": data.len(),
469 })
470 }
471 }
472 }
473 _ => {
474 debug!(pid, "ignoring signal message");
475 return;
476 }
477 };
478
479 let cmd = cmd_value
480 .get("cmd")
481 .and_then(|v| v.as_str())
482 .unwrap_or("unknown");
483
484 let response = match cmd {
485 "ping" => {
486 let uptime_ms = started.elapsed().as_millis() as u64;
487 serde_json::json!({
488 "status": "ok",
489 "pid": pid,
490 "uptime_ms": uptime_ms,
491 })
492 }
493 "cron.add" => {
494 let name = cmd_value
495 .get("name")
496 .and_then(|v| v.as_str())
497 .unwrap_or("unnamed")
498 .to_string();
499 let interval_secs = cmd_value
500 .get("interval_secs")
501 .and_then(|v| v.as_u64())
502 .unwrap_or(60);
503 let command = cmd_value
504 .get("command")
505 .and_then(|v| v.as_str())
506 .unwrap_or("ping")
507 .to_string();
508 let target_pid = cmd_value
509 .get("target_pid")
510 .and_then(|v| v.as_u64());
511
512 let job = cron.add_job(name, interval_secs, command, target_pid);
513
514 #[cfg(feature = "exochain")]
515 if let Some(cm) = chain {
516 cm.append(
517 "cron",
518 "cron.add",
519 Some(serde_json::json!({
520 "job_id": job.id,
521 "name": job.name,
522 "interval_secs": job.interval_secs,
523 "via_agent": pid,
524 })),
525 );
526 }
527
528 serde_json::to_value(&job).unwrap_or_default()
529 }
530 "cron.list" => {
531 let jobs = cron.list_jobs();
532 serde_json::to_value(&jobs).unwrap_or_default()
533 }
534 "cron.remove" => {
535 let id = cmd_value
536 .get("id")
537 .and_then(|v| v.as_str())
538 .unwrap_or("");
539 match cron.remove_job(id) {
540 Some(job) => {
541 #[cfg(feature = "exochain")]
542 if let Some(cm) = chain {
543 cm.append(
544 "cron",
545 "cron.remove",
546 Some(serde_json::json!({
547 "job_id": job.id,
548 "name": job.name,
549 "via_agent": pid,
550 })),
551 );
552 }
553 serde_json::json!({"removed": true, "job_id": job.id})
554 }
555 None => serde_json::json!({"removed": false, "error": "job not found"}),
556 }
557 }
558 "exec" => {
559 let tool_name = cmd_value
561 .get("tool")
562 .and_then(|v| v.as_str())
563 .unwrap_or("");
564 let args = cmd_value
565 .get("args")
566 .cloned()
567 .unwrap_or(serde_json::json!({}));
568
569 if tool_name.is_empty() {
570 let text = cmd_value
572 .get("text")
573 .and_then(|v| v.as_str())
574 .unwrap_or("(no input)");
575 serde_json::json!({
576 "status": "ok",
577 "echo": text,
578 "pid": pid,
579 })
580 } else if let Some(registry) = tool_registry {
581 match registry.execute(tool_name, args) {
582 Ok(result) => {
583 #[cfg(feature = "exochain")]
584 if let Some(cm) = chain {
585 cm.append(
586 "tool",
587 "tool.exec",
588 Some(serde_json::json!({
589 "tool": tool_name,
590 "pid": pid,
591 "status": "ok",
592 })),
593 );
594 }
595 serde_json::json!({
596 "status": "ok",
597 "tool": tool_name,
598 "result": result,
599 "pid": pid,
600 })
601 }
602 Err(e) => {
603 #[cfg(feature = "exochain")]
604 if let Some(cm) = chain {
605 cm.append(
606 "tool",
607 "tool.exec",
608 Some(serde_json::json!({
609 "tool": tool_name,
610 "pid": pid,
611 "status": "error",
612 "error": e.to_string(),
613 })),
614 );
615 }
616 serde_json::json!({
617 "error": e.to_string(),
618 "tool": tool_name,
619 "pid": pid,
620 })
621 }
622 }
623 } else {
624 serde_json::json!({
625 "error": "tool registry not available",
626 "tool": tool_name,
627 "pid": pid,
628 })
629 }
630 }
631 "echo" => {
632 let text = cmd_value
633 .get("text")
634 .and_then(|v| v.as_str())
635 .unwrap_or("");
636 serde_json::json!({"echo": text, "pid": pid})
637 }
638 "rvf.recv" => {
639 let seg_type = cmd_value
641 .get("segment_type")
642 .and_then(|v| v.as_u64())
643 .unwrap_or(0);
644 let data_len = cmd_value
645 .get("data_len")
646 .and_then(|v| v.as_u64())
647 .unwrap_or(0);
648 serde_json::json!({
649 "status": "ok",
650 "cmd": "rvf.recv",
651 "segment_type": seg_type,
652 "data_len": data_len,
653 "pid": pid,
654 })
655 }
656 unknown => {
657 serde_json::json!({
658 "error": format!("unknown command: {unknown}"),
659 "pid": pid,
660 })
661 }
662 };
663
664 let reply = KernelMessage::with_correlation(
666 pid,
667 MessageTarget::Process(msg.from),
668 MessagePayload::Json(response),
669 msg.id.clone(),
670 );
671
672 #[cfg(feature = "exochain")]
673 {
674 if let Err(e) = a2a.send_checked(reply, chain).await {
675 warn!(pid, error = %e, "failed to send reply");
676 }
677 }
678 #[cfg(not(feature = "exochain"))]
679 {
680 if let Err(e) = a2a.send(reply).await {
681 warn!(pid, error = %e, "failed to send reply");
682 }
683 }
684}
685
686#[cfg(test)]
687mod tests {
688 use super::*;
689 use crate::capability::{AgentCapabilities, CapabilityChecker};
690 use crate::process::{ProcessEntry, ProcessState, ProcessTable, ResourceUsage};
691 use crate::topic::TopicRouter;
692
693 fn setup() -> (Arc<A2ARouter>, Arc<CronService>, Arc<ProcessTable>) {
694 let pt = Arc::new(ProcessTable::new(64));
695
696 let kernel_entry = ProcessEntry {
698 pid: 0,
699 agent_id: "kernel".into(),
700 state: ProcessState::Running,
701 capabilities: AgentCapabilities::default(),
702 resource_usage: ResourceUsage::default(),
703 cancel_token: CancellationToken::new(),
704 parent_pid: None,
705 };
706 pt.insert_with_pid(kernel_entry).unwrap();
707
708 let checker = Arc::new(CapabilityChecker::new(pt.clone()));
709 let topic_router = Arc::new(TopicRouter::new(pt.clone()));
710 let a2a = Arc::new(A2ARouter::new(pt.clone(), checker, topic_router));
711 let cron = Arc::new(CronService::new());
712 (a2a, cron, pt)
713 }
714
715 fn spawn_agent(
716 pt: &ProcessTable,
717 a2a: &A2ARouter,
718 agent_id: &str,
719 ) -> (Pid, mpsc::Receiver<KernelMessage>) {
720 let entry = ProcessEntry {
721 pid: 0,
722 agent_id: agent_id.into(),
723 state: ProcessState::Running,
724 capabilities: AgentCapabilities::default(),
725 resource_usage: ResourceUsage::default(),
726 cancel_token: CancellationToken::new(),
727 parent_pid: None,
728 };
729 let pid = pt.insert(entry).unwrap();
730 let rx = a2a.create_inbox(pid);
731 (pid, rx)
732 }
733
734 fn spawn_loop(
736 agent_pid: Pid,
737 cancel: CancellationToken,
738 inbox: mpsc::Receiver<KernelMessage>,
739 a2a: Arc<A2ARouter>,
740 cron: Arc<CronService>,
741 pt: Arc<ProcessTable>,
742 ) -> tokio::task::JoinHandle<i32> {
743 tokio::spawn(async move {
744 kernel_agent_loop(
745 agent_pid,
746 cancel,
747 inbox,
748 a2a,
749 cron,
750 pt,
751 None, #[cfg(feature = "exochain")]
753 None,
754 #[cfg(feature = "exochain")]
755 None,
756 )
757 .await
758 })
759 }
760
761 #[tokio::test]
762 async fn ping_command() {
763 let (a2a, cron, pt) = setup();
764 let (agent_pid, inbox) = spawn_agent(&pt, &a2a, "test-agent");
765 let mut kernel_inbox = a2a.create_inbox(0);
766
767 let cancel = CancellationToken::new();
768 let handle = spawn_loop(agent_pid, cancel.clone(), inbox, a2a.clone(), cron, pt);
769
770 let msg = KernelMessage::new(
772 0,
773 MessageTarget::Process(agent_pid),
774 MessagePayload::Json(serde_json::json!({"cmd": "ping"})),
775 );
776 a2a.send(msg).await.unwrap();
777
778 let reply = tokio::time::timeout(
780 std::time::Duration::from_secs(1),
781 kernel_inbox.recv(),
782 )
783 .await
784 .unwrap()
785 .unwrap();
786
787 if let MessagePayload::Json(v) = &reply.payload {
788 assert_eq!(v["status"], "ok");
789 assert_eq!(v["pid"], agent_pid);
790 } else {
791 panic!("expected JSON reply");
792 }
793
794 cancel.cancel();
795 let code = handle.await.unwrap();
796 assert_eq!(code, 0);
797 }
798
799 #[tokio::test]
800 async fn unknown_command() {
801 let (a2a, cron, pt) = setup();
802 let (agent_pid, inbox) = spawn_agent(&pt, &a2a, "test-agent");
803 let mut kernel_inbox = a2a.create_inbox(0);
804
805 let cancel = CancellationToken::new();
806 let handle = spawn_loop(agent_pid, cancel.clone(), inbox, a2a.clone(), cron, pt);
807
808 let msg = KernelMessage::new(
809 0,
810 MessageTarget::Process(agent_pid),
811 MessagePayload::Json(serde_json::json!({"cmd": "nosuch"})),
812 );
813 a2a.send(msg).await.unwrap();
814
815 let reply = tokio::time::timeout(
816 std::time::Duration::from_secs(1),
817 kernel_inbox.recv(),
818 )
819 .await
820 .unwrap()
821 .unwrap();
822
823 if let MessagePayload::Json(v) = &reply.payload {
824 assert!(v["error"].as_str().unwrap().contains("unknown command"));
825 } else {
826 panic!("expected JSON reply");
827 }
828
829 cancel.cancel();
830 handle.await.unwrap();
831 }
832
833 #[tokio::test]
834 async fn cron_add_via_agent() {
835 let (a2a, cron, pt) = setup();
836 let (agent_pid, inbox) = spawn_agent(&pt, &a2a, "test-agent");
837 let mut kernel_inbox = a2a.create_inbox(0);
838
839 let cancel = CancellationToken::new();
840 let handle = spawn_loop(agent_pid, cancel.clone(), inbox, a2a.clone(), cron.clone(), pt);
841
842 let msg = KernelMessage::new(
843 0,
844 MessageTarget::Process(agent_pid),
845 MessagePayload::Json(serde_json::json!({
846 "cmd": "cron.add",
847 "name": "test-job",
848 "interval_secs": 30,
849 "command": "health",
850 })),
851 );
852 a2a.send(msg).await.unwrap();
853
854 let reply = tokio::time::timeout(
855 std::time::Duration::from_secs(1),
856 kernel_inbox.recv(),
857 )
858 .await
859 .unwrap()
860 .unwrap();
861
862 if let MessagePayload::Json(v) = &reply.payload {
863 assert_eq!(v["name"], "test-job");
864 assert!(v["id"].as_str().is_some());
865 } else {
866 panic!("expected JSON reply");
867 }
868
869 assert_eq!(cron.job_count(), 1);
871
872 cancel.cancel();
873 handle.await.unwrap();
874 }
875
876 #[tokio::test]
877 async fn cancellation_exits_cleanly() {
878 let (a2a, cron, pt) = setup();
879 let (agent_pid, inbox) = spawn_agent(&pt, &a2a, "test-agent");
880
881 let cancel = CancellationToken::new();
882 let handle = spawn_loop(agent_pid, cancel.clone(), inbox, a2a, cron, pt);
883
884 cancel.cancel();
885 let code = handle.await.unwrap();
886 assert_eq!(code, 0);
887 }
888
889 #[tokio::test]
890 async fn rvf_json_payload_processed() {
891 let (a2a, cron, pt) = setup();
892 let (agent_pid, inbox) = spawn_agent(&pt, &a2a, "test-agent");
893 let mut kernel_inbox = a2a.create_inbox(0);
894
895 let cancel = CancellationToken::new();
896 let handle = spawn_loop(agent_pid, cancel.clone(), inbox, a2a.clone(), cron, pt);
897
898 let json_bytes = serde_json::to_vec(&serde_json::json!({"cmd": "ping"})).unwrap();
900 let msg = KernelMessage::new(
901 0,
902 MessageTarget::Process(agent_pid),
903 MessagePayload::Rvf {
904 segment_type: 0x40,
905 data: json_bytes,
906 },
907 );
908 a2a.send(msg).await.unwrap();
909
910 let reply = tokio::time::timeout(
911 std::time::Duration::from_secs(1),
912 kernel_inbox.recv(),
913 )
914 .await
915 .unwrap()
916 .unwrap();
917
918 if let MessagePayload::Json(v) = &reply.payload {
919 assert_eq!(v["status"], "ok");
920 assert_eq!(v["pid"], agent_pid);
921 } else {
922 panic!("expected JSON reply to RVF-wrapped ping");
923 }
924
925 cancel.cancel();
926 handle.await.unwrap();
927 }
928
929 #[tokio::test]
930 async fn rvf_opaque_binary_acknowledged() {
931 let (a2a, cron, pt) = setup();
932 let (agent_pid, inbox) = spawn_agent(&pt, &a2a, "test-agent");
933 let mut kernel_inbox = a2a.create_inbox(0);
934
935 let cancel = CancellationToken::new();
936 let handle = spawn_loop(agent_pid, cancel.clone(), inbox, a2a.clone(), cron, pt);
937
938 let msg = KernelMessage::new(
940 0,
941 MessageTarget::Process(agent_pid),
942 MessagePayload::Rvf {
943 segment_type: 0x42,
944 data: vec![0xDE, 0xAD, 0xBE, 0xEF],
945 },
946 );
947 a2a.send(msg).await.unwrap();
948
949 let reply = tokio::time::timeout(
950 std::time::Duration::from_secs(1),
951 kernel_inbox.recv(),
952 )
953 .await
954 .unwrap()
955 .unwrap();
956
957 if let MessagePayload::Json(v) = &reply.payload {
958 assert_eq!(v["cmd"], "rvf.recv");
959 assert_eq!(v["segment_type"], 0x42);
960 assert_eq!(v["data_len"], 4);
961 } else {
962 panic!("expected JSON reply acknowledging RVF binary");
963 }
964
965 cancel.cancel();
966 handle.await.unwrap();
967 }
968
969 #[tokio::test]
970 async fn resource_usage_increments() {
971 let (a2a, cron, pt) = setup();
972 let (agent_pid, inbox) = spawn_agent(&pt, &a2a, "test-agent");
973 let mut kernel_inbox = a2a.create_inbox(0);
974
975 let cancel = CancellationToken::new();
976 let handle = spawn_loop(agent_pid, cancel.clone(), inbox, a2a.clone(), cron, pt.clone());
977
978 let msg = KernelMessage::new(
980 0,
981 MessageTarget::Process(agent_pid),
982 MessagePayload::Json(serde_json::json!({"cmd": "ping"})),
983 );
984 a2a.send(msg).await.unwrap();
985
986 let _reply = tokio::time::timeout(
988 std::time::Duration::from_secs(1),
989 kernel_inbox.recv(),
990 )
991 .await
992 .unwrap()
993 .unwrap();
994
995 cancel.cancel();
997 let _code = handle.await.unwrap();
998
999 let entry = pt.get(agent_pid).unwrap();
1001 assert!(entry.resource_usage.messages_sent >= 1, "messages_sent should be at least 1");
1002 }
1003
1004 #[tokio::test]
1005 async fn suspend_resume_cycle() {
1006 let (a2a, cron, pt) = setup();
1007 let (agent_pid, inbox) = spawn_agent(&pt, &a2a, "test-agent");
1008 let mut kernel_inbox = a2a.create_inbox(0);
1009
1010 let cancel = CancellationToken::new();
1011 let handle = spawn_loop(agent_pid, cancel.clone(), inbox, a2a.clone(), cron, pt.clone());
1012
1013 let msg = KernelMessage::new(
1015 0,
1016 MessageTarget::Process(agent_pid),
1017 MessagePayload::Json(serde_json::json!({"cmd": "suspend"})),
1018 );
1019 a2a.send(msg).await.unwrap();
1020
1021 let reply = tokio::time::timeout(
1023 std::time::Duration::from_secs(1),
1024 kernel_inbox.recv(),
1025 )
1026 .await
1027 .unwrap()
1028 .unwrap();
1029
1030 if let MessagePayload::Json(v) = &reply.payload {
1031 assert_eq!(v["status"], "suspended");
1032 } else {
1033 panic!("expected JSON reply");
1034 }
1035
1036 let entry = pt.get(agent_pid).unwrap();
1038 assert_eq!(entry.state, ProcessState::Suspended);
1039
1040 let msg = KernelMessage::new(
1042 0,
1043 MessageTarget::Process(agent_pid),
1044 MessagePayload::Json(serde_json::json!({"cmd": "ping"})),
1045 );
1046 a2a.send(msg).await.unwrap();
1047
1048 let reply = tokio::time::timeout(
1049 std::time::Duration::from_secs(1),
1050 kernel_inbox.recv(),
1051 )
1052 .await
1053 .unwrap()
1054 .unwrap();
1055
1056 if let MessagePayload::Json(v) = &reply.payload {
1057 assert_eq!(v["error"], "agent suspended");
1058 } else {
1059 panic!("expected JSON error reply");
1060 }
1061
1062 let msg = KernelMessage::new(
1064 0,
1065 MessageTarget::Process(agent_pid),
1066 MessagePayload::Json(serde_json::json!({"cmd": "resume"})),
1067 );
1068 a2a.send(msg).await.unwrap();
1069
1070 let reply = tokio::time::timeout(
1071 std::time::Duration::from_secs(1),
1072 kernel_inbox.recv(),
1073 )
1074 .await
1075 .unwrap()
1076 .unwrap();
1077
1078 if let MessagePayload::Json(v) = &reply.payload {
1079 assert_eq!(v["status"], "resumed");
1080 } else {
1081 panic!("expected JSON reply");
1082 }
1083
1084 let entry = pt.get(agent_pid).unwrap();
1086 assert_eq!(entry.state, ProcessState::Running);
1087
1088 let msg = KernelMessage::new(
1090 0,
1091 MessageTarget::Process(agent_pid),
1092 MessagePayload::Json(serde_json::json!({"cmd": "ping"})),
1093 );
1094 a2a.send(msg).await.unwrap();
1095
1096 let reply = tokio::time::timeout(
1097 std::time::Duration::from_secs(1),
1098 kernel_inbox.recv(),
1099 )
1100 .await
1101 .unwrap()
1102 .unwrap();
1103
1104 if let MessagePayload::Json(v) = &reply.payload {
1105 assert_eq!(v["status"], "ok");
1106 } else {
1107 panic!("expected JSON reply");
1108 }
1109
1110 cancel.cancel();
1111 handle.await.unwrap();
1112 }
1113
1114 #[cfg(feature = "exochain")]
1115 #[tokio::test]
1116 async fn gate_deny_blocks_exec() {
1117 use crate::gate::{GateBackend, GateDecision};
1118
1119 struct AlwaysDeny;
1121 impl GateBackend for AlwaysDeny {
1122 fn check(
1123 &self,
1124 _agent_id: &str,
1125 _action: &str,
1126 _context: &serde_json::Value,
1127 ) -> GateDecision {
1128 GateDecision::Deny {
1129 reason: "test deny".into(),
1130 receipt: None,
1131 }
1132 }
1133 }
1134
1135 let (a2a, cron, pt) = setup();
1136 let (agent_pid, inbox) = spawn_agent(&pt, &a2a, "test-agent");
1137 let mut kernel_inbox = a2a.create_inbox(0);
1138
1139 let cancel = CancellationToken::new();
1140 let cancel2 = cancel.clone();
1141 let a2a2 = a2a.clone();
1142 let pt2 = pt.clone();
1143
1144 let handle = tokio::spawn(async move {
1145 kernel_agent_loop(
1146 agent_pid,
1147 cancel2,
1148 inbox,
1149 a2a2,
1150 cron,
1151 pt2,
1152 None, None, Some(Arc::new(AlwaysDeny) as Arc<dyn GateBackend>),
1155 )
1156 .await
1157 });
1158
1159 let msg = KernelMessage::new(
1161 0,
1162 MessageTarget::Process(agent_pid),
1163 MessagePayload::Json(serde_json::json!({"cmd": "exec", "text": "hello"})),
1164 );
1165 a2a.send(msg).await.unwrap();
1166
1167 let reply = tokio::time::timeout(
1168 std::time::Duration::from_secs(1),
1169 kernel_inbox.recv(),
1170 )
1171 .await
1172 .unwrap()
1173 .unwrap();
1174
1175 if let MessagePayload::Json(v) = &reply.payload {
1176 assert_eq!(v["denied"], true);
1177 assert_eq!(v["error"], "test deny");
1178 } else {
1179 panic!("expected JSON deny reply");
1180 }
1181
1182 cancel.cancel();
1183 handle.await.unwrap();
1184 }
1185
1186 #[cfg(feature = "exochain")]
1187 #[tokio::test]
1188 async fn gate_permit_allows_exec() {
1189 use crate::gate::{GateBackend, GateDecision};
1190
1191 struct AlwaysPermit;
1193 impl GateBackend for AlwaysPermit {
1194 fn check(
1195 &self,
1196 _agent_id: &str,
1197 _action: &str,
1198 _context: &serde_json::Value,
1199 ) -> GateDecision {
1200 GateDecision::Permit { token: None }
1201 }
1202 }
1203
1204 let (a2a, cron, pt) = setup();
1205 let (agent_pid, inbox) = spawn_agent(&pt, &a2a, "test-agent");
1206 let mut kernel_inbox = a2a.create_inbox(0);
1207
1208 let cancel = CancellationToken::new();
1209 let cancel2 = cancel.clone();
1210 let a2a2 = a2a.clone();
1211 let pt2 = pt.clone();
1212
1213 let handle = tokio::spawn(async move {
1214 kernel_agent_loop(
1215 agent_pid,
1216 cancel2,
1217 inbox,
1218 a2a2,
1219 cron,
1220 pt2,
1221 None, None, Some(Arc::new(AlwaysPermit) as Arc<dyn GateBackend>),
1224 )
1225 .await
1226 });
1227
1228 let msg = KernelMessage::new(
1230 0,
1231 MessageTarget::Process(agent_pid),
1232 MessagePayload::Json(serde_json::json!({"cmd": "exec", "text": "hello"})),
1233 );
1234 a2a.send(msg).await.unwrap();
1235
1236 let reply = tokio::time::timeout(
1237 std::time::Duration::from_secs(1),
1238 kernel_inbox.recv(),
1239 )
1240 .await
1241 .unwrap()
1242 .unwrap();
1243
1244 if let MessagePayload::Json(v) = &reply.payload {
1245 assert_eq!(v["status"], "ok");
1246 assert_eq!(v["echo"], "hello");
1247 } else {
1248 panic!("expected JSON reply");
1249 }
1250
1251 cancel.cancel();
1252 handle.await.unwrap();
1253 }
1254
1255 #[cfg(feature = "exochain")]
1256 #[tokio::test]
1257 async fn chain_logs_ipc_recv_ack() {
1258 let (a2a, cron, pt) = setup();
1259 let (agent_pid, inbox) = spawn_agent(&pt, &a2a, "chain-test");
1260 let mut kernel_inbox = a2a.create_inbox(0);
1261
1262 let cm = Arc::new(crate::chain::ChainManager::new(0, 1000));
1263 let cancel = CancellationToken::new();
1264 let cancel2 = cancel.clone();
1265 let a2a2 = a2a.clone();
1266 let pt2 = pt.clone();
1267 let cm2 = cm.clone();
1268
1269 let handle = tokio::spawn(async move {
1270 kernel_agent_loop(
1271 agent_pid,
1272 cancel2,
1273 inbox,
1274 a2a2,
1275 cron,
1276 pt2,
1277 None, Some(cm2),
1279 None, )
1281 .await
1282 });
1283
1284 let msg = KernelMessage::new(
1286 0,
1287 MessageTarget::Process(agent_pid),
1288 MessagePayload::Json(serde_json::json!({"cmd": "ping"})),
1289 );
1290 let msg_id = msg.id.clone();
1291 a2a.send(msg).await.unwrap();
1292
1293 let _reply = tokio::time::timeout(
1295 std::time::Duration::from_secs(1),
1296 kernel_inbox.recv(),
1297 )
1298 .await
1299 .unwrap()
1300 .unwrap();
1301
1302 cancel.cancel();
1303 handle.await.unwrap();
1304
1305 let events = cm.tail(10);
1307 let recv_evt = events.iter().find(|e| e.kind == "ipc.recv");
1308 let ack_evt = events.iter().find(|e| e.kind == "ipc.ack");
1309
1310 assert!(recv_evt.is_some(), "expected ipc.recv event on chain");
1311 assert!(ack_evt.is_some(), "expected ipc.ack event on chain");
1312
1313 let recv_payload = recv_evt.unwrap().payload.as_ref().unwrap();
1314 assert_eq!(recv_payload["pid"], agent_pid);
1315 assert_eq!(recv_payload["from"], 0);
1316 assert_eq!(recv_payload["msg_id"], msg_id);
1317 assert_eq!(recv_payload["cmd"], "ping");
1318
1319 let ack_payload = ack_evt.unwrap().payload.as_ref().unwrap();
1320 assert_eq!(ack_payload["pid"], agent_pid);
1321 assert_eq!(ack_payload["msg_id"], msg_id);
1322 assert_eq!(ack_payload["cmd"], "ping");
1323 assert_eq!(ack_payload["status"], "processed");
1324 }
1325
1326 #[cfg(feature = "exochain")]
1327 #[tokio::test]
1328 async fn chain_logs_suspend_resume() {
1329 let (a2a, cron, pt) = setup();
1330 let (agent_pid, inbox) = spawn_agent(&pt, &a2a, "suspend-test");
1331 let mut kernel_inbox = a2a.create_inbox(0);
1332
1333 let cm = Arc::new(crate::chain::ChainManager::new(0, 1000));
1334 let cancel = CancellationToken::new();
1335 let cancel2 = cancel.clone();
1336 let a2a2 = a2a.clone();
1337 let pt2 = pt.clone();
1338 let cm2 = cm.clone();
1339
1340 let handle = tokio::spawn(async move {
1341 kernel_agent_loop(
1342 agent_pid,
1343 cancel2,
1344 inbox,
1345 a2a2,
1346 cron,
1347 pt2,
1348 None, Some(cm2),
1350 None, )
1352 .await
1353 });
1354
1355 let suspend_msg = KernelMessage::new(
1357 0,
1358 MessageTarget::Process(agent_pid),
1359 MessagePayload::Json(serde_json::json!({"cmd": "suspend"})),
1360 );
1361 let suspend_id = suspend_msg.id.clone();
1362 a2a.send(suspend_msg).await.unwrap();
1363
1364 let _reply = tokio::time::timeout(
1366 std::time::Duration::from_secs(1),
1367 kernel_inbox.recv(),
1368 )
1369 .await
1370 .unwrap()
1371 .unwrap();
1372
1373 let resume_msg = KernelMessage::new(
1375 0,
1376 MessageTarget::Process(agent_pid),
1377 MessagePayload::Json(serde_json::json!({"cmd": "resume"})),
1378 );
1379 let resume_id = resume_msg.id.clone();
1380 a2a.send(resume_msg).await.unwrap();
1381
1382 let _reply = tokio::time::timeout(
1384 std::time::Duration::from_secs(1),
1385 kernel_inbox.recv(),
1386 )
1387 .await
1388 .unwrap()
1389 .unwrap();
1390
1391 cancel.cancel();
1392 handle.await.unwrap();
1393
1394 let events = cm.tail(20);
1396 let suspend_evt = events.iter().find(|e| e.kind == "agent.suspend");
1397 let resume_evt = events.iter().find(|e| e.kind == "agent.resume");
1398
1399 assert!(suspend_evt.is_some(), "expected agent.suspend event on chain");
1400 assert!(resume_evt.is_some(), "expected agent.resume event on chain");
1401
1402 let sp = suspend_evt.unwrap().payload.as_ref().unwrap();
1403 assert_eq!(sp["pid"], agent_pid);
1404 assert_eq!(sp["from"], 0);
1405 assert_eq!(sp["msg_id"], suspend_id);
1406
1407 let rp = resume_evt.unwrap().payload.as_ref().unwrap();
1408 assert_eq!(rp["pid"], agent_pid);
1409 assert_eq!(rp["from"], 0);
1410 assert_eq!(rp["msg_id"], resume_id);
1411 }
1412
1413 #[tokio::test]
1416 async fn echo_command() {
1417 let (a2a, cron, pt) = setup();
1418 let (agent_pid, inbox) = spawn_agent(&pt, &a2a, "echo-agent");
1419 let mut kernel_inbox = a2a.create_inbox(0);
1420
1421 let cancel = CancellationToken::new();
1422 let handle = spawn_loop(agent_pid, cancel.clone(), inbox, a2a.clone(), cron, pt);
1423
1424 let msg = KernelMessage::new(
1425 0,
1426 MessageTarget::Process(agent_pid),
1427 MessagePayload::Json(serde_json::json!({"cmd": "echo", "text": "hello world"})),
1428 );
1429 a2a.send(msg).await.unwrap();
1430
1431 let reply = tokio::time::timeout(
1432 std::time::Duration::from_secs(1),
1433 kernel_inbox.recv(),
1434 )
1435 .await
1436 .unwrap()
1437 .unwrap();
1438
1439 if let MessagePayload::Json(v) = &reply.payload {
1440 assert_eq!(v["echo"], "hello world");
1441 assert_eq!(v["pid"], agent_pid);
1442 } else {
1443 panic!("expected JSON reply");
1444 }
1445
1446 cancel.cancel();
1447 handle.await.unwrap();
1448 }
1449
1450 #[tokio::test]
1451 async fn text_payload_parsed_as_json() {
1452 let (a2a, cron, pt) = setup();
1453 let (agent_pid, inbox) = spawn_agent(&pt, &a2a, "text-agent");
1454 let mut kernel_inbox = a2a.create_inbox(0);
1455
1456 let cancel = CancellationToken::new();
1457 let handle = spawn_loop(agent_pid, cancel.clone(), inbox, a2a.clone(), cron, pt);
1458
1459 let msg = KernelMessage::new(
1461 0,
1462 MessageTarget::Process(agent_pid),
1463 MessagePayload::Text(r#"{"cmd": "ping"}"#.into()),
1464 );
1465 a2a.send(msg).await.unwrap();
1466
1467 let reply = tokio::time::timeout(
1468 std::time::Duration::from_secs(1),
1469 kernel_inbox.recv(),
1470 )
1471 .await
1472 .unwrap()
1473 .unwrap();
1474
1475 if let MessagePayload::Json(v) = &reply.payload {
1476 assert_eq!(v["status"], "ok");
1477 } else {
1478 panic!("expected JSON reply");
1479 }
1480
1481 cancel.cancel();
1482 handle.await.unwrap();
1483 }
1484
1485 #[tokio::test]
1486 async fn text_payload_non_json_becomes_echo() {
1487 let (a2a, cron, pt) = setup();
1488 let (agent_pid, inbox) = spawn_agent(&pt, &a2a, "text-echo-agent");
1489 let mut kernel_inbox = a2a.create_inbox(0);
1490
1491 let cancel = CancellationToken::new();
1492 let handle = spawn_loop(agent_pid, cancel.clone(), inbox, a2a.clone(), cron, pt);
1493
1494 let msg = KernelMessage::new(
1496 0,
1497 MessageTarget::Process(agent_pid),
1498 MessagePayload::Text("just plain text".into()),
1499 );
1500 a2a.send(msg).await.unwrap();
1501
1502 let reply = tokio::time::timeout(
1503 std::time::Duration::from_secs(1),
1504 kernel_inbox.recv(),
1505 )
1506 .await
1507 .unwrap()
1508 .unwrap();
1509
1510 if let MessagePayload::Json(v) = &reply.payload {
1511 assert_eq!(v["echo"], "just plain text");
1513 } else {
1514 panic!("expected JSON reply");
1515 }
1516
1517 cancel.cancel();
1518 handle.await.unwrap();
1519 }
1520
1521 #[tokio::test]
1522 async fn inbox_close_causes_clean_exit() {
1523 let pt = Arc::new(ProcessTable::new(64));
1524
1525 let kernel_entry = ProcessEntry {
1527 pid: 0,
1528 agent_id: "kernel".into(),
1529 state: ProcessState::Running,
1530 capabilities: AgentCapabilities::default(),
1531 resource_usage: ResourceUsage::default(),
1532 cancel_token: CancellationToken::new(),
1533 parent_pid: None,
1534 };
1535 pt.insert_with_pid(kernel_entry).unwrap();
1536
1537 let checker = Arc::new(CapabilityChecker::new(pt.clone()));
1538 let topic_router = Arc::new(TopicRouter::new(pt.clone()));
1539 let a2a = Arc::new(A2ARouter::new(pt.clone(), checker, topic_router));
1540 let cron = Arc::new(CronService::new());
1541
1542 let (tx, rx) = mpsc::channel(32);
1544 let agent_entry = ProcessEntry {
1545 pid: 0,
1546 agent_id: "close-agent".into(),
1547 state: ProcessState::Running,
1548 capabilities: AgentCapabilities::default(),
1549 resource_usage: ResourceUsage::default(),
1550 cancel_token: CancellationToken::new(),
1551 parent_pid: None,
1552 };
1553 let agent_pid = pt.insert(agent_entry).unwrap();
1554
1555 let cancel = CancellationToken::new();
1556 let handle = spawn_loop(agent_pid, cancel.clone(), rx, a2a, cron, pt);
1557
1558 drop(tx);
1560
1561 let code = tokio::time::timeout(
1562 std::time::Duration::from_secs(2),
1563 handle,
1564 )
1565 .await
1566 .unwrap()
1567 .unwrap();
1568
1569 assert_eq!(code, 0, "should exit cleanly when inbox closes");
1570 }
1571
1572 #[tokio::test]
1573 async fn cron_list_returns_empty() {
1574 let (a2a, cron, pt) = setup();
1575 let (agent_pid, inbox) = spawn_agent(&pt, &a2a, "cron-list-agent");
1576 let mut kernel_inbox = a2a.create_inbox(0);
1577
1578 let cancel = CancellationToken::new();
1579 let handle = spawn_loop(agent_pid, cancel.clone(), inbox, a2a.clone(), cron, pt);
1580
1581 let msg = KernelMessage::new(
1582 0,
1583 MessageTarget::Process(agent_pid),
1584 MessagePayload::Json(serde_json::json!({"cmd": "cron.list"})),
1585 );
1586 a2a.send(msg).await.unwrap();
1587
1588 let reply = tokio::time::timeout(
1589 std::time::Duration::from_secs(1),
1590 kernel_inbox.recv(),
1591 )
1592 .await
1593 .unwrap()
1594 .unwrap();
1595
1596 if let MessagePayload::Json(v) = &reply.payload {
1597 assert!(v.is_array(), "cron.list should return array");
1598 assert_eq!(v.as_array().unwrap().len(), 0, "should be empty");
1599 } else {
1600 panic!("expected JSON reply");
1601 }
1602
1603 cancel.cancel();
1604 handle.await.unwrap();
1605 }
1606
1607 #[tokio::test]
1608 async fn cron_remove_nonexistent() {
1609 let (a2a, cron, pt) = setup();
1610 let (agent_pid, inbox) = spawn_agent(&pt, &a2a, "cron-rm-agent");
1611 let mut kernel_inbox = a2a.create_inbox(0);
1612
1613 let cancel = CancellationToken::new();
1614 let handle = spawn_loop(agent_pid, cancel.clone(), inbox, a2a.clone(), cron, pt);
1615
1616 let msg = KernelMessage::new(
1617 0,
1618 MessageTarget::Process(agent_pid),
1619 MessagePayload::Json(serde_json::json!({"cmd": "cron.remove", "id": "nonexistent"})),
1620 );
1621 a2a.send(msg).await.unwrap();
1622
1623 let reply = tokio::time::timeout(
1624 std::time::Duration::from_secs(1),
1625 kernel_inbox.recv(),
1626 )
1627 .await
1628 .unwrap()
1629 .unwrap();
1630
1631 if let MessagePayload::Json(v) = &reply.payload {
1632 assert_eq!(v["removed"], false);
1633 assert!(v["error"].as_str().is_some());
1634 } else {
1635 panic!("expected JSON reply");
1636 }
1637
1638 cancel.cancel();
1639 handle.await.unwrap();
1640 }
1641
1642 #[tokio::test]
1643 async fn exec_without_tool_echoes() {
1644 let (a2a, cron, pt) = setup();
1645 let (agent_pid, inbox) = spawn_agent(&pt, &a2a, "exec-agent");
1646 let mut kernel_inbox = a2a.create_inbox(0);
1647
1648 let cancel = CancellationToken::new();
1649 let handle = spawn_loop(agent_pid, cancel.clone(), inbox, a2a.clone(), cron, pt);
1650
1651 let msg = KernelMessage::new(
1653 0,
1654 MessageTarget::Process(agent_pid),
1655 MessagePayload::Json(serde_json::json!({"cmd": "exec", "text": "fallback"})),
1656 );
1657 a2a.send(msg).await.unwrap();
1658
1659 let reply = tokio::time::timeout(
1660 std::time::Duration::from_secs(1),
1661 kernel_inbox.recv(),
1662 )
1663 .await
1664 .unwrap()
1665 .unwrap();
1666
1667 if let MessagePayload::Json(v) = &reply.payload {
1668 assert_eq!(v["status"], "ok");
1669 assert_eq!(v["echo"], "fallback");
1670 } else {
1671 panic!("expected JSON reply");
1672 }
1673
1674 cancel.cancel();
1675 handle.await.unwrap();
1676 }
1677
1678 #[tokio::test]
1679 async fn exec_with_tool_name_no_registry() {
1680 let (a2a, cron, pt) = setup();
1681 let (agent_pid, inbox) = spawn_agent(&pt, &a2a, "exec-noreg-agent");
1682 let mut kernel_inbox = a2a.create_inbox(0);
1683
1684 let cancel = CancellationToken::new();
1685 let handle = spawn_loop(agent_pid, cancel.clone(), inbox, a2a.clone(), cron, pt);
1686
1687 let msg = KernelMessage::new(
1689 0,
1690 MessageTarget::Process(agent_pid),
1691 MessagePayload::Json(serde_json::json!({"cmd": "exec", "tool": "fs.read", "args": {}})),
1692 );
1693 a2a.send(msg).await.unwrap();
1694
1695 let reply = tokio::time::timeout(
1696 std::time::Duration::from_secs(1),
1697 kernel_inbox.recv(),
1698 )
1699 .await
1700 .unwrap()
1701 .unwrap();
1702
1703 if let MessagePayload::Json(v) = &reply.payload {
1704 assert!(
1705 v["error"].as_str().unwrap().contains("tool registry not available"),
1706 "should report tool registry unavailable"
1707 );
1708 } else {
1709 panic!("expected JSON reply");
1710 }
1711
1712 cancel.cancel();
1713 handle.await.unwrap();
1714 }
1715
1716 #[tokio::test]
1717 async fn multiple_messages_increment_usage() {
1718 let (a2a, cron, pt) = setup();
1719 let (agent_pid, inbox) = spawn_agent(&pt, &a2a, "multi-msg-agent");
1720 let mut kernel_inbox = a2a.create_inbox(0);
1721
1722 let cancel = CancellationToken::new();
1723 let handle = spawn_loop(agent_pid, cancel.clone(), inbox, a2a.clone(), cron, pt.clone());
1724
1725 for _ in 0..3 {
1727 let msg = KernelMessage::new(
1728 0,
1729 MessageTarget::Process(agent_pid),
1730 MessagePayload::Json(serde_json::json!({"cmd": "ping"})),
1731 );
1732 a2a.send(msg).await.unwrap();
1733 let _reply = tokio::time::timeout(
1734 std::time::Duration::from_secs(1),
1735 kernel_inbox.recv(),
1736 )
1737 .await
1738 .unwrap()
1739 .unwrap();
1740 }
1741
1742 cancel.cancel();
1743 let _code = handle.await.unwrap();
1744
1745 let entry = pt.get(agent_pid).unwrap();
1746 assert!(
1747 entry.resource_usage.messages_sent >= 3,
1748 "should have sent at least 3 messages, got {}",
1749 entry.resource_usage.messages_sent
1750 );
1751 }
1752
1753 #[tokio::test]
1754 async fn cancel_during_suspend_exits_cleanly() {
1755 let (a2a, cron, pt) = setup();
1756 let (agent_pid, inbox) = spawn_agent(&pt, &a2a, "cancel-suspend-agent");
1757 let mut kernel_inbox = a2a.create_inbox(0);
1758
1759 let cancel = CancellationToken::new();
1760 let handle = spawn_loop(agent_pid, cancel.clone(), inbox, a2a.clone(), cron, pt);
1761
1762 let msg = KernelMessage::new(
1764 0,
1765 MessageTarget::Process(agent_pid),
1766 MessagePayload::Json(serde_json::json!({"cmd": "suspend"})),
1767 );
1768 a2a.send(msg).await.unwrap();
1769
1770 let _reply = tokio::time::timeout(
1772 std::time::Duration::from_secs(1),
1773 kernel_inbox.recv(),
1774 )
1775 .await
1776 .unwrap()
1777 .unwrap();
1778
1779 cancel.cancel();
1781 let code = tokio::time::timeout(
1782 std::time::Duration::from_secs(2),
1783 handle,
1784 )
1785 .await
1786 .unwrap()
1787 .unwrap();
1788
1789 assert_eq!(code, 0, "should exit cleanly when cancelled during suspend");
1790 }
1791
1792 #[tokio::test]
1793 async fn extract_cmd_from_json() {
1794 let msg = KernelMessage::new(
1795 0,
1796 MessageTarget::Process(1),
1797 MessagePayload::Json(serde_json::json!({"cmd": "test_cmd"})),
1798 );
1799 assert_eq!(extract_cmd(&msg), Some("test_cmd".to_string()));
1800 }
1801
1802 #[tokio::test]
1803 async fn extract_cmd_from_text() {
1804 let msg = KernelMessage::new(
1805 0,
1806 MessageTarget::Process(1),
1807 MessagePayload::Text(r#"{"cmd": "from_text"}"#.into()),
1808 );
1809 assert_eq!(extract_cmd(&msg), Some("from_text".to_string()));
1810 }
1811
1812 #[tokio::test]
1813 async fn extract_cmd_from_plain_text_returns_none() {
1814 let msg = KernelMessage::new(
1815 0,
1816 MessageTarget::Process(1),
1817 MessagePayload::Text("not json".into()),
1818 );
1819 assert_eq!(extract_cmd(&msg), None);
1820 }
1821
1822 #[tokio::test]
1823 async fn extract_cmd_from_signal_returns_none() {
1824 let msg = KernelMessage::new(
1825 0,
1826 MessageTarget::Process(1),
1827 MessagePayload::Signal(crate::ipc::KernelSignal::Shutdown),
1828 );
1829 assert_eq!(extract_cmd(&msg), None);
1830 }
1831}