Skip to main content

agent_relay/
lib.rs

1//! # agent-relay
2//!
3//! Agent-to-agent messaging for AI coding tools.
4//!
5//! When multiple AI agents (Claude, Gemini, GPT, Copilot) or human developers
6//! work on the same codebase, they need a way to coordinate. `agent-relay`
7//! provides instant, file-based messaging with zero setup — no server, no
8//! database, no config.
9//!
10//! ## Quick Start
11//!
12//! ```no_run
13//! use std::path::PathBuf;
14//! use agent_relay::Relay;
15//!
16//! let relay = Relay::new(PathBuf::from(".relay"));
17//!
18//! // Register this agent
19//! relay.register("claude-opus", "session-1", std::process::id());
20//!
21//! // Send a message
22//! relay.send("session-1", "claude-opus", None, "refactoring auth module — stay away from src/auth/");
23//!
24//! // Another agent checks inbox
25//! let msgs = relay.inbox("session-2", 10);
26//! for (msg, is_new) in &msgs {
27//!     if *is_new {
28//!         println!("[{}] {}: {}", msg.from_agent, msg.from_session, msg.content);
29//!     }
30//! }
31//! ```
32//!
33//! ## CLI
34//!
35//! ```bash
36//! # Register yourself
37//! agent-relay register --agent claude --session my-session
38//!
39//! # Send a broadcast
40//! agent-relay send "heads up: changing the auth module"
41//!
42//! # Send to specific agent
43//! agent-relay send --to session-2 "can you review src/auth.rs?"
44//!
45//! # Check inbox
46//! agent-relay inbox
47//!
48//! # List active agents
49//! agent-relay agents
50//! ```
51//!
52//! Originally extracted from [Aura](https://auravcs.com), the semantic
53//! version control engine.
54
55pub mod client;
56pub mod daemon;
57pub mod git;
58pub mod server;
59pub mod server_service;
60
61use serde::{Deserialize, Serialize};
62use std::fs;
63use std::path::PathBuf;
64use std::time::{SystemTime, UNIX_EPOCH};
65
66// ── Data Types ──
67
68#[derive(Serialize, Deserialize, Clone, Debug)]
69pub struct Message {
70    pub id: String,
71    pub from_session: String,
72    pub from_agent: String,
73    pub to_session: Option<String>,
74    pub content: String,
75    pub timestamp: u64,
76    pub read_by: Vec<String>,
77}
78
79#[derive(Serialize, Deserialize, Clone, Debug)]
80pub struct AgentRegistration {
81    pub session_id: String,
82    pub agent_id: String,
83    pub pid: u32,
84    pub registered_at: u64,
85    pub last_heartbeat: u64,
86    pub metadata: serde_json::Value,
87}
88
89// ── Relay ──
90
91/// The main relay hub. All operations are local filesystem — no network.
92///
93/// Create one per repo/project with a shared `base_dir` path. Every agent
94/// that uses the same path will see each other's messages.
95pub struct Relay {
96    pub base_dir: PathBuf,
97}
98
99impl Relay {
100    /// Create a new relay rooted at the given directory.
101    ///
102    /// Typical usage: `Relay::new(".relay".into())` at the repo root.
103    /// All agents must use the same path to see each other.
104    pub fn new(base_dir: PathBuf) -> Self {
105        Self { base_dir }
106    }
107
108    fn messages_dir(&self) -> PathBuf {
109        self.base_dir.join("messages")
110    }
111
112    fn agents_dir(&self) -> PathBuf {
113        self.base_dir.join("agents")
114    }
115
116    fn ensure_dirs(&self) {
117        let _ = fs::create_dir_all(self.messages_dir());
118        let _ = fs::create_dir_all(self.agents_dir());
119    }
120
121    pub fn now() -> u64 {
122        SystemTime::now()
123            .duration_since(UNIX_EPOCH)
124            .unwrap_or_default()
125            .as_secs()
126    }
127
128    fn atomic_write(path: &PathBuf, data: &[u8]) -> Result<(), String> {
129        let tmp = path.with_extension("tmp");
130        fs::write(&tmp, data).map_err(|e| format!("Write error: {}", e))?;
131        fs::rename(&tmp, path).map_err(|e| format!("Rename error: {}", e))?;
132        Ok(())
133    }
134
135    // ── Agent Registration ──
136
137    /// Register an agent so others can discover it.
138    pub fn register(&self, agent_id: &str, session_id: &str, pid: u32) -> AgentRegistration {
139        self.register_with_metadata(agent_id, session_id, pid, serde_json::json!({}))
140    }
141
142    /// Register with custom metadata (model name, capabilities, etc).
143    pub fn register_with_metadata(
144        &self,
145        agent_id: &str,
146        session_id: &str,
147        pid: u32,
148        metadata: serde_json::Value,
149    ) -> AgentRegistration {
150        self.ensure_dirs();
151        let reg = AgentRegistration {
152            session_id: session_id.to_string(),
153            agent_id: agent_id.to_string(),
154            pid,
155            registered_at: Self::now(),
156            last_heartbeat: Self::now(),
157            metadata,
158        };
159        let path = self.agents_dir().join(format!("{}.json", session_id));
160        if let Ok(json) = serde_json::to_string_pretty(&reg) {
161            let _ = Self::atomic_write(&path, json.as_bytes());
162        }
163        reg
164    }
165
166    /// Update heartbeat to signal this agent is still alive.
167    pub fn heartbeat(&self, session_id: &str) {
168        let path = self.agents_dir().join(format!("{}.json", session_id));
169        if let Ok(content) = fs::read_to_string(&path) {
170            if let Ok(mut reg) = serde_json::from_str::<AgentRegistration>(&content) {
171                reg.last_heartbeat = Self::now();
172                if let Ok(json) = serde_json::to_string_pretty(&reg) {
173                    let _ = Self::atomic_write(&path, json.as_bytes());
174                }
175            }
176        }
177    }
178
179    /// Unregister an agent (cleanup on exit).
180    pub fn unregister(&self, session_id: &str) {
181        let path = self.agents_dir().join(format!("{}.json", session_id));
182        let _ = fs::remove_file(&path);
183    }
184
185    /// List all registered agents.
186    pub fn agents(&self) -> Vec<AgentRegistration> {
187        self.ensure_dirs();
188        let dir = self.agents_dir();
189        let mut agents = Vec::new();
190        if let Ok(entries) = fs::read_dir(&dir) {
191            for entry in entries.flatten() {
192                if entry.path().extension().is_some_and(|x| x == "json") {
193                    if let Ok(content) = fs::read_to_string(entry.path()) {
194                        if let Ok(reg) = serde_json::from_str::<AgentRegistration>(&content) {
195                            agents.push(reg);
196                        }
197                    }
198                }
199            }
200        }
201        agents.sort_by(|a, b| b.last_heartbeat.cmp(&a.last_heartbeat));
202        agents
203    }
204
205    /// Remove registrations for agents whose PID is no longer alive.
206    pub fn cleanup_dead(&self) -> usize {
207        self.ensure_dirs();
208        let agents = self.agents();
209        let mut removed = 0;
210        for agent in &agents {
211            if !is_pid_alive(agent.pid) {
212                let path = self.agents_dir().join(format!("{}.json", agent.session_id));
213                let _ = fs::remove_file(&path);
214                removed += 1;
215            }
216        }
217        removed
218    }
219
220    // ── Messaging ──
221
222    /// Send a message. `to_session: None` = broadcast to all agents.
223    pub fn send(
224        &self,
225        from_session: &str,
226        from_agent: &str,
227        to_session: Option<&str>,
228        content: &str,
229    ) -> Message {
230        self.ensure_dirs();
231        let msg = Message {
232            id: format!("msg-{}", &uuid::Uuid::new_v4().to_string()[..8]),
233            from_session: from_session.to_string(),
234            from_agent: from_agent.to_string(),
235            to_session: to_session.map(|s| s.to_string()),
236            content: content.to_string(),
237            timestamp: Self::now(),
238            read_by: vec![from_session.to_string()],
239        };
240        let path = self.messages_dir().join(format!("{}.json", msg.id));
241        if let Ok(json) = serde_json::to_string_pretty(&msg) {
242            let _ = Self::atomic_write(&path, json.as_bytes());
243        }
244        msg
245    }
246
247    /// Read inbox: returns messages with a flag indicating if newly read.
248    /// Marks all returned messages as read by this session.
249    pub fn inbox(&self, session_id: &str, limit: usize) -> Vec<(Message, bool)> {
250        self.ensure_dirs();
251        let dir = self.messages_dir();
252        let mut messages = Vec::new();
253
254        if let Ok(entries) = fs::read_dir(&dir) {
255            for entry in entries.flatten() {
256                if entry.path().extension().is_some_and(|x| x == "json") {
257                    if let Ok(content) = fs::read_to_string(entry.path()) {
258                        if let Ok(msg) = serde_json::from_str::<Message>(&content) {
259                            let dominated = msg.to_session.is_none()
260                                || msg.to_session.as_deref() == Some(session_id)
261                                || msg.from_session == session_id;
262                            if dominated {
263                                messages.push((entry.path(), msg));
264                            }
265                        }
266                    }
267                }
268            }
269        }
270
271        messages.sort_by(|a, b| b.1.timestamp.cmp(&a.1.timestamp));
272
273        let mut result = Vec::new();
274        for entry in &mut messages {
275            let was_unread = !entry.1.read_by.contains(&session_id.to_string());
276            if was_unread {
277                entry.1.read_by.push(session_id.to_string());
278                if let Ok(json) = serde_json::to_string_pretty(&entry.1) {
279                    let _ = Self::atomic_write(&entry.0, json.as_bytes());
280                }
281            }
282            result.push((entry.1.clone(), was_unread));
283        }
284
285        result.into_iter().take(limit).collect()
286    }
287
288    /// Get unread messages without marking them as read.
289    pub fn unread(&self, session_id: &str) -> Vec<Message> {
290        let dir = self.messages_dir();
291        let mut unread = Vec::new();
292        if let Ok(entries) = fs::read_dir(dir) {
293            for entry in entries.flatten() {
294                if entry.path().extension().is_some_and(|x| x == "json") {
295                    if let Ok(content) = fs::read_to_string(entry.path()) {
296                        if let Ok(msg) = serde_json::from_str::<Message>(&content) {
297                            let dominated = msg.to_session.is_none()
298                                || msg.to_session.as_deref() == Some(session_id);
299                            if dominated
300                                && msg.from_session != session_id
301                                && !msg.read_by.contains(&session_id.to_string())
302                            {
303                                unread.push(msg);
304                            }
305                        }
306                    }
307                }
308            }
309        }
310        unread.sort_by(|a, b| b.timestamp.cmp(&a.timestamp));
311        unread
312    }
313
314    /// Count unread messages for a session.
315    pub fn unread_count(&self, session_id: &str) -> u64 {
316        self.unread(session_id).len() as u64
317    }
318
319    /// Delete messages older than `max_age_secs`.
320    pub fn cleanup_old(&self, max_age_secs: u64) -> usize {
321        let dir = self.messages_dir();
322        let now = Self::now();
323        let mut removed = 0;
324        if let Ok(entries) = fs::read_dir(dir) {
325            for entry in entries.flatten() {
326                if entry.path().extension().is_some_and(|x| x == "json") {
327                    if let Ok(content) = fs::read_to_string(entry.path()) {
328                        if let Ok(msg) = serde_json::from_str::<Message>(&content) {
329                            if now - msg.timestamp > max_age_secs {
330                                let _ = fs::remove_file(entry.path());
331                                removed += 1;
332                            }
333                        }
334                    }
335                }
336            }
337        }
338        removed
339    }
340
341    // ── Watch ──
342
343    /// Poll for new messages. Returns unread count.
344    /// Useful for integration: call this in a loop and trigger actions
345    /// (like spawning a background AI session) when count > 0.
346    pub fn poll(&self, session_id: &str) -> u64 {
347        self.unread_count(session_id)
348    }
349}
350
351/// Check if a PID is still running (cross-platform via sysinfo-less approach).
352fn is_pid_alive(pid: u32) -> bool {
353    // Use kill(pid, 0) on Unix — returns 0 if process exists
354    #[cfg(unix)]
355    {
356        unsafe { libc_kill(pid as i32, 0) == 0 }
357    }
358    #[cfg(not(unix))]
359    {
360        // Fallback: assume alive (better safe than garbage-collecting live agents)
361        let _ = pid;
362        true
363    }
364}
365
366#[cfg(unix)]
367extern "C" {
368    fn kill(pid: i32, sig: i32) -> i32;
369}
370
371#[cfg(unix)]
372unsafe fn libc_kill(pid: i32, sig: i32) -> i32 {
373    unsafe { kill(pid, sig) }
374}
375
376#[cfg(test)]
377mod tests {
378    use super::*;
379
380    fn temp_dir() -> PathBuf {
381        let dir = std::env::temp_dir().join(format!("agent-relay-test-{}", uuid::Uuid::new_v4()));
382        let _ = fs::create_dir_all(&dir);
383        dir
384    }
385
386    #[test]
387    fn test_register_and_list_agents() {
388        let dir = temp_dir();
389        let relay = Relay::new(dir.clone());
390
391        relay.register("claude", "s1", 99999);
392        relay.register("gemini", "s2", 99998);
393
394        let agents = relay.agents();
395        assert_eq!(agents.len(), 2);
396
397        let ids: std::collections::HashSet<String> =
398            agents.iter().map(|a| a.agent_id.clone()).collect();
399        assert!(ids.contains("claude"));
400        assert!(ids.contains("gemini"));
401
402        let _ = fs::remove_dir_all(&dir);
403    }
404
405    #[test]
406    fn test_send_and_receive() {
407        let dir = temp_dir();
408        let relay = Relay::new(dir.clone());
409
410        relay.register("claude", "s1", 99999);
411        relay.register("gemini", "s2", 99998);
412
413        relay.send("s1", "claude", None, "hello from claude");
414
415        let count = relay.unread_count("s2");
416        assert_eq!(count, 1);
417
418        let inbox = relay.inbox("s2", 10);
419        assert_eq!(inbox.len(), 1);
420        assert!(inbox[0].1); // newly read
421        assert_eq!(inbox[0].0.content, "hello from claude");
422        assert_eq!(inbox[0].0.from_agent, "claude");
423
424        // Should be read now
425        let count_after = relay.unread_count("s2");
426        assert_eq!(count_after, 0);
427
428        let _ = fs::remove_dir_all(&dir);
429    }
430
431    #[test]
432    fn test_direct_message() {
433        let dir = temp_dir();
434        let relay = Relay::new(dir.clone());
435
436        relay.register("claude", "s1", 99999);
437        relay.register("gemini", "s2", 99998);
438        relay.register("gpt", "s3", 99997);
439
440        // DM to s2 only
441        relay.send("s1", "claude", Some("s2"), "private to gemini");
442
443        assert_eq!(relay.unread_count("s2"), 1);
444        assert_eq!(relay.unread_count("s3"), 0); // gpt shouldn't see it
445
446        let _ = fs::remove_dir_all(&dir);
447    }
448
449    #[test]
450    fn test_broadcast() {
451        let dir = temp_dir();
452        let relay = Relay::new(dir.clone());
453
454        relay.register("claude", "s1", 99999);
455        relay.register("gemini", "s2", 99998);
456        relay.register("gpt", "s3", 99997);
457
458        relay.send("s1", "claude", None, "broadcast to all");
459
460        assert_eq!(relay.unread_count("s2"), 1);
461        assert_eq!(relay.unread_count("s3"), 1);
462        assert_eq!(relay.unread_count("s1"), 0); // sender doesn't see own msg as unread
463
464        let _ = fs::remove_dir_all(&dir);
465    }
466
467    #[test]
468    fn test_unregister() {
469        let dir = temp_dir();
470        let relay = Relay::new(dir.clone());
471
472        relay.register("claude", "s1", 99999);
473        assert_eq!(relay.agents().len(), 1);
474
475        relay.unregister("s1");
476        assert_eq!(relay.agents().len(), 0);
477
478        let _ = fs::remove_dir_all(&dir);
479    }
480
481    #[test]
482    fn test_cleanup_old_messages() {
483        let dir = temp_dir();
484        let relay = Relay::new(dir.clone());
485
486        // Create a message and manually backdate it
487        let mut msg = relay.send("s1", "claude", None, "old message");
488        msg.timestamp = Relay::now() - 7200; // 2 hours ago
489        let path = relay.messages_dir().join(format!("{}.json", msg.id));
490        let json = serde_json::to_string_pretty(&msg).unwrap();
491        let _ = fs::write(&path, json);
492
493        relay.send("s1", "claude", None, "new message");
494
495        let removed = relay.cleanup_old(3600); // 1 hour
496        assert_eq!(removed, 1);
497
498        let _ = fs::remove_dir_all(&dir);
499    }
500}