use axum::{
extract::{Path as AxumPath, Query, State},
http::{HeaderMap, StatusCode},
routing::{get, post},
Json, Router,
};
use rusqlite::Connection;
use serde::{Deserialize, Serialize};
use std::path::Path;
use std::sync::{Arc, Mutex};
use crate::hub;
use crate::{AgentRegistration, Message};
pub type SharedState = Arc<Mutex<Connection>>;
pub fn init_db(conn: &Connection) {
conn.execute_batch(
"CREATE TABLE IF NOT EXISTS messages (
id TEXT PRIMARY KEY,
from_session TEXT NOT NULL,
from_agent TEXT NOT NULL,
to_session TEXT,
content TEXT NOT NULL,
timestamp INTEGER NOT NULL
);
CREATE TABLE IF NOT EXISTS message_reads (
message_id TEXT NOT NULL,
session_id TEXT NOT NULL,
PRIMARY KEY (message_id, session_id)
);
CREATE TABLE IF NOT EXISTS agents (
session_id TEXT PRIMARY KEY,
agent_id TEXT NOT NULL,
pid INTEGER NOT NULL,
registered_at INTEGER NOT NULL,
last_heartbeat INTEGER NOT NULL,
metadata TEXT NOT NULL DEFAULT '{}'
);",
)
.expect("Failed to initialize database schema");
hub::init_hub_tables(conn);
}
#[derive(Deserialize)]
pub struct SendRequest {
pub from_session: String,
pub from_agent: String,
pub to_session: Option<String>,
pub content: String,
pub team_id: Option<String>,
pub channel: Option<String>,
}
#[derive(Deserialize)]
pub struct InboxQuery {
pub session: String,
#[serde(default = "default_limit")]
pub limit: usize,
pub team: Option<String>,
pub channel: Option<String>,
}
fn default_limit() -> usize {
50
}
#[derive(Deserialize)]
pub struct RegisterRequest {
pub session_id: String,
pub agent_id: String,
pub pid: u32,
#[serde(default)]
pub metadata: serde_json::Value,
}
#[derive(Serialize)]
pub struct CountResponse {
pub count: u64,
}
pub fn build_router(db_path: &Path) -> Router {
let conn = Connection::open(db_path).expect("Failed to open SQLite database");
conn.execute_batch("PRAGMA journal_mode=WAL; PRAGMA busy_timeout=5000;")
.expect("Failed to set pragmas");
init_db(&conn);
let state: SharedState = Arc::new(Mutex::new(conn));
Router::new()
.route("/health", get(health))
.route("/agents", get(list_agents))
.route("/agents/register", post(register_agent))
.route("/agents/unregister", post(unregister_agent))
.route("/messages/send", post(send_message))
.route("/messages/inbox", get(inbox))
.route("/messages/unread", get(unread_count))
.route("/auth/signup", post(signup))
.route("/auth/login", post(login))
.route("/teams", post(create_team).get(list_teams))
.route("/teams/join", post(join_team))
.route("/teams/{team_id}/invite", post(create_invite))
.route("/teams/{team_id}/members", get(list_members))
.route("/teams/{team_id}/kick", post(kick_member))
.route(
"/teams/{team_id}/channels",
post(create_channel).get(list_channels),
)
.layer(tower_http::cors::CorsLayer::permissive())
.with_state(state)
}
async fn health() -> &'static str {
"agent-relay server ok"
}
async fn register_agent(
State(state): State<SharedState>,
Json(req): Json<RegisterRequest>,
) -> (StatusCode, Json<AgentRegistration>) {
let now = crate::Relay::now();
let metadata_str = req.metadata.to_string();
let conn = state.lock().unwrap();
conn.execute(
"INSERT OR REPLACE INTO agents (session_id, agent_id, pid, registered_at, last_heartbeat, metadata)
VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
rusqlite::params![req.session_id, req.agent_id, req.pid, now, now, metadata_str],
)
.expect("Failed to insert agent");
let reg = AgentRegistration {
session_id: req.session_id,
agent_id: req.agent_id,
pid: req.pid,
registered_at: now,
last_heartbeat: now,
metadata: req.metadata,
};
(StatusCode::CREATED, Json(reg))
}
async fn unregister_agent(
State(state): State<SharedState>,
Json(req): Json<serde_json::Value>,
) -> StatusCode {
let session = req["session_id"].as_str().unwrap_or("");
let conn = state.lock().unwrap();
let _ = conn.execute("DELETE FROM agents WHERE session_id = ?1", [session]);
StatusCode::OK
}
async fn list_agents(State(state): State<SharedState>) -> Json<Vec<AgentRegistration>> {
let conn = state.lock().unwrap();
let mut stmt = conn
.prepare("SELECT session_id, agent_id, pid, registered_at, last_heartbeat, metadata FROM agents ORDER BY last_heartbeat DESC")
.unwrap();
let agents: Vec<AgentRegistration> = stmt
.query_map([], |row| {
let metadata_str: String = row.get(5)?;
Ok(AgentRegistration {
session_id: row.get(0)?,
agent_id: row.get(1)?,
pid: row.get::<_, u32>(2)?,
registered_at: row.get(3)?,
last_heartbeat: row.get(4)?,
metadata: serde_json::from_str(&metadata_str).unwrap_or_default(),
})
})
.unwrap()
.filter_map(|r| r.ok())
.collect();
Json(agents)
}
async fn send_message(
State(state): State<SharedState>,
Json(req): Json<SendRequest>,
) -> (StatusCode, Json<Message>) {
let now = crate::Relay::now();
let id = format!("msg-{}", &uuid::Uuid::new_v4().to_string()[..8]);
let conn = state.lock().unwrap();
conn.execute(
"INSERT INTO messages (id, from_session, from_agent, to_session, content, timestamp, team_id, channel)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
rusqlite::params![
id,
req.from_session,
req.from_agent,
req.to_session,
req.content,
now,
req.team_id,
req.channel.as_deref().unwrap_or("general")
],
)
.expect("Failed to insert message");
let _ = conn.execute(
"INSERT OR IGNORE INTO message_reads (message_id, session_id) VALUES (?1, ?2)",
rusqlite::params![id, req.from_session],
);
let msg = Message {
id,
from_session: req.from_session,
from_agent: req.from_agent,
to_session: req.to_session,
content: req.content,
timestamp: now,
read_by: vec![],
};
(StatusCode::CREATED, Json(msg))
}
async fn inbox(
State(state): State<SharedState>,
Query(q): Query<InboxQuery>,
) -> Json<Vec<Message>> {
let conn = state.lock().unwrap();
let mut sql = String::from(
"SELECT id, from_session, from_agent, to_session, content, timestamp
FROM messages
WHERE (to_session IS NULL OR to_session = ?1 OR from_session = ?1)",
);
let mut params: Vec<Box<dyn rusqlite::types::ToSql>> = vec![
Box::new(q.session.clone()),
];
if let Some(ref team) = q.team {
params.push(Box::new(team.clone()));
sql.push_str(&format!(" AND team_id = ?{}", params.len()));
}
if let Some(ref channel) = q.channel {
params.push(Box::new(channel.clone()));
sql.push_str(&format!(" AND channel = ?{}", params.len()));
}
params.push(Box::new(q.limit as i64));
sql.push_str(&format!(" ORDER BY timestamp DESC LIMIT ?{}", params.len()));
let mut stmt = conn.prepare(&sql).unwrap();
let param_refs: Vec<&dyn rusqlite::types::ToSql> = params.iter().map(|p| p.as_ref()).collect();
let messages: Vec<Message> = stmt
.query_map(param_refs.as_slice(), |row| {
Ok(Message {
id: row.get(0)?,
from_session: row.get(1)?,
from_agent: row.get(2)?,
to_session: row.get(3)?,
content: row.get(4)?,
timestamp: row.get(5)?,
read_by: vec![],
})
})
.unwrap()
.filter_map(|r| r.ok())
.collect();
for msg in &messages {
let _ = conn.execute(
"INSERT OR IGNORE INTO message_reads (message_id, session_id) VALUES (?1, ?2)",
rusqlite::params![msg.id, q.session],
);
}
Json(messages)
}
async fn unread_count(
State(state): State<SharedState>,
Query(q): Query<InboxQuery>,
) -> Json<CountResponse> {
let conn = state.lock().unwrap();
let count: u64 = conn
.query_row(
"SELECT COUNT(*) FROM messages m
WHERE (m.to_session IS NULL OR m.to_session = ?1)
AND m.from_session != ?1
AND NOT EXISTS (
SELECT 1 FROM message_reads mr
WHERE mr.message_id = m.id AND mr.session_id = ?1
)",
[&q.session],
|row| row.get(0),
)
.unwrap_or(0);
Json(CountResponse { count })
}
fn extract_user(
state: &SharedState,
headers: &HeaderMap,
) -> Result<hub::User, (StatusCode, String)> {
let auth = headers
.get("authorization")
.and_then(|v| v.to_str().ok())
.and_then(|v| v.strip_prefix("Bearer "))
.ok_or((
StatusCode::UNAUTHORIZED,
"Missing Authorization header".to_string(),
))?;
let conn = state.lock().unwrap();
hub::verify_api_key(&conn, auth).ok_or((
StatusCode::UNAUTHORIZED,
"Invalid API key".to_string(),
))
}
#[derive(Deserialize)]
struct SignupRequest {
email: String,
name: String,
}
#[derive(Serialize)]
struct SignupResponse {
user: hub::User,
api_key: String,
}
#[derive(Deserialize)]
struct LoginRequest {
email: String,
}
#[derive(Serialize)]
struct LoginResponse {
api_key: String,
}
async fn signup(
State(state): State<SharedState>,
Json(req): Json<SignupRequest>,
) -> Result<(StatusCode, Json<SignupResponse>), (StatusCode, String)> {
let conn = state.lock().unwrap();
let now = crate::Relay::now();
let user_id = uuid::Uuid::new_v4().to_string();
conn.execute(
"INSERT INTO users (id, email, name, created_at) VALUES (?1, ?2, ?3, ?4)",
rusqlite::params![user_id, req.email, req.name, now],
)
.map_err(|e| {
if e.to_string().contains("UNIQUE") {
(
StatusCode::CONFLICT,
format!("Email '{}' already registered. Use /auth/login instead.", req.email),
)
} else {
(
StatusCode::INTERNAL_SERVER_ERROR,
format!("Database error: {}", e),
)
}
})?;
let api_key = hub::generate_api_key();
let key_hash = hub::hash_api_key(&api_key);
conn.execute(
"INSERT INTO api_keys (key_hash, user_id, label, created_at) VALUES (?1, ?2, ?3, ?4)",
rusqlite::params![key_hash, user_id, "default", now],
)
.map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
format!("Failed to create API key: {}", e),
)
})?;
let user = hub::User {
id: user_id,
email: req.email,
name: req.name,
};
Ok((
StatusCode::CREATED,
Json(SignupResponse {
user,
api_key,
}),
))
}
async fn login(
State(state): State<SharedState>,
Json(req): Json<LoginRequest>,
) -> Result<Json<LoginResponse>, (StatusCode, String)> {
let conn = state.lock().unwrap();
let now = crate::Relay::now();
let user_id: String = conn
.query_row(
"SELECT id FROM users WHERE email = ?1",
[&req.email],
|row| row.get(0),
)
.map_err(|_| {
(
StatusCode::NOT_FOUND,
format!("No account found for '{}'", req.email),
)
})?;
let api_key = hub::generate_api_key();
let key_hash = hub::hash_api_key(&api_key);
conn.execute(
"INSERT INTO api_keys (key_hash, user_id, label, created_at) VALUES (?1, ?2, ?3, ?4)",
rusqlite::params![key_hash, user_id, "default", now],
)
.map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
format!("Failed to create API key: {}", e),
)
})?;
Ok(Json(LoginResponse { api_key }))
}
#[derive(Deserialize)]
struct CreateTeamRequest {
name: String,
}
#[derive(Deserialize)]
struct InviteRequest {
email: Option<String>,
}
#[derive(Serialize)]
struct InviteResponse {
token: String,
}
#[derive(Deserialize)]
struct JoinTeamRequest {
token: String,
}
#[derive(Deserialize)]
struct KickRequest {
user_id: String,
}
#[derive(Deserialize)]
struct CreateChannelRequest {
name: String,
}
async fn create_team(
State(state): State<SharedState>,
headers: HeaderMap,
Json(req): Json<CreateTeamRequest>,
) -> Result<(StatusCode, Json<hub::Team>), (StatusCode, String)> {
let user = extract_user(&state, &headers)?;
let conn = state.lock().unwrap();
let now = crate::Relay::now();
let team_id = uuid::Uuid::new_v4().to_string();
conn.execute(
"INSERT INTO teams (id, name, owner_id, created_at) VALUES (?1, ?2, ?3, ?4)",
rusqlite::params![team_id, req.name, user.id, now],
)
.map_err(|e| {
if e.to_string().contains("UNIQUE") {
(
StatusCode::CONFLICT,
format!("Team name '{}' already taken", req.name),
)
} else {
(
StatusCode::INTERNAL_SERVER_ERROR,
format!("Database error: {}", e),
)
}
})?;
conn.execute(
"INSERT INTO team_members (team_id, user_id, role, joined_at) VALUES (?1, ?2, ?3, ?4)",
rusqlite::params![team_id, user.id, "owner", now],
)
.map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
format!("Failed to add owner: {}", e),
)
})?;
let channel_id = uuid::Uuid::new_v4().to_string();
conn.execute(
"INSERT INTO channels (id, team_id, name, created_at) VALUES (?1, ?2, ?3, ?4)",
rusqlite::params![channel_id, team_id, "general", now],
)
.map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
format!("Failed to create #general channel: {}", e),
)
})?;
let team = hub::Team {
id: team_id,
name: req.name,
owner_id: user.id,
created_at: now,
};
Ok((StatusCode::CREATED, Json(team)))
}
async fn list_teams(
State(state): State<SharedState>,
headers: HeaderMap,
) -> Result<Json<Vec<hub::Team>>, (StatusCode, String)> {
let user = extract_user(&state, &headers)?;
let conn = state.lock().unwrap();
let mut stmt = conn
.prepare(
"SELECT t.id, t.name, t.owner_id, t.created_at
FROM teams t
JOIN team_members tm ON tm.team_id = t.id
WHERE tm.user_id = ?1
ORDER BY t.created_at DESC",
)
.map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
format!("Query error: {}", e),
)
})?;
let teams: Vec<hub::Team> = stmt
.query_map([&user.id], |row| {
Ok(hub::Team {
id: row.get(0)?,
name: row.get(1)?,
owner_id: row.get(2)?,
created_at: row.get(3)?,
})
})
.map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
format!("Query error: {}", e),
)
})?
.filter_map(|r| r.ok())
.collect();
Ok(Json(teams))
}
async fn create_invite(
State(state): State<SharedState>,
headers: HeaderMap,
AxumPath(team_id): AxumPath<String>,
Json(req): Json<InviteRequest>,
) -> Result<(StatusCode, Json<InviteResponse>), (StatusCode, String)> {
let user = extract_user(&state, &headers)?;
let conn = state.lock().unwrap();
let role: String = conn
.query_row(
"SELECT role FROM team_members WHERE team_id = ?1 AND user_id = ?2",
rusqlite::params![team_id, user.id],
|row| row.get(0),
)
.map_err(|_| {
(
StatusCode::FORBIDDEN,
"You are not a member of this team".to_string(),
)
})?;
if role != "owner" && role != "admin" {
return Err((
StatusCode::FORBIDDEN,
"Only owners and admins can create invites".to_string(),
));
}
let now = crate::Relay::now();
let token = format!(
"inv_{}",
uuid::Uuid::new_v4().to_string().replace('-', "")
);
conn.execute(
"INSERT INTO invites (token, team_id, inviter_id, email, created_at) VALUES (?1, ?2, ?3, ?4, ?5)",
rusqlite::params![token, team_id, user.id, req.email, now],
)
.map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
format!("Failed to create invite: {}", e),
)
})?;
Ok((StatusCode::CREATED, Json(InviteResponse { token })))
}
async fn join_team(
State(state): State<SharedState>,
headers: HeaderMap,
Json(req): Json<JoinTeamRequest>,
) -> Result<Json<hub::Team>, (StatusCode, String)> {
let user = extract_user(&state, &headers)?;
let conn = state.lock().unwrap();
let (team_id, invite_email): (String, Option<String>) = conn
.query_row(
"SELECT team_id, email FROM invites WHERE token = ?1 AND used_at IS NULL",
[&req.token],
|row| Ok((row.get(0)?, row.get(1)?)),
)
.map_err(|_| {
(
StatusCode::NOT_FOUND,
"Invalid or already used invite token".to_string(),
)
})?;
if let Some(ref email) = invite_email {
if email != &user.email {
return Err((
StatusCode::FORBIDDEN,
format!("This invite is for {}", email),
));
}
}
let now = crate::Relay::now();
let already: bool = conn
.query_row(
"SELECT COUNT(*) FROM team_members WHERE team_id = ?1 AND user_id = ?2",
rusqlite::params![team_id, user.id],
|row| row.get::<_, i64>(0),
)
.map(|c| c > 0)
.unwrap_or(false);
if already {
return Err((
StatusCode::CONFLICT,
"You are already a member of this team".to_string(),
));
}
conn.execute(
"INSERT INTO team_members (team_id, user_id, role, joined_at) VALUES (?1, ?2, ?3, ?4)",
rusqlite::params![team_id, user.id, "member", now],
)
.map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
format!("Failed to join team: {}", e),
)
})?;
let _ = conn.execute(
"UPDATE invites SET used_at = ?1 WHERE token = ?2",
rusqlite::params![now, req.token],
);
let team = conn
.query_row(
"SELECT id, name, owner_id, created_at FROM teams WHERE id = ?1",
[&team_id],
|row| {
Ok(hub::Team {
id: row.get(0)?,
name: row.get(1)?,
owner_id: row.get(2)?,
created_at: row.get(3)?,
})
},
)
.map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
format!("Failed to fetch team: {}", e),
)
})?;
Ok(Json(team))
}
async fn list_members(
State(state): State<SharedState>,
headers: HeaderMap,
AxumPath(team_id): AxumPath<String>,
) -> Result<Json<Vec<hub::TeamMember>>, (StatusCode, String)> {
let user = extract_user(&state, &headers)?;
let conn = state.lock().unwrap();
conn.query_row(
"SELECT 1 FROM team_members WHERE team_id = ?1 AND user_id = ?2",
rusqlite::params![team_id, user.id],
|_| Ok(()),
)
.map_err(|_| {
(
StatusCode::FORBIDDEN,
"You are not a member of this team".to_string(),
)
})?;
let mut stmt = conn
.prepare(
"SELECT tm.user_id, tm.role, tm.joined_at, u.name, u.email
FROM team_members tm
JOIN users u ON u.id = tm.user_id
WHERE tm.team_id = ?1
ORDER BY tm.joined_at ASC",
)
.map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
format!("Query error: {}", e),
)
})?;
let members: Vec<hub::TeamMember> = stmt
.query_map([&team_id], |row| {
Ok(hub::TeamMember {
user_id: row.get(0)?,
role: row.get(1)?,
joined_at: row.get(2)?,
name: row.get(3)?,
email: row.get(4)?,
})
})
.map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
format!("Query error: {}", e),
)
})?
.filter_map(|r| r.ok())
.collect();
Ok(Json(members))
}
async fn kick_member(
State(state): State<SharedState>,
headers: HeaderMap,
AxumPath(team_id): AxumPath<String>,
Json(req): Json<KickRequest>,
) -> Result<StatusCode, (StatusCode, String)> {
let user = extract_user(&state, &headers)?;
let conn = state.lock().unwrap();
let role: String = conn
.query_row(
"SELECT role FROM team_members WHERE team_id = ?1 AND user_id = ?2",
rusqlite::params![team_id, user.id],
|row| row.get(0),
)
.map_err(|_| {
(
StatusCode::FORBIDDEN,
"You are not a member of this team".to_string(),
)
})?;
if role != "owner" && role != "admin" {
return Err((
StatusCode::FORBIDDEN,
"Only owners and admins can kick members".to_string(),
));
}
if req.user_id == user.id {
return Err((
StatusCode::BAD_REQUEST,
"Cannot kick yourself".to_string(),
));
}
let target_role: String = conn
.query_row(
"SELECT role FROM team_members WHERE team_id = ?1 AND user_id = ?2",
rusqlite::params![team_id, req.user_id],
|row| row.get(0),
)
.map_err(|_| {
(
StatusCode::NOT_FOUND,
"User is not a member of this team".to_string(),
)
})?;
if target_role == "owner" {
return Err((
StatusCode::FORBIDDEN,
"Cannot kick the team owner".to_string(),
));
}
conn.execute(
"DELETE FROM team_members WHERE team_id = ?1 AND user_id = ?2",
rusqlite::params![team_id, req.user_id],
)
.map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
format!("Failed to kick member: {}", e),
)
})?;
Ok(StatusCode::OK)
}
async fn create_channel(
State(state): State<SharedState>,
headers: HeaderMap,
AxumPath(team_id): AxumPath<String>,
Json(req): Json<CreateChannelRequest>,
) -> Result<(StatusCode, Json<hub::Channel>), (StatusCode, String)> {
let user = extract_user(&state, &headers)?;
let conn = state.lock().unwrap();
conn.query_row(
"SELECT 1 FROM team_members WHERE team_id = ?1 AND user_id = ?2",
rusqlite::params![team_id, user.id],
|_| Ok(()),
)
.map_err(|_| {
(
StatusCode::FORBIDDEN,
"You are not a member of this team".to_string(),
)
})?;
let now = crate::Relay::now();
let channel_id = uuid::Uuid::new_v4().to_string();
conn.execute(
"INSERT INTO channels (id, team_id, name, created_at) VALUES (?1, ?2, ?3, ?4)",
rusqlite::params![channel_id, team_id, req.name, now],
)
.map_err(|e| {
if e.to_string().contains("UNIQUE") {
(
StatusCode::CONFLICT,
format!("Channel '{}' already exists in this team", req.name),
)
} else {
(
StatusCode::INTERNAL_SERVER_ERROR,
format!("Database error: {}", e),
)
}
})?;
let channel = hub::Channel {
id: channel_id,
name: req.name,
team_id,
};
Ok((StatusCode::CREATED, Json(channel)))
}
async fn list_channels(
State(state): State<SharedState>,
headers: HeaderMap,
AxumPath(team_id): AxumPath<String>,
) -> Result<Json<Vec<hub::Channel>>, (StatusCode, String)> {
let user = extract_user(&state, &headers)?;
let conn = state.lock().unwrap();
conn.query_row(
"SELECT 1 FROM team_members WHERE team_id = ?1 AND user_id = ?2",
rusqlite::params![team_id, user.id],
|_| Ok(()),
)
.map_err(|_| {
(
StatusCode::FORBIDDEN,
"You are not a member of this team".to_string(),
)
})?;
let mut stmt = conn
.prepare(
"SELECT id, name, team_id FROM channels WHERE team_id = ?1 ORDER BY created_at ASC",
)
.map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
format!("Query error: {}", e),
)
})?;
let channels: Vec<hub::Channel> = stmt
.query_map([&team_id], |row| {
Ok(hub::Channel {
id: row.get(0)?,
name: row.get(1)?,
team_id: row.get(2)?,
})
})
.map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
format!("Query error: {}", e),
)
})?
.filter_map(|r| r.ok())
.collect();
Ok(Json(channels))
}