Skip to main content

agent_relay/
lib.rs

1//! # agent-relay
2//!
3//! Agent-to-agent messaging for AI coding tools.
4//!
5//! When multiple AI agents (Claude, Gemini, GPT, Copilot) or human developers
6//! work on the same codebase, they need a way to coordinate. `agent-relay`
7//! provides instant, file-based messaging with zero setup — no server, no
8//! database, no config.
9//!
10//! ## Quick Start
11//!
12//! ```no_run
13//! use std::path::PathBuf;
14//! use agent_relay::Relay;
15//!
16//! let relay = Relay::new(PathBuf::from(".relay"));
17//!
18//! // Register this agent
19//! relay.register("claude-opus", "session-1", std::process::id());
20//!
21//! // Send a message
22//! relay.send("session-1", "claude-opus", None, "refactoring auth module — stay away from src/auth/");
23//!
24//! // Another agent checks inbox
25//! let msgs = relay.inbox("session-2", 10);
26//! for (msg, is_new) in &msgs {
27//!     if *is_new {
28//!         println!("[{}] {}: {}", msg.from_agent, msg.from_session, msg.content);
29//!     }
30//! }
31//! ```
32//!
33//! ## CLI
34//!
35//! ```bash
36//! # Register yourself
37//! agent-relay register --agent claude --session my-session
38//!
39//! # Send a broadcast
40//! agent-relay send "heads up: changing the auth module"
41//!
42//! # Send to specific agent
43//! agent-relay send --to session-2 "can you review src/auth.rs?"
44//!
45//! # Check inbox
46//! agent-relay inbox
47//!
48//! # List active agents
49//! agent-relay agents
50//! ```
51//!
52//! Originally extracted from [Aura](https://auravcs.com), the semantic
53//! version control engine.
54
55pub mod client;
56pub mod daemon;
57pub mod git;
58pub mod server;
59
60use serde::{Deserialize, Serialize};
61use std::fs;
62use std::path::PathBuf;
63use std::time::{SystemTime, UNIX_EPOCH};
64
65// ── Data Types ──
66
67#[derive(Serialize, Deserialize, Clone, Debug)]
68pub struct Message {
69    pub id: String,
70    pub from_session: String,
71    pub from_agent: String,
72    pub to_session: Option<String>,
73    pub content: String,
74    pub timestamp: u64,
75    pub read_by: Vec<String>,
76}
77
78#[derive(Serialize, Deserialize, Clone, Debug)]
79pub struct AgentRegistration {
80    pub session_id: String,
81    pub agent_id: String,
82    pub pid: u32,
83    pub registered_at: u64,
84    pub last_heartbeat: u64,
85    pub metadata: serde_json::Value,
86}
87
88// ── Relay ──
89
90/// The main relay hub. All operations are local filesystem — no network.
91///
92/// Create one per repo/project with a shared `base_dir` path. Every agent
93/// that uses the same path will see each other's messages.
94pub struct Relay {
95    pub base_dir: PathBuf,
96}
97
98impl Relay {
99    /// Create a new relay rooted at the given directory.
100    ///
101    /// Typical usage: `Relay::new(".relay".into())` at the repo root.
102    /// All agents must use the same path to see each other.
103    pub fn new(base_dir: PathBuf) -> Self {
104        Self { base_dir }
105    }
106
107    fn messages_dir(&self) -> PathBuf {
108        self.base_dir.join("messages")
109    }
110
111    fn agents_dir(&self) -> PathBuf {
112        self.base_dir.join("agents")
113    }
114
115    fn ensure_dirs(&self) {
116        let _ = fs::create_dir_all(self.messages_dir());
117        let _ = fs::create_dir_all(self.agents_dir());
118    }
119
120    fn now() -> u64 {
121        SystemTime::now()
122            .duration_since(UNIX_EPOCH)
123            .unwrap_or_default()
124            .as_secs()
125    }
126
127    fn atomic_write(path: &PathBuf, data: &[u8]) -> Result<(), String> {
128        let tmp = path.with_extension("tmp");
129        fs::write(&tmp, data).map_err(|e| format!("Write error: {}", e))?;
130        fs::rename(&tmp, path).map_err(|e| format!("Rename error: {}", e))?;
131        Ok(())
132    }
133
134    // ── Agent Registration ──
135
136    /// Register an agent so others can discover it.
137    pub fn register(&self, agent_id: &str, session_id: &str, pid: u32) -> AgentRegistration {
138        self.register_with_metadata(agent_id, session_id, pid, serde_json::json!({}))
139    }
140
141    /// Register with custom metadata (model name, capabilities, etc).
142    pub fn register_with_metadata(
143        &self,
144        agent_id: &str,
145        session_id: &str,
146        pid: u32,
147        metadata: serde_json::Value,
148    ) -> AgentRegistration {
149        self.ensure_dirs();
150        let reg = AgentRegistration {
151            session_id: session_id.to_string(),
152            agent_id: agent_id.to_string(),
153            pid,
154            registered_at: Self::now(),
155            last_heartbeat: Self::now(),
156            metadata,
157        };
158        let path = self.agents_dir().join(format!("{}.json", session_id));
159        if let Ok(json) = serde_json::to_string_pretty(&reg) {
160            let _ = Self::atomic_write(&path, json.as_bytes());
161        }
162        reg
163    }
164
165    /// Update heartbeat to signal this agent is still alive.
166    pub fn heartbeat(&self, session_id: &str) {
167        let path = self.agents_dir().join(format!("{}.json", session_id));
168        if let Ok(content) = fs::read_to_string(&path) {
169            if let Ok(mut reg) = serde_json::from_str::<AgentRegistration>(&content) {
170                reg.last_heartbeat = Self::now();
171                if let Ok(json) = serde_json::to_string_pretty(&reg) {
172                    let _ = Self::atomic_write(&path, json.as_bytes());
173                }
174            }
175        }
176    }
177
178    /// Unregister an agent (cleanup on exit).
179    pub fn unregister(&self, session_id: &str) {
180        let path = self.agents_dir().join(format!("{}.json", session_id));
181        let _ = fs::remove_file(&path);
182    }
183
184    /// List all registered agents.
185    pub fn agents(&self) -> Vec<AgentRegistration> {
186        self.ensure_dirs();
187        let dir = self.agents_dir();
188        let mut agents = Vec::new();
189        if let Ok(entries) = fs::read_dir(&dir) {
190            for entry in entries.flatten() {
191                if entry.path().extension().is_some_and(|x| x == "json") {
192                    if let Ok(content) = fs::read_to_string(entry.path()) {
193                        if let Ok(reg) = serde_json::from_str::<AgentRegistration>(&content) {
194                            agents.push(reg);
195                        }
196                    }
197                }
198            }
199        }
200        agents.sort_by(|a, b| b.last_heartbeat.cmp(&a.last_heartbeat));
201        agents
202    }
203
204    /// Remove registrations for agents whose PID is no longer alive.
205    pub fn cleanup_dead(&self) -> usize {
206        self.ensure_dirs();
207        let agents = self.agents();
208        let mut removed = 0;
209        for agent in &agents {
210            if !is_pid_alive(agent.pid) {
211                let path = self.agents_dir().join(format!("{}.json", agent.session_id));
212                let _ = fs::remove_file(&path);
213                removed += 1;
214            }
215        }
216        removed
217    }
218
219    // ── Messaging ──
220
221    /// Send a message. `to_session: None` = broadcast to all agents.
222    pub fn send(
223        &self,
224        from_session: &str,
225        from_agent: &str,
226        to_session: Option<&str>,
227        content: &str,
228    ) -> Message {
229        self.ensure_dirs();
230        let msg = Message {
231            id: format!("msg-{}", &uuid::Uuid::new_v4().to_string()[..8]),
232            from_session: from_session.to_string(),
233            from_agent: from_agent.to_string(),
234            to_session: to_session.map(|s| s.to_string()),
235            content: content.to_string(),
236            timestamp: Self::now(),
237            read_by: vec![from_session.to_string()],
238        };
239        let path = self.messages_dir().join(format!("{}.json", msg.id));
240        if let Ok(json) = serde_json::to_string_pretty(&msg) {
241            let _ = Self::atomic_write(&path, json.as_bytes());
242        }
243        msg
244    }
245
246    /// Read inbox: returns messages with a flag indicating if newly read.
247    /// Marks all returned messages as read by this session.
248    pub fn inbox(&self, session_id: &str, limit: usize) -> Vec<(Message, bool)> {
249        self.ensure_dirs();
250        let dir = self.messages_dir();
251        let mut messages = Vec::new();
252
253        if let Ok(entries) = fs::read_dir(&dir) {
254            for entry in entries.flatten() {
255                if entry.path().extension().is_some_and(|x| x == "json") {
256                    if let Ok(content) = fs::read_to_string(entry.path()) {
257                        if let Ok(msg) = serde_json::from_str::<Message>(&content) {
258                            let dominated = msg.to_session.is_none()
259                                || msg.to_session.as_deref() == Some(session_id)
260                                || msg.from_session == session_id;
261                            if dominated {
262                                messages.push((entry.path(), msg));
263                            }
264                        }
265                    }
266                }
267            }
268        }
269
270        messages.sort_by(|a, b| b.1.timestamp.cmp(&a.1.timestamp));
271
272        let mut result = Vec::new();
273        for entry in &mut messages {
274            let was_unread = !entry.1.read_by.contains(&session_id.to_string());
275            if was_unread {
276                entry.1.read_by.push(session_id.to_string());
277                if let Ok(json) = serde_json::to_string_pretty(&entry.1) {
278                    let _ = Self::atomic_write(&entry.0, json.as_bytes());
279                }
280            }
281            result.push((entry.1.clone(), was_unread));
282        }
283
284        result.into_iter().take(limit).collect()
285    }
286
287    /// Get unread messages without marking them as read.
288    pub fn unread(&self, session_id: &str) -> Vec<Message> {
289        let dir = self.messages_dir();
290        let mut unread = Vec::new();
291        if let Ok(entries) = fs::read_dir(dir) {
292            for entry in entries.flatten() {
293                if entry.path().extension().is_some_and(|x| x == "json") {
294                    if let Ok(content) = fs::read_to_string(entry.path()) {
295                        if let Ok(msg) = serde_json::from_str::<Message>(&content) {
296                            let dominated = msg.to_session.is_none()
297                                || msg.to_session.as_deref() == Some(session_id);
298                            if dominated
299                                && msg.from_session != session_id
300                                && !msg.read_by.contains(&session_id.to_string())
301                            {
302                                unread.push(msg);
303                            }
304                        }
305                    }
306                }
307            }
308        }
309        unread.sort_by(|a, b| b.timestamp.cmp(&a.timestamp));
310        unread
311    }
312
313    /// Count unread messages for a session.
314    pub fn unread_count(&self, session_id: &str) -> u64 {
315        self.unread(session_id).len() as u64
316    }
317
318    /// Delete messages older than `max_age_secs`.
319    pub fn cleanup_old(&self, max_age_secs: u64) -> usize {
320        let dir = self.messages_dir();
321        let now = Self::now();
322        let mut removed = 0;
323        if let Ok(entries) = fs::read_dir(dir) {
324            for entry in entries.flatten() {
325                if entry.path().extension().is_some_and(|x| x == "json") {
326                    if let Ok(content) = fs::read_to_string(entry.path()) {
327                        if let Ok(msg) = serde_json::from_str::<Message>(&content) {
328                            if now - msg.timestamp > max_age_secs {
329                                let _ = fs::remove_file(entry.path());
330                                removed += 1;
331                            }
332                        }
333                    }
334                }
335            }
336        }
337        removed
338    }
339
340    // ── Watch ──
341
342    /// Poll for new messages. Returns unread count.
343    /// Useful for integration: call this in a loop and trigger actions
344    /// (like spawning a background AI session) when count > 0.
345    pub fn poll(&self, session_id: &str) -> u64 {
346        self.unread_count(session_id)
347    }
348}
349
350/// Check if a PID is still running (cross-platform via sysinfo-less approach).
351fn is_pid_alive(pid: u32) -> bool {
352    // Use kill(pid, 0) on Unix — returns 0 if process exists
353    #[cfg(unix)]
354    {
355        unsafe { libc_kill(pid as i32, 0) == 0 }
356    }
357    #[cfg(not(unix))]
358    {
359        // Fallback: assume alive (better safe than garbage-collecting live agents)
360        let _ = pid;
361        true
362    }
363}
364
365#[cfg(unix)]
366extern "C" {
367    fn kill(pid: i32, sig: i32) -> i32;
368}
369
370#[cfg(unix)]
371unsafe fn libc_kill(pid: i32, sig: i32) -> i32 {
372    unsafe { kill(pid, sig) }
373}
374
375#[cfg(test)]
376mod tests {
377    use super::*;
378
379    fn temp_dir() -> PathBuf {
380        let dir = std::env::temp_dir().join(format!("agent-relay-test-{}", uuid::Uuid::new_v4()));
381        let _ = fs::create_dir_all(&dir);
382        dir
383    }
384
385    #[test]
386    fn test_register_and_list_agents() {
387        let dir = temp_dir();
388        let relay = Relay::new(dir.clone());
389
390        relay.register("claude", "s1", 99999);
391        relay.register("gemini", "s2", 99998);
392
393        let agents = relay.agents();
394        assert_eq!(agents.len(), 2);
395
396        let ids: std::collections::HashSet<String> =
397            agents.iter().map(|a| a.agent_id.clone()).collect();
398        assert!(ids.contains("claude"));
399        assert!(ids.contains("gemini"));
400
401        let _ = fs::remove_dir_all(&dir);
402    }
403
404    #[test]
405    fn test_send_and_receive() {
406        let dir = temp_dir();
407        let relay = Relay::new(dir.clone());
408
409        relay.register("claude", "s1", 99999);
410        relay.register("gemini", "s2", 99998);
411
412        relay.send("s1", "claude", None, "hello from claude");
413
414        let count = relay.unread_count("s2");
415        assert_eq!(count, 1);
416
417        let inbox = relay.inbox("s2", 10);
418        assert_eq!(inbox.len(), 1);
419        assert!(inbox[0].1); // newly read
420        assert_eq!(inbox[0].0.content, "hello from claude");
421        assert_eq!(inbox[0].0.from_agent, "claude");
422
423        // Should be read now
424        let count_after = relay.unread_count("s2");
425        assert_eq!(count_after, 0);
426
427        let _ = fs::remove_dir_all(&dir);
428    }
429
430    #[test]
431    fn test_direct_message() {
432        let dir = temp_dir();
433        let relay = Relay::new(dir.clone());
434
435        relay.register("claude", "s1", 99999);
436        relay.register("gemini", "s2", 99998);
437        relay.register("gpt", "s3", 99997);
438
439        // DM to s2 only
440        relay.send("s1", "claude", Some("s2"), "private to gemini");
441
442        assert_eq!(relay.unread_count("s2"), 1);
443        assert_eq!(relay.unread_count("s3"), 0); // gpt shouldn't see it
444
445        let _ = fs::remove_dir_all(&dir);
446    }
447
448    #[test]
449    fn test_broadcast() {
450        let dir = temp_dir();
451        let relay = Relay::new(dir.clone());
452
453        relay.register("claude", "s1", 99999);
454        relay.register("gemini", "s2", 99998);
455        relay.register("gpt", "s3", 99997);
456
457        relay.send("s1", "claude", None, "broadcast to all");
458
459        assert_eq!(relay.unread_count("s2"), 1);
460        assert_eq!(relay.unread_count("s3"), 1);
461        assert_eq!(relay.unread_count("s1"), 0); // sender doesn't see own msg as unread
462
463        let _ = fs::remove_dir_all(&dir);
464    }
465
466    #[test]
467    fn test_unregister() {
468        let dir = temp_dir();
469        let relay = Relay::new(dir.clone());
470
471        relay.register("claude", "s1", 99999);
472        assert_eq!(relay.agents().len(), 1);
473
474        relay.unregister("s1");
475        assert_eq!(relay.agents().len(), 0);
476
477        let _ = fs::remove_dir_all(&dir);
478    }
479
480    #[test]
481    fn test_cleanup_old_messages() {
482        let dir = temp_dir();
483        let relay = Relay::new(dir.clone());
484
485        // Create a message and manually backdate it
486        let mut msg = relay.send("s1", "claude", None, "old message");
487        msg.timestamp = Relay::now() - 7200; // 2 hours ago
488        let path = relay.messages_dir().join(format!("{}.json", msg.id));
489        let json = serde_json::to_string_pretty(&msg).unwrap();
490        let _ = fs::write(&path, json);
491
492        relay.send("s1", "claude", None, "new message");
493
494        let removed = relay.cleanup_old(3600); // 1 hour
495        assert_eq!(removed, 1);
496
497        let _ = fs::remove_dir_all(&dir);
498    }
499}