1pub mod git;
56
57use serde::{Deserialize, Serialize};
58use std::fs;
59use std::path::PathBuf;
60use std::time::{SystemTime, UNIX_EPOCH};
61
62#[derive(Serialize, Deserialize, Clone, Debug)]
65pub struct Message {
66 pub id: String,
67 pub from_session: String,
68 pub from_agent: String,
69 pub to_session: Option<String>,
70 pub content: String,
71 pub timestamp: u64,
72 pub read_by: Vec<String>,
73}
74
75#[derive(Serialize, Deserialize, Clone, Debug)]
76pub struct AgentRegistration {
77 pub session_id: String,
78 pub agent_id: String,
79 pub pid: u32,
80 pub registered_at: u64,
81 pub last_heartbeat: u64,
82 pub metadata: serde_json::Value,
83}
84
85pub struct Relay {
92 pub base_dir: PathBuf,
93}
94
95impl Relay {
96 pub fn new(base_dir: PathBuf) -> Self {
101 Self { base_dir }
102 }
103
104 fn messages_dir(&self) -> PathBuf {
105 self.base_dir.join("messages")
106 }
107
108 fn agents_dir(&self) -> PathBuf {
109 self.base_dir.join("agents")
110 }
111
112 fn ensure_dirs(&self) {
113 let _ = fs::create_dir_all(self.messages_dir());
114 let _ = fs::create_dir_all(self.agents_dir());
115 }
116
117 fn now() -> u64 {
118 SystemTime::now()
119 .duration_since(UNIX_EPOCH)
120 .unwrap_or_default()
121 .as_secs()
122 }
123
124 fn atomic_write(path: &PathBuf, data: &[u8]) -> Result<(), String> {
125 let tmp = path.with_extension("tmp");
126 fs::write(&tmp, data).map_err(|e| format!("Write error: {}", e))?;
127 fs::rename(&tmp, path).map_err(|e| format!("Rename error: {}", e))?;
128 Ok(())
129 }
130
131 pub fn register(&self, agent_id: &str, session_id: &str, pid: u32) -> AgentRegistration {
135 self.register_with_metadata(agent_id, session_id, pid, serde_json::json!({}))
136 }
137
138 pub fn register_with_metadata(
140 &self,
141 agent_id: &str,
142 session_id: &str,
143 pid: u32,
144 metadata: serde_json::Value,
145 ) -> AgentRegistration {
146 self.ensure_dirs();
147 let reg = AgentRegistration {
148 session_id: session_id.to_string(),
149 agent_id: agent_id.to_string(),
150 pid,
151 registered_at: Self::now(),
152 last_heartbeat: Self::now(),
153 metadata,
154 };
155 let path = self.agents_dir().join(format!("{}.json", session_id));
156 if let Ok(json) = serde_json::to_string_pretty(®) {
157 let _ = Self::atomic_write(&path, json.as_bytes());
158 }
159 reg
160 }
161
162 pub fn heartbeat(&self, session_id: &str) {
164 let path = self.agents_dir().join(format!("{}.json", session_id));
165 if let Ok(content) = fs::read_to_string(&path) {
166 if let Ok(mut reg) = serde_json::from_str::<AgentRegistration>(&content) {
167 reg.last_heartbeat = Self::now();
168 if let Ok(json) = serde_json::to_string_pretty(®) {
169 let _ = Self::atomic_write(&path, json.as_bytes());
170 }
171 }
172 }
173 }
174
175 pub fn unregister(&self, session_id: &str) {
177 let path = self.agents_dir().join(format!("{}.json", session_id));
178 let _ = fs::remove_file(&path);
179 }
180
181 pub fn agents(&self) -> Vec<AgentRegistration> {
183 self.ensure_dirs();
184 let dir = self.agents_dir();
185 let mut agents = Vec::new();
186 if let Ok(entries) = fs::read_dir(&dir) {
187 for entry in entries.flatten() {
188 if entry.path().extension().is_some_and(|x| x == "json") {
189 if let Ok(content) = fs::read_to_string(entry.path()) {
190 if let Ok(reg) = serde_json::from_str::<AgentRegistration>(&content) {
191 agents.push(reg);
192 }
193 }
194 }
195 }
196 }
197 agents.sort_by(|a, b| b.last_heartbeat.cmp(&a.last_heartbeat));
198 agents
199 }
200
201 pub fn cleanup_dead(&self) -> usize {
203 self.ensure_dirs();
204 let agents = self.agents();
205 let mut removed = 0;
206 for agent in &agents {
207 if !is_pid_alive(agent.pid) {
208 let path = self.agents_dir().join(format!("{}.json", agent.session_id));
209 let _ = fs::remove_file(&path);
210 removed += 1;
211 }
212 }
213 removed
214 }
215
216 pub fn send(
220 &self,
221 from_session: &str,
222 from_agent: &str,
223 to_session: Option<&str>,
224 content: &str,
225 ) -> Message {
226 self.ensure_dirs();
227 let msg = Message {
228 id: format!("msg-{}", &uuid::Uuid::new_v4().to_string()[..8]),
229 from_session: from_session.to_string(),
230 from_agent: from_agent.to_string(),
231 to_session: to_session.map(|s| s.to_string()),
232 content: content.to_string(),
233 timestamp: Self::now(),
234 read_by: vec![from_session.to_string()],
235 };
236 let path = self.messages_dir().join(format!("{}.json", msg.id));
237 if let Ok(json) = serde_json::to_string_pretty(&msg) {
238 let _ = Self::atomic_write(&path, json.as_bytes());
239 }
240 msg
241 }
242
243 pub fn inbox(&self, session_id: &str, limit: usize) -> Vec<(Message, bool)> {
246 self.ensure_dirs();
247 let dir = self.messages_dir();
248 let mut messages = Vec::new();
249
250 if let Ok(entries) = fs::read_dir(&dir) {
251 for entry in entries.flatten() {
252 if entry.path().extension().is_some_and(|x| x == "json") {
253 if let Ok(content) = fs::read_to_string(entry.path()) {
254 if let Ok(msg) = serde_json::from_str::<Message>(&content) {
255 let dominated = msg.to_session.is_none()
256 || msg.to_session.as_deref() == Some(session_id)
257 || msg.from_session == session_id;
258 if dominated {
259 messages.push((entry.path(), msg));
260 }
261 }
262 }
263 }
264 }
265 }
266
267 messages.sort_by(|a, b| b.1.timestamp.cmp(&a.1.timestamp));
268
269 let mut result = Vec::new();
270 for entry in &mut messages {
271 let was_unread = !entry.1.read_by.contains(&session_id.to_string());
272 if was_unread {
273 entry.1.read_by.push(session_id.to_string());
274 if let Ok(json) = serde_json::to_string_pretty(&entry.1) {
275 let _ = Self::atomic_write(&entry.0, json.as_bytes());
276 }
277 }
278 result.push((entry.1.clone(), was_unread));
279 }
280
281 result.into_iter().take(limit).collect()
282 }
283
284 pub fn unread(&self, session_id: &str) -> Vec<Message> {
286 let dir = self.messages_dir();
287 let mut unread = Vec::new();
288 if let Ok(entries) = fs::read_dir(dir) {
289 for entry in entries.flatten() {
290 if entry.path().extension().is_some_and(|x| x == "json") {
291 if let Ok(content) = fs::read_to_string(entry.path()) {
292 if let Ok(msg) = serde_json::from_str::<Message>(&content) {
293 let dominated = msg.to_session.is_none()
294 || msg.to_session.as_deref() == Some(session_id);
295 if dominated
296 && msg.from_session != session_id
297 && !msg.read_by.contains(&session_id.to_string())
298 {
299 unread.push(msg);
300 }
301 }
302 }
303 }
304 }
305 }
306 unread.sort_by(|a, b| b.timestamp.cmp(&a.timestamp));
307 unread
308 }
309
310 pub fn unread_count(&self, session_id: &str) -> u64 {
312 self.unread(session_id).len() as u64
313 }
314
315 pub fn cleanup_old(&self, max_age_secs: u64) -> usize {
317 let dir = self.messages_dir();
318 let now = Self::now();
319 let mut removed = 0;
320 if let Ok(entries) = fs::read_dir(dir) {
321 for entry in entries.flatten() {
322 if entry.path().extension().is_some_and(|x| x == "json") {
323 if let Ok(content) = fs::read_to_string(entry.path()) {
324 if let Ok(msg) = serde_json::from_str::<Message>(&content) {
325 if now - msg.timestamp > max_age_secs {
326 let _ = fs::remove_file(entry.path());
327 removed += 1;
328 }
329 }
330 }
331 }
332 }
333 }
334 removed
335 }
336
337 pub fn poll(&self, session_id: &str) -> u64 {
343 self.unread_count(session_id)
344 }
345}
346
347fn is_pid_alive(pid: u32) -> bool {
349 #[cfg(unix)]
351 {
352 unsafe { libc_kill(pid as i32, 0) == 0 }
353 }
354 #[cfg(not(unix))]
355 {
356 let _ = pid;
358 true
359 }
360}
361
362#[cfg(unix)]
363extern "C" {
364 fn kill(pid: i32, sig: i32) -> i32;
365}
366
367#[cfg(unix)]
368unsafe fn libc_kill(pid: i32, sig: i32) -> i32 {
369 unsafe { kill(pid, sig) }
370}
371
372#[cfg(test)]
373mod tests {
374 use super::*;
375
376 fn temp_dir() -> PathBuf {
377 let dir = std::env::temp_dir().join(format!("agent-relay-test-{}", uuid::Uuid::new_v4()));
378 let _ = fs::create_dir_all(&dir);
379 dir
380 }
381
382 #[test]
383 fn test_register_and_list_agents() {
384 let dir = temp_dir();
385 let relay = Relay::new(dir.clone());
386
387 relay.register("claude", "s1", 99999);
388 relay.register("gemini", "s2", 99998);
389
390 let agents = relay.agents();
391 assert_eq!(agents.len(), 2);
392
393 let ids: std::collections::HashSet<String> =
394 agents.iter().map(|a| a.agent_id.clone()).collect();
395 assert!(ids.contains("claude"));
396 assert!(ids.contains("gemini"));
397
398 let _ = fs::remove_dir_all(&dir);
399 }
400
401 #[test]
402 fn test_send_and_receive() {
403 let dir = temp_dir();
404 let relay = Relay::new(dir.clone());
405
406 relay.register("claude", "s1", 99999);
407 relay.register("gemini", "s2", 99998);
408
409 relay.send("s1", "claude", None, "hello from claude");
410
411 let count = relay.unread_count("s2");
412 assert_eq!(count, 1);
413
414 let inbox = relay.inbox("s2", 10);
415 assert_eq!(inbox.len(), 1);
416 assert!(inbox[0].1); assert_eq!(inbox[0].0.content, "hello from claude");
418 assert_eq!(inbox[0].0.from_agent, "claude");
419
420 let count_after = relay.unread_count("s2");
422 assert_eq!(count_after, 0);
423
424 let _ = fs::remove_dir_all(&dir);
425 }
426
427 #[test]
428 fn test_direct_message() {
429 let dir = temp_dir();
430 let relay = Relay::new(dir.clone());
431
432 relay.register("claude", "s1", 99999);
433 relay.register("gemini", "s2", 99998);
434 relay.register("gpt", "s3", 99997);
435
436 relay.send("s1", "claude", Some("s2"), "private to gemini");
438
439 assert_eq!(relay.unread_count("s2"), 1);
440 assert_eq!(relay.unread_count("s3"), 0); let _ = fs::remove_dir_all(&dir);
443 }
444
445 #[test]
446 fn test_broadcast() {
447 let dir = temp_dir();
448 let relay = Relay::new(dir.clone());
449
450 relay.register("claude", "s1", 99999);
451 relay.register("gemini", "s2", 99998);
452 relay.register("gpt", "s3", 99997);
453
454 relay.send("s1", "claude", None, "broadcast to all");
455
456 assert_eq!(relay.unread_count("s2"), 1);
457 assert_eq!(relay.unread_count("s3"), 1);
458 assert_eq!(relay.unread_count("s1"), 0); let _ = fs::remove_dir_all(&dir);
461 }
462
463 #[test]
464 fn test_unregister() {
465 let dir = temp_dir();
466 let relay = Relay::new(dir.clone());
467
468 relay.register("claude", "s1", 99999);
469 assert_eq!(relay.agents().len(), 1);
470
471 relay.unregister("s1");
472 assert_eq!(relay.agents().len(), 0);
473
474 let _ = fs::remove_dir_all(&dir);
475 }
476
477 #[test]
478 fn test_cleanup_old_messages() {
479 let dir = temp_dir();
480 let relay = Relay::new(dir.clone());
481
482 let mut msg = relay.send("s1", "claude", None, "old message");
484 msg.timestamp = Relay::now() - 7200; let path = relay.messages_dir().join(format!("{}.json", msg.id));
486 let json = serde_json::to_string_pretty(&msg).unwrap();
487 let _ = fs::write(&path, json);
488
489 relay.send("s1", "claude", None, "new message");
490
491 let removed = relay.cleanup_old(3600); assert_eq!(removed, 1);
493
494 let _ = fs::remove_dir_all(&dir);
495 }
496}