#![cfg(all(feature = "comms", any(unix, windows)))]
use std::sync::Arc;
use rmcp::ErrorData as McpError;
use rmcp::model::CallToolResult;
use tokio::sync::Mutex;
use super::ServerState;
use super::helpers::json_result;
use super::types_comms::{
AgentListParams, AgentListResponse, AgentRegisterParams, AgentRegisterResponse, AgentSummary,
CursorAdvance, DmSendParams, DmSendResponse, GetOrCreateRoomForPathParams,
GetOrCreateRoomForPathResponse, InboxAckParams, InboxAckResponse, InboxReadParams,
InboxReadResponse, MessageFrontMatter, MessageGetParams, MessageGetResponse, RoomCreateParams,
RoomCreateResponse, RoomHistoryParams, RoomHistoryResponse, RoomJoinParams, RoomLeaveParams,
RoomListParams, RoomListResponse, RoomMembershipResponse, RoomPostParams, RoomPostResponse,
RoomSummary,
};
use crate::comms::client::{CommsClient, SessionContext, scope_context_for};
use crate::comms::cursor::Cursor;
use crate::comms::ids::{AgentId, RoomId};
use crate::comms::model::{RoomScope, now_micros};
const DEFAULT_LIMIT: u32 = 100;
const DEFAULT_SINCE_HOURS: u32 = 24;
const MICROS_PER_HOUR: i64 = 3_600_000_000;
fn since_cutoff(since_hours: Option<u32>) -> Option<i64> {
let hours = since_hours.unwrap_or(DEFAULT_SINCE_HOURS);
if hours == 0 {
None
} else {
Some(now_micros() - i64::from(hours) * MICROS_PER_HOUR)
}
}
pub(super) fn comms_err(error: impl std::fmt::Display) -> McpError {
McpError::internal_error(format!("comms: {error}"), None)
}
pub(super) async fn resolve_comms_client(
state: &ServerState,
as_agent: Option<String>,
) -> Result<Arc<Mutex<CommsClient>>, McpError> {
let target = match as_agent {
Some(raw) => AgentId::parse(raw.clone())
.map_err(|e| comms_err(format!("invalid as_agent {raw:?}: {e}")))?,
None => AgentId::parse(state.agent_id.clone())
.map_err(|e| comms_err(format!("invalid agent id {:?}: {e}", state.agent_id)))?,
};
let mut map = state.comms_clients.lock().await;
if let Some(handle) = map.get(&target) {
return Ok(handle.clone());
}
let (remote, cwd) = scope_context_for(&state.root);
let is_self = target.as_str() == state.agent_id;
let client = if is_self {
CommsClient::ensure_and_connect(target.clone(), remote, cwd)
.await
.map_err(comms_err)?
} else {
let session = SessionContext {
session_id: Some(state.orchestration_session.clone()),
parent_agent: Some(state.agent_id.clone()),
};
CommsClient::ensure_and_connect_with_session(target.clone(), remote, cwd, session)
.await
.map_err(comms_err)?
};
let handle = Arc::new(Mutex::new(client));
map.insert(target, handle.clone());
Ok(handle)
}
fn clamp_limit(limit: Option<u32>) -> u32 {
limit
.unwrap_or(DEFAULT_LIMIT)
.clamp(1, crate::comms::daemon::MAX_LIMIT)
}
pub(super) async fn run_agent_register(
state: &ServerState,
params: AgentRegisterParams,
) -> Result<CallToolResult, McpError> {
let card = crate::comms::model::AgentCard {
name: params.name,
description: params.description,
version: params.version,
skills: params.skills,
};
let handle = resolve_comms_client(state, params.as_agent).await?;
let mut client = handle.lock().await;
let agent_id = client.agent().as_str().to_string();
client.register_agent(card).await.map_err(comms_err)?;
json_result(&AgentRegisterResponse {
agent_id,
registered: true,
})
}
pub(super) async fn run_agent_list(
state: &ServerState,
params: AgentListParams,
) -> Result<CallToolResult, McpError> {
let handle = resolve_comms_client(state, params.as_agent).await?;
let mut client = handle.lock().await;
let records = client.list_agents(params.room).await.map_err(comms_err)?;
let agents: Vec<AgentSummary> = records
.iter()
.map(|r| AgentSummary {
agent_id: r.agent_id.as_str().to_string(),
name: r.card.name.clone(),
description: r.card.description.clone(),
version: r.card.version.clone(),
skills: r.card.skills.clone(),
first_seen: r.first_seen,
last_seen: r.last_seen,
})
.collect();
json_result(&AgentListResponse {
total: agents.len(),
agents,
})
}
pub(super) async fn run_room_create(
state: &ServerState,
params: RoomCreateParams,
) -> Result<CallToolResult, McpError> {
let scope = params.scope.into();
let handle = resolve_comms_client(state, params.as_agent).await?;
let mut client = handle.lock().await;
let room = client
.create_room(params.room, scope, params.title)
.await
.map_err(comms_err)?;
json_result(&RoomCreateResponse {
room: RoomSummary::from_room(&room, now_micros()),
})
}
pub(super) async fn run_room_list(
state: &ServerState,
_params: RoomListParams,
) -> Result<CallToolResult, McpError> {
let (remote, cwd) = scope_context_for(&state.root);
let handle = resolve_comms_client(state, None).await?;
let mut client = handle.lock().await;
let rooms = client.list_rooms(remote, cwd).await.map_err(comms_err)?;
let now = now_micros();
let summaries: Vec<RoomSummary> = rooms
.iter()
.map(|room| RoomSummary::from_room(room, now))
.collect();
json_result(&RoomListResponse {
total: summaries.len(),
rooms: summaries,
})
}
pub(super) async fn run_room_join(
state: &ServerState,
params: RoomJoinParams,
) -> Result<CallToolResult, McpError> {
let room_label = params.room.as_str().to_string();
let handle = resolve_comms_client(state, params.as_agent).await?;
let mut client = handle.lock().await;
client.join_room(params.room).await.map_err(comms_err)?;
json_result(&RoomMembershipResponse {
room: room_label,
joined: true,
left: false,
})
}
pub(super) async fn run_room_leave(
state: &ServerState,
params: RoomLeaveParams,
) -> Result<CallToolResult, McpError> {
let room_label = params.room.as_str().to_string();
let handle = resolve_comms_client(state, params.as_agent).await?;
let mut client = handle.lock().await;
client.leave_room(params.room).await.map_err(comms_err)?;
json_result(&RoomMembershipResponse {
room: room_label,
joined: false,
left: true,
})
}
pub(super) async fn run_room_post(
state: &ServerState,
params: RoomPostParams,
) -> Result<CallToolResult, McpError> {
let body = params.body.unwrap_or_default().into_bytes();
let tags = params.tags.unwrap_or_default();
let scope = params.scope.unwrap_or_default();
let handle = resolve_comms_client(state, params.as_agent).await?;
let mut client = handle.lock().await;
let message_id = client
.post_message(
params.room,
params.subject,
body,
tags,
params.reply_to,
scope,
)
.await
.map_err(comms_err)?;
json_result(&RoomPostResponse { message_id })
}
pub(super) async fn run_room_history(
state: &ServerState,
params: RoomHistoryParams,
) -> Result<CallToolResult, McpError> {
let limit = clamp_limit(params.limit);
let cursor = params.cursor.map(Cursor);
let since = since_cutoff(params.since_hours);
let handle = resolve_comms_client(state, params.as_agent).await?;
let mut client = handle.lock().await;
let (metas, next_cursor) = client
.read_history(params.room, cursor, limit, since)
.await
.map_err(comms_err)?;
let now = now_micros();
let messages: Vec<MessageFrontMatter> = metas
.iter()
.map(|sm| MessageFrontMatter::from_seq_meta(sm, now))
.collect();
json_result(&RoomHistoryResponse {
total: messages.len(),
messages,
next_cursor,
})
}
pub(super) async fn run_message_get(
state: &ServerState,
params: MessageGetParams,
) -> Result<CallToolResult, McpError> {
let message_id = params.message_id.clone();
let handle = resolve_comms_client(state, params.as_agent).await?;
let mut client = handle.lock().await;
let body = client
.get_body(params.message_id)
.await
.map_err(comms_err)?;
let found = body.is_some();
let body = body.map(|b| String::from_utf8_lossy(&b).into_owned());
json_result(&MessageGetResponse {
message_id,
found,
body,
})
}
pub(super) async fn run_inbox_read(
state: &ServerState,
params: InboxReadParams,
) -> Result<CallToolResult, McpError> {
let limit = clamp_limit(params.limit);
let cursor = params.cursor.map(Cursor);
let since = since_cutoff(params.since_hours);
let (remote, cwd) = scope_context_for(&state.root);
let handle = resolve_comms_client(state, params.as_agent).await?;
let mut client = handle.lock().await;
let (metas, unread, next_cursor) = client
.read_inbox(remote, cwd, cursor, limit, params.mark_read, since)
.await
.map_err(comms_err)?;
let now = now_micros();
let messages: Vec<MessageFrontMatter> = metas
.iter()
.map(|sm| MessageFrontMatter::from_seq_meta(sm, now))
.collect();
json_result(&InboxReadResponse {
total: messages.len(),
unread,
messages,
next_cursor,
})
}
pub(super) async fn run_inbox_ack(
state: &ServerState,
params: InboxAckParams,
) -> Result<CallToolResult, McpError> {
let has_bulk = params.room.is_some() && params.to_seq.is_some();
if params.message_ids.is_empty() && !has_bulk {
return Err(comms_err(
"inbox_ack requires message_ids or a (room, to_seq) pair",
));
}
let handle = resolve_comms_client(state, params.as_agent).await?;
let mut client = handle.lock().await;
let (acked, cursors) = client
.ack_inbox(params.message_ids, params.room, params.to_seq)
.await
.map_err(comms_err)?;
let cursors_advanced: Vec<CursorAdvance> = cursors
.into_iter()
.map(|(room, seq)| CursorAdvance { room, seq })
.collect();
json_result(&InboxAckResponse {
acked: acked as usize,
cursors_advanced,
})
}
pub(super) async fn run_get_or_create_chat_room_for_path(
state: &ServerState,
params: GetOrCreateRoomForPathParams,
) -> Result<CallToolResult, McpError> {
let base = crate::git::Repo::discover(std::path::Path::new(¶ms.path))
.ok()
.map(|r| r.workdir().to_path_buf())
.unwrap_or_else(|| std::path::PathBuf::from(¶ms.path));
let (remote, cwd) = scope_context_for(&base);
let room = crate::comms::daemon::repo_room_for(remote.clone(), cwd.clone());
let scope_label = match &room.scope {
RoomScope::Remote(_) => "remote",
RoomScope::PathPrefix(_) => "path",
RoomScope::Session(_) => "session",
RoomScope::Global => "global",
};
let handle = resolve_comms_client(state, params.as_agent).await?;
let mut client = handle.lock().await;
let existed = client
.list_rooms(remote, cwd)
.await
.map_err(comms_err)?
.iter()
.any(|r| r.room_id == room.room_id);
client
.create_room(
room.room_id.clone(),
room.scope.clone(),
Some(room.title.clone()),
)
.await
.map_err(comms_err)?;
client
.join_room(room.room_id.clone())
.await
.map_err(comms_err)?;
json_result(&GetOrCreateRoomForPathResponse {
room: room.room_id.as_str().to_string(),
scope: scope_label.to_string(),
title: room.title,
created: !existed,
})
}
pub(super) async fn run_dm_send(
state: &ServerState,
params: DmSendParams,
) -> Result<CallToolResult, McpError> {
let from_agent = match ¶ms.as_agent {
Some(raw) => AgentId::parse(raw.clone())
.map_err(|e| comms_err(format!("invalid as_agent {raw:?}: {e}")))?,
None => AgentId::parse(state.agent_id.clone())
.map_err(|e| comms_err(format!("invalid agent id {:?}: {e}", state.agent_id)))?,
};
let to_agent = AgentId::parse(params.to_agent.clone())
.map_err(|e| comms_err(format!("invalid to_agent {:?}: {e}", params.to_agent)))?;
if from_agent == to_agent {
return Err(comms_err("cannot dm yourself"));
}
let (lo, hi) = if from_agent.as_str() <= to_agent.as_str() {
(from_agent.as_str(), to_agent.as_str())
} else {
(to_agent.as_str(), from_agent.as_str())
};
let room = RoomId::parse(format!("dm:{lo}:{hi}"))
.map_err(|e| comms_err(format!("derive dm room id: {e}")))?;
let dm_scope = RoomScope::Session(format!("dm:{lo}:{hi}"));
let sender = resolve_comms_client(state, params.as_agent.clone()).await?;
{
let mut client = sender.lock().await;
client
.create_room(room.clone(), dm_scope, Some(format!("dm {lo} <-> {hi}")))
.await
.map_err(comms_err)?;
client.join_room(room.clone()).await.map_err(comms_err)?;
}
{
let recipient = resolve_comms_client(state, Some(to_agent.as_str().to_string())).await?;
let mut client = recipient.lock().await;
client.join_room(room.clone()).await.map_err(comms_err)?;
}
let body = params.body.unwrap_or_default().into_bytes();
let message_id = {
let mut client = sender.lock().await;
client
.post_message(
room.clone(),
params.subject,
body,
Vec::new(),
params.reply_to,
Vec::new(),
)
.await
.map_err(comms_err)?
};
json_result(&DmSendResponse {
message_id,
room: room.into_string(),
})
}