1pub mod client;
56pub mod daemon;
57pub mod git;
58pub mod server;
59
60use serde::{Deserialize, Serialize};
61use std::fs;
62use std::path::PathBuf;
63use std::time::{SystemTime, UNIX_EPOCH};
64
65#[derive(Serialize, Deserialize, Clone, Debug)]
68pub struct Message {
69 pub id: String,
70 pub from_session: String,
71 pub from_agent: String,
72 pub to_session: Option<String>,
73 pub content: String,
74 pub timestamp: u64,
75 pub read_by: Vec<String>,
76}
77
78#[derive(Serialize, Deserialize, Clone, Debug)]
79pub struct AgentRegistration {
80 pub session_id: String,
81 pub agent_id: String,
82 pub pid: u32,
83 pub registered_at: u64,
84 pub last_heartbeat: u64,
85 pub metadata: serde_json::Value,
86}
87
88pub struct Relay {
95 pub base_dir: PathBuf,
96}
97
98impl Relay {
99 pub fn new(base_dir: PathBuf) -> Self {
104 Self { base_dir }
105 }
106
107 fn messages_dir(&self) -> PathBuf {
108 self.base_dir.join("messages")
109 }
110
111 fn agents_dir(&self) -> PathBuf {
112 self.base_dir.join("agents")
113 }
114
115 fn ensure_dirs(&self) {
116 let _ = fs::create_dir_all(self.messages_dir());
117 let _ = fs::create_dir_all(self.agents_dir());
118 }
119
120 fn now() -> u64 {
121 SystemTime::now()
122 .duration_since(UNIX_EPOCH)
123 .unwrap_or_default()
124 .as_secs()
125 }
126
127 fn atomic_write(path: &PathBuf, data: &[u8]) -> Result<(), String> {
128 let tmp = path.with_extension("tmp");
129 fs::write(&tmp, data).map_err(|e| format!("Write error: {}", e))?;
130 fs::rename(&tmp, path).map_err(|e| format!("Rename error: {}", e))?;
131 Ok(())
132 }
133
134 pub fn register(&self, agent_id: &str, session_id: &str, pid: u32) -> AgentRegistration {
138 self.register_with_metadata(agent_id, session_id, pid, serde_json::json!({}))
139 }
140
141 pub fn register_with_metadata(
143 &self,
144 agent_id: &str,
145 session_id: &str,
146 pid: u32,
147 metadata: serde_json::Value,
148 ) -> AgentRegistration {
149 self.ensure_dirs();
150 let reg = AgentRegistration {
151 session_id: session_id.to_string(),
152 agent_id: agent_id.to_string(),
153 pid,
154 registered_at: Self::now(),
155 last_heartbeat: Self::now(),
156 metadata,
157 };
158 let path = self.agents_dir().join(format!("{}.json", session_id));
159 if let Ok(json) = serde_json::to_string_pretty(®) {
160 let _ = Self::atomic_write(&path, json.as_bytes());
161 }
162 reg
163 }
164
165 pub fn heartbeat(&self, session_id: &str) {
167 let path = self.agents_dir().join(format!("{}.json", session_id));
168 if let Ok(content) = fs::read_to_string(&path) {
169 if let Ok(mut reg) = serde_json::from_str::<AgentRegistration>(&content) {
170 reg.last_heartbeat = Self::now();
171 if let Ok(json) = serde_json::to_string_pretty(®) {
172 let _ = Self::atomic_write(&path, json.as_bytes());
173 }
174 }
175 }
176 }
177
178 pub fn unregister(&self, session_id: &str) {
180 let path = self.agents_dir().join(format!("{}.json", session_id));
181 let _ = fs::remove_file(&path);
182 }
183
184 pub fn agents(&self) -> Vec<AgentRegistration> {
186 self.ensure_dirs();
187 let dir = self.agents_dir();
188 let mut agents = Vec::new();
189 if let Ok(entries) = fs::read_dir(&dir) {
190 for entry in entries.flatten() {
191 if entry.path().extension().is_some_and(|x| x == "json") {
192 if let Ok(content) = fs::read_to_string(entry.path()) {
193 if let Ok(reg) = serde_json::from_str::<AgentRegistration>(&content) {
194 agents.push(reg);
195 }
196 }
197 }
198 }
199 }
200 agents.sort_by(|a, b| b.last_heartbeat.cmp(&a.last_heartbeat));
201 agents
202 }
203
204 pub fn cleanup_dead(&self) -> usize {
206 self.ensure_dirs();
207 let agents = self.agents();
208 let mut removed = 0;
209 for agent in &agents {
210 if !is_pid_alive(agent.pid) {
211 let path = self.agents_dir().join(format!("{}.json", agent.session_id));
212 let _ = fs::remove_file(&path);
213 removed += 1;
214 }
215 }
216 removed
217 }
218
219 pub fn send(
223 &self,
224 from_session: &str,
225 from_agent: &str,
226 to_session: Option<&str>,
227 content: &str,
228 ) -> Message {
229 self.ensure_dirs();
230 let msg = Message {
231 id: format!("msg-{}", &uuid::Uuid::new_v4().to_string()[..8]),
232 from_session: from_session.to_string(),
233 from_agent: from_agent.to_string(),
234 to_session: to_session.map(|s| s.to_string()),
235 content: content.to_string(),
236 timestamp: Self::now(),
237 read_by: vec![from_session.to_string()],
238 };
239 let path = self.messages_dir().join(format!("{}.json", msg.id));
240 if let Ok(json) = serde_json::to_string_pretty(&msg) {
241 let _ = Self::atomic_write(&path, json.as_bytes());
242 }
243 msg
244 }
245
246 pub fn inbox(&self, session_id: &str, limit: usize) -> Vec<(Message, bool)> {
249 self.ensure_dirs();
250 let dir = self.messages_dir();
251 let mut messages = Vec::new();
252
253 if let Ok(entries) = fs::read_dir(&dir) {
254 for entry in entries.flatten() {
255 if entry.path().extension().is_some_and(|x| x == "json") {
256 if let Ok(content) = fs::read_to_string(entry.path()) {
257 if let Ok(msg) = serde_json::from_str::<Message>(&content) {
258 let dominated = msg.to_session.is_none()
259 || msg.to_session.as_deref() == Some(session_id)
260 || msg.from_session == session_id;
261 if dominated {
262 messages.push((entry.path(), msg));
263 }
264 }
265 }
266 }
267 }
268 }
269
270 messages.sort_by(|a, b| b.1.timestamp.cmp(&a.1.timestamp));
271
272 let mut result = Vec::new();
273 for entry in &mut messages {
274 let was_unread = !entry.1.read_by.contains(&session_id.to_string());
275 if was_unread {
276 entry.1.read_by.push(session_id.to_string());
277 if let Ok(json) = serde_json::to_string_pretty(&entry.1) {
278 let _ = Self::atomic_write(&entry.0, json.as_bytes());
279 }
280 }
281 result.push((entry.1.clone(), was_unread));
282 }
283
284 result.into_iter().take(limit).collect()
285 }
286
287 pub fn unread(&self, session_id: &str) -> Vec<Message> {
289 let dir = self.messages_dir();
290 let mut unread = Vec::new();
291 if let Ok(entries) = fs::read_dir(dir) {
292 for entry in entries.flatten() {
293 if entry.path().extension().is_some_and(|x| x == "json") {
294 if let Ok(content) = fs::read_to_string(entry.path()) {
295 if let Ok(msg) = serde_json::from_str::<Message>(&content) {
296 let dominated = msg.to_session.is_none()
297 || msg.to_session.as_deref() == Some(session_id);
298 if dominated
299 && msg.from_session != session_id
300 && !msg.read_by.contains(&session_id.to_string())
301 {
302 unread.push(msg);
303 }
304 }
305 }
306 }
307 }
308 }
309 unread.sort_by(|a, b| b.timestamp.cmp(&a.timestamp));
310 unread
311 }
312
313 pub fn unread_count(&self, session_id: &str) -> u64 {
315 self.unread(session_id).len() as u64
316 }
317
318 pub fn cleanup_old(&self, max_age_secs: u64) -> usize {
320 let dir = self.messages_dir();
321 let now = Self::now();
322 let mut removed = 0;
323 if let Ok(entries) = fs::read_dir(dir) {
324 for entry in entries.flatten() {
325 if entry.path().extension().is_some_and(|x| x == "json") {
326 if let Ok(content) = fs::read_to_string(entry.path()) {
327 if let Ok(msg) = serde_json::from_str::<Message>(&content) {
328 if now - msg.timestamp > max_age_secs {
329 let _ = fs::remove_file(entry.path());
330 removed += 1;
331 }
332 }
333 }
334 }
335 }
336 }
337 removed
338 }
339
340 pub fn poll(&self, session_id: &str) -> u64 {
346 self.unread_count(session_id)
347 }
348}
349
350fn is_pid_alive(pid: u32) -> bool {
352 #[cfg(unix)]
354 {
355 unsafe { libc_kill(pid as i32, 0) == 0 }
356 }
357 #[cfg(not(unix))]
358 {
359 let _ = pid;
361 true
362 }
363}
364
365#[cfg(unix)]
366extern "C" {
367 fn kill(pid: i32, sig: i32) -> i32;
368}
369
370#[cfg(unix)]
371unsafe fn libc_kill(pid: i32, sig: i32) -> i32 {
372 unsafe { kill(pid, sig) }
373}
374
375#[cfg(test)]
376mod tests {
377 use super::*;
378
379 fn temp_dir() -> PathBuf {
380 let dir = std::env::temp_dir().join(format!("agent-relay-test-{}", uuid::Uuid::new_v4()));
381 let _ = fs::create_dir_all(&dir);
382 dir
383 }
384
385 #[test]
386 fn test_register_and_list_agents() {
387 let dir = temp_dir();
388 let relay = Relay::new(dir.clone());
389
390 relay.register("claude", "s1", 99999);
391 relay.register("gemini", "s2", 99998);
392
393 let agents = relay.agents();
394 assert_eq!(agents.len(), 2);
395
396 let ids: std::collections::HashSet<String> =
397 agents.iter().map(|a| a.agent_id.clone()).collect();
398 assert!(ids.contains("claude"));
399 assert!(ids.contains("gemini"));
400
401 let _ = fs::remove_dir_all(&dir);
402 }
403
404 #[test]
405 fn test_send_and_receive() {
406 let dir = temp_dir();
407 let relay = Relay::new(dir.clone());
408
409 relay.register("claude", "s1", 99999);
410 relay.register("gemini", "s2", 99998);
411
412 relay.send("s1", "claude", None, "hello from claude");
413
414 let count = relay.unread_count("s2");
415 assert_eq!(count, 1);
416
417 let inbox = relay.inbox("s2", 10);
418 assert_eq!(inbox.len(), 1);
419 assert!(inbox[0].1); assert_eq!(inbox[0].0.content, "hello from claude");
421 assert_eq!(inbox[0].0.from_agent, "claude");
422
423 let count_after = relay.unread_count("s2");
425 assert_eq!(count_after, 0);
426
427 let _ = fs::remove_dir_all(&dir);
428 }
429
430 #[test]
431 fn test_direct_message() {
432 let dir = temp_dir();
433 let relay = Relay::new(dir.clone());
434
435 relay.register("claude", "s1", 99999);
436 relay.register("gemini", "s2", 99998);
437 relay.register("gpt", "s3", 99997);
438
439 relay.send("s1", "claude", Some("s2"), "private to gemini");
441
442 assert_eq!(relay.unread_count("s2"), 1);
443 assert_eq!(relay.unread_count("s3"), 0); let _ = fs::remove_dir_all(&dir);
446 }
447
448 #[test]
449 fn test_broadcast() {
450 let dir = temp_dir();
451 let relay = Relay::new(dir.clone());
452
453 relay.register("claude", "s1", 99999);
454 relay.register("gemini", "s2", 99998);
455 relay.register("gpt", "s3", 99997);
456
457 relay.send("s1", "claude", None, "broadcast to all");
458
459 assert_eq!(relay.unread_count("s2"), 1);
460 assert_eq!(relay.unread_count("s3"), 1);
461 assert_eq!(relay.unread_count("s1"), 0); let _ = fs::remove_dir_all(&dir);
464 }
465
466 #[test]
467 fn test_unregister() {
468 let dir = temp_dir();
469 let relay = Relay::new(dir.clone());
470
471 relay.register("claude", "s1", 99999);
472 assert_eq!(relay.agents().len(), 1);
473
474 relay.unregister("s1");
475 assert_eq!(relay.agents().len(), 0);
476
477 let _ = fs::remove_dir_all(&dir);
478 }
479
480 #[test]
481 fn test_cleanup_old_messages() {
482 let dir = temp_dir();
483 let relay = Relay::new(dir.clone());
484
485 let mut msg = relay.send("s1", "claude", None, "old message");
487 msg.timestamp = Relay::now() - 7200; let path = relay.messages_dir().join(format!("{}.json", msg.id));
489 let json = serde_json::to_string_pretty(&msg).unwrap();
490 let _ = fs::write(&path, json);
491
492 relay.send("s1", "claude", None, "new message");
493
494 let removed = relay.cleanup_old(3600); assert_eq!(removed, 1);
496
497 let _ = fs::remove_dir_all(&dir);
498 }
499}