use std::sync::Arc;
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
use std::time::Instant;
use ahash::AHashMap;
use tokio::sync::Mutex;
use tokio::sync::mpsc;
use super::cursor::Cursor;
use super::ids::{AgentId, RoomId};
use super::model::{
AgentCard, AgentKind, AgentRecord, MessageBody, MessageMeta, Room, RoomScope, Subscription,
now_micros,
};
use super::protocol::{
CommsNotification, CommsOut, CommsRequest, CommsResponse, PROTO_VER, SeqMeta, StatusReport,
};
use super::scope::{self, ScopeChain};
use super::store::{self, CommsStore, CommsStoreError};
pub const DEFAULT_LIMIT: u32 = 100;
pub const MAX_LIMIT: u32 = 1000;
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum LifecycleState {
Starting,
Active,
Idle,
Draining,
Stopped,
}
struct SubSink {
room: RoomId,
#[allow(dead_code)]
agent: AgentId,
tx: mpsc::Sender<CommsOut>,
}
struct Registry {
sinks: AHashMap<u64, SubSink>,
state: LifecycleState,
}
pub struct Broker {
store: Arc<CommsStore>,
registry: Mutex<Registry>,
subscriber_count: AtomicUsize,
next_sub: AtomicU64,
started: Instant,
version: String,
}
impl Broker {
pub fn new(store: Arc<CommsStore>) -> Self {
Self {
store,
registry: Mutex::new(Registry {
sinks: AHashMap::new(),
state: LifecycleState::Starting,
}),
subscriber_count: AtomicUsize::new(0),
next_sub: AtomicU64::new(1),
started: Instant::now(),
version: env!("CARGO_PKG_VERSION").to_string(),
}
}
pub async fn mark_active(&self) {
let mut reg = self.registry.lock().await;
if reg.state == LifecycleState::Starting || reg.state == LifecycleState::Idle {
reg.state = LifecycleState::Active;
}
}
pub fn subscriber_count(&self) -> usize {
self.subscriber_count.load(Ordering::Relaxed)
}
pub async fn handle(
&self,
req: CommsRequest,
session: &mut Session,
link_tx: &mpsc::Sender<CommsOut>,
) -> CommsResponse {
match self.dispatch(req, session, link_tx).await {
Ok(resp) => resp,
Err(e) => CommsResponse::Error {
code: "store_error".to_string(),
message: e.to_string(),
},
}
}
async fn dispatch(
&self,
req: CommsRequest,
session: &mut Session,
link_tx: &mpsc::Sender<CommsOut>,
) -> Result<CommsResponse, CommsStoreError> {
match req {
CommsRequest::Hello {
agent,
proto_ver,
remote,
cwd,
} => self.on_hello(agent, proto_ver, remote, cwd, session).await,
CommsRequest::Register { card } => self.on_register(session, card),
CommsRequest::ListAgents { room } => self.on_list_agents(room),
CommsRequest::CreateRoom { room, scope, title } => {
self.on_create_room(room, scope, title)
}
CommsRequest::ListRooms { remote, cwd } => {
self.on_list_rooms(remote, cwd, session).await
}
CommsRequest::Join { room } => self.on_join(session, room),
CommsRequest::Leave { room } => self.on_leave(session, room),
CommsRequest::Post {
room,
subject,
tags,
reply_to,
scope,
body,
} => {
self.on_post(session, room, subject, tags, reply_to, scope, body)
.await
}
CommsRequest::History {
room,
cursor,
limit,
} => self.on_history(room, cursor, limit),
CommsRequest::GetBody { message_id } => self.on_get_body(message_id),
CommsRequest::Inbox {
remote,
cwd,
cursor,
limit,
mark_read,
} => {
self.on_inbox(session, remote, cwd, cursor, limit, mark_read)
.await
}
CommsRequest::AckInbox {
message_ids,
room,
to_seq,
} => self.on_ack(session, message_ids, room, to_seq),
CommsRequest::Subscribe { room } => self.on_subscribe(session, room, link_tx).await,
CommsRequest::Unsubscribe { sub } => self.on_unsubscribe(sub).await,
CommsRequest::Ping => Ok(CommsResponse::Pong),
CommsRequest::Status => Ok(self.on_status().await),
CommsRequest::Stop => {
self.begin_drain().await;
Ok(CommsResponse::Ok)
}
}
}
async fn on_hello(
&self,
agent: AgentId,
proto_ver: u32,
remote: Option<String>,
cwd: Option<std::path::PathBuf>,
session: &mut Session,
) -> Result<CommsResponse, CommsStoreError> {
if proto_ver != PROTO_VER {
return Ok(CommsResponse::Error {
code: "proto_skew".to_string(),
message: format!("daemon speaks proto {PROTO_VER}, client sent {proto_ver}"),
});
}
session.agent = Some(agent.clone());
session.chain = Some(build_chain(remote.clone(), cwd.clone()));
let now = now_micros();
let record = match self.store.get_agent(&agent)? {
Some(mut existing) => {
existing.last_seen = now;
existing
}
None => AgentRecord {
agent_id: agent.clone(),
card: AgentCard::default(),
kind: AgentKind::Other,
first_seen: now,
last_seen: now,
},
};
self.store.put_agent(&record)?;
if let Some(chain) = session.chain.clone() {
self.auto_join(&agent, &chain)?;
}
Ok(CommsResponse::Welcome {
proto_ver: PROTO_VER,
daemon_version: self.version.clone(),
})
}
fn on_register(
&self,
session: &Session,
card: AgentCard,
) -> Result<CommsResponse, CommsStoreError> {
let Some(agent) = session.agent.clone() else {
return Ok(need_hello());
};
let now = now_micros();
let record = match self.store.get_agent(&agent)? {
Some(mut existing) => {
existing.card = card;
existing.last_seen = now;
existing
}
None => AgentRecord {
agent_id: agent,
card,
kind: AgentKind::Other,
first_seen: now,
last_seen: now,
},
};
self.store.put_agent(&record)?;
Ok(CommsResponse::Ok)
}
fn on_list_agents(&self, room: Option<RoomId>) -> Result<CommsResponse, CommsStoreError> {
let agents = match room {
None => self.store.list_agents()?,
Some(room) => {
let subs = self.store.subscribers(&room)?;
let mut out = Vec::new();
for id in subs {
if let Some(rec) = self.store.get_agent(&id)? {
out.push(rec);
}
}
out
}
};
Ok(CommsResponse::Agents(agents))
}
fn on_create_room(
&self,
room: RoomId,
scope: RoomScope,
title: Option<String>,
) -> Result<CommsResponse, CommsStoreError> {
let record = Room {
room_id: room.clone(),
scope,
title: title.unwrap_or_else(|| room.as_str().to_string()),
created_at: now_micros(),
};
self.store.put_room(&record)?;
Ok(CommsResponse::Room(record))
}
async fn on_list_rooms(
&self,
remote: Option<String>,
cwd: Option<std::path::PathBuf>,
session: &mut Session,
) -> Result<CommsResponse, CommsStoreError> {
let chain = build_chain(remote, cwd);
if let Some(agent) = session.agent.clone() {
self.auto_join(&agent, &chain)?;
}
let matching: Vec<Room> = self
.store
.list_rooms()?
.into_iter()
.filter(|r| scope::room_matches(&r.scope, &chain))
.collect();
Ok(CommsResponse::Rooms(matching))
}
fn on_join(&self, session: &Session, room: RoomId) -> Result<CommsResponse, CommsStoreError> {
let Some(agent) = session.agent.clone() else {
return Ok(need_hello());
};
self.store.subscribe(&Subscription {
agent_id: agent,
room,
created_at: now_micros(),
})?;
Ok(CommsResponse::Ok)
}
fn on_leave(&self, session: &Session, room: RoomId) -> Result<CommsResponse, CommsStoreError> {
let Some(agent) = session.agent.clone() else {
return Ok(need_hello());
};
self.store.unsubscribe(&room, &agent)?;
Ok(CommsResponse::Ok)
}
#[allow(clippy::too_many_arguments)]
async fn on_post(
&self,
session: &Session,
room: RoomId,
subject: String,
tags: Vec<String>,
reply_to: Option<String>,
scope: Vec<String>,
body: Vec<u8>,
) -> Result<CommsResponse, CommsStoreError> {
let Some(agent) = session.agent.clone() else {
return Ok(need_hello());
};
let id = mint_message_id(&room, &agent);
let meta = store::build_meta(
id,
room.clone(),
agent,
subject,
tags,
reply_to,
scope,
&body,
);
let (_, stored) = self.store.post(&room, meta, MessageBody(body))?;
self.fan_out(&room, &stored).await;
Ok(CommsResponse::Posted {
message_id: stored.id,
})
}
fn on_history(
&self,
room: RoomId,
cursor: Option<Cursor>,
limit: Option<u32>,
) -> Result<CommsResponse, CommsStoreError> {
let after = decode_after(cursor.as_ref(), room.as_str());
let limit = clamp_limit(limit);
let page = self.store.history(&room, after, limit)?;
let next = page
.more
.then(|| Cursor::encode(room.as_str(), page.last_seq));
let messages = page
.messages
.into_iter()
.map(|(seq, meta)| SeqMeta { seq, meta })
.collect();
Ok(CommsResponse::History {
messages,
next_cursor: next,
})
}
fn on_get_body(&self, message_id: String) -> Result<CommsResponse, CommsStoreError> {
let body = self.store.get_body(&message_id)?;
Ok(CommsResponse::Body { body })
}
async fn on_inbox(
&self,
session: &mut Session,
remote: Option<String>,
cwd: Option<std::path::PathBuf>,
cursor: Option<Cursor>,
limit: Option<u32>,
mark_read: bool,
) -> Result<CommsResponse, CommsStoreError> {
let Some(agent) = session.agent.clone() else {
return Ok(need_hello());
};
let chain = build_chain(remote, cwd);
self.auto_join(&agent, &chain)?;
let limit = clamp_limit(limit);
let resume = cursor.as_ref().and_then(|c| c.decode().ok());
let mut rooms = self.store.rooms_for_agent(&agent)?;
rooms.sort_by(|a, b| a.as_str().cmp(b.as_str()));
let mut collected: Vec<SeqMeta> = Vec::new();
let mut delivered_high: Vec<(RoomId, u64)> = Vec::new();
let mut unread_remaining: u32 = 0;
let mut next_cursor: Option<Cursor> = None;
for room in &rooms {
let read_seq = self.store.read_cursor(&agent, room)?;
let after = match &resume {
Some(pos) if pos.room == room.as_str() => pos.seq.max(read_seq),
_ => read_seq,
};
let remaining = limit.saturating_sub(collected.len());
let want = remaining.saturating_add(1).max(1);
let rows = self.store.history_with_seq(room, after, want)?;
for (seq, meta) in rows {
if meta.from == agent {
upsert_high(&mut delivered_high, room, seq);
continue;
}
if collected.len() < limit {
collected.push(SeqMeta { seq, meta });
upsert_high(&mut delivered_high, room, seq);
} else {
unread_remaining = unread_remaining.saturating_add(1);
if next_cursor.is_none() {
let resume_seq = highest_for(&delivered_high, room).unwrap_or(after);
next_cursor = Some(Cursor::encode(room.as_str(), resume_seq));
}
}
}
}
if mark_read {
for (room, seq) in &delivered_high {
self.store.set_read_cursor(&agent, room, *seq)?;
}
}
Ok(CommsResponse::Inbox {
messages: collected,
unread: unread_remaining,
next_cursor,
})
}
fn on_ack(
&self,
session: &Session,
message_ids: Vec<String>,
room: Option<RoomId>,
to_seq: Option<u64>,
) -> Result<CommsResponse, CommsStoreError> {
let Some(agent) = session.agent.clone() else {
return Ok(need_hello());
};
let bulk = matches!((&room, to_seq), (Some(_), Some(_)));
if message_ids.is_empty() && !bulk {
return Ok(CommsResponse::Error {
code: "empty_ack".to_string(),
message: "ack requires message_ids or a (room, to_seq) pair".to_string(),
});
}
let mut targets: Vec<(RoomId, u64)> = Vec::new();
let mut acked: u32 = 0;
if !message_ids.is_empty() {
for (_, room, seq) in self.store.resolve_ids(&message_ids)? {
acked = acked.saturating_add(1);
upsert_high(&mut targets, &room, seq);
}
}
if let (Some(room), Some(seq)) = (room, to_seq) {
upsert_high(&mut targets, &room, seq);
}
let mut cursors_advanced: Vec<(String, u64)> = Vec::new();
for (room, seq) in &targets {
let before = self.store.read_cursor(&agent, room)?;
self.store.set_read_cursor(&agent, room, *seq)?;
let after = self.store.read_cursor(&agent, room)?;
if after > before {
cursors_advanced.push((room.as_str().to_string(), after));
}
}
Ok(CommsResponse::Acked {
acked,
cursors_advanced,
})
}
async fn on_subscribe(
&self,
session: &Session,
room: RoomId,
link_tx: &mpsc::Sender<CommsOut>,
) -> Result<CommsResponse, CommsStoreError> {
let Some(agent) = session.agent.clone() else {
return Ok(need_hello());
};
self.store.subscribe(&Subscription {
agent_id: agent.clone(),
room: room.clone(),
created_at: now_micros(),
})?;
let sub = self.next_sub.fetch_add(1, Ordering::Relaxed);
{
let mut reg = self.registry.lock().await;
reg.sinks.insert(
sub,
SubSink {
room,
agent,
tx: link_tx.clone(),
},
);
reg.state = LifecycleState::Active;
}
self.subscriber_count.fetch_add(1, Ordering::Relaxed);
Ok(CommsResponse::Subscribed { sub })
}
async fn on_unsubscribe(&self, sub: u64) -> Result<CommsResponse, CommsStoreError> {
let removed = {
let mut reg = self.registry.lock().await;
reg.sinks.remove(&sub)
};
if removed.is_some() {
self.subscriber_count.fetch_sub(1, Ordering::Relaxed);
self.maybe_idle().await;
}
Ok(CommsResponse::Ok)
}
async fn on_status(&self) -> CommsResponse {
let rooms = self.store.list_rooms().map(|r| r.len()).unwrap_or(0);
CommsResponse::Status(StatusReport {
pid: std::process::id(),
version: self.version.clone(),
proto_ver: PROTO_VER,
uptime_secs: self.started.elapsed().as_secs(),
rooms: u32::try_from(rooms).unwrap_or(u32::MAX),
subscribers: u32::try_from(self.subscriber_count()).unwrap_or(u32::MAX),
})
}
fn auto_join(&self, agent: &AgentId, chain: &ScopeChain) -> Result<(), CommsStoreError> {
let default = default_room_for(chain);
if self.store.get_room(&default.room_id)?.is_none() {
tracing::info!(
room = %default.room_id,
"comms: auto-creating default room for scope"
);
self.store.put_room(&default)?;
}
for room in self.store.list_rooms()? {
if scope::room_matches(&room.scope, chain) {
let already = self
.store
.subscribers(&room.room_id)?
.iter()
.any(|a| a == agent);
if !already {
tracing::info!(
agent = %agent,
room = %room.room_id,
"comms: auto-joining agent to scope-matching room"
);
self.store.subscribe(&Subscription {
agent_id: agent.clone(),
room: room.room_id.clone(),
created_at: now_micros(),
})?;
}
}
}
Ok(())
}
async fn fan_out(&self, room: &RoomId, meta: &MessageMeta) {
let mut dead: Vec<u64> = Vec::new();
{
let reg = self.registry.lock().await;
for (sub, sink) in reg.sinks.iter() {
if &sink.room == room {
let note = CommsOut::Notification(CommsNotification::Message(meta.clone()));
if sink.tx.try_send(note).is_err() {
dead.push(*sub);
}
}
}
}
if !dead.is_empty() {
let mut reg = self.registry.lock().await;
for sub in dead {
if reg.sinks.remove(&sub).is_some() {
self.subscriber_count.fetch_sub(1, Ordering::Relaxed);
}
}
}
}
async fn maybe_idle(&self) {
if self.subscriber_count() == 0 {
let mut reg = self.registry.lock().await;
if reg.state == LifecycleState::Active {
reg.state = LifecycleState::Idle;
tracing::debug!("comms: broker idle (no subscribers); socket + flock retained");
}
}
}
pub async fn begin_drain(&self) {
let sinks: Vec<mpsc::Sender<CommsOut>> = {
let mut reg = self.registry.lock().await;
reg.state = LifecycleState::Draining;
reg.sinks.values().map(|s| s.tx.clone()).collect()
};
for tx in sinks {
let _ = tx
.send(CommsOut::Notification(CommsNotification::Shutdown))
.await;
}
}
pub async fn state(&self) -> LifecycleState {
self.registry.lock().await.state
}
}
#[derive(Default)]
pub struct Session {
pub agent: Option<AgentId>,
pub chain: Option<ScopeChain>,
}
fn need_hello() -> CommsResponse {
CommsResponse::Error {
code: "no_hello".to_string(),
message: "send Hello before any other request".to_string(),
}
}
fn clamp_limit(limit: Option<u32>) -> usize {
usize::try_from(limit.unwrap_or(DEFAULT_LIMIT).clamp(1, MAX_LIMIT))
.unwrap_or(DEFAULT_LIMIT as usize)
}
fn decode_after(cursor: Option<&Cursor>, room: &str) -> u64 {
match cursor.and_then(|c| c.decode().ok()) {
Some(pos) if pos.room == room || pos.room.is_empty() => pos.seq,
_ => 0,
}
}
fn upsert_high(acc: &mut Vec<(RoomId, u64)>, room: &RoomId, seq: u64) {
if let Some(entry) = acc.iter_mut().find(|(r, _)| r == room) {
if seq > entry.1 {
entry.1 = seq;
}
} else {
acc.push((room.clone(), seq));
}
}
fn highest_for(acc: &[(RoomId, u64)], room: &RoomId) -> Option<u64> {
acc.iter().find(|(r, _)| r == room).map(|(_, s)| *s)
}
fn build_chain(remote: Option<String>, cwd: Option<std::path::PathBuf>) -> ScopeChain {
match cwd {
Some(cwd) => {
let repo = crate::git::Repo::discover(&cwd).ok();
let mut chain = scope::scope_chain(&cwd, repo.as_ref());
if chain.remote.is_none() {
chain.remote = remote;
}
chain
}
None => ScopeChain {
remote,
cwd: std::path::PathBuf::new(),
ancestors: Vec::new(),
},
}
}
fn default_room_for(chain: &ScopeChain) -> Room {
let (room_id, scope, title) = match (&chain.remote, chain.cwd.as_os_str().is_empty()) {
(Some(remote), _) => (
RoomId::parse(sanitize_id(remote)).unwrap_or_else(|_| fallback_room()),
RoomScope::Remote(remote.clone()),
format!("workspace: {remote}"),
),
(None, false) => {
let path = chain.cwd.clone();
(
RoomId::parse(sanitize_id(&path.to_string_lossy()))
.unwrap_or_else(|_| fallback_room()),
RoomScope::PathPrefix(path.clone()),
format!("workspace: {}", path.display()),
)
}
(None, true) => (fallback_room(), RoomScope::Global, "global".to_string()),
};
Room {
room_id,
scope,
title,
created_at: now_micros(),
}
}
fn fallback_room() -> RoomId {
RoomId::parse("global").expect("`global` is a valid room id")
}
fn sanitize_id(s: &str) -> String {
let mut out: String = s
.chars()
.map(|c| {
if c.is_ascii_alphanumeric() || matches!(c, '.' | '_' | ':' | '-') {
c
} else {
'-'
}
})
.collect();
if out.len() > super::ids::MAX_ID_LEN {
out.truncate(super::ids::MAX_ID_LEN);
}
if out.is_empty() {
out.push('x');
}
out
}
fn mint_message_id(room: &RoomId, agent: &AgentId) -> String {
static COUNTER: AtomicU64 = AtomicU64::new(0);
let n = COUNTER.fetch_add(1, Ordering::Relaxed);
format!(
"{}:{}:{}:{}",
room.as_str(),
agent.as_str(),
now_micros(),
n
)
}
#[cfg(test)]
#[path = "daemon_tests.rs"]
mod tests;