1pub mod client;
56pub mod daemon;
57pub mod git;
58pub mod server;
59pub mod server_service;
60
61use serde::{Deserialize, Serialize};
62use std::fs;
63use std::path::PathBuf;
64use std::time::{SystemTime, UNIX_EPOCH};
65
66#[derive(Serialize, Deserialize, Clone, Debug)]
69pub struct Message {
70 pub id: String,
71 pub from_session: String,
72 pub from_agent: String,
73 pub to_session: Option<String>,
74 pub content: String,
75 pub timestamp: u64,
76 pub read_by: Vec<String>,
77}
78
79#[derive(Serialize, Deserialize, Clone, Debug)]
80pub struct AgentRegistration {
81 pub session_id: String,
82 pub agent_id: String,
83 pub pid: u32,
84 pub registered_at: u64,
85 pub last_heartbeat: u64,
86 pub metadata: serde_json::Value,
87}
88
89pub struct Relay {
96 pub base_dir: PathBuf,
97}
98
99impl Relay {
100 pub fn new(base_dir: PathBuf) -> Self {
105 Self { base_dir }
106 }
107
108 fn messages_dir(&self) -> PathBuf {
109 self.base_dir.join("messages")
110 }
111
112 fn agents_dir(&self) -> PathBuf {
113 self.base_dir.join("agents")
114 }
115
116 fn ensure_dirs(&self) {
117 let _ = fs::create_dir_all(self.messages_dir());
118 let _ = fs::create_dir_all(self.agents_dir());
119 }
120
121 pub fn now() -> u64 {
122 SystemTime::now()
123 .duration_since(UNIX_EPOCH)
124 .unwrap_or_default()
125 .as_secs()
126 }
127
128 fn atomic_write(path: &PathBuf, data: &[u8]) -> Result<(), String> {
129 let tmp = path.with_extension("tmp");
130 fs::write(&tmp, data).map_err(|e| format!("Write error: {}", e))?;
131 fs::rename(&tmp, path).map_err(|e| format!("Rename error: {}", e))?;
132 Ok(())
133 }
134
135 pub fn register(&self, agent_id: &str, session_id: &str, pid: u32) -> AgentRegistration {
139 self.register_with_metadata(agent_id, session_id, pid, serde_json::json!({}))
140 }
141
142 pub fn register_with_metadata(
144 &self,
145 agent_id: &str,
146 session_id: &str,
147 pid: u32,
148 metadata: serde_json::Value,
149 ) -> AgentRegistration {
150 self.ensure_dirs();
151 let reg = AgentRegistration {
152 session_id: session_id.to_string(),
153 agent_id: agent_id.to_string(),
154 pid,
155 registered_at: Self::now(),
156 last_heartbeat: Self::now(),
157 metadata,
158 };
159 let path = self.agents_dir().join(format!("{}.json", session_id));
160 if let Ok(json) = serde_json::to_string_pretty(®) {
161 let _ = Self::atomic_write(&path, json.as_bytes());
162 }
163 reg
164 }
165
166 pub fn heartbeat(&self, session_id: &str) {
168 let path = self.agents_dir().join(format!("{}.json", session_id));
169 if let Ok(content) = fs::read_to_string(&path) {
170 if let Ok(mut reg) = serde_json::from_str::<AgentRegistration>(&content) {
171 reg.last_heartbeat = Self::now();
172 if let Ok(json) = serde_json::to_string_pretty(®) {
173 let _ = Self::atomic_write(&path, json.as_bytes());
174 }
175 }
176 }
177 }
178
179 pub fn unregister(&self, session_id: &str) {
181 let path = self.agents_dir().join(format!("{}.json", session_id));
182 let _ = fs::remove_file(&path);
183 }
184
185 pub fn agents(&self) -> Vec<AgentRegistration> {
187 self.ensure_dirs();
188 let dir = self.agents_dir();
189 let mut agents = Vec::new();
190 if let Ok(entries) = fs::read_dir(&dir) {
191 for entry in entries.flatten() {
192 if entry.path().extension().is_some_and(|x| x == "json") {
193 if let Ok(content) = fs::read_to_string(entry.path()) {
194 if let Ok(reg) = serde_json::from_str::<AgentRegistration>(&content) {
195 agents.push(reg);
196 }
197 }
198 }
199 }
200 }
201 agents.sort_by(|a, b| b.last_heartbeat.cmp(&a.last_heartbeat));
202 agents
203 }
204
205 pub fn cleanup_dead(&self) -> usize {
207 self.ensure_dirs();
208 let agents = self.agents();
209 let mut removed = 0;
210 for agent in &agents {
211 if !is_pid_alive(agent.pid) {
212 let path = self.agents_dir().join(format!("{}.json", agent.session_id));
213 let _ = fs::remove_file(&path);
214 removed += 1;
215 }
216 }
217 removed
218 }
219
220 pub fn send(
224 &self,
225 from_session: &str,
226 from_agent: &str,
227 to_session: Option<&str>,
228 content: &str,
229 ) -> Message {
230 self.ensure_dirs();
231 let msg = Message {
232 id: format!("msg-{}", &uuid::Uuid::new_v4().to_string()[..8]),
233 from_session: from_session.to_string(),
234 from_agent: from_agent.to_string(),
235 to_session: to_session.map(|s| s.to_string()),
236 content: content.to_string(),
237 timestamp: Self::now(),
238 read_by: vec![from_session.to_string()],
239 };
240 let path = self.messages_dir().join(format!("{}.json", msg.id));
241 if let Ok(json) = serde_json::to_string_pretty(&msg) {
242 let _ = Self::atomic_write(&path, json.as_bytes());
243 }
244 msg
245 }
246
247 pub fn inbox(&self, session_id: &str, limit: usize) -> Vec<(Message, bool)> {
250 self.ensure_dirs();
251 let dir = self.messages_dir();
252 let mut messages = Vec::new();
253
254 if let Ok(entries) = fs::read_dir(&dir) {
255 for entry in entries.flatten() {
256 if entry.path().extension().is_some_and(|x| x == "json") {
257 if let Ok(content) = fs::read_to_string(entry.path()) {
258 if let Ok(msg) = serde_json::from_str::<Message>(&content) {
259 let dominated = msg.to_session.is_none()
260 || msg.to_session.as_deref() == Some(session_id)
261 || msg.from_session == session_id;
262 if dominated {
263 messages.push((entry.path(), msg));
264 }
265 }
266 }
267 }
268 }
269 }
270
271 messages.sort_by(|a, b| b.1.timestamp.cmp(&a.1.timestamp));
272
273 let mut result = Vec::new();
274 for entry in &mut messages {
275 let was_unread = !entry.1.read_by.contains(&session_id.to_string());
276 if was_unread {
277 entry.1.read_by.push(session_id.to_string());
278 if let Ok(json) = serde_json::to_string_pretty(&entry.1) {
279 let _ = Self::atomic_write(&entry.0, json.as_bytes());
280 }
281 }
282 result.push((entry.1.clone(), was_unread));
283 }
284
285 result.into_iter().take(limit).collect()
286 }
287
288 pub fn unread(&self, session_id: &str) -> Vec<Message> {
290 let dir = self.messages_dir();
291 let mut unread = Vec::new();
292 if let Ok(entries) = fs::read_dir(dir) {
293 for entry in entries.flatten() {
294 if entry.path().extension().is_some_and(|x| x == "json") {
295 if let Ok(content) = fs::read_to_string(entry.path()) {
296 if let Ok(msg) = serde_json::from_str::<Message>(&content) {
297 let dominated = msg.to_session.is_none()
298 || msg.to_session.as_deref() == Some(session_id);
299 if dominated
300 && msg.from_session != session_id
301 && !msg.read_by.contains(&session_id.to_string())
302 {
303 unread.push(msg);
304 }
305 }
306 }
307 }
308 }
309 }
310 unread.sort_by(|a, b| b.timestamp.cmp(&a.timestamp));
311 unread
312 }
313
314 pub fn unread_count(&self, session_id: &str) -> u64 {
316 self.unread(session_id).len() as u64
317 }
318
319 pub fn cleanup_old(&self, max_age_secs: u64) -> usize {
321 let dir = self.messages_dir();
322 let now = Self::now();
323 let mut removed = 0;
324 if let Ok(entries) = fs::read_dir(dir) {
325 for entry in entries.flatten() {
326 if entry.path().extension().is_some_and(|x| x == "json") {
327 if let Ok(content) = fs::read_to_string(entry.path()) {
328 if let Ok(msg) = serde_json::from_str::<Message>(&content) {
329 if now - msg.timestamp > max_age_secs {
330 let _ = fs::remove_file(entry.path());
331 removed += 1;
332 }
333 }
334 }
335 }
336 }
337 }
338 removed
339 }
340
341 pub fn poll(&self, session_id: &str) -> u64 {
347 self.unread_count(session_id)
348 }
349}
350
351fn is_pid_alive(pid: u32) -> bool {
353 #[cfg(unix)]
355 {
356 unsafe { libc_kill(pid as i32, 0) == 0 }
357 }
358 #[cfg(not(unix))]
359 {
360 let _ = pid;
362 true
363 }
364}
365
366#[cfg(unix)]
367extern "C" {
368 fn kill(pid: i32, sig: i32) -> i32;
369}
370
371#[cfg(unix)]
372unsafe fn libc_kill(pid: i32, sig: i32) -> i32 {
373 unsafe { kill(pid, sig) }
374}
375
376#[cfg(test)]
377mod tests {
378 use super::*;
379
380 fn temp_dir() -> PathBuf {
381 let dir = std::env::temp_dir().join(format!("agent-relay-test-{}", uuid::Uuid::new_v4()));
382 let _ = fs::create_dir_all(&dir);
383 dir
384 }
385
386 #[test]
387 fn test_register_and_list_agents() {
388 let dir = temp_dir();
389 let relay = Relay::new(dir.clone());
390
391 relay.register("claude", "s1", 99999);
392 relay.register("gemini", "s2", 99998);
393
394 let agents = relay.agents();
395 assert_eq!(agents.len(), 2);
396
397 let ids: std::collections::HashSet<String> =
398 agents.iter().map(|a| a.agent_id.clone()).collect();
399 assert!(ids.contains("claude"));
400 assert!(ids.contains("gemini"));
401
402 let _ = fs::remove_dir_all(&dir);
403 }
404
405 #[test]
406 fn test_send_and_receive() {
407 let dir = temp_dir();
408 let relay = Relay::new(dir.clone());
409
410 relay.register("claude", "s1", 99999);
411 relay.register("gemini", "s2", 99998);
412
413 relay.send("s1", "claude", None, "hello from claude");
414
415 let count = relay.unread_count("s2");
416 assert_eq!(count, 1);
417
418 let inbox = relay.inbox("s2", 10);
419 assert_eq!(inbox.len(), 1);
420 assert!(inbox[0].1); assert_eq!(inbox[0].0.content, "hello from claude");
422 assert_eq!(inbox[0].0.from_agent, "claude");
423
424 let count_after = relay.unread_count("s2");
426 assert_eq!(count_after, 0);
427
428 let _ = fs::remove_dir_all(&dir);
429 }
430
431 #[test]
432 fn test_direct_message() {
433 let dir = temp_dir();
434 let relay = Relay::new(dir.clone());
435
436 relay.register("claude", "s1", 99999);
437 relay.register("gemini", "s2", 99998);
438 relay.register("gpt", "s3", 99997);
439
440 relay.send("s1", "claude", Some("s2"), "private to gemini");
442
443 assert_eq!(relay.unread_count("s2"), 1);
444 assert_eq!(relay.unread_count("s3"), 0); let _ = fs::remove_dir_all(&dir);
447 }
448
449 #[test]
450 fn test_broadcast() {
451 let dir = temp_dir();
452 let relay = Relay::new(dir.clone());
453
454 relay.register("claude", "s1", 99999);
455 relay.register("gemini", "s2", 99998);
456 relay.register("gpt", "s3", 99997);
457
458 relay.send("s1", "claude", None, "broadcast to all");
459
460 assert_eq!(relay.unread_count("s2"), 1);
461 assert_eq!(relay.unread_count("s3"), 1);
462 assert_eq!(relay.unread_count("s1"), 0); let _ = fs::remove_dir_all(&dir);
465 }
466
467 #[test]
468 fn test_unregister() {
469 let dir = temp_dir();
470 let relay = Relay::new(dir.clone());
471
472 relay.register("claude", "s1", 99999);
473 assert_eq!(relay.agents().len(), 1);
474
475 relay.unregister("s1");
476 assert_eq!(relay.agents().len(), 0);
477
478 let _ = fs::remove_dir_all(&dir);
479 }
480
481 #[test]
482 fn test_cleanup_old_messages() {
483 let dir = temp_dir();
484 let relay = Relay::new(dir.clone());
485
486 let mut msg = relay.send("s1", "claude", None, "old message");
488 msg.timestamp = Relay::now() - 7200; let path = relay.messages_dir().join(format!("{}.json", msg.id));
490 let json = serde_json::to_string_pretty(&msg).unwrap();
491 let _ = fs::write(&path, json);
492
493 relay.send("s1", "claude", None, "new message");
494
495 let removed = relay.cleanup_old(3600); assert_eq!(removed, 1);
497
498 let _ = fs::remove_dir_all(&dir);
499 }
500}