1pub mod client;
56pub mod daemon;
57pub mod git;
58pub mod hub;
59pub mod server;
60pub mod server_service;
61
62use serde::{Deserialize, Serialize};
63use std::fs;
64use std::path::PathBuf;
65use std::time::{SystemTime, UNIX_EPOCH};
66
67#[derive(Serialize, Deserialize, Clone, Debug)]
70pub struct Message {
71 pub id: String,
72 pub from_session: String,
73 pub from_agent: String,
74 pub to_session: Option<String>,
75 pub content: String,
76 pub timestamp: u64,
77 pub read_by: Vec<String>,
78}
79
80#[derive(Serialize, Deserialize, Clone, Debug)]
81pub struct AgentRegistration {
82 pub session_id: String,
83 pub agent_id: String,
84 pub pid: u32,
85 pub registered_at: u64,
86 pub last_heartbeat: u64,
87 pub metadata: serde_json::Value,
88}
89
90pub struct Relay {
97 pub base_dir: PathBuf,
98}
99
100impl Relay {
101 pub fn new(base_dir: PathBuf) -> Self {
106 Self { base_dir }
107 }
108
109 fn messages_dir(&self) -> PathBuf {
110 self.base_dir.join("messages")
111 }
112
113 fn agents_dir(&self) -> PathBuf {
114 self.base_dir.join("agents")
115 }
116
117 fn ensure_dirs(&self) {
118 let _ = fs::create_dir_all(self.messages_dir());
119 let _ = fs::create_dir_all(self.agents_dir());
120 }
121
122 pub fn now() -> u64 {
123 SystemTime::now()
124 .duration_since(UNIX_EPOCH)
125 .unwrap_or_default()
126 .as_secs()
127 }
128
129 fn atomic_write(path: &PathBuf, data: &[u8]) -> Result<(), String> {
130 let tmp = path.with_extension("tmp");
131 fs::write(&tmp, data).map_err(|e| format!("Write error: {}", e))?;
132 fs::rename(&tmp, path).map_err(|e| format!("Rename error: {}", e))?;
133 Ok(())
134 }
135
136 pub fn register(&self, agent_id: &str, session_id: &str, pid: u32) -> AgentRegistration {
140 self.register_with_metadata(agent_id, session_id, pid, serde_json::json!({}))
141 }
142
143 pub fn register_with_metadata(
145 &self,
146 agent_id: &str,
147 session_id: &str,
148 pid: u32,
149 metadata: serde_json::Value,
150 ) -> AgentRegistration {
151 self.ensure_dirs();
152 let reg = AgentRegistration {
153 session_id: session_id.to_string(),
154 agent_id: agent_id.to_string(),
155 pid,
156 registered_at: Self::now(),
157 last_heartbeat: Self::now(),
158 metadata,
159 };
160 let path = self.agents_dir().join(format!("{}.json", session_id));
161 if let Ok(json) = serde_json::to_string_pretty(®) {
162 let _ = Self::atomic_write(&path, json.as_bytes());
163 }
164 reg
165 }
166
167 pub fn heartbeat(&self, session_id: &str) {
169 let path = self.agents_dir().join(format!("{}.json", session_id));
170 if let Ok(content) = fs::read_to_string(&path) {
171 if let Ok(mut reg) = serde_json::from_str::<AgentRegistration>(&content) {
172 reg.last_heartbeat = Self::now();
173 if let Ok(json) = serde_json::to_string_pretty(®) {
174 let _ = Self::atomic_write(&path, json.as_bytes());
175 }
176 }
177 }
178 }
179
180 pub fn unregister(&self, session_id: &str) {
182 let path = self.agents_dir().join(format!("{}.json", session_id));
183 let _ = fs::remove_file(&path);
184 }
185
186 pub fn agents(&self) -> Vec<AgentRegistration> {
188 self.ensure_dirs();
189 let dir = self.agents_dir();
190 let mut agents = Vec::new();
191 if let Ok(entries) = fs::read_dir(&dir) {
192 for entry in entries.flatten() {
193 if entry.path().extension().is_some_and(|x| x == "json") {
194 if let Ok(content) = fs::read_to_string(entry.path()) {
195 if let Ok(reg) = serde_json::from_str::<AgentRegistration>(&content) {
196 agents.push(reg);
197 }
198 }
199 }
200 }
201 }
202 agents.sort_by(|a, b| b.last_heartbeat.cmp(&a.last_heartbeat));
203 agents
204 }
205
206 pub fn cleanup_dead(&self) -> usize {
208 self.ensure_dirs();
209 let agents = self.agents();
210 let mut removed = 0;
211 for agent in &agents {
212 if !is_pid_alive(agent.pid) {
213 let path = self.agents_dir().join(format!("{}.json", agent.session_id));
214 let _ = fs::remove_file(&path);
215 removed += 1;
216 }
217 }
218 removed
219 }
220
221 pub fn send(
225 &self,
226 from_session: &str,
227 from_agent: &str,
228 to_session: Option<&str>,
229 content: &str,
230 ) -> Message {
231 self.ensure_dirs();
232 let msg = Message {
233 id: format!("msg-{}", &uuid::Uuid::new_v4().to_string()[..8]),
234 from_session: from_session.to_string(),
235 from_agent: from_agent.to_string(),
236 to_session: to_session.map(|s| s.to_string()),
237 content: content.to_string(),
238 timestamp: Self::now(),
239 read_by: vec![from_session.to_string()],
240 };
241 let path = self.messages_dir().join(format!("{}.json", msg.id));
242 if let Ok(json) = serde_json::to_string_pretty(&msg) {
243 let _ = Self::atomic_write(&path, json.as_bytes());
244 }
245 msg
246 }
247
248 pub fn inbox(&self, session_id: &str, limit: usize) -> Vec<(Message, bool)> {
251 self.ensure_dirs();
252 let dir = self.messages_dir();
253 let mut messages = Vec::new();
254
255 if let Ok(entries) = fs::read_dir(&dir) {
256 for entry in entries.flatten() {
257 if entry.path().extension().is_some_and(|x| x == "json") {
258 if let Ok(content) = fs::read_to_string(entry.path()) {
259 if let Ok(msg) = serde_json::from_str::<Message>(&content) {
260 let dominated = msg.to_session.is_none()
261 || msg.to_session.as_deref() == Some(session_id)
262 || msg.from_session == session_id;
263 if dominated {
264 messages.push((entry.path(), msg));
265 }
266 }
267 }
268 }
269 }
270 }
271
272 messages.sort_by(|a, b| b.1.timestamp.cmp(&a.1.timestamp));
273
274 let mut result = Vec::new();
275 for entry in &mut messages {
276 let was_unread = !entry.1.read_by.contains(&session_id.to_string());
277 if was_unread {
278 entry.1.read_by.push(session_id.to_string());
279 if let Ok(json) = serde_json::to_string_pretty(&entry.1) {
280 let _ = Self::atomic_write(&entry.0, json.as_bytes());
281 }
282 }
283 result.push((entry.1.clone(), was_unread));
284 }
285
286 result.into_iter().take(limit).collect()
287 }
288
289 pub fn unread(&self, session_id: &str) -> Vec<Message> {
291 let dir = self.messages_dir();
292 let mut unread = Vec::new();
293 if let Ok(entries) = fs::read_dir(dir) {
294 for entry in entries.flatten() {
295 if entry.path().extension().is_some_and(|x| x == "json") {
296 if let Ok(content) = fs::read_to_string(entry.path()) {
297 if let Ok(msg) = serde_json::from_str::<Message>(&content) {
298 let dominated = msg.to_session.is_none()
299 || msg.to_session.as_deref() == Some(session_id);
300 if dominated
301 && msg.from_session != session_id
302 && !msg.read_by.contains(&session_id.to_string())
303 {
304 unread.push(msg);
305 }
306 }
307 }
308 }
309 }
310 }
311 unread.sort_by(|a, b| b.timestamp.cmp(&a.timestamp));
312 unread
313 }
314
315 pub fn unread_count(&self, session_id: &str) -> u64 {
317 self.unread(session_id).len() as u64
318 }
319
320 pub fn cleanup_old(&self, max_age_secs: u64) -> usize {
322 let dir = self.messages_dir();
323 let now = Self::now();
324 let mut removed = 0;
325 if let Ok(entries) = fs::read_dir(dir) {
326 for entry in entries.flatten() {
327 if entry.path().extension().is_some_and(|x| x == "json") {
328 if let Ok(content) = fs::read_to_string(entry.path()) {
329 if let Ok(msg) = serde_json::from_str::<Message>(&content) {
330 if now - msg.timestamp > max_age_secs {
331 let _ = fs::remove_file(entry.path());
332 removed += 1;
333 }
334 }
335 }
336 }
337 }
338 }
339 removed
340 }
341
342 pub fn poll(&self, session_id: &str) -> u64 {
348 self.unread_count(session_id)
349 }
350}
351
352fn is_pid_alive(pid: u32) -> bool {
354 #[cfg(unix)]
356 {
357 unsafe { libc_kill(pid as i32, 0) == 0 }
358 }
359 #[cfg(not(unix))]
360 {
361 let _ = pid;
363 true
364 }
365}
366
367#[cfg(unix)]
368extern "C" {
369 fn kill(pid: i32, sig: i32) -> i32;
370}
371
372#[cfg(unix)]
373unsafe fn libc_kill(pid: i32, sig: i32) -> i32 {
374 unsafe { kill(pid, sig) }
375}
376
377#[cfg(test)]
378mod tests {
379 use super::*;
380
381 fn temp_dir() -> PathBuf {
382 let dir = std::env::temp_dir().join(format!("agent-relay-test-{}", uuid::Uuid::new_v4()));
383 let _ = fs::create_dir_all(&dir);
384 dir
385 }
386
387 #[test]
388 fn test_register_and_list_agents() {
389 let dir = temp_dir();
390 let relay = Relay::new(dir.clone());
391
392 relay.register("claude", "s1", 99999);
393 relay.register("gemini", "s2", 99998);
394
395 let agents = relay.agents();
396 assert_eq!(agents.len(), 2);
397
398 let ids: std::collections::HashSet<String> =
399 agents.iter().map(|a| a.agent_id.clone()).collect();
400 assert!(ids.contains("claude"));
401 assert!(ids.contains("gemini"));
402
403 let _ = fs::remove_dir_all(&dir);
404 }
405
406 #[test]
407 fn test_send_and_receive() {
408 let dir = temp_dir();
409 let relay = Relay::new(dir.clone());
410
411 relay.register("claude", "s1", 99999);
412 relay.register("gemini", "s2", 99998);
413
414 relay.send("s1", "claude", None, "hello from claude");
415
416 let count = relay.unread_count("s2");
417 assert_eq!(count, 1);
418
419 let inbox = relay.inbox("s2", 10);
420 assert_eq!(inbox.len(), 1);
421 assert!(inbox[0].1); assert_eq!(inbox[0].0.content, "hello from claude");
423 assert_eq!(inbox[0].0.from_agent, "claude");
424
425 let count_after = relay.unread_count("s2");
427 assert_eq!(count_after, 0);
428
429 let _ = fs::remove_dir_all(&dir);
430 }
431
432 #[test]
433 fn test_direct_message() {
434 let dir = temp_dir();
435 let relay = Relay::new(dir.clone());
436
437 relay.register("claude", "s1", 99999);
438 relay.register("gemini", "s2", 99998);
439 relay.register("gpt", "s3", 99997);
440
441 relay.send("s1", "claude", Some("s2"), "private to gemini");
443
444 assert_eq!(relay.unread_count("s2"), 1);
445 assert_eq!(relay.unread_count("s3"), 0); let _ = fs::remove_dir_all(&dir);
448 }
449
450 #[test]
451 fn test_broadcast() {
452 let dir = temp_dir();
453 let relay = Relay::new(dir.clone());
454
455 relay.register("claude", "s1", 99999);
456 relay.register("gemini", "s2", 99998);
457 relay.register("gpt", "s3", 99997);
458
459 relay.send("s1", "claude", None, "broadcast to all");
460
461 assert_eq!(relay.unread_count("s2"), 1);
462 assert_eq!(relay.unread_count("s3"), 1);
463 assert_eq!(relay.unread_count("s1"), 0); let _ = fs::remove_dir_all(&dir);
466 }
467
468 #[test]
469 fn test_unregister() {
470 let dir = temp_dir();
471 let relay = Relay::new(dir.clone());
472
473 relay.register("claude", "s1", 99999);
474 assert_eq!(relay.agents().len(), 1);
475
476 relay.unregister("s1");
477 assert_eq!(relay.agents().len(), 0);
478
479 let _ = fs::remove_dir_all(&dir);
480 }
481
482 #[test]
483 fn test_cleanup_old_messages() {
484 let dir = temp_dir();
485 let relay = Relay::new(dir.clone());
486
487 let mut msg = relay.send("s1", "claude", None, "old message");
489 msg.timestamp = Relay::now() - 7200; let path = relay.messages_dir().join(format!("{}.json", msg.id));
491 let json = serde_json::to_string_pretty(&msg).unwrap();
492 let _ = fs::write(&path, json);
493
494 relay.send("s1", "claude", None, "new message");
495
496 let removed = relay.cleanup_old(3600); assert_eq!(removed, 1);
498
499 let _ = fs::remove_dir_all(&dir);
500 }
501}