1use agentzero_core::{Tool, ToolContext, ToolResult};
2use agentzero_storage::EncryptedJsonStore;
3use anyhow::{anyhow, Context};
4use async_trait::async_trait;
5use serde::{Deserialize, Serialize};
6use serde_json::json;
7use std::path::{Path, PathBuf};
8use std::time::{SystemTime, UNIX_EPOCH};
9
10const IPC_STORE_FILE: &str = "ipc.json";
11
12#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
13struct IpcMessage {
14 from: String,
15 to: String,
16 payload: String,
17 created_at_epoch_secs: u64,
18}
19
20#[derive(Debug, Clone, Serialize, Deserialize)]
21#[serde(tag = "op", rename_all = "snake_case")]
22enum IpcRequest {
23 Send {
24 from: String,
25 to: String,
26 payload: String,
27 },
28 Recv {
29 to: String,
30 },
31 List {
32 to: Option<String>,
33 from: Option<String>,
34 limit: Option<usize>,
35 },
36 Clear {
37 to: Option<String>,
38 from: Option<String>,
39 },
40}
41
42pub struct AgentsIpcTool;
43
44#[async_trait]
45impl Tool for AgentsIpcTool {
46 fn name(&self) -> &'static str {
47 "agents_ipc"
48 }
49
50 fn description(&self) -> &'static str {
51 "Inter-process communication between agents: send messages and receive responses."
52 }
53
54 fn input_schema(&self) -> Option<serde_json::Value> {
55 Some(serde_json::json!({
56 "type": "object",
57 "properties": {
58 "op": { "type": "string", "enum": ["send", "recv", "list", "clear"], "description": "The IPC operation to perform" },
59 "from": { "type": "string", "description": "Sender agent name (required for send)" },
60 "to": { "type": "string", "description": "Recipient agent name (required for send/recv)" },
61 "payload": { "type": "string", "description": "Message payload (required for send)" },
62 "limit": { "type": "integer", "description": "Max messages to return (for list)" }
63 },
64 "required": ["op"],
65 "additionalProperties": false
66 }))
67 }
68
69 async fn execute(&self, input: &str, ctx: &ToolContext) -> anyhow::Result<ToolResult> {
70 let req: IpcRequest =
71 serde_json::from_str(input).context("agents_ipc input must be valid JSON request")?;
72 let ipc_dir = ipc_dir(&ctx.workspace_root);
73 let store = EncryptedJsonStore::in_config_dir(&ipc_dir, IPC_STORE_FILE)?;
74 let mut messages: Vec<IpcMessage> = store.load_or_default()?;
75
76 let output = match req {
77 IpcRequest::Send { from, to, payload } => {
78 if from.trim().is_empty() || to.trim().is_empty() {
79 return Err(anyhow!("`from` and `to` must not be empty"));
80 }
81 messages.push(IpcMessage {
82 from,
83 to,
84 payload,
85 created_at_epoch_secs: now_epoch_secs(),
86 });
87 store.save(&messages)?;
88 json!({
89 "queued": messages.len(),
90 "status": "ok"
91 })
92 }
93 IpcRequest::Recv { to } => {
94 if to.trim().is_empty() {
95 return Err(anyhow!("`to` must not be empty"));
96 }
97 let idx = messages.iter().position(|msg| msg.to == to);
98 let received = idx.map(|index| messages.remove(index));
99 store.save(&messages)?;
100 json!({
101 "message": received,
102 "remaining": messages.len()
103 })
104 }
105 IpcRequest::List { to, from, limit } => {
106 let iter = messages.iter().filter(|msg| {
107 to.as_ref()
108 .map(|expected| &msg.to == expected)
109 .unwrap_or(true)
110 && from
111 .as_ref()
112 .map(|expected| &msg.from == expected)
113 .unwrap_or(true)
114 });
115 let listed = if let Some(limit) = limit {
116 iter.take(limit).cloned().collect::<Vec<_>>()
117 } else {
118 iter.cloned().collect::<Vec<_>>()
119 };
120 json!({
121 "messages": listed,
122 "count": listed.len()
123 })
124 }
125 IpcRequest::Clear { to, from } => {
126 let before = messages.len();
127 messages.retain(|msg| {
128 let to_match = to
129 .as_ref()
130 .map(|expected| &msg.to == expected)
131 .unwrap_or(true);
132 let from_match = from
133 .as_ref()
134 .map(|expected| &msg.from == expected)
135 .unwrap_or(true);
136 !(to_match && from_match)
137 });
138 let removed = before.saturating_sub(messages.len());
139 store.save(&messages)?;
140 json!({
141 "removed": removed,
142 "remaining": messages.len()
143 })
144 }
145 };
146
147 Ok(ToolResult {
148 output: serde_json::to_string_pretty(&output)?,
149 })
150 }
151}
152
153fn ipc_dir(workspace_root: &str) -> PathBuf {
154 Path::new(workspace_root).join(".agentzero")
155}
156
157fn now_epoch_secs() -> u64 {
158 SystemTime::now()
159 .duration_since(UNIX_EPOCH)
160 .expect("time should be after epoch")
161 .as_secs()
162}
163
164#[cfg(test)]
165mod tests {
166 use super::AgentsIpcTool;
167 use agentzero_core::{Tool, ToolContext};
168 use std::fs;
169 use std::sync::atomic::{AtomicU64, Ordering};
170 use std::time::{SystemTime, UNIX_EPOCH};
171
172 static TEST_COUNTER: AtomicU64 = AtomicU64::new(0);
173
174 fn temp_dir() -> std::path::PathBuf {
175 let nanos = SystemTime::now()
176 .duration_since(UNIX_EPOCH)
177 .expect("time should move forward")
178 .as_nanos();
179 let seq = TEST_COUNTER.fetch_add(1, Ordering::Relaxed);
180 let dir = std::env::temp_dir().join(format!(
181 "agentzero-ipc-tool-test-{}-{nanos}-{seq}",
182 std::process::id()
183 ));
184 fs::create_dir_all(&dir).expect("temp dir should be created");
185 dir
186 }
187
188 #[tokio::test]
189 async fn agents_ipc_send_and_recv_success_path() {
190 let dir = temp_dir();
191 let ctx = ToolContext::new(dir.to_string_lossy().to_string());
192 let tool = AgentsIpcTool;
193
194 tool.execute(
195 r#"{"op":"send","from":"planner","to":"worker","payload":"do task"}"#,
196 &ctx,
197 )
198 .await
199 .expect("send should succeed");
200
201 let recv = tool
202 .execute(r#"{"op":"recv","to":"worker"}"#, &ctx)
203 .await
204 .expect("recv should succeed");
205 assert!(recv.output.contains("\"payload\": \"do task\""));
206
207 let ipc_file = dir.join(".agentzero").join("ipc.json");
209 if ipc_file.exists() {
210 let raw = fs::read_to_string(&ipc_file).unwrap_or_default();
211 assert!(
212 !raw.contains("\"planner\""),
213 "IPC store should be encrypted, not plaintext"
214 );
215 }
216
217 fs::remove_dir_all(dir).expect("temp dir should be removed");
218 }
219
220 #[tokio::test]
221 async fn agents_ipc_rejects_invalid_json_negative_path() {
222 let dir = temp_dir();
223 let ctx = ToolContext::new(dir.to_string_lossy().to_string());
224 let tool = AgentsIpcTool;
225
226 let err = tool
227 .execute("not-json", &ctx)
228 .await
229 .expect_err("invalid json should fail");
230 assert!(err.to_string().contains("valid JSON"));
231
232 fs::remove_dir_all(dir).expect("temp dir should be removed");
233 }
234
235 #[tokio::test]
236 async fn agents_ipc_recv_missing_returns_no_messages() {
237 let dir = temp_dir();
238 let ctx = ToolContext::new(dir.to_string_lossy().to_string());
239 let tool = AgentsIpcTool;
240
241 let result = tool
242 .execute(r#"{"op":"recv","to":"nobody"}"#, &ctx)
243 .await
244 .expect("recv for empty mailbox should succeed");
245 assert!(
246 result.output.contains("\"message\": null")
247 || result.output.contains("\"remaining\": 0"),
248 "should indicate no messages, got: {}",
249 result.output
250 );
251 fs::remove_dir_all(dir).ok();
252 }
253
254 #[tokio::test]
255 async fn agents_ipc_message_round_trip() {
256 let dir = temp_dir();
257 let ctx = ToolContext::new(dir.to_string_lossy().to_string());
258 let tool = AgentsIpcTool;
259
260 tool.execute(
262 r#"{"op":"send","from":"alice","to":"bob","payload":"msg-1"}"#,
263 &ctx,
264 )
265 .await
266 .expect("send 1");
267 tool.execute(
268 r#"{"op":"send","from":"alice","to":"bob","payload":"msg-2"}"#,
269 &ctx,
270 )
271 .await
272 .expect("send 2");
273
274 let list = tool
276 .execute(r#"{"op":"list","to":"bob"}"#, &ctx)
277 .await
278 .expect("list");
279 assert!(list.output.contains("msg-1"), "list should contain msg-1");
280 assert!(list.output.contains("msg-2"), "list should contain msg-2");
281
282 fs::remove_dir_all(dir).ok();
283 }
284}