Skip to main content

agent_relay/
lib.rs

1//! # agent-relay
2//!
3//! Agent-to-agent messaging for AI coding tools.
4//!
5//! When multiple AI agents (Claude, Gemini, GPT, Copilot) or human developers
6//! work on the same codebase, they need a way to coordinate. `agent-relay`
7//! provides instant, file-based messaging with zero setup — no server, no
8//! database, no config.
9//!
10//! ## Quick Start
11//!
12//! ```no_run
13//! use std::path::PathBuf;
14//! use agent_relay::Relay;
15//!
16//! let relay = Relay::new(PathBuf::from(".relay"));
17//!
18//! // Register this agent
19//! relay.register("claude-opus", "session-1", std::process::id());
20//!
21//! // Send a message
22//! relay.send("session-1", "claude-opus", None, "refactoring auth module — stay away from src/auth/");
23//!
24//! // Another agent checks inbox
25//! let msgs = relay.inbox("session-2", 10);
26//! for (msg, is_new) in &msgs {
27//!     if *is_new {
28//!         println!("[{}] {}: {}", msg.from_agent, msg.from_session, msg.content);
29//!     }
30//! }
31//! ```
32//!
33//! ## CLI
34//!
35//! ```bash
36//! # Register yourself
37//! agent-relay register --agent claude --session my-session
38//!
39//! # Send a broadcast
40//! agent-relay send "heads up: changing the auth module"
41//!
42//! # Send to specific agent
43//! agent-relay send --to session-2 "can you review src/auth.rs?"
44//!
45//! # Check inbox
46//! agent-relay inbox
47//!
48//! # List active agents
49//! agent-relay agents
50//! ```
51//!
52//! Originally extracted from [Aura](https://auravcs.com), the semantic
53//! version control engine.
54
55pub mod client;
56pub mod daemon;
57pub mod git;
58pub mod hub;
59pub mod server;
60pub mod server_service;
61
62use serde::{Deserialize, Serialize};
63use std::fs;
64use std::path::PathBuf;
65use std::time::{SystemTime, UNIX_EPOCH};
66
67// ── Data Types ──
68
69#[derive(Serialize, Deserialize, Clone, Debug)]
70pub struct Message {
71    pub id: String,
72    pub from_session: String,
73    pub from_agent: String,
74    pub to_session: Option<String>,
75    pub content: String,
76    pub timestamp: u64,
77    pub read_by: Vec<String>,
78}
79
80#[derive(Serialize, Deserialize, Clone, Debug)]
81pub struct AgentRegistration {
82    pub session_id: String,
83    pub agent_id: String,
84    pub pid: u32,
85    pub registered_at: u64,
86    pub last_heartbeat: u64,
87    pub metadata: serde_json::Value,
88}
89
90// ── Relay ──
91
92/// The main relay hub. All operations are local filesystem — no network.
93///
94/// Create one per repo/project with a shared `base_dir` path. Every agent
95/// that uses the same path will see each other's messages.
96pub struct Relay {
97    pub base_dir: PathBuf,
98}
99
100impl Relay {
101    /// Create a new relay rooted at the given directory.
102    ///
103    /// Typical usage: `Relay::new(".relay".into())` at the repo root.
104    /// All agents must use the same path to see each other.
105    pub fn new(base_dir: PathBuf) -> Self {
106        Self { base_dir }
107    }
108
109    fn messages_dir(&self) -> PathBuf {
110        self.base_dir.join("messages")
111    }
112
113    fn agents_dir(&self) -> PathBuf {
114        self.base_dir.join("agents")
115    }
116
117    fn ensure_dirs(&self) {
118        let _ = fs::create_dir_all(self.messages_dir());
119        let _ = fs::create_dir_all(self.agents_dir());
120    }
121
122    pub fn now() -> u64 {
123        SystemTime::now()
124            .duration_since(UNIX_EPOCH)
125            .unwrap_or_default()
126            .as_secs()
127    }
128
129    fn atomic_write(path: &PathBuf, data: &[u8]) -> Result<(), String> {
130        let tmp = path.with_extension("tmp");
131        fs::write(&tmp, data).map_err(|e| format!("Write error: {}", e))?;
132        fs::rename(&tmp, path).map_err(|e| format!("Rename error: {}", e))?;
133        Ok(())
134    }
135
136    // ── Agent Registration ──
137
138    /// Register an agent so others can discover it.
139    pub fn register(&self, agent_id: &str, session_id: &str, pid: u32) -> AgentRegistration {
140        self.register_with_metadata(agent_id, session_id, pid, serde_json::json!({}))
141    }
142
143    /// Register with custom metadata (model name, capabilities, etc).
144    pub fn register_with_metadata(
145        &self,
146        agent_id: &str,
147        session_id: &str,
148        pid: u32,
149        metadata: serde_json::Value,
150    ) -> AgentRegistration {
151        self.ensure_dirs();
152        let reg = AgentRegistration {
153            session_id: session_id.to_string(),
154            agent_id: agent_id.to_string(),
155            pid,
156            registered_at: Self::now(),
157            last_heartbeat: Self::now(),
158            metadata,
159        };
160        let path = self.agents_dir().join(format!("{}.json", session_id));
161        if let Ok(json) = serde_json::to_string_pretty(&reg) {
162            let _ = Self::atomic_write(&path, json.as_bytes());
163        }
164        reg
165    }
166
167    /// Update heartbeat to signal this agent is still alive.
168    pub fn heartbeat(&self, session_id: &str) {
169        let path = self.agents_dir().join(format!("{}.json", session_id));
170        if let Ok(content) = fs::read_to_string(&path) {
171            if let Ok(mut reg) = serde_json::from_str::<AgentRegistration>(&content) {
172                reg.last_heartbeat = Self::now();
173                if let Ok(json) = serde_json::to_string_pretty(&reg) {
174                    let _ = Self::atomic_write(&path, json.as_bytes());
175                }
176            }
177        }
178    }
179
180    /// Unregister an agent (cleanup on exit).
181    pub fn unregister(&self, session_id: &str) {
182        let path = self.agents_dir().join(format!("{}.json", session_id));
183        let _ = fs::remove_file(&path);
184    }
185
186    /// List all registered agents.
187    pub fn agents(&self) -> Vec<AgentRegistration> {
188        self.ensure_dirs();
189        let dir = self.agents_dir();
190        let mut agents = Vec::new();
191        if let Ok(entries) = fs::read_dir(&dir) {
192            for entry in entries.flatten() {
193                if entry.path().extension().is_some_and(|x| x == "json") {
194                    if let Ok(content) = fs::read_to_string(entry.path()) {
195                        if let Ok(reg) = serde_json::from_str::<AgentRegistration>(&content) {
196                            agents.push(reg);
197                        }
198                    }
199                }
200            }
201        }
202        agents.sort_by(|a, b| b.last_heartbeat.cmp(&a.last_heartbeat));
203        agents
204    }
205
206    /// Remove registrations for agents whose PID is no longer alive.
207    pub fn cleanup_dead(&self) -> usize {
208        self.ensure_dirs();
209        let agents = self.agents();
210        let mut removed = 0;
211        for agent in &agents {
212            if !is_pid_alive(agent.pid) {
213                let path = self.agents_dir().join(format!("{}.json", agent.session_id));
214                let _ = fs::remove_file(&path);
215                removed += 1;
216            }
217        }
218        removed
219    }
220
221    // ── Messaging ──
222
223    /// Send a message. `to_session: None` = broadcast to all agents.
224    pub fn send(
225        &self,
226        from_session: &str,
227        from_agent: &str,
228        to_session: Option<&str>,
229        content: &str,
230    ) -> Message {
231        self.ensure_dirs();
232        let msg = Message {
233            id: format!("msg-{}", &uuid::Uuid::new_v4().to_string()[..8]),
234            from_session: from_session.to_string(),
235            from_agent: from_agent.to_string(),
236            to_session: to_session.map(|s| s.to_string()),
237            content: content.to_string(),
238            timestamp: Self::now(),
239            read_by: vec![from_session.to_string()],
240        };
241        let path = self.messages_dir().join(format!("{}.json", msg.id));
242        if let Ok(json) = serde_json::to_string_pretty(&msg) {
243            let _ = Self::atomic_write(&path, json.as_bytes());
244        }
245        msg
246    }
247
248    /// Read inbox: returns messages with a flag indicating if newly read.
249    /// Marks all returned messages as read by this session.
250    pub fn inbox(&self, session_id: &str, limit: usize) -> Vec<(Message, bool)> {
251        self.ensure_dirs();
252        let dir = self.messages_dir();
253        let mut messages = Vec::new();
254
255        if let Ok(entries) = fs::read_dir(&dir) {
256            for entry in entries.flatten() {
257                if entry.path().extension().is_some_and(|x| x == "json") {
258                    if let Ok(content) = fs::read_to_string(entry.path()) {
259                        if let Ok(msg) = serde_json::from_str::<Message>(&content) {
260                            let dominated = msg.to_session.is_none()
261                                || msg.to_session.as_deref() == Some(session_id)
262                                || msg.from_session == session_id;
263                            if dominated {
264                                messages.push((entry.path(), msg));
265                            }
266                        }
267                    }
268                }
269            }
270        }
271
272        messages.sort_by(|a, b| b.1.timestamp.cmp(&a.1.timestamp));
273
274        let mut result = Vec::new();
275        for entry in &mut messages {
276            let was_unread = !entry.1.read_by.contains(&session_id.to_string());
277            if was_unread {
278                entry.1.read_by.push(session_id.to_string());
279                if let Ok(json) = serde_json::to_string_pretty(&entry.1) {
280                    let _ = Self::atomic_write(&entry.0, json.as_bytes());
281                }
282            }
283            result.push((entry.1.clone(), was_unread));
284        }
285
286        result.into_iter().take(limit).collect()
287    }
288
289    /// Get unread messages without marking them as read.
290    pub fn unread(&self, session_id: &str) -> Vec<Message> {
291        let dir = self.messages_dir();
292        let mut unread = Vec::new();
293        if let Ok(entries) = fs::read_dir(dir) {
294            for entry in entries.flatten() {
295                if entry.path().extension().is_some_and(|x| x == "json") {
296                    if let Ok(content) = fs::read_to_string(entry.path()) {
297                        if let Ok(msg) = serde_json::from_str::<Message>(&content) {
298                            let dominated = msg.to_session.is_none()
299                                || msg.to_session.as_deref() == Some(session_id);
300                            if dominated
301                                && msg.from_session != session_id
302                                && !msg.read_by.contains(&session_id.to_string())
303                            {
304                                unread.push(msg);
305                            }
306                        }
307                    }
308                }
309            }
310        }
311        unread.sort_by(|a, b| b.timestamp.cmp(&a.timestamp));
312        unread
313    }
314
315    /// Count unread messages for a session.
316    pub fn unread_count(&self, session_id: &str) -> u64 {
317        self.unread(session_id).len() as u64
318    }
319
320    /// Delete messages older than `max_age_secs`.
321    pub fn cleanup_old(&self, max_age_secs: u64) -> usize {
322        let dir = self.messages_dir();
323        let now = Self::now();
324        let mut removed = 0;
325        if let Ok(entries) = fs::read_dir(dir) {
326            for entry in entries.flatten() {
327                if entry.path().extension().is_some_and(|x| x == "json") {
328                    if let Ok(content) = fs::read_to_string(entry.path()) {
329                        if let Ok(msg) = serde_json::from_str::<Message>(&content) {
330                            if now - msg.timestamp > max_age_secs {
331                                let _ = fs::remove_file(entry.path());
332                                removed += 1;
333                            }
334                        }
335                    }
336                }
337            }
338        }
339        removed
340    }
341
342    // ── Watch ──
343
344    /// Poll for new messages. Returns unread count.
345    /// Useful for integration: call this in a loop and trigger actions
346    /// (like spawning a background AI session) when count > 0.
347    pub fn poll(&self, session_id: &str) -> u64 {
348        self.unread_count(session_id)
349    }
350}
351
352/// Check if a PID is still running (cross-platform via sysinfo-less approach).
353fn is_pid_alive(pid: u32) -> bool {
354    // Use kill(pid, 0) on Unix — returns 0 if process exists
355    #[cfg(unix)]
356    {
357        unsafe { libc_kill(pid as i32, 0) == 0 }
358    }
359    #[cfg(not(unix))]
360    {
361        // Fallback: assume alive (better safe than garbage-collecting live agents)
362        let _ = pid;
363        true
364    }
365}
366
367#[cfg(unix)]
368extern "C" {
369    fn kill(pid: i32, sig: i32) -> i32;
370}
371
372#[cfg(unix)]
373unsafe fn libc_kill(pid: i32, sig: i32) -> i32 {
374    unsafe { kill(pid, sig) }
375}
376
377#[cfg(test)]
378mod tests {
379    use super::*;
380
381    fn temp_dir() -> PathBuf {
382        let dir = std::env::temp_dir().join(format!("agent-relay-test-{}", uuid::Uuid::new_v4()));
383        let _ = fs::create_dir_all(&dir);
384        dir
385    }
386
387    #[test]
388    fn test_register_and_list_agents() {
389        let dir = temp_dir();
390        let relay = Relay::new(dir.clone());
391
392        relay.register("claude", "s1", 99999);
393        relay.register("gemini", "s2", 99998);
394
395        let agents = relay.agents();
396        assert_eq!(agents.len(), 2);
397
398        let ids: std::collections::HashSet<String> =
399            agents.iter().map(|a| a.agent_id.clone()).collect();
400        assert!(ids.contains("claude"));
401        assert!(ids.contains("gemini"));
402
403        let _ = fs::remove_dir_all(&dir);
404    }
405
406    #[test]
407    fn test_send_and_receive() {
408        let dir = temp_dir();
409        let relay = Relay::new(dir.clone());
410
411        relay.register("claude", "s1", 99999);
412        relay.register("gemini", "s2", 99998);
413
414        relay.send("s1", "claude", None, "hello from claude");
415
416        let count = relay.unread_count("s2");
417        assert_eq!(count, 1);
418
419        let inbox = relay.inbox("s2", 10);
420        assert_eq!(inbox.len(), 1);
421        assert!(inbox[0].1); // newly read
422        assert_eq!(inbox[0].0.content, "hello from claude");
423        assert_eq!(inbox[0].0.from_agent, "claude");
424
425        // Should be read now
426        let count_after = relay.unread_count("s2");
427        assert_eq!(count_after, 0);
428
429        let _ = fs::remove_dir_all(&dir);
430    }
431
432    #[test]
433    fn test_direct_message() {
434        let dir = temp_dir();
435        let relay = Relay::new(dir.clone());
436
437        relay.register("claude", "s1", 99999);
438        relay.register("gemini", "s2", 99998);
439        relay.register("gpt", "s3", 99997);
440
441        // DM to s2 only
442        relay.send("s1", "claude", Some("s2"), "private to gemini");
443
444        assert_eq!(relay.unread_count("s2"), 1);
445        assert_eq!(relay.unread_count("s3"), 0); // gpt shouldn't see it
446
447        let _ = fs::remove_dir_all(&dir);
448    }
449
450    #[test]
451    fn test_broadcast() {
452        let dir = temp_dir();
453        let relay = Relay::new(dir.clone());
454
455        relay.register("claude", "s1", 99999);
456        relay.register("gemini", "s2", 99998);
457        relay.register("gpt", "s3", 99997);
458
459        relay.send("s1", "claude", None, "broadcast to all");
460
461        assert_eq!(relay.unread_count("s2"), 1);
462        assert_eq!(relay.unread_count("s3"), 1);
463        assert_eq!(relay.unread_count("s1"), 0); // sender doesn't see own msg as unread
464
465        let _ = fs::remove_dir_all(&dir);
466    }
467
468    #[test]
469    fn test_unregister() {
470        let dir = temp_dir();
471        let relay = Relay::new(dir.clone());
472
473        relay.register("claude", "s1", 99999);
474        assert_eq!(relay.agents().len(), 1);
475
476        relay.unregister("s1");
477        assert_eq!(relay.agents().len(), 0);
478
479        let _ = fs::remove_dir_all(&dir);
480    }
481
482    #[test]
483    fn test_cleanup_old_messages() {
484        let dir = temp_dir();
485        let relay = Relay::new(dir.clone());
486
487        // Create a message and manually backdate it
488        let mut msg = relay.send("s1", "claude", None, "old message");
489        msg.timestamp = Relay::now() - 7200; // 2 hours ago
490        let path = relay.messages_dir().join(format!("{}.json", msg.id));
491        let json = serde_json::to_string_pretty(&msg).unwrap();
492        let _ = fs::write(&path, json);
493
494        relay.send("s1", "claude", None, "new message");
495
496        let removed = relay.cleanup_old(3600); // 1 hour
497        assert_eq!(removed, 1);
498
499        let _ = fs::remove_dir_all(&dir);
500    }
501}