Skip to main content

agent_relay/
lib.rs

1//! # agent-relay
2//!
3//! Agent-to-agent messaging for AI coding tools.
4//!
5//! When multiple AI agents (Claude, Gemini, GPT, Copilot) or human developers
6//! work on the same codebase, they need a way to coordinate. `agent-relay`
7//! provides instant, file-based messaging with zero setup — no server, no
8//! database, no config.
9//!
10//! ## Quick Start
11//!
12//! ```no_run
13//! use std::path::PathBuf;
14//! use agent_relay::Relay;
15//!
16//! let relay = Relay::new(PathBuf::from(".relay"));
17//!
18//! // Register this agent
19//! relay.register("claude-opus", "session-1", std::process::id());
20//!
21//! // Send a message
22//! relay.send("session-1", "claude-opus", None, "refactoring auth module — stay away from src/auth/");
23//!
24//! // Another agent checks inbox
25//! let msgs = relay.inbox("session-2", 10);
26//! for (msg, is_new) in &msgs {
27//!     if *is_new {
28//!         println!("[{}] {}: {}", msg.from_agent, msg.from_session, msg.content);
29//!     }
30//! }
31//! ```
32//!
33//! ## CLI
34//!
35//! ```bash
36//! # Register yourself
37//! agent-relay register --agent claude --session my-session
38//!
39//! # Send a broadcast
40//! agent-relay send "heads up: changing the auth module"
41//!
42//! # Send to specific agent
43//! agent-relay send --to session-2 "can you review src/auth.rs?"
44//!
45//! # Check inbox
46//! agent-relay inbox
47//!
48//! # List active agents
49//! agent-relay agents
50//! ```
51//!
52//! Originally extracted from [Aura](https://auravcs.com), the semantic
53//! version control engine.
54
55pub mod git;
56
57use serde::{Deserialize, Serialize};
58use std::fs;
59use std::path::PathBuf;
60use std::time::{SystemTime, UNIX_EPOCH};
61
62// ── Data Types ──
63
64#[derive(Serialize, Deserialize, Clone, Debug)]
65pub struct Message {
66    pub id: String,
67    pub from_session: String,
68    pub from_agent: String,
69    pub to_session: Option<String>,
70    pub content: String,
71    pub timestamp: u64,
72    pub read_by: Vec<String>,
73}
74
75#[derive(Serialize, Deserialize, Clone, Debug)]
76pub struct AgentRegistration {
77    pub session_id: String,
78    pub agent_id: String,
79    pub pid: u32,
80    pub registered_at: u64,
81    pub last_heartbeat: u64,
82    pub metadata: serde_json::Value,
83}
84
85// ── Relay ──
86
87/// The main relay hub. All operations are local filesystem — no network.
88///
89/// Create one per repo/project with a shared `base_dir` path. Every agent
90/// that uses the same path will see each other's messages.
91pub struct Relay {
92    pub base_dir: PathBuf,
93}
94
95impl Relay {
96    /// Create a new relay rooted at the given directory.
97    ///
98    /// Typical usage: `Relay::new(".relay".into())` at the repo root.
99    /// All agents must use the same path to see each other.
100    pub fn new(base_dir: PathBuf) -> Self {
101        Self { base_dir }
102    }
103
104    fn messages_dir(&self) -> PathBuf {
105        self.base_dir.join("messages")
106    }
107
108    fn agents_dir(&self) -> PathBuf {
109        self.base_dir.join("agents")
110    }
111
112    fn ensure_dirs(&self) {
113        let _ = fs::create_dir_all(self.messages_dir());
114        let _ = fs::create_dir_all(self.agents_dir());
115    }
116
117    fn now() -> u64 {
118        SystemTime::now()
119            .duration_since(UNIX_EPOCH)
120            .unwrap_or_default()
121            .as_secs()
122    }
123
124    fn atomic_write(path: &PathBuf, data: &[u8]) -> Result<(), String> {
125        let tmp = path.with_extension("tmp");
126        fs::write(&tmp, data).map_err(|e| format!("Write error: {}", e))?;
127        fs::rename(&tmp, path).map_err(|e| format!("Rename error: {}", e))?;
128        Ok(())
129    }
130
131    // ── Agent Registration ──
132
133    /// Register an agent so others can discover it.
134    pub fn register(&self, agent_id: &str, session_id: &str, pid: u32) -> AgentRegistration {
135        self.register_with_metadata(agent_id, session_id, pid, serde_json::json!({}))
136    }
137
138    /// Register with custom metadata (model name, capabilities, etc).
139    pub fn register_with_metadata(
140        &self,
141        agent_id: &str,
142        session_id: &str,
143        pid: u32,
144        metadata: serde_json::Value,
145    ) -> AgentRegistration {
146        self.ensure_dirs();
147        let reg = AgentRegistration {
148            session_id: session_id.to_string(),
149            agent_id: agent_id.to_string(),
150            pid,
151            registered_at: Self::now(),
152            last_heartbeat: Self::now(),
153            metadata,
154        };
155        let path = self.agents_dir().join(format!("{}.json", session_id));
156        if let Ok(json) = serde_json::to_string_pretty(&reg) {
157            let _ = Self::atomic_write(&path, json.as_bytes());
158        }
159        reg
160    }
161
162    /// Update heartbeat to signal this agent is still alive.
163    pub fn heartbeat(&self, session_id: &str) {
164        let path = self.agents_dir().join(format!("{}.json", session_id));
165        if let Ok(content) = fs::read_to_string(&path) {
166            if let Ok(mut reg) = serde_json::from_str::<AgentRegistration>(&content) {
167                reg.last_heartbeat = Self::now();
168                if let Ok(json) = serde_json::to_string_pretty(&reg) {
169                    let _ = Self::atomic_write(&path, json.as_bytes());
170                }
171            }
172        }
173    }
174
175    /// Unregister an agent (cleanup on exit).
176    pub fn unregister(&self, session_id: &str) {
177        let path = self.agents_dir().join(format!("{}.json", session_id));
178        let _ = fs::remove_file(&path);
179    }
180
181    /// List all registered agents.
182    pub fn agents(&self) -> Vec<AgentRegistration> {
183        self.ensure_dirs();
184        let dir = self.agents_dir();
185        let mut agents = Vec::new();
186        if let Ok(entries) = fs::read_dir(&dir) {
187            for entry in entries.flatten() {
188                if entry.path().extension().is_some_and(|x| x == "json") {
189                    if let Ok(content) = fs::read_to_string(entry.path()) {
190                        if let Ok(reg) = serde_json::from_str::<AgentRegistration>(&content) {
191                            agents.push(reg);
192                        }
193                    }
194                }
195            }
196        }
197        agents.sort_by(|a, b| b.last_heartbeat.cmp(&a.last_heartbeat));
198        agents
199    }
200
201    /// Remove registrations for agents whose PID is no longer alive.
202    pub fn cleanup_dead(&self) -> usize {
203        self.ensure_dirs();
204        let agents = self.agents();
205        let mut removed = 0;
206        for agent in &agents {
207            if !is_pid_alive(agent.pid) {
208                let path = self.agents_dir().join(format!("{}.json", agent.session_id));
209                let _ = fs::remove_file(&path);
210                removed += 1;
211            }
212        }
213        removed
214    }
215
216    // ── Messaging ──
217
218    /// Send a message. `to_session: None` = broadcast to all agents.
219    pub fn send(
220        &self,
221        from_session: &str,
222        from_agent: &str,
223        to_session: Option<&str>,
224        content: &str,
225    ) -> Message {
226        self.ensure_dirs();
227        let msg = Message {
228            id: format!("msg-{}", &uuid::Uuid::new_v4().to_string()[..8]),
229            from_session: from_session.to_string(),
230            from_agent: from_agent.to_string(),
231            to_session: to_session.map(|s| s.to_string()),
232            content: content.to_string(),
233            timestamp: Self::now(),
234            read_by: vec![from_session.to_string()],
235        };
236        let path = self.messages_dir().join(format!("{}.json", msg.id));
237        if let Ok(json) = serde_json::to_string_pretty(&msg) {
238            let _ = Self::atomic_write(&path, json.as_bytes());
239        }
240        msg
241    }
242
243    /// Read inbox: returns messages with a flag indicating if newly read.
244    /// Marks all returned messages as read by this session.
245    pub fn inbox(&self, session_id: &str, limit: usize) -> Vec<(Message, bool)> {
246        self.ensure_dirs();
247        let dir = self.messages_dir();
248        let mut messages = Vec::new();
249
250        if let Ok(entries) = fs::read_dir(&dir) {
251            for entry in entries.flatten() {
252                if entry.path().extension().is_some_and(|x| x == "json") {
253                    if let Ok(content) = fs::read_to_string(entry.path()) {
254                        if let Ok(msg) = serde_json::from_str::<Message>(&content) {
255                            let dominated = msg.to_session.is_none()
256                                || msg.to_session.as_deref() == Some(session_id)
257                                || msg.from_session == session_id;
258                            if dominated {
259                                messages.push((entry.path(), msg));
260                            }
261                        }
262                    }
263                }
264            }
265        }
266
267        messages.sort_by(|a, b| b.1.timestamp.cmp(&a.1.timestamp));
268
269        let mut result = Vec::new();
270        for entry in &mut messages {
271            let was_unread = !entry.1.read_by.contains(&session_id.to_string());
272            if was_unread {
273                entry.1.read_by.push(session_id.to_string());
274                if let Ok(json) = serde_json::to_string_pretty(&entry.1) {
275                    let _ = Self::atomic_write(&entry.0, json.as_bytes());
276                }
277            }
278            result.push((entry.1.clone(), was_unread));
279        }
280
281        result.into_iter().take(limit).collect()
282    }
283
284    /// Get unread messages without marking them as read.
285    pub fn unread(&self, session_id: &str) -> Vec<Message> {
286        let dir = self.messages_dir();
287        let mut unread = Vec::new();
288        if let Ok(entries) = fs::read_dir(dir) {
289            for entry in entries.flatten() {
290                if entry.path().extension().is_some_and(|x| x == "json") {
291                    if let Ok(content) = fs::read_to_string(entry.path()) {
292                        if let Ok(msg) = serde_json::from_str::<Message>(&content) {
293                            let dominated = msg.to_session.is_none()
294                                || msg.to_session.as_deref() == Some(session_id);
295                            if dominated
296                                && msg.from_session != session_id
297                                && !msg.read_by.contains(&session_id.to_string())
298                            {
299                                unread.push(msg);
300                            }
301                        }
302                    }
303                }
304            }
305        }
306        unread.sort_by(|a, b| b.timestamp.cmp(&a.timestamp));
307        unread
308    }
309
310    /// Count unread messages for a session.
311    pub fn unread_count(&self, session_id: &str) -> u64 {
312        self.unread(session_id).len() as u64
313    }
314
315    /// Delete messages older than `max_age_secs`.
316    pub fn cleanup_old(&self, max_age_secs: u64) -> usize {
317        let dir = self.messages_dir();
318        let now = Self::now();
319        let mut removed = 0;
320        if let Ok(entries) = fs::read_dir(dir) {
321            for entry in entries.flatten() {
322                if entry.path().extension().is_some_and(|x| x == "json") {
323                    if let Ok(content) = fs::read_to_string(entry.path()) {
324                        if let Ok(msg) = serde_json::from_str::<Message>(&content) {
325                            if now - msg.timestamp > max_age_secs {
326                                let _ = fs::remove_file(entry.path());
327                                removed += 1;
328                            }
329                        }
330                    }
331                }
332            }
333        }
334        removed
335    }
336
337    // ── Watch ──
338
339    /// Poll for new messages. Returns unread count.
340    /// Useful for integration: call this in a loop and trigger actions
341    /// (like spawning a background AI session) when count > 0.
342    pub fn poll(&self, session_id: &str) -> u64 {
343        self.unread_count(session_id)
344    }
345}
346
347/// Check if a PID is still running (cross-platform via sysinfo-less approach).
348fn is_pid_alive(pid: u32) -> bool {
349    // Use kill(pid, 0) on Unix — returns 0 if process exists
350    #[cfg(unix)]
351    {
352        unsafe { libc_kill(pid as i32, 0) == 0 }
353    }
354    #[cfg(not(unix))]
355    {
356        // Fallback: assume alive (better safe than garbage-collecting live agents)
357        let _ = pid;
358        true
359    }
360}
361
362#[cfg(unix)]
363extern "C" {
364    fn kill(pid: i32, sig: i32) -> i32;
365}
366
367#[cfg(unix)]
368unsafe fn libc_kill(pid: i32, sig: i32) -> i32 {
369    unsafe { kill(pid, sig) }
370}
371
372#[cfg(test)]
373mod tests {
374    use super::*;
375
376    fn temp_dir() -> PathBuf {
377        let dir = std::env::temp_dir().join(format!("agent-relay-test-{}", uuid::Uuid::new_v4()));
378        let _ = fs::create_dir_all(&dir);
379        dir
380    }
381
382    #[test]
383    fn test_register_and_list_agents() {
384        let dir = temp_dir();
385        let relay = Relay::new(dir.clone());
386
387        relay.register("claude", "s1", 99999);
388        relay.register("gemini", "s2", 99998);
389
390        let agents = relay.agents();
391        assert_eq!(agents.len(), 2);
392
393        let ids: std::collections::HashSet<String> =
394            agents.iter().map(|a| a.agent_id.clone()).collect();
395        assert!(ids.contains("claude"));
396        assert!(ids.contains("gemini"));
397
398        let _ = fs::remove_dir_all(&dir);
399    }
400
401    #[test]
402    fn test_send_and_receive() {
403        let dir = temp_dir();
404        let relay = Relay::new(dir.clone());
405
406        relay.register("claude", "s1", 99999);
407        relay.register("gemini", "s2", 99998);
408
409        relay.send("s1", "claude", None, "hello from claude");
410
411        let count = relay.unread_count("s2");
412        assert_eq!(count, 1);
413
414        let inbox = relay.inbox("s2", 10);
415        assert_eq!(inbox.len(), 1);
416        assert!(inbox[0].1); // newly read
417        assert_eq!(inbox[0].0.content, "hello from claude");
418        assert_eq!(inbox[0].0.from_agent, "claude");
419
420        // Should be read now
421        let count_after = relay.unread_count("s2");
422        assert_eq!(count_after, 0);
423
424        let _ = fs::remove_dir_all(&dir);
425    }
426
427    #[test]
428    fn test_direct_message() {
429        let dir = temp_dir();
430        let relay = Relay::new(dir.clone());
431
432        relay.register("claude", "s1", 99999);
433        relay.register("gemini", "s2", 99998);
434        relay.register("gpt", "s3", 99997);
435
436        // DM to s2 only
437        relay.send("s1", "claude", Some("s2"), "private to gemini");
438
439        assert_eq!(relay.unread_count("s2"), 1);
440        assert_eq!(relay.unread_count("s3"), 0); // gpt shouldn't see it
441
442        let _ = fs::remove_dir_all(&dir);
443    }
444
445    #[test]
446    fn test_broadcast() {
447        let dir = temp_dir();
448        let relay = Relay::new(dir.clone());
449
450        relay.register("claude", "s1", 99999);
451        relay.register("gemini", "s2", 99998);
452        relay.register("gpt", "s3", 99997);
453
454        relay.send("s1", "claude", None, "broadcast to all");
455
456        assert_eq!(relay.unread_count("s2"), 1);
457        assert_eq!(relay.unread_count("s3"), 1);
458        assert_eq!(relay.unread_count("s1"), 0); // sender doesn't see own msg as unread
459
460        let _ = fs::remove_dir_all(&dir);
461    }
462
463    #[test]
464    fn test_unregister() {
465        let dir = temp_dir();
466        let relay = Relay::new(dir.clone());
467
468        relay.register("claude", "s1", 99999);
469        assert_eq!(relay.agents().len(), 1);
470
471        relay.unregister("s1");
472        assert_eq!(relay.agents().len(), 0);
473
474        let _ = fs::remove_dir_all(&dir);
475    }
476
477    #[test]
478    fn test_cleanup_old_messages() {
479        let dir = temp_dir();
480        let relay = Relay::new(dir.clone());
481
482        // Create a message and manually backdate it
483        let mut msg = relay.send("s1", "claude", None, "old message");
484        msg.timestamp = Relay::now() - 7200; // 2 hours ago
485        let path = relay.messages_dir().join(format!("{}.json", msg.id));
486        let json = serde_json::to_string_pretty(&msg).unwrap();
487        let _ = fs::write(&path, json);
488
489        relay.send("s1", "claude", None, "new message");
490
491        let removed = relay.cleanup_old(3600); // 1 hour
492        assert_eq!(removed, 1);
493
494        let _ = fs::remove_dir_all(&dir);
495    }
496}