pub mod client;
pub mod daemon;
pub mod git;
pub mod hub;
pub mod server;
pub mod server_service;
use serde::{Deserialize, Serialize};
use std::fs;
use std::path::PathBuf;
use std::time::{SystemTime, UNIX_EPOCH};
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct Message {
pub id: String,
pub from_session: String,
pub from_agent: String,
pub to_session: Option<String>,
pub content: String,
pub timestamp: u64,
pub read_by: Vec<String>,
}
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct AgentRegistration {
pub session_id: String,
pub agent_id: String,
pub pid: u32,
pub registered_at: u64,
pub last_heartbeat: u64,
pub metadata: serde_json::Value,
}
pub struct Relay {
pub base_dir: PathBuf,
}
impl Relay {
pub fn new(base_dir: PathBuf) -> Self {
Self { base_dir }
}
fn messages_dir(&self) -> PathBuf {
self.base_dir.join("messages")
}
fn agents_dir(&self) -> PathBuf {
self.base_dir.join("agents")
}
fn ensure_dirs(&self) {
let _ = fs::create_dir_all(self.messages_dir());
let _ = fs::create_dir_all(self.agents_dir());
}
pub fn now() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs()
}
fn atomic_write(path: &PathBuf, data: &[u8]) -> Result<(), String> {
let tmp = path.with_extension("tmp");
fs::write(&tmp, data).map_err(|e| format!("Write error: {}", e))?;
fs::rename(&tmp, path).map_err(|e| format!("Rename error: {}", e))?;
Ok(())
}
pub fn register(&self, agent_id: &str, session_id: &str, pid: u32) -> AgentRegistration {
self.register_with_metadata(agent_id, session_id, pid, serde_json::json!({}))
}
pub fn register_with_metadata(
&self,
agent_id: &str,
session_id: &str,
pid: u32,
metadata: serde_json::Value,
) -> AgentRegistration {
self.ensure_dirs();
let reg = AgentRegistration {
session_id: session_id.to_string(),
agent_id: agent_id.to_string(),
pid,
registered_at: Self::now(),
last_heartbeat: Self::now(),
metadata,
};
let path = self.agents_dir().join(format!("{}.json", session_id));
if let Ok(json) = serde_json::to_string_pretty(®) {
let _ = Self::atomic_write(&path, json.as_bytes());
}
reg
}
pub fn heartbeat(&self, session_id: &str) {
let path = self.agents_dir().join(format!("{}.json", session_id));
if let Ok(content) = fs::read_to_string(&path) {
if let Ok(mut reg) = serde_json::from_str::<AgentRegistration>(&content) {
reg.last_heartbeat = Self::now();
if let Ok(json) = serde_json::to_string_pretty(®) {
let _ = Self::atomic_write(&path, json.as_bytes());
}
}
}
}
pub fn unregister(&self, session_id: &str) {
let path = self.agents_dir().join(format!("{}.json", session_id));
let _ = fs::remove_file(&path);
}
pub fn agents(&self) -> Vec<AgentRegistration> {
self.ensure_dirs();
let dir = self.agents_dir();
let mut agents = Vec::new();
if let Ok(entries) = fs::read_dir(&dir) {
for entry in entries.flatten() {
if entry.path().extension().is_some_and(|x| x == "json") {
if let Ok(content) = fs::read_to_string(entry.path()) {
if let Ok(reg) = serde_json::from_str::<AgentRegistration>(&content) {
agents.push(reg);
}
}
}
}
}
agents.sort_by(|a, b| b.last_heartbeat.cmp(&a.last_heartbeat));
agents
}
pub fn cleanup_dead(&self) -> usize {
self.ensure_dirs();
let agents = self.agents();
let mut removed = 0;
for agent in &agents {
if !is_pid_alive(agent.pid) {
let path = self.agents_dir().join(format!("{}.json", agent.session_id));
let _ = fs::remove_file(&path);
removed += 1;
}
}
removed
}
pub fn send(
&self,
from_session: &str,
from_agent: &str,
to_session: Option<&str>,
content: &str,
) -> Message {
self.ensure_dirs();
let msg = Message {
id: format!("msg-{}", &uuid::Uuid::new_v4().to_string()[..8]),
from_session: from_session.to_string(),
from_agent: from_agent.to_string(),
to_session: to_session.map(|s| s.to_string()),
content: content.to_string(),
timestamp: Self::now(),
read_by: vec![from_session.to_string()],
};
let path = self.messages_dir().join(format!("{}.json", msg.id));
if let Ok(json) = serde_json::to_string_pretty(&msg) {
let _ = Self::atomic_write(&path, json.as_bytes());
}
msg
}
pub fn inbox(&self, session_id: &str, limit: usize) -> Vec<(Message, bool)> {
self.ensure_dirs();
let dir = self.messages_dir();
let mut messages = Vec::new();
if let Ok(entries) = fs::read_dir(&dir) {
for entry in entries.flatten() {
if entry.path().extension().is_some_and(|x| x == "json") {
if let Ok(content) = fs::read_to_string(entry.path()) {
if let Ok(msg) = serde_json::from_str::<Message>(&content) {
let dominated = msg.to_session.is_none()
|| msg.to_session.as_deref() == Some(session_id)
|| msg.from_session == session_id;
if dominated {
messages.push((entry.path(), msg));
}
}
}
}
}
}
messages.sort_by(|a, b| b.1.timestamp.cmp(&a.1.timestamp));
let mut result = Vec::new();
for entry in &mut messages {
let was_unread = !entry.1.read_by.contains(&session_id.to_string());
if was_unread {
entry.1.read_by.push(session_id.to_string());
if let Ok(json) = serde_json::to_string_pretty(&entry.1) {
let _ = Self::atomic_write(&entry.0, json.as_bytes());
}
}
result.push((entry.1.clone(), was_unread));
}
result.into_iter().take(limit).collect()
}
pub fn unread(&self, session_id: &str) -> Vec<Message> {
let dir = self.messages_dir();
let mut unread = Vec::new();
if let Ok(entries) = fs::read_dir(dir) {
for entry in entries.flatten() {
if entry.path().extension().is_some_and(|x| x == "json") {
if let Ok(content) = fs::read_to_string(entry.path()) {
if let Ok(msg) = serde_json::from_str::<Message>(&content) {
let dominated = msg.to_session.is_none()
|| msg.to_session.as_deref() == Some(session_id);
if dominated
&& msg.from_session != session_id
&& !msg.read_by.contains(&session_id.to_string())
{
unread.push(msg);
}
}
}
}
}
}
unread.sort_by(|a, b| b.timestamp.cmp(&a.timestamp));
unread
}
pub fn unread_count(&self, session_id: &str) -> u64 {
self.unread(session_id).len() as u64
}
pub fn cleanup_old(&self, max_age_secs: u64) -> usize {
let dir = self.messages_dir();
let now = Self::now();
let mut removed = 0;
if let Ok(entries) = fs::read_dir(dir) {
for entry in entries.flatten() {
if entry.path().extension().is_some_and(|x| x == "json") {
if let Ok(content) = fs::read_to_string(entry.path()) {
if let Ok(msg) = serde_json::from_str::<Message>(&content) {
if now - msg.timestamp > max_age_secs {
let _ = fs::remove_file(entry.path());
removed += 1;
}
}
}
}
}
}
removed
}
pub fn poll(&self, session_id: &str) -> u64 {
self.unread_count(session_id)
}
}
fn is_pid_alive(pid: u32) -> bool {
#[cfg(unix)]
{
unsafe { libc_kill(pid as i32, 0) == 0 }
}
#[cfg(not(unix))]
{
let _ = pid;
true
}
}
#[cfg(unix)]
extern "C" {
fn kill(pid: i32, sig: i32) -> i32;
}
#[cfg(unix)]
unsafe fn libc_kill(pid: i32, sig: i32) -> i32 {
unsafe { kill(pid, sig) }
}
#[cfg(test)]
mod tests {
use super::*;
fn temp_dir() -> PathBuf {
let dir = std::env::temp_dir().join(format!("agent-relay-test-{}", uuid::Uuid::new_v4()));
let _ = fs::create_dir_all(&dir);
dir
}
#[test]
fn test_register_and_list_agents() {
let dir = temp_dir();
let relay = Relay::new(dir.clone());
relay.register("claude", "s1", 99999);
relay.register("gemini", "s2", 99998);
let agents = relay.agents();
assert_eq!(agents.len(), 2);
let ids: std::collections::HashSet<String> =
agents.iter().map(|a| a.agent_id.clone()).collect();
assert!(ids.contains("claude"));
assert!(ids.contains("gemini"));
let _ = fs::remove_dir_all(&dir);
}
#[test]
fn test_send_and_receive() {
let dir = temp_dir();
let relay = Relay::new(dir.clone());
relay.register("claude", "s1", 99999);
relay.register("gemini", "s2", 99998);
relay.send("s1", "claude", None, "hello from claude");
let count = relay.unread_count("s2");
assert_eq!(count, 1);
let inbox = relay.inbox("s2", 10);
assert_eq!(inbox.len(), 1);
assert!(inbox[0].1); assert_eq!(inbox[0].0.content, "hello from claude");
assert_eq!(inbox[0].0.from_agent, "claude");
let count_after = relay.unread_count("s2");
assert_eq!(count_after, 0);
let _ = fs::remove_dir_all(&dir);
}
#[test]
fn test_direct_message() {
let dir = temp_dir();
let relay = Relay::new(dir.clone());
relay.register("claude", "s1", 99999);
relay.register("gemini", "s2", 99998);
relay.register("gpt", "s3", 99997);
relay.send("s1", "claude", Some("s2"), "private to gemini");
assert_eq!(relay.unread_count("s2"), 1);
assert_eq!(relay.unread_count("s3"), 0);
let _ = fs::remove_dir_all(&dir);
}
#[test]
fn test_broadcast() {
let dir = temp_dir();
let relay = Relay::new(dir.clone());
relay.register("claude", "s1", 99999);
relay.register("gemini", "s2", 99998);
relay.register("gpt", "s3", 99997);
relay.send("s1", "claude", None, "broadcast to all");
assert_eq!(relay.unread_count("s2"), 1);
assert_eq!(relay.unread_count("s3"), 1);
assert_eq!(relay.unread_count("s1"), 0);
let _ = fs::remove_dir_all(&dir);
}
#[test]
fn test_unregister() {
let dir = temp_dir();
let relay = Relay::new(dir.clone());
relay.register("claude", "s1", 99999);
assert_eq!(relay.agents().len(), 1);
relay.unregister("s1");
assert_eq!(relay.agents().len(), 0);
let _ = fs::remove_dir_all(&dir);
}
#[test]
fn test_cleanup_old_messages() {
let dir = temp_dir();
let relay = Relay::new(dir.clone());
let mut msg = relay.send("s1", "claude", None, "old message");
msg.timestamp = Relay::now() - 7200; let path = relay.messages_dir().join(format!("{}.json", msg.id));
let json = serde_json::to_string_pretty(&msg).unwrap();
let _ = fs::write(&path, json);
relay.send("s1", "claude", None, "new message");
let removed = relay.cleanup_old(3600); assert_eq!(removed, 1);
let _ = fs::remove_dir_all(&dir);
}
}