Skip to main content

agent_relay/
lib.rs

1//! # agent-relay
2//!
3//! Agent-to-agent messaging for AI coding tools.
4//!
5//! When multiple AI agents (Claude, Gemini, GPT, Copilot) or human developers
6//! work on the same codebase, they need a way to coordinate. `agent-relay`
7//! provides instant, file-based messaging with zero setup — no server, no
8//! database, no config.
9//!
10//! ## Quick Start
11//!
12//! ```no_run
13//! use std::path::PathBuf;
14//! use agent_relay::Relay;
15//!
16//! let relay = Relay::new(PathBuf::from(".relay"));
17//!
18//! // Register this agent
19//! relay.register("claude-opus", "session-1", std::process::id());
20//!
21//! // Send a message
22//! relay.send("session-1", "claude-opus", None, "refactoring auth module — stay away from src/auth/");
23//!
24//! // Another agent checks inbox
25//! let msgs = relay.inbox("session-2", 10);
26//! for (msg, is_new) in &msgs {
27//!     if *is_new {
28//!         println!("[{}] {}: {}", msg.from_agent, msg.from_session, msg.content);
29//!     }
30//! }
31//! ```
32//!
33//! ## CLI
34//!
35//! ```bash
36//! # Register yourself
37//! agent-relay register --agent claude --session my-session
38//!
39//! # Send a broadcast
40//! agent-relay send "heads up: changing the auth module"
41//!
42//! # Send to specific agent
43//! agent-relay send --to session-2 "can you review src/auth.rs?"
44//!
45//! # Check inbox
46//! agent-relay inbox
47//!
48//! # List active agents
49//! agent-relay agents
50//! ```
51//!
52//! Originally extracted from [Aura](https://auravcs.com), the semantic
53//! version control engine.
54
55pub mod client;
56pub mod git;
57pub mod server;
58
59use serde::{Deserialize, Serialize};
60use std::fs;
61use std::path::PathBuf;
62use std::time::{SystemTime, UNIX_EPOCH};
63
64// ── Data Types ──
65
66#[derive(Serialize, Deserialize, Clone, Debug)]
67pub struct Message {
68    pub id: String,
69    pub from_session: String,
70    pub from_agent: String,
71    pub to_session: Option<String>,
72    pub content: String,
73    pub timestamp: u64,
74    pub read_by: Vec<String>,
75}
76
77#[derive(Serialize, Deserialize, Clone, Debug)]
78pub struct AgentRegistration {
79    pub session_id: String,
80    pub agent_id: String,
81    pub pid: u32,
82    pub registered_at: u64,
83    pub last_heartbeat: u64,
84    pub metadata: serde_json::Value,
85}
86
87// ── Relay ──
88
89/// The main relay hub. All operations are local filesystem — no network.
90///
91/// Create one per repo/project with a shared `base_dir` path. Every agent
92/// that uses the same path will see each other's messages.
93pub struct Relay {
94    pub base_dir: PathBuf,
95}
96
97impl Relay {
98    /// Create a new relay rooted at the given directory.
99    ///
100    /// Typical usage: `Relay::new(".relay".into())` at the repo root.
101    /// All agents must use the same path to see each other.
102    pub fn new(base_dir: PathBuf) -> Self {
103        Self { base_dir }
104    }
105
106    fn messages_dir(&self) -> PathBuf {
107        self.base_dir.join("messages")
108    }
109
110    fn agents_dir(&self) -> PathBuf {
111        self.base_dir.join("agents")
112    }
113
114    fn ensure_dirs(&self) {
115        let _ = fs::create_dir_all(self.messages_dir());
116        let _ = fs::create_dir_all(self.agents_dir());
117    }
118
119    fn now() -> u64 {
120        SystemTime::now()
121            .duration_since(UNIX_EPOCH)
122            .unwrap_or_default()
123            .as_secs()
124    }
125
126    fn atomic_write(path: &PathBuf, data: &[u8]) -> Result<(), String> {
127        let tmp = path.with_extension("tmp");
128        fs::write(&tmp, data).map_err(|e| format!("Write error: {}", e))?;
129        fs::rename(&tmp, path).map_err(|e| format!("Rename error: {}", e))?;
130        Ok(())
131    }
132
133    // ── Agent Registration ──
134
135    /// Register an agent so others can discover it.
136    pub fn register(&self, agent_id: &str, session_id: &str, pid: u32) -> AgentRegistration {
137        self.register_with_metadata(agent_id, session_id, pid, serde_json::json!({}))
138    }
139
140    /// Register with custom metadata (model name, capabilities, etc).
141    pub fn register_with_metadata(
142        &self,
143        agent_id: &str,
144        session_id: &str,
145        pid: u32,
146        metadata: serde_json::Value,
147    ) -> AgentRegistration {
148        self.ensure_dirs();
149        let reg = AgentRegistration {
150            session_id: session_id.to_string(),
151            agent_id: agent_id.to_string(),
152            pid,
153            registered_at: Self::now(),
154            last_heartbeat: Self::now(),
155            metadata,
156        };
157        let path = self.agents_dir().join(format!("{}.json", session_id));
158        if let Ok(json) = serde_json::to_string_pretty(&reg) {
159            let _ = Self::atomic_write(&path, json.as_bytes());
160        }
161        reg
162    }
163
164    /// Update heartbeat to signal this agent is still alive.
165    pub fn heartbeat(&self, session_id: &str) {
166        let path = self.agents_dir().join(format!("{}.json", session_id));
167        if let Ok(content) = fs::read_to_string(&path) {
168            if let Ok(mut reg) = serde_json::from_str::<AgentRegistration>(&content) {
169                reg.last_heartbeat = Self::now();
170                if let Ok(json) = serde_json::to_string_pretty(&reg) {
171                    let _ = Self::atomic_write(&path, json.as_bytes());
172                }
173            }
174        }
175    }
176
177    /// Unregister an agent (cleanup on exit).
178    pub fn unregister(&self, session_id: &str) {
179        let path = self.agents_dir().join(format!("{}.json", session_id));
180        let _ = fs::remove_file(&path);
181    }
182
183    /// List all registered agents.
184    pub fn agents(&self) -> Vec<AgentRegistration> {
185        self.ensure_dirs();
186        let dir = self.agents_dir();
187        let mut agents = Vec::new();
188        if let Ok(entries) = fs::read_dir(&dir) {
189            for entry in entries.flatten() {
190                if entry.path().extension().is_some_and(|x| x == "json") {
191                    if let Ok(content) = fs::read_to_string(entry.path()) {
192                        if let Ok(reg) = serde_json::from_str::<AgentRegistration>(&content) {
193                            agents.push(reg);
194                        }
195                    }
196                }
197            }
198        }
199        agents.sort_by(|a, b| b.last_heartbeat.cmp(&a.last_heartbeat));
200        agents
201    }
202
203    /// Remove registrations for agents whose PID is no longer alive.
204    pub fn cleanup_dead(&self) -> usize {
205        self.ensure_dirs();
206        let agents = self.agents();
207        let mut removed = 0;
208        for agent in &agents {
209            if !is_pid_alive(agent.pid) {
210                let path = self.agents_dir().join(format!("{}.json", agent.session_id));
211                let _ = fs::remove_file(&path);
212                removed += 1;
213            }
214        }
215        removed
216    }
217
218    // ── Messaging ──
219
220    /// Send a message. `to_session: None` = broadcast to all agents.
221    pub fn send(
222        &self,
223        from_session: &str,
224        from_agent: &str,
225        to_session: Option<&str>,
226        content: &str,
227    ) -> Message {
228        self.ensure_dirs();
229        let msg = Message {
230            id: format!("msg-{}", &uuid::Uuid::new_v4().to_string()[..8]),
231            from_session: from_session.to_string(),
232            from_agent: from_agent.to_string(),
233            to_session: to_session.map(|s| s.to_string()),
234            content: content.to_string(),
235            timestamp: Self::now(),
236            read_by: vec![from_session.to_string()],
237        };
238        let path = self.messages_dir().join(format!("{}.json", msg.id));
239        if let Ok(json) = serde_json::to_string_pretty(&msg) {
240            let _ = Self::atomic_write(&path, json.as_bytes());
241        }
242        msg
243    }
244
245    /// Read inbox: returns messages with a flag indicating if newly read.
246    /// Marks all returned messages as read by this session.
247    pub fn inbox(&self, session_id: &str, limit: usize) -> Vec<(Message, bool)> {
248        self.ensure_dirs();
249        let dir = self.messages_dir();
250        let mut messages = Vec::new();
251
252        if let Ok(entries) = fs::read_dir(&dir) {
253            for entry in entries.flatten() {
254                if entry.path().extension().is_some_and(|x| x == "json") {
255                    if let Ok(content) = fs::read_to_string(entry.path()) {
256                        if let Ok(msg) = serde_json::from_str::<Message>(&content) {
257                            let dominated = msg.to_session.is_none()
258                                || msg.to_session.as_deref() == Some(session_id)
259                                || msg.from_session == session_id;
260                            if dominated {
261                                messages.push((entry.path(), msg));
262                            }
263                        }
264                    }
265                }
266            }
267        }
268
269        messages.sort_by(|a, b| b.1.timestamp.cmp(&a.1.timestamp));
270
271        let mut result = Vec::new();
272        for entry in &mut messages {
273            let was_unread = !entry.1.read_by.contains(&session_id.to_string());
274            if was_unread {
275                entry.1.read_by.push(session_id.to_string());
276                if let Ok(json) = serde_json::to_string_pretty(&entry.1) {
277                    let _ = Self::atomic_write(&entry.0, json.as_bytes());
278                }
279            }
280            result.push((entry.1.clone(), was_unread));
281        }
282
283        result.into_iter().take(limit).collect()
284    }
285
286    /// Get unread messages without marking them as read.
287    pub fn unread(&self, session_id: &str) -> Vec<Message> {
288        let dir = self.messages_dir();
289        let mut unread = Vec::new();
290        if let Ok(entries) = fs::read_dir(dir) {
291            for entry in entries.flatten() {
292                if entry.path().extension().is_some_and(|x| x == "json") {
293                    if let Ok(content) = fs::read_to_string(entry.path()) {
294                        if let Ok(msg) = serde_json::from_str::<Message>(&content) {
295                            let dominated = msg.to_session.is_none()
296                                || msg.to_session.as_deref() == Some(session_id);
297                            if dominated
298                                && msg.from_session != session_id
299                                && !msg.read_by.contains(&session_id.to_string())
300                            {
301                                unread.push(msg);
302                            }
303                        }
304                    }
305                }
306            }
307        }
308        unread.sort_by(|a, b| b.timestamp.cmp(&a.timestamp));
309        unread
310    }
311
312    /// Count unread messages for a session.
313    pub fn unread_count(&self, session_id: &str) -> u64 {
314        self.unread(session_id).len() as u64
315    }
316
317    /// Delete messages older than `max_age_secs`.
318    pub fn cleanup_old(&self, max_age_secs: u64) -> usize {
319        let dir = self.messages_dir();
320        let now = Self::now();
321        let mut removed = 0;
322        if let Ok(entries) = fs::read_dir(dir) {
323            for entry in entries.flatten() {
324                if entry.path().extension().is_some_and(|x| x == "json") {
325                    if let Ok(content) = fs::read_to_string(entry.path()) {
326                        if let Ok(msg) = serde_json::from_str::<Message>(&content) {
327                            if now - msg.timestamp > max_age_secs {
328                                let _ = fs::remove_file(entry.path());
329                                removed += 1;
330                            }
331                        }
332                    }
333                }
334            }
335        }
336        removed
337    }
338
339    // ── Watch ──
340
341    /// Poll for new messages. Returns unread count.
342    /// Useful for integration: call this in a loop and trigger actions
343    /// (like spawning a background AI session) when count > 0.
344    pub fn poll(&self, session_id: &str) -> u64 {
345        self.unread_count(session_id)
346    }
347}
348
349/// Check if a PID is still running (cross-platform via sysinfo-less approach).
350fn is_pid_alive(pid: u32) -> bool {
351    // Use kill(pid, 0) on Unix — returns 0 if process exists
352    #[cfg(unix)]
353    {
354        unsafe { libc_kill(pid as i32, 0) == 0 }
355    }
356    #[cfg(not(unix))]
357    {
358        // Fallback: assume alive (better safe than garbage-collecting live agents)
359        let _ = pid;
360        true
361    }
362}
363
364#[cfg(unix)]
365extern "C" {
366    fn kill(pid: i32, sig: i32) -> i32;
367}
368
369#[cfg(unix)]
370unsafe fn libc_kill(pid: i32, sig: i32) -> i32 {
371    unsafe { kill(pid, sig) }
372}
373
374#[cfg(test)]
375mod tests {
376    use super::*;
377
378    fn temp_dir() -> PathBuf {
379        let dir = std::env::temp_dir().join(format!("agent-relay-test-{}", uuid::Uuid::new_v4()));
380        let _ = fs::create_dir_all(&dir);
381        dir
382    }
383
384    #[test]
385    fn test_register_and_list_agents() {
386        let dir = temp_dir();
387        let relay = Relay::new(dir.clone());
388
389        relay.register("claude", "s1", 99999);
390        relay.register("gemini", "s2", 99998);
391
392        let agents = relay.agents();
393        assert_eq!(agents.len(), 2);
394
395        let ids: std::collections::HashSet<String> =
396            agents.iter().map(|a| a.agent_id.clone()).collect();
397        assert!(ids.contains("claude"));
398        assert!(ids.contains("gemini"));
399
400        let _ = fs::remove_dir_all(&dir);
401    }
402
403    #[test]
404    fn test_send_and_receive() {
405        let dir = temp_dir();
406        let relay = Relay::new(dir.clone());
407
408        relay.register("claude", "s1", 99999);
409        relay.register("gemini", "s2", 99998);
410
411        relay.send("s1", "claude", None, "hello from claude");
412
413        let count = relay.unread_count("s2");
414        assert_eq!(count, 1);
415
416        let inbox = relay.inbox("s2", 10);
417        assert_eq!(inbox.len(), 1);
418        assert!(inbox[0].1); // newly read
419        assert_eq!(inbox[0].0.content, "hello from claude");
420        assert_eq!(inbox[0].0.from_agent, "claude");
421
422        // Should be read now
423        let count_after = relay.unread_count("s2");
424        assert_eq!(count_after, 0);
425
426        let _ = fs::remove_dir_all(&dir);
427    }
428
429    #[test]
430    fn test_direct_message() {
431        let dir = temp_dir();
432        let relay = Relay::new(dir.clone());
433
434        relay.register("claude", "s1", 99999);
435        relay.register("gemini", "s2", 99998);
436        relay.register("gpt", "s3", 99997);
437
438        // DM to s2 only
439        relay.send("s1", "claude", Some("s2"), "private to gemini");
440
441        assert_eq!(relay.unread_count("s2"), 1);
442        assert_eq!(relay.unread_count("s3"), 0); // gpt shouldn't see it
443
444        let _ = fs::remove_dir_all(&dir);
445    }
446
447    #[test]
448    fn test_broadcast() {
449        let dir = temp_dir();
450        let relay = Relay::new(dir.clone());
451
452        relay.register("claude", "s1", 99999);
453        relay.register("gemini", "s2", 99998);
454        relay.register("gpt", "s3", 99997);
455
456        relay.send("s1", "claude", None, "broadcast to all");
457
458        assert_eq!(relay.unread_count("s2"), 1);
459        assert_eq!(relay.unread_count("s3"), 1);
460        assert_eq!(relay.unread_count("s1"), 0); // sender doesn't see own msg as unread
461
462        let _ = fs::remove_dir_all(&dir);
463    }
464
465    #[test]
466    fn test_unregister() {
467        let dir = temp_dir();
468        let relay = Relay::new(dir.clone());
469
470        relay.register("claude", "s1", 99999);
471        assert_eq!(relay.agents().len(), 1);
472
473        relay.unregister("s1");
474        assert_eq!(relay.agents().len(), 0);
475
476        let _ = fs::remove_dir_all(&dir);
477    }
478
479    #[test]
480    fn test_cleanup_old_messages() {
481        let dir = temp_dir();
482        let relay = Relay::new(dir.clone());
483
484        // Create a message and manually backdate it
485        let mut msg = relay.send("s1", "claude", None, "old message");
486        msg.timestamp = Relay::now() - 7200; // 2 hours ago
487        let path = relay.messages_dir().join(format!("{}.json", msg.id));
488        let json = serde_json::to_string_pretty(&msg).unwrap();
489        let _ = fs::write(&path, json);
490
491        relay.send("s1", "claude", None, "new message");
492
493        let removed = relay.cleanup_old(3600); // 1 hour
494        assert_eq!(removed, 1);
495
496        let _ = fs::remove_dir_all(&dir);
497    }
498}