Skip to main content

agentzero_tools/
agents_ipc.rs

1use agentzero_core::{Tool, ToolContext, ToolResult};
2use agentzero_storage::EncryptedJsonStore;
3use anyhow::{anyhow, Context};
4use async_trait::async_trait;
5use serde::{Deserialize, Serialize};
6use serde_json::json;
7use std::path::{Path, PathBuf};
8use std::time::{SystemTime, UNIX_EPOCH};
9
10const IPC_STORE_FILE: &str = "ipc.json";
11
12#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
13struct IpcMessage {
14    from: String,
15    to: String,
16    payload: String,
17    created_at_epoch_secs: u64,
18}
19
20#[derive(Debug, Clone, Serialize, Deserialize)]
21#[serde(tag = "op", rename_all = "snake_case")]
22enum IpcRequest {
23    Send {
24        from: String,
25        to: String,
26        payload: String,
27    },
28    Recv {
29        to: String,
30    },
31    List {
32        to: Option<String>,
33        from: Option<String>,
34        limit: Option<usize>,
35    },
36    Clear {
37        to: Option<String>,
38        from: Option<String>,
39    },
40}
41
42pub struct AgentsIpcTool;
43
44#[async_trait]
45impl Tool for AgentsIpcTool {
46    fn name(&self) -> &'static str {
47        "agents_ipc"
48    }
49
50    fn description(&self) -> &'static str {
51        "Inter-process communication between agents: send messages and receive responses."
52    }
53
54    fn input_schema(&self) -> Option<serde_json::Value> {
55        Some(serde_json::json!({
56            "type": "object",
57            "properties": {
58                "op": { "type": "string", "enum": ["send", "recv", "list", "clear"], "description": "The IPC operation to perform" },
59                "from": { "type": "string", "description": "Sender agent name (required for send)" },
60                "to": { "type": "string", "description": "Recipient agent name (required for send/recv)" },
61                "payload": { "type": "string", "description": "Message payload (required for send)" },
62                "limit": { "type": "integer", "description": "Max messages to return (for list)" }
63            },
64            "required": ["op"],
65            "additionalProperties": false
66        }))
67    }
68
69    async fn execute(&self, input: &str, ctx: &ToolContext) -> anyhow::Result<ToolResult> {
70        let req: IpcRequest =
71            serde_json::from_str(input).context("agents_ipc input must be valid JSON request")?;
72        let ipc_dir = ipc_dir(&ctx.workspace_root);
73        let store = EncryptedJsonStore::in_config_dir(&ipc_dir, IPC_STORE_FILE)?;
74        let mut messages: Vec<IpcMessage> = store.load_or_default()?;
75
76        let output = match req {
77            IpcRequest::Send { from, to, payload } => {
78                if from.trim().is_empty() || to.trim().is_empty() {
79                    return Err(anyhow!("`from` and `to` must not be empty"));
80                }
81                messages.push(IpcMessage {
82                    from,
83                    to,
84                    payload,
85                    created_at_epoch_secs: now_epoch_secs(),
86                });
87                store.save(&messages)?;
88                json!({
89                    "queued": messages.len(),
90                    "status": "ok"
91                })
92            }
93            IpcRequest::Recv { to } => {
94                if to.trim().is_empty() {
95                    return Err(anyhow!("`to` must not be empty"));
96                }
97                let idx = messages.iter().position(|msg| msg.to == to);
98                let received = idx.map(|index| messages.remove(index));
99                store.save(&messages)?;
100                json!({
101                    "message": received,
102                    "remaining": messages.len()
103                })
104            }
105            IpcRequest::List { to, from, limit } => {
106                let iter = messages.iter().filter(|msg| {
107                    to.as_ref()
108                        .map(|expected| &msg.to == expected)
109                        .unwrap_or(true)
110                        && from
111                            .as_ref()
112                            .map(|expected| &msg.from == expected)
113                            .unwrap_or(true)
114                });
115                let listed = if let Some(limit) = limit {
116                    iter.take(limit).cloned().collect::<Vec<_>>()
117                } else {
118                    iter.cloned().collect::<Vec<_>>()
119                };
120                json!({
121                    "messages": listed,
122                    "count": listed.len()
123                })
124            }
125            IpcRequest::Clear { to, from } => {
126                let before = messages.len();
127                messages.retain(|msg| {
128                    let to_match = to
129                        .as_ref()
130                        .map(|expected| &msg.to == expected)
131                        .unwrap_or(true);
132                    let from_match = from
133                        .as_ref()
134                        .map(|expected| &msg.from == expected)
135                        .unwrap_or(true);
136                    !(to_match && from_match)
137                });
138                let removed = before.saturating_sub(messages.len());
139                store.save(&messages)?;
140                json!({
141                    "removed": removed,
142                    "remaining": messages.len()
143                })
144            }
145        };
146
147        Ok(ToolResult {
148            output: serde_json::to_string_pretty(&output)?,
149        })
150    }
151}
152
153fn ipc_dir(workspace_root: &str) -> PathBuf {
154    Path::new(workspace_root).join(".agentzero")
155}
156
157fn now_epoch_secs() -> u64 {
158    SystemTime::now()
159        .duration_since(UNIX_EPOCH)
160        .expect("time should be after epoch")
161        .as_secs()
162}
163
164#[cfg(test)]
165mod tests {
166    use super::AgentsIpcTool;
167    use agentzero_core::{Tool, ToolContext};
168    use std::fs;
169    use std::sync::atomic::{AtomicU64, Ordering};
170    use std::time::{SystemTime, UNIX_EPOCH};
171
172    static TEST_COUNTER: AtomicU64 = AtomicU64::new(0);
173
174    fn temp_dir() -> std::path::PathBuf {
175        let nanos = SystemTime::now()
176            .duration_since(UNIX_EPOCH)
177            .expect("time should move forward")
178            .as_nanos();
179        let seq = TEST_COUNTER.fetch_add(1, Ordering::Relaxed);
180        let dir = std::env::temp_dir().join(format!(
181            "agentzero-ipc-tool-test-{}-{nanos}-{seq}",
182            std::process::id()
183        ));
184        fs::create_dir_all(&dir).expect("temp dir should be created");
185        dir
186    }
187
188    #[tokio::test]
189    async fn agents_ipc_send_and_recv_success_path() {
190        let dir = temp_dir();
191        let ctx = ToolContext::new(dir.to_string_lossy().to_string());
192        let tool = AgentsIpcTool;
193
194        tool.execute(
195            r#"{"op":"send","from":"planner","to":"worker","payload":"do task"}"#,
196            &ctx,
197        )
198        .await
199        .expect("send should succeed");
200
201        let recv = tool
202            .execute(r#"{"op":"recv","to":"worker"}"#, &ctx)
203            .await
204            .expect("recv should succeed");
205        assert!(recv.output.contains("\"payload\": \"do task\""));
206
207        // Verify stored data is encrypted (not readable as plain JSON)
208        let ipc_file = dir.join(".agentzero").join("ipc.json");
209        if ipc_file.exists() {
210            let raw = fs::read_to_string(&ipc_file).unwrap_or_default();
211            assert!(
212                !raw.contains("\"planner\""),
213                "IPC store should be encrypted, not plaintext"
214            );
215        }
216
217        fs::remove_dir_all(dir).expect("temp dir should be removed");
218    }
219
220    #[tokio::test]
221    async fn agents_ipc_rejects_invalid_json_negative_path() {
222        let dir = temp_dir();
223        let ctx = ToolContext::new(dir.to_string_lossy().to_string());
224        let tool = AgentsIpcTool;
225
226        let err = tool
227            .execute("not-json", &ctx)
228            .await
229            .expect_err("invalid json should fail");
230        assert!(err.to_string().contains("valid JSON"));
231
232        fs::remove_dir_all(dir).expect("temp dir should be removed");
233    }
234
235    #[tokio::test]
236    async fn agents_ipc_recv_missing_returns_no_messages() {
237        let dir = temp_dir();
238        let ctx = ToolContext::new(dir.to_string_lossy().to_string());
239        let tool = AgentsIpcTool;
240
241        let result = tool
242            .execute(r#"{"op":"recv","to":"nobody"}"#, &ctx)
243            .await
244            .expect("recv for empty mailbox should succeed");
245        assert!(
246            result.output.contains("\"message\": null")
247                || result.output.contains("\"remaining\": 0"),
248            "should indicate no messages, got: {}",
249            result.output
250        );
251        fs::remove_dir_all(dir).ok();
252    }
253
254    #[tokio::test]
255    async fn agents_ipc_message_round_trip() {
256        let dir = temp_dir();
257        let ctx = ToolContext::new(dir.to_string_lossy().to_string());
258        let tool = AgentsIpcTool;
259
260        // Send two messages.
261        tool.execute(
262            r#"{"op":"send","from":"alice","to":"bob","payload":"msg-1"}"#,
263            &ctx,
264        )
265        .await
266        .expect("send 1");
267        tool.execute(
268            r#"{"op":"send","from":"alice","to":"bob","payload":"msg-2"}"#,
269            &ctx,
270        )
271        .await
272        .expect("send 2");
273
274        // List messages for bob.
275        let list = tool
276            .execute(r#"{"op":"list","to":"bob"}"#, &ctx)
277            .await
278            .expect("list");
279        assert!(list.output.contains("msg-1"), "list should contain msg-1");
280        assert!(list.output.contains("msg-2"), "list should contain msg-2");
281
282        fs::remove_dir_all(dir).ok();
283    }
284}