1pub mod client;
56pub mod git;
57pub mod server;
58
59use serde::{Deserialize, Serialize};
60use std::fs;
61use std::path::PathBuf;
62use std::time::{SystemTime, UNIX_EPOCH};
63
64#[derive(Serialize, Deserialize, Clone, Debug)]
67pub struct Message {
68 pub id: String,
69 pub from_session: String,
70 pub from_agent: String,
71 pub to_session: Option<String>,
72 pub content: String,
73 pub timestamp: u64,
74 pub read_by: Vec<String>,
75}
76
77#[derive(Serialize, Deserialize, Clone, Debug)]
78pub struct AgentRegistration {
79 pub session_id: String,
80 pub agent_id: String,
81 pub pid: u32,
82 pub registered_at: u64,
83 pub last_heartbeat: u64,
84 pub metadata: serde_json::Value,
85}
86
87pub struct Relay {
94 pub base_dir: PathBuf,
95}
96
97impl Relay {
98 pub fn new(base_dir: PathBuf) -> Self {
103 Self { base_dir }
104 }
105
106 fn messages_dir(&self) -> PathBuf {
107 self.base_dir.join("messages")
108 }
109
110 fn agents_dir(&self) -> PathBuf {
111 self.base_dir.join("agents")
112 }
113
114 fn ensure_dirs(&self) {
115 let _ = fs::create_dir_all(self.messages_dir());
116 let _ = fs::create_dir_all(self.agents_dir());
117 }
118
119 fn now() -> u64 {
120 SystemTime::now()
121 .duration_since(UNIX_EPOCH)
122 .unwrap_or_default()
123 .as_secs()
124 }
125
126 fn atomic_write(path: &PathBuf, data: &[u8]) -> Result<(), String> {
127 let tmp = path.with_extension("tmp");
128 fs::write(&tmp, data).map_err(|e| format!("Write error: {}", e))?;
129 fs::rename(&tmp, path).map_err(|e| format!("Rename error: {}", e))?;
130 Ok(())
131 }
132
133 pub fn register(&self, agent_id: &str, session_id: &str, pid: u32) -> AgentRegistration {
137 self.register_with_metadata(agent_id, session_id, pid, serde_json::json!({}))
138 }
139
140 pub fn register_with_metadata(
142 &self,
143 agent_id: &str,
144 session_id: &str,
145 pid: u32,
146 metadata: serde_json::Value,
147 ) -> AgentRegistration {
148 self.ensure_dirs();
149 let reg = AgentRegistration {
150 session_id: session_id.to_string(),
151 agent_id: agent_id.to_string(),
152 pid,
153 registered_at: Self::now(),
154 last_heartbeat: Self::now(),
155 metadata,
156 };
157 let path = self.agents_dir().join(format!("{}.json", session_id));
158 if let Ok(json) = serde_json::to_string_pretty(®) {
159 let _ = Self::atomic_write(&path, json.as_bytes());
160 }
161 reg
162 }
163
164 pub fn heartbeat(&self, session_id: &str) {
166 let path = self.agents_dir().join(format!("{}.json", session_id));
167 if let Ok(content) = fs::read_to_string(&path) {
168 if let Ok(mut reg) = serde_json::from_str::<AgentRegistration>(&content) {
169 reg.last_heartbeat = Self::now();
170 if let Ok(json) = serde_json::to_string_pretty(®) {
171 let _ = Self::atomic_write(&path, json.as_bytes());
172 }
173 }
174 }
175 }
176
177 pub fn unregister(&self, session_id: &str) {
179 let path = self.agents_dir().join(format!("{}.json", session_id));
180 let _ = fs::remove_file(&path);
181 }
182
183 pub fn agents(&self) -> Vec<AgentRegistration> {
185 self.ensure_dirs();
186 let dir = self.agents_dir();
187 let mut agents = Vec::new();
188 if let Ok(entries) = fs::read_dir(&dir) {
189 for entry in entries.flatten() {
190 if entry.path().extension().is_some_and(|x| x == "json") {
191 if let Ok(content) = fs::read_to_string(entry.path()) {
192 if let Ok(reg) = serde_json::from_str::<AgentRegistration>(&content) {
193 agents.push(reg);
194 }
195 }
196 }
197 }
198 }
199 agents.sort_by(|a, b| b.last_heartbeat.cmp(&a.last_heartbeat));
200 agents
201 }
202
203 pub fn cleanup_dead(&self) -> usize {
205 self.ensure_dirs();
206 let agents = self.agents();
207 let mut removed = 0;
208 for agent in &agents {
209 if !is_pid_alive(agent.pid) {
210 let path = self.agents_dir().join(format!("{}.json", agent.session_id));
211 let _ = fs::remove_file(&path);
212 removed += 1;
213 }
214 }
215 removed
216 }
217
218 pub fn send(
222 &self,
223 from_session: &str,
224 from_agent: &str,
225 to_session: Option<&str>,
226 content: &str,
227 ) -> Message {
228 self.ensure_dirs();
229 let msg = Message {
230 id: format!("msg-{}", &uuid::Uuid::new_v4().to_string()[..8]),
231 from_session: from_session.to_string(),
232 from_agent: from_agent.to_string(),
233 to_session: to_session.map(|s| s.to_string()),
234 content: content.to_string(),
235 timestamp: Self::now(),
236 read_by: vec![from_session.to_string()],
237 };
238 let path = self.messages_dir().join(format!("{}.json", msg.id));
239 if let Ok(json) = serde_json::to_string_pretty(&msg) {
240 let _ = Self::atomic_write(&path, json.as_bytes());
241 }
242 msg
243 }
244
245 pub fn inbox(&self, session_id: &str, limit: usize) -> Vec<(Message, bool)> {
248 self.ensure_dirs();
249 let dir = self.messages_dir();
250 let mut messages = Vec::new();
251
252 if let Ok(entries) = fs::read_dir(&dir) {
253 for entry in entries.flatten() {
254 if entry.path().extension().is_some_and(|x| x == "json") {
255 if let Ok(content) = fs::read_to_string(entry.path()) {
256 if let Ok(msg) = serde_json::from_str::<Message>(&content) {
257 let dominated = msg.to_session.is_none()
258 || msg.to_session.as_deref() == Some(session_id)
259 || msg.from_session == session_id;
260 if dominated {
261 messages.push((entry.path(), msg));
262 }
263 }
264 }
265 }
266 }
267 }
268
269 messages.sort_by(|a, b| b.1.timestamp.cmp(&a.1.timestamp));
270
271 let mut result = Vec::new();
272 for entry in &mut messages {
273 let was_unread = !entry.1.read_by.contains(&session_id.to_string());
274 if was_unread {
275 entry.1.read_by.push(session_id.to_string());
276 if let Ok(json) = serde_json::to_string_pretty(&entry.1) {
277 let _ = Self::atomic_write(&entry.0, json.as_bytes());
278 }
279 }
280 result.push((entry.1.clone(), was_unread));
281 }
282
283 result.into_iter().take(limit).collect()
284 }
285
286 pub fn unread(&self, session_id: &str) -> Vec<Message> {
288 let dir = self.messages_dir();
289 let mut unread = Vec::new();
290 if let Ok(entries) = fs::read_dir(dir) {
291 for entry in entries.flatten() {
292 if entry.path().extension().is_some_and(|x| x == "json") {
293 if let Ok(content) = fs::read_to_string(entry.path()) {
294 if let Ok(msg) = serde_json::from_str::<Message>(&content) {
295 let dominated = msg.to_session.is_none()
296 || msg.to_session.as_deref() == Some(session_id);
297 if dominated
298 && msg.from_session != session_id
299 && !msg.read_by.contains(&session_id.to_string())
300 {
301 unread.push(msg);
302 }
303 }
304 }
305 }
306 }
307 }
308 unread.sort_by(|a, b| b.timestamp.cmp(&a.timestamp));
309 unread
310 }
311
312 pub fn unread_count(&self, session_id: &str) -> u64 {
314 self.unread(session_id).len() as u64
315 }
316
317 pub fn cleanup_old(&self, max_age_secs: u64) -> usize {
319 let dir = self.messages_dir();
320 let now = Self::now();
321 let mut removed = 0;
322 if let Ok(entries) = fs::read_dir(dir) {
323 for entry in entries.flatten() {
324 if entry.path().extension().is_some_and(|x| x == "json") {
325 if let Ok(content) = fs::read_to_string(entry.path()) {
326 if let Ok(msg) = serde_json::from_str::<Message>(&content) {
327 if now - msg.timestamp > max_age_secs {
328 let _ = fs::remove_file(entry.path());
329 removed += 1;
330 }
331 }
332 }
333 }
334 }
335 }
336 removed
337 }
338
339 pub fn poll(&self, session_id: &str) -> u64 {
345 self.unread_count(session_id)
346 }
347}
348
349fn is_pid_alive(pid: u32) -> bool {
351 #[cfg(unix)]
353 {
354 unsafe { libc_kill(pid as i32, 0) == 0 }
355 }
356 #[cfg(not(unix))]
357 {
358 let _ = pid;
360 true
361 }
362}
363
364#[cfg(unix)]
365extern "C" {
366 fn kill(pid: i32, sig: i32) -> i32;
367}
368
369#[cfg(unix)]
370unsafe fn libc_kill(pid: i32, sig: i32) -> i32 {
371 unsafe { kill(pid, sig) }
372}
373
374#[cfg(test)]
375mod tests {
376 use super::*;
377
378 fn temp_dir() -> PathBuf {
379 let dir = std::env::temp_dir().join(format!("agent-relay-test-{}", uuid::Uuid::new_v4()));
380 let _ = fs::create_dir_all(&dir);
381 dir
382 }
383
384 #[test]
385 fn test_register_and_list_agents() {
386 let dir = temp_dir();
387 let relay = Relay::new(dir.clone());
388
389 relay.register("claude", "s1", 99999);
390 relay.register("gemini", "s2", 99998);
391
392 let agents = relay.agents();
393 assert_eq!(agents.len(), 2);
394
395 let ids: std::collections::HashSet<String> =
396 agents.iter().map(|a| a.agent_id.clone()).collect();
397 assert!(ids.contains("claude"));
398 assert!(ids.contains("gemini"));
399
400 let _ = fs::remove_dir_all(&dir);
401 }
402
403 #[test]
404 fn test_send_and_receive() {
405 let dir = temp_dir();
406 let relay = Relay::new(dir.clone());
407
408 relay.register("claude", "s1", 99999);
409 relay.register("gemini", "s2", 99998);
410
411 relay.send("s1", "claude", None, "hello from claude");
412
413 let count = relay.unread_count("s2");
414 assert_eq!(count, 1);
415
416 let inbox = relay.inbox("s2", 10);
417 assert_eq!(inbox.len(), 1);
418 assert!(inbox[0].1); assert_eq!(inbox[0].0.content, "hello from claude");
420 assert_eq!(inbox[0].0.from_agent, "claude");
421
422 let count_after = relay.unread_count("s2");
424 assert_eq!(count_after, 0);
425
426 let _ = fs::remove_dir_all(&dir);
427 }
428
429 #[test]
430 fn test_direct_message() {
431 let dir = temp_dir();
432 let relay = Relay::new(dir.clone());
433
434 relay.register("claude", "s1", 99999);
435 relay.register("gemini", "s2", 99998);
436 relay.register("gpt", "s3", 99997);
437
438 relay.send("s1", "claude", Some("s2"), "private to gemini");
440
441 assert_eq!(relay.unread_count("s2"), 1);
442 assert_eq!(relay.unread_count("s3"), 0); let _ = fs::remove_dir_all(&dir);
445 }
446
447 #[test]
448 fn test_broadcast() {
449 let dir = temp_dir();
450 let relay = Relay::new(dir.clone());
451
452 relay.register("claude", "s1", 99999);
453 relay.register("gemini", "s2", 99998);
454 relay.register("gpt", "s3", 99997);
455
456 relay.send("s1", "claude", None, "broadcast to all");
457
458 assert_eq!(relay.unread_count("s2"), 1);
459 assert_eq!(relay.unread_count("s3"), 1);
460 assert_eq!(relay.unread_count("s1"), 0); let _ = fs::remove_dir_all(&dir);
463 }
464
465 #[test]
466 fn test_unregister() {
467 let dir = temp_dir();
468 let relay = Relay::new(dir.clone());
469
470 relay.register("claude", "s1", 99999);
471 assert_eq!(relay.agents().len(), 1);
472
473 relay.unregister("s1");
474 assert_eq!(relay.agents().len(), 0);
475
476 let _ = fs::remove_dir_all(&dir);
477 }
478
479 #[test]
480 fn test_cleanup_old_messages() {
481 let dir = temp_dir();
482 let relay = Relay::new(dir.clone());
483
484 let mut msg = relay.send("s1", "claude", None, "old message");
486 msg.timestamp = Relay::now() - 7200; let path = relay.messages_dir().join(format!("{}.json", msg.id));
488 let json = serde_json::to_string_pretty(&msg).unwrap();
489 let _ = fs::write(&path, json);
490
491 relay.send("s1", "claude", None, "new message");
492
493 let removed = relay.cleanup_old(3600); assert_eq!(removed, 1);
495
496 let _ = fs::remove_dir_all(&dir);
497 }
498}