use std::sync::Arc;
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
use std::time::{Duration, Instant};
use ahash::AHashMap;
use tokio::sync::Mutex;
use tokio::sync::mpsc;
use super::cursor::Cursor;
use super::ids::{AgentId, RoomId};
use super::model::{
AgentCard, AgentKind, AgentRecord, MessageBody, MessageMeta, Room, RoomScope, SessionLineage,
Subscription, now_micros,
};
use super::protocol::{
CommsNotification, CommsOut, CommsRequest, CommsResponse, PROTO_VER, SeqMeta, StatusReport,
};
use super::scope::{self, ScopeChain};
use super::store::{self, CommsStore, CommsStoreError};
pub const DEFAULT_LIMIT: u32 = 100;
pub const MAX_LIMIT: u32 = 1000;
pub const IDLE_REAP_AFTER: Duration = Duration::from_secs(30 * 60);
pub const IDLE_REAP_CHECK_EVERY: Duration = Duration::from_secs(60);
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum LifecycleState {
Starting,
Active,
Idle,
Draining,
Stopped,
}
struct SubSink {
room: RoomId,
#[allow(dead_code)]
agent: AgentId,
tx: mpsc::Sender<CommsOut>,
}
struct Registry {
sinks: AHashMap<u64, SubSink>,
state: LifecycleState,
}
pub struct Broker {
store: Arc<CommsStore>,
registry: Mutex<Registry>,
subscriber_count: AtomicUsize,
link_count: AtomicUsize,
last_activity_ms: AtomicU64,
next_sub: AtomicU64,
started: Instant,
version: String,
}
impl Broker {
pub fn new(store: Arc<CommsStore>) -> Self {
Self {
store,
registry: Mutex::new(Registry {
sinks: AHashMap::new(),
state: LifecycleState::Starting,
}),
subscriber_count: AtomicUsize::new(0),
link_count: AtomicUsize::new(0),
last_activity_ms: AtomicU64::new(0),
next_sub: AtomicU64::new(1),
started: Instant::now(),
version: env!("CARGO_PKG_VERSION").to_string(),
}
}
pub async fn mark_active(&self) {
let mut reg = self.registry.lock().await;
if reg.state == LifecycleState::Starting || reg.state == LifecycleState::Idle {
reg.state = LifecycleState::Active;
}
}
pub fn subscriber_count(&self) -> usize {
self.subscriber_count.load(Ordering::Relaxed)
}
pub fn link_connected(&self) {
self.link_count.fetch_add(1, Ordering::Relaxed);
self.touch();
}
pub fn link_disconnected(&self) {
self.link_count.fetch_sub(1, Ordering::Relaxed);
self.touch();
}
pub fn touch(&self) {
self.last_activity_ms
.store(self.started.elapsed().as_millis() as u64, Ordering::Relaxed);
}
pub async fn is_idle_for(&self, idle_for: Duration) -> bool {
if self.link_count.load(Ordering::Relaxed) != 0 {
return false;
}
if matches!(
self.state().await,
LifecycleState::Draining | LifecycleState::Stopped
) {
return false;
}
let now_ms = self.started.elapsed().as_millis() as u64;
let last = self.last_activity_ms.load(Ordering::Relaxed);
now_ms.saturating_sub(last) >= idle_for.as_millis() as u64
}
pub async fn handle(
&self,
req: CommsRequest,
session: &mut Session,
link_tx: &mpsc::Sender<CommsOut>,
) -> CommsResponse {
self.touch();
match self.dispatch(req, session, link_tx).await {
Ok(resp) => resp,
Err(e) => CommsResponse::Error {
code: "store_error".to_string(),
message: e.to_string(),
},
}
}
async fn dispatch(
&self,
req: CommsRequest,
session: &mut Session,
link_tx: &mpsc::Sender<CommsOut>,
) -> Result<CommsResponse, CommsStoreError> {
match req {
CommsRequest::Hello {
agent,
proto_ver,
remote,
cwd,
session_id,
parent_agent,
} => {
self.on_hello(
agent,
proto_ver,
remote,
cwd,
session_id,
parent_agent,
session,
)
.await
}
CommsRequest::Register { card } => self.on_register(session, card),
CommsRequest::ListAgents { room } => self.on_list_agents(room),
CommsRequest::CreateRoom { room, scope, title } => {
self.on_create_room(room, scope, title)
}
CommsRequest::ListRooms { remote, cwd } => {
self.on_list_rooms(remote, cwd, session).await
}
CommsRequest::Join { room } => self.on_join(session, room),
CommsRequest::Leave { room } => self.on_leave(session, room),
CommsRequest::Post {
room,
subject,
tags,
reply_to,
scope,
body,
} => {
self.on_post(session, room, subject, tags, reply_to, scope, body)
.await
}
CommsRequest::History {
room,
cursor,
limit,
since_micros,
} => self.on_history(room, cursor, limit, since_micros),
CommsRequest::GetBody { message_id } => self.on_get_body(message_id),
CommsRequest::Inbox {
remote,
cwd,
cursor,
limit,
mark_read,
since_micros,
} => {
self.on_inbox(session, remote, cwd, cursor, limit, mark_read, since_micros)
.await
}
CommsRequest::AckInbox {
message_ids,
room,
to_seq,
} => self.on_ack(session, message_ids, room, to_seq),
CommsRequest::Subscribe { room } => self.on_subscribe(session, room, link_tx).await,
CommsRequest::Unsubscribe { sub } => self.on_unsubscribe(sub).await,
CommsRequest::ListSessions {} => self.on_list_sessions(),
CommsRequest::DeleteSession { session_id } => self.on_delete_session(&session_id),
CommsRequest::Ping => Ok(CommsResponse::Pong),
CommsRequest::Status => Ok(self.on_status().await),
CommsRequest::Stop => {
self.begin_drain().await;
Ok(CommsResponse::Ok)
}
}
}
#[allow(clippy::too_many_arguments)]
async fn on_hello(
&self,
agent: AgentId,
proto_ver: u32,
remote: Option<String>,
cwd: Option<std::path::PathBuf>,
session_id: Option<String>,
parent_agent: Option<String>,
session: &mut Session,
) -> Result<CommsResponse, CommsStoreError> {
if proto_ver != PROTO_VER {
return Ok(CommsResponse::Error {
code: "proto_skew".to_string(),
message: format!("daemon speaks proto {PROTO_VER}, client sent {proto_ver}"),
});
}
session.agent = Some(agent.clone());
let parent_agent = parent_agent.and_then(|p| AgentId::parse(p).ok());
session.session_id = session_id.clone();
session.parent_agent = parent_agent.clone();
let mut chain = build_chain(remote.clone(), cwd.clone());
chain.session_id = session_id;
chain.parent_agent = parent_agent;
session.chain = Some(chain);
let now = now_micros();
let record = match self.store.get_agent(&agent)? {
Some(mut existing) => {
existing.last_seen = now;
existing
}
None => AgentRecord {
agent_id: agent.clone(),
card: AgentCard::default(),
kind: AgentKind::Other,
first_seen: now,
last_seen: now,
},
};
self.store.put_agent(&record)?;
if let Some(ref chain) = session.chain {
let session_room = self.auto_join(&agent, chain)?;
if let Err(e) = self.record_session_lineage(&agent, chain, session_room) {
tracing::warn!(error = %e, "comms: failed to record session lineage");
}
}
Ok(CommsResponse::Welcome {
proto_ver: PROTO_VER,
daemon_version: self.version.clone(),
})
}
fn record_session_lineage(
&self,
agent: &AgentId,
chain: &ScopeChain,
session_room: Option<RoomId>,
) -> Result<(), CommsStoreError> {
let Some(sid) = chain.session_id.clone() else {
return Ok(());
};
let Some(room_id) = session_room else {
tracing::warn!(
session_id = %sid,
agent = %agent,
"comms: session id presented but no session room to anchor lineage; skipping"
);
return Ok(());
};
let created_at = match self.store.get_session(&sid)? {
Some(existing) => existing.created_at,
None => now_micros(),
};
self.store.put_session(&SessionLineage {
session_id: sid,
parent_agent: chain.parent_agent.clone(),
child_agent: agent.clone(),
room_id,
created_at,
})
}
fn on_list_sessions(&self) -> Result<CommsResponse, CommsStoreError> {
Ok(CommsResponse::Sessions {
sessions: self.store.list_sessions()?,
})
}
fn on_delete_session(&self, session_id: &str) -> Result<CommsResponse, CommsStoreError> {
self.store.delete_session(session_id)?;
Ok(CommsResponse::Ok)
}
fn on_register(
&self,
session: &Session,
card: AgentCard,
) -> Result<CommsResponse, CommsStoreError> {
let Some(agent) = session.agent.clone() else {
return Ok(need_hello());
};
let now = now_micros();
let record = match self.store.get_agent(&agent)? {
Some(mut existing) => {
existing.card = card;
existing.last_seen = now;
existing
}
None => AgentRecord {
agent_id: agent,
card,
kind: AgentKind::Other,
first_seen: now,
last_seen: now,
},
};
self.store.put_agent(&record)?;
Ok(CommsResponse::Ok)
}
fn on_list_agents(&self, room: Option<RoomId>) -> Result<CommsResponse, CommsStoreError> {
let agents = match room {
None => self.store.list_agents()?,
Some(room) => {
let subs = self.store.subscribers(&room)?;
let mut out = Vec::new();
for id in subs {
if let Some(rec) = self.store.get_agent(&id)? {
out.push(rec);
}
}
out
}
};
Ok(CommsResponse::Agents(agents))
}
fn on_create_room(
&self,
room: RoomId,
scope: RoomScope,
title: Option<String>,
) -> Result<CommsResponse, CommsStoreError> {
let last_activity = self
.store
.get_room(&room)?
.map(|existing| existing.last_activity)
.unwrap_or(0);
let record = Room {
room_id: room.clone(),
scope,
title: title.unwrap_or_else(|| room.as_str().to_string()),
created_at: now_micros(),
last_activity,
};
self.store.put_room(&record)?;
Ok(CommsResponse::Room(record))
}
async fn on_list_rooms(
&self,
remote: Option<String>,
cwd: Option<std::path::PathBuf>,
session: &mut Session,
) -> Result<CommsResponse, CommsStoreError> {
let chain = build_chain(remote, cwd);
if let Some(agent) = session.agent.clone() {
self.auto_join(&agent, &chain)?;
}
let matching: Vec<Room> = self
.store
.list_rooms()?
.into_iter()
.filter(|r| scope::room_matches(&r.scope, &chain))
.collect();
Ok(CommsResponse::Rooms(matching))
}
fn on_join(&self, session: &Session, room: RoomId) -> Result<CommsResponse, CommsStoreError> {
let Some(agent) = session.agent.clone() else {
return Ok(need_hello());
};
self.store.subscribe(&Subscription {
agent_id: agent,
room,
created_at: now_micros(),
})?;
Ok(CommsResponse::Ok)
}
fn on_leave(&self, session: &Session, room: RoomId) -> Result<CommsResponse, CommsStoreError> {
let Some(agent) = session.agent.clone() else {
return Ok(need_hello());
};
self.store.unsubscribe(&room, &agent)?;
Ok(CommsResponse::Ok)
}
#[allow(clippy::too_many_arguments)]
async fn on_post(
&self,
session: &Session,
room: RoomId,
subject: String,
tags: Vec<String>,
reply_to: Option<String>,
scope: Vec<String>,
body: Vec<u8>,
) -> Result<CommsResponse, CommsStoreError> {
let Some(agent) = session.agent.clone() else {
return Ok(need_hello());
};
let id = mint_message_id(&room, &agent);
let meta = store::build_meta(
id,
room.clone(),
agent,
subject,
tags,
reply_to,
scope,
&body,
);
let (_, stored) = self.store.post(&room, meta, MessageBody(body))?;
if let Some(mut record) = self.store.get_room(&room)? {
record.last_activity = stored.ts_micros;
self.store.put_room(&record)?;
}
self.fan_out(&room, &stored).await;
Ok(CommsResponse::Posted {
message_id: stored.id,
})
}
fn on_history(
&self,
room: RoomId,
cursor: Option<Cursor>,
limit: Option<u32>,
since_micros: Option<i64>,
) -> Result<CommsResponse, CommsStoreError> {
let after = decode_after(cursor.as_ref(), room.as_str());
let limit = clamp_limit(limit);
let page = self.store.history(&room, after, limit)?;
let next = page
.more
.then(|| Cursor::encode(room.as_str(), page.last_seq));
let messages = page
.messages
.into_iter()
.filter(|(_, meta)| keep_since(meta.ts_micros, since_micros))
.map(|(seq, meta)| SeqMeta { seq, meta })
.collect();
Ok(CommsResponse::History {
messages,
next_cursor: next,
})
}
fn on_get_body(&self, message_id: String) -> Result<CommsResponse, CommsStoreError> {
let body = self.store.get_body(&message_id)?;
Ok(CommsResponse::Body { body })
}
#[allow(clippy::too_many_arguments)]
async fn on_inbox(
&self,
session: &mut Session,
remote: Option<String>,
cwd: Option<std::path::PathBuf>,
cursor: Option<Cursor>,
limit: Option<u32>,
mark_read: bool,
since_micros: Option<i64>,
) -> Result<CommsResponse, CommsStoreError> {
let Some(agent) = session.agent.clone() else {
return Ok(need_hello());
};
let chain = build_chain(remote, cwd);
self.auto_join(&agent, &chain)?;
let limit = clamp_limit(limit);
let resume = cursor.as_ref().and_then(|c| c.decode().ok());
let mut rooms = self.store.rooms_for_agent(&agent)?;
rooms.sort_by(|a, b| a.as_str().cmp(b.as_str()));
let mut collected: Vec<SeqMeta> = Vec::new();
let mut delivered_high: Vec<(RoomId, u64)> = Vec::new();
let mut unread_remaining: u32 = 0;
let mut next_cursor: Option<Cursor> = None;
for room in &rooms {
let read_seq = self.store.read_cursor(&agent, room)?;
let after = match &resume {
Some(pos) if pos.room == room.as_str() => pos.seq.max(read_seq),
_ => read_seq,
};
let remaining = limit.saturating_sub(collected.len());
let want = remaining.saturating_add(1).max(1);
let rows = self.store.history_with_seq(room, after, want)?;
for (seq, meta) in rows {
if meta.from == agent {
upsert_high(&mut delivered_high, room, seq);
continue;
}
if !keep_since(meta.ts_micros, since_micros) {
upsert_high(&mut delivered_high, room, seq);
continue;
}
if collected.len() < limit {
collected.push(SeqMeta { seq, meta });
upsert_high(&mut delivered_high, room, seq);
} else {
unread_remaining = unread_remaining.saturating_add(1);
if next_cursor.is_none() {
let resume_seq = highest_for(&delivered_high, room).unwrap_or(after);
next_cursor = Some(Cursor::encode(room.as_str(), resume_seq));
}
}
}
}
if mark_read {
for (room, seq) in &delivered_high {
self.store.set_read_cursor(&agent, room, *seq)?;
}
}
Ok(CommsResponse::Inbox {
messages: collected,
unread: unread_remaining,
next_cursor,
})
}
fn on_ack(
&self,
session: &Session,
message_ids: Vec<String>,
room: Option<RoomId>,
to_seq: Option<u64>,
) -> Result<CommsResponse, CommsStoreError> {
let Some(agent) = session.agent.clone() else {
return Ok(need_hello());
};
let bulk = matches!((&room, to_seq), (Some(_), Some(_)));
if message_ids.is_empty() && !bulk {
return Ok(CommsResponse::Error {
code: "empty_ack".to_string(),
message: "ack requires message_ids or a (room, to_seq) pair".to_string(),
});
}
let mut targets: Vec<(RoomId, u64)> = Vec::new();
let mut acked: u32 = 0;
if !message_ids.is_empty() {
for (_, room, seq) in self.store.resolve_ids(&message_ids)? {
acked = acked.saturating_add(1);
upsert_high(&mut targets, &room, seq);
}
}
if let (Some(room), Some(seq)) = (room, to_seq) {
upsert_high(&mut targets, &room, seq);
}
let mut cursors_advanced: Vec<(String, u64)> = Vec::new();
for (room, seq) in &targets {
let before = self.store.read_cursor(&agent, room)?;
self.store.set_read_cursor(&agent, room, *seq)?;
let after = self.store.read_cursor(&agent, room)?;
if after > before {
cursors_advanced.push((room.as_str().to_string(), after));
}
}
Ok(CommsResponse::Acked {
acked,
cursors_advanced,
})
}
async fn on_subscribe(
&self,
session: &Session,
room: RoomId,
link_tx: &mpsc::Sender<CommsOut>,
) -> Result<CommsResponse, CommsStoreError> {
let Some(agent) = session.agent.clone() else {
return Ok(need_hello());
};
self.store.subscribe(&Subscription {
agent_id: agent.clone(),
room: room.clone(),
created_at: now_micros(),
})?;
let sub = self.next_sub.fetch_add(1, Ordering::Relaxed);
{
let mut reg = self.registry.lock().await;
reg.sinks.insert(
sub,
SubSink {
room,
agent,
tx: link_tx.clone(),
},
);
reg.state = LifecycleState::Active;
}
self.subscriber_count.fetch_add(1, Ordering::Relaxed);
Ok(CommsResponse::Subscribed { sub })
}
async fn on_unsubscribe(&self, sub: u64) -> Result<CommsResponse, CommsStoreError> {
let removed = {
let mut reg = self.registry.lock().await;
reg.sinks.remove(&sub)
};
if removed.is_some() {
self.subscriber_count.fetch_sub(1, Ordering::Relaxed);
self.maybe_idle().await;
}
Ok(CommsResponse::Ok)
}
async fn on_status(&self) -> CommsResponse {
let rooms = self.store.list_rooms().map(|r| r.len()).unwrap_or(0);
CommsResponse::Status(StatusReport {
pid: std::process::id(),
version: self.version.clone(),
proto_ver: PROTO_VER,
uptime_secs: self.started.elapsed().as_secs(),
rooms: u32::try_from(rooms).unwrap_or(u32::MAX),
subscribers: u32::try_from(self.subscriber_count()).unwrap_or(u32::MAX),
})
}
fn auto_join(
&self,
agent: &AgentId,
chain: &ScopeChain,
) -> Result<Option<RoomId>, CommsStoreError> {
let default = default_room_for(chain);
if self.store.get_room(&default.room_id)?.is_none() {
tracing::info!(
room = %default.room_id,
"comms: auto-creating default room for scope"
);
self.store.put_room(&default)?;
}
let mut session_room = None;
for room in self.store.list_rooms()? {
if scope::room_matches(&room.scope, chain) {
if matches!(&room.scope, RoomScope::Session(_)) {
session_room = Some(room.room_id.clone());
}
let already = self
.store
.subscribers(&room.room_id)?
.iter()
.any(|a| a == agent);
if !already {
tracing::info!(
agent = %agent,
room = %room.room_id,
"comms: auto-joining agent to scope-matching room"
);
self.store.subscribe(&Subscription {
agent_id: agent.clone(),
room: room.room_id.clone(),
created_at: now_micros(),
})?;
}
}
}
Ok(session_room)
}
async fn fan_out(&self, room: &RoomId, meta: &MessageMeta) {
let mut dead: Vec<u64> = Vec::new();
{
let reg = self.registry.lock().await;
for (sub, sink) in reg.sinks.iter() {
if &sink.room == room {
let note = CommsOut::Notification(CommsNotification::Message(meta.clone()));
if sink.tx.try_send(note).is_err() {
dead.push(*sub);
}
}
}
}
if !dead.is_empty() {
let mut reg = self.registry.lock().await;
for sub in dead {
if reg.sinks.remove(&sub).is_some() {
self.subscriber_count.fetch_sub(1, Ordering::Relaxed);
}
}
}
}
async fn maybe_idle(&self) {
if self.subscriber_count() == 0 {
let mut reg = self.registry.lock().await;
if reg.state == LifecycleState::Active {
reg.state = LifecycleState::Idle;
tracing::debug!("comms: broker idle (no subscribers); socket + flock retained");
}
}
}
pub async fn begin_drain(&self) {
let sinks: Vec<mpsc::Sender<CommsOut>> = {
let mut reg = self.registry.lock().await;
reg.state = LifecycleState::Draining;
reg.sinks.values().map(|s| s.tx.clone()).collect()
};
for tx in sinks {
let _ = tx
.send(CommsOut::Notification(CommsNotification::Shutdown))
.await;
}
}
pub async fn state(&self) -> LifecycleState {
self.registry.lock().await.state
}
}
#[derive(Default)]
pub struct Session {
pub agent: Option<AgentId>,
pub chain: Option<ScopeChain>,
pub session_id: Option<String>,
pub parent_agent: Option<AgentId>,
}
fn need_hello() -> CommsResponse {
CommsResponse::Error {
code: "no_hello".to_string(),
message: "send Hello before any other request".to_string(),
}
}
fn clamp_limit(limit: Option<u32>) -> usize {
usize::try_from(limit.unwrap_or(DEFAULT_LIMIT).clamp(1, MAX_LIMIT))
.unwrap_or(DEFAULT_LIMIT as usize)
}
fn decode_after(cursor: Option<&Cursor>, room: &str) -> u64 {
match cursor.and_then(|c| c.decode().ok()) {
Some(pos) if pos.room == room || pos.room.is_empty() => pos.seq,
_ => 0,
}
}
fn keep_since(ts_micros: i64, since_micros: Option<i64>) -> bool {
match since_micros {
Some(cut) => ts_micros >= cut,
None => true,
}
}
fn upsert_high(acc: &mut Vec<(RoomId, u64)>, room: &RoomId, seq: u64) {
if let Some(entry) = acc.iter_mut().find(|(r, _)| r == room) {
if seq > entry.1 {
entry.1 = seq;
}
} else {
acc.push((room.clone(), seq));
}
}
fn highest_for(acc: &[(RoomId, u64)], room: &RoomId) -> Option<u64> {
acc.iter().find(|(r, _)| r == room).map(|(_, s)| *s)
}
#[path = "daemon_rooms.rs"]
mod rooms;
pub(crate) use rooms::repo_room_for;
#[cfg(test)]
use rooms::sanitize_id;
use rooms::{build_chain, default_room_for, mint_message_id};
#[cfg(test)]
#[path = "daemon_tests.rs"]
mod tests;