mod client;
mod mcp;
mod policy;
mod sink;
use std::{
collections::{HashMap, HashSet, VecDeque},
sync::{Arc, Mutex},
time::Duration,
};
use serde_json::{Value, json};
use tokio::sync::{Notify, mpsc};
use crate::{
base::{PermissionLevel, Res, SessionPath, Visibility, Void},
identity::{Config, Identity, PermissionOverride, Scope, ServerRegistration},
protocol::{AdminOp, Payload, ProtocolError, ProtocolMessage},
};
use mcp::{FromMcp, McpSink, Tool};
use policy::Delivery;
use sink::{Injection, NotificationSink};
const KEEPALIVE_INTERVAL: Duration = Duration::from_secs(20);
const RAPID_DROP_DIAGNOSIS_THRESHOLD: u32 = 3;
pub struct BridgeSetup {
pub identity: Identity,
pub config: Config,
pub session: String,
pub servers: Vec<String>,
}
pub async fn run(setup: BridgeSetup) -> Void {
let registrations = resolve_registrations(&setup.config, &setup.servers)?;
let (from_mcp_tx, from_mcp_rx) = mpsc::unbounded_channel();
let (to_mcp_tx, to_mcp_rx) = mpsc::unbounded_channel();
tokio::spawn(mcp::read_loop(tokio::io::stdin(), from_mcp_tx));
tokio::spawn(mcp::write_loop(tokio::io::stdout(), to_mcp_rx));
let (inbound_tx, inbound_rx) = mpsc::unbounded_channel();
let shutdown = Arc::new(Notify::new());
let identity = Arc::new(setup.identity);
let sink = Box::new(McpSink::new(to_mcp_tx.clone()));
let mut core = BridgeCore::new(setup.config.clone(), setup.session.clone(), to_mcp_tx, sink);
let claims = client::ServerClaims::default();
for registration in registrations {
let joined = Arc::new(Mutex::new(HashSet::new()));
let (out_tx, out_rx) = mpsc::unbounded_channel();
core.register_server(registration.clone(), out_tx.clone(), Arc::clone(&joined));
let identity = Arc::clone(&identity);
let url = registration.url.clone();
let session = setup.session.clone();
let claims = Arc::clone(&claims);
let connect = move || {
let identity = Arc::clone(&identity);
let url = url.clone();
let session = session.clone();
let claims = Arc::clone(&claims);
async move { client::connect_ws(&url, &identity, &session, &claims).await }
};
tokio::spawn(client::run_link(registration.url.clone(), connect, inbound_tx.clone(), out_rx, Arc::clone(&shutdown)));
spawn_keepalive(out_tx, Arc::clone(&shutdown));
}
core.run(from_mcp_rx, inbound_rx, shutdown).await
}
fn resolve_registrations(config: &Config, requested: &[String]) -> Res<Vec<ServerRegistration>> {
let selected: Vec<ServerRegistration> = if requested.is_empty() {
config.servers.clone()
} else {
config.servers.iter().filter(|r| requested.iter().any(|u| u == &r.url)).cloned().collect()
};
anyhow::ensure!(!selected.is_empty(), "no known server to connect to (register one first, or pass --server)");
Ok(selected)
}
fn spawn_keepalive(to_server: mpsc::UnboundedSender<ProtocolMessage>, shutdown: Arc<Notify>) {
tokio::spawn(async move {
let mut ticker = tokio::time::interval(KEEPALIVE_INTERVAL);
loop {
tokio::select! {
() = shutdown.notified() => break,
_ = ticker.tick() => {
if to_server.send(ProtocolMessage::Ping).is_err() {
break;
}
}
}
}
});
}
struct ServerHandle {
registration: ServerRegistration,
to_server: mpsc::UnboundedSender<ProtocolMessage>,
joined: Arc<Mutex<HashSet<String>>>,
}
enum Pending {
Tool { id: Value, ok: Option<String> },
Resubscribe,
}
struct BridgeCore {
config: Config,
session: String,
to_mcp: mpsc::UnboundedSender<Value>,
sink: Box<dyn NotificationSink>,
servers: HashMap<String, ServerHandle>,
pending: HashMap<String, VecDeque<Pending>>,
link_down_notified: HashSet<String>,
admin_servers: HashSet<String>,
link_up_at: HashMap<String, tokio::time::Instant>,
rapid_drops: HashMap<String, u32>,
}
impl BridgeCore {
fn new(config: Config, session: String, to_mcp: mpsc::UnboundedSender<Value>, sink: Box<dyn NotificationSink>) -> Self {
Self {
config,
session,
to_mcp,
sink,
servers: HashMap::new(),
pending: HashMap::new(),
link_down_notified: HashSet::new(),
admin_servers: HashSet::new(),
link_up_at: HashMap::new(),
rapid_drops: HashMap::new(),
}
}
fn register_server(&mut self, registration: ServerRegistration, to_server: mpsc::UnboundedSender<ProtocolMessage>, joined: Arc<Mutex<HashSet<String>>>) {
self.servers.insert(registration.url.clone(), ServerHandle { registration, to_server, joined });
}
async fn run(mut self, mut from_mcp: mpsc::UnboundedReceiver<FromMcp>, mut inbound: mpsc::UnboundedReceiver<(String, client::LinkEvent)>, shutdown: Arc<Notify>) -> Void {
loop {
tokio::select! {
() = shutdown.notified() => break,
_ = tokio::signal::ctrl_c() => break,
event = from_mcp.recv() => match event {
Some(event) => self.handle_mcp(event),
None => break,
},
event = inbound.recv() => match event {
Some((server, event)) => self.handle_link_event(&server, event),
None => break,
},
}
}
shutdown.notify_waiters();
Ok(())
}
fn handle_link_event(&mut self, server: &str, event: client::LinkEvent) {
let before = self.tool_signature();
match event {
client::LinkEvent::Up => self.link_up(server),
client::LinkEvent::Down => self.link_down(server),
client::LinkEvent::Duplicate { canonical } => self.link_duplicate(server, &canonical),
client::LinkEvent::Frame(frame) => self.handle_inbound(server, frame),
}
self.notify_tools_changed(before);
}
fn tool_signature(&self) -> (bool, bool) {
(self.any_emit_allowed(), self.admin_servers.is_empty())
}
fn notify_tools_changed(&mut self, before: (bool, bool)) {
if self.tool_signature() != before {
self.send_mcp(mcp::tools_list_changed());
}
}
fn handle_mcp(&mut self, event: FromMcp) {
let before = self.tool_signature();
self.dispatch_mcp(event);
self.notify_tools_changed(before);
}
fn dispatch_mcp(&mut self, event: FromMcp) {
match event {
FromMcp::Initialize { id, protocol_version } => self.send_mcp(mcp::initialize_result(&id, &protocol_version)),
FromMcp::ListTools { id } => {
let tools = self.tools();
self.send_mcp(mcp::tools_list_result(&id, &tools));
}
FromMcp::Ping { id } => self.send_mcp(mcp::ping_result(&id)),
FromMcp::CallTool { id, name, args } => self.dispatch_tool(&id, &name, &args),
FromMcp::PermissionRequest { request_id, tool_name, description, .. } => self.relay_permission(&request_id, &tool_name, &description),
FromMcp::Initialized => {}
FromMcp::UnknownRequest { id } => self.send_mcp(mcp::method_not_found(&id)),
}
}
fn dispatch_tool(&mut self, id: &Value, name: &str, args: &Value) {
match name {
"join_channel" => self.tool_join(id, args),
"leave_channel" => self.tool_leave(id, args),
"send_channel" => self.tool_send(id, args),
"whisper" => self.tool_whisper(id, args),
"list_channels" => self.tool_list(id, args),
"who" => self.tool_who(id, args),
"submit_permission" => self.tool_submit_permission(id, args),
"set_perm" => self.tool_set_perm(id, args),
"create_channel" => self.tool_create_channel(id, args),
"delete_channel" => self.tool_delete_channel(id, args),
"set_visibility" => self.tool_set_visibility(id, args),
"acl_add" => self.tool_acl(id, args, true),
"acl_remove" => self.tool_acl(id, args, false),
"invite_create" => self.tool_invite_create(id, args),
"invite_revoke" => self.tool_invite_revoke(id, args),
"kick" => self.tool_kick(id, args),
"ban" => self.tool_ban(id, args),
other => self.send_mcp(mcp::tool_error_result(id, &format!("unknown tool `{other}`"))),
}
}
fn defer(&mut self, id: &Value, server: &str, ok: Option<String>) {
self.pending.entry(server.to_owned()).or_default().push_back(Pending::Tool { id: id.clone(), ok });
}
fn defer_admin(&mut self, id: &Value, server: &str, op: AdminOp) {
if !self.admin_servers.contains(server) {
return self.send_mcp(mcp::tool_error_result(id, &format!("not an admin on `{server}`")));
}
self.defer(id, server, None);
self.send_to_server(server, ProtocolMessage::Admin(op));
}
fn tool_create_channel(&mut self, id: &Value, args: &Value) {
let server = match self.resolve_server(id, args) {
Ok(server) => server,
Err(error) => return self.send_mcp(error),
};
let Some(name) = arg_str(args, "name") else {
return self.send_mcp(mcp::tool_error_result(id, "`name` is required"));
};
let visibility = arg_str(args, "visibility").and_then(|v| v.parse().ok()).unwrap_or(Visibility::Public);
self.defer_admin(id, &server, AdminOp::CreateChannel { name: name.to_owned(), visibility });
}
fn tool_delete_channel(&mut self, id: &Value, args: &Value) {
let server = match self.resolve_server(id, args) {
Ok(server) => server,
Err(error) => return self.send_mcp(error),
};
let Some(name) = arg_str(args, "name") else {
return self.send_mcp(mcp::tool_error_result(id, "`name` is required"));
};
self.defer_admin(id, &server, AdminOp::DeleteChannel { name: name.to_owned() });
}
fn tool_set_visibility(&mut self, id: &Value, args: &Value) {
let server = match self.resolve_server(id, args) {
Ok(server) => server,
Err(error) => return self.send_mcp(error),
};
let (Some(name), Some(visibility)) = (arg_str(args, "name"), arg_str(args, "visibility").and_then(|v| v.parse::<Visibility>().ok())) else {
return self.send_mcp(mcp::tool_error_result(id, "`name` and a valid `visibility` are required"));
};
self.defer_admin(id, &server, AdminOp::SetVisibility { name: name.to_owned(), visibility });
}
fn tool_acl(&mut self, id: &Value, args: &Value, add: bool) {
let server = match self.resolve_server(id, args) {
Ok(server) => server,
Err(error) => return self.send_mcp(error),
};
let (Some(channel), Some(user)) = (arg_str(args, "channel"), arg_str(args, "user")) else {
return self.send_mcp(mcp::tool_error_result(id, "`channel` and `user` are required"));
};
let op = if add {
AdminOp::AclAdd {
channel: channel.to_owned(),
user: user.to_owned(),
}
} else {
AdminOp::AclRemove {
channel: channel.to_owned(),
user: user.to_owned(),
}
};
self.defer_admin(id, &server, op);
}
fn tool_invite_create(&mut self, id: &Value, args: &Value) {
let server = match self.resolve_server(id, args) {
Ok(server) => server,
Err(error) => return self.send_mcp(error),
};
let Some(channel) = arg_str(args, "channel") else {
return self.send_mcp(mcp::tool_error_result(id, "`channel` is required"));
};
let uses = args.get("uses").and_then(Value::as_u64).and_then(|u| u32::try_from(u).ok());
let expires_in_secs = args.get("expires_in_secs").and_then(Value::as_u64);
self.defer_admin(
id,
&server,
AdminOp::InviteCreate {
channel: channel.to_owned(),
uses,
expires_in_secs,
},
);
}
fn tool_invite_revoke(&mut self, id: &Value, args: &Value) {
let server = match self.resolve_server(id, args) {
Ok(server) => server,
Err(error) => return self.send_mcp(error),
};
let Some(token) = arg_str(args, "token") else {
return self.send_mcp(mcp::tool_error_result(id, "`token` is required"));
};
self.defer_admin(id, &server, AdminOp::InviteRevoke { token: token.to_owned() });
}
fn tool_kick(&mut self, id: &Value, args: &Value) {
let server = match self.resolve_server(id, args) {
Ok(server) => server,
Err(error) => return self.send_mcp(error),
};
let (Some(channel), Some(target)) = (arg_str(args, "channel"), arg_str(args, "target")) else {
return self.send_mcp(mcp::tool_error_result(id, "`channel` and `target` are required"));
};
self.defer_admin(
id,
&server,
AdminOp::Kick {
channel: channel.to_owned(),
target: target.to_owned(),
},
);
}
fn tool_ban(&mut self, id: &Value, args: &Value) {
let server = match self.resolve_server(id, args) {
Ok(server) => server,
Err(error) => return self.send_mcp(error),
};
let (Some(channel), Some(user)) = (arg_str(args, "channel"), arg_str(args, "user")) else {
return self.send_mcp(mcp::tool_error_result(id, "`channel` and `user` are required"));
};
self.defer_admin(
id,
&server,
AdminOp::Ban {
channel: channel.to_owned(),
user: user.to_owned(),
},
);
}
fn tool_join(&mut self, id: &Value, args: &Value) {
let server = match self.resolve_server(id, args) {
Ok(server) => server,
Err(error) => return self.send_mcp(error),
};
let Some(channel) = arg_str(args, "channel") else {
return self.send_mcp(mcp::tool_error_result(id, "`channel` is required"));
};
let token = arg_str(args, "token").map(str::to_owned);
if let Some(perm) = arg_str(args, "perm") {
match perm.parse::<PermissionLevel>() {
Ok(level) => self.set_scope_override(&server, Some(channel.to_owned()), level),
Err(err) => return self.send_mcp(mcp::tool_error_result(id, &err.to_string())),
}
}
self.defer(id, &server, None);
self.send_to_server(&server, ProtocolMessage::Join { channel: channel.to_owned(), token });
}
fn tool_leave(&mut self, id: &Value, args: &Value) {
let server = match self.resolve_server(id, args) {
Ok(server) => server,
Err(error) => return self.send_mcp(error),
};
let Some(channel) = arg_str(args, "channel") else {
return self.send_mcp(mcp::tool_error_result(id, "`channel` is required"));
};
if let Some(handle) = self.servers.get(&server) {
handle.joined.lock().expect("joined mutex poisoned").remove(channel);
}
self.defer(id, &server, Some(format!("left {channel}")));
self.send_to_server(&server, ProtocolMessage::Leave { channel: channel.to_owned() });
}
fn tool_send(&mut self, id: &Value, args: &Value) {
let server = match self.resolve_server(id, args) {
Ok(server) => server,
Err(error) => return self.send_mcp(error),
};
let (Some(channel), Some(text)) = (arg_str(args, "channel"), arg_str(args, "text")) else {
return self.send_mcp(mcp::tool_error_result(id, "`channel` and `text` are required"));
};
if !policy::emit_allowed(&self.config, &server, &Scope::Channel(channel.to_owned())) {
return self.send_mcp(mcp::tool_error_result(id, &format!("permission denied: `{channel}` on `{server}` is below `converse`")));
}
let from = self.our_path(&server);
self.defer(id, &server, Some(format!("sent to {channel}")));
self.send_to_server(
&server,
ProtocolMessage::ChannelMsg {
channel: channel.to_owned(),
from,
payload: Payload::Plain(text.to_owned()),
},
);
}
fn tool_whisper(&mut self, id: &Value, args: &Value) {
let server = match self.resolve_server(id, args) {
Ok(server) => server,
Err(error) => return self.send_mcp(error),
};
let (Some(target), Some(text)) = (arg_str(args, "target"), arg_str(args, "text")) else {
return self.send_mcp(mcp::tool_error_result(id, "`target` and `text` are required"));
};
let Ok(target) = target.parse::<SessionPath>() else {
return self.send_mcp(mcp::tool_error_result(id, "`target` must be a `user/machine/session` path"));
};
if !policy::emit_allowed(&self.config, &server, &Scope::Whisper) {
return self.send_mcp(mcp::tool_error_result(id, &format!("permission denied: whispers on `{server}` are below `converse`")));
}
let from = self.our_path(&server);
self.defer(id, &server, Some("whisper sent".to_owned()));
self.send_to_server(
&server,
ProtocolMessage::Whisper {
from,
target,
payload: Payload::Plain(text.to_owned()),
},
);
}
fn tool_list(&mut self, id: &Value, args: &Value) {
let server = match self.resolve_server(id, args) {
Ok(server) => server,
Err(error) => return self.send_mcp(error),
};
self.defer(id, &server, None);
self.send_to_server(&server, ProtocolMessage::ListChannels);
}
fn tool_who(&mut self, id: &Value, args: &Value) {
let server = match self.resolve_server(id, args) {
Ok(server) => server,
Err(error) => return self.send_mcp(error),
};
let channel = arg_str(args, "channel").map(str::to_owned);
self.defer(id, &server, None);
self.send_to_server(&server, ProtocolMessage::Who { channel });
}
fn tool_submit_permission(&mut self, id: &Value, args: &Value) {
let Some(request_id) = arg_str(args, "request_id") else {
return self.send_mcp(mcp::tool_error_result(id, "`request_id` is required"));
};
let behavior = if arg_str(args, "decision") == Some("allow") { "allow" } else { "deny" };
self.send_mcp(mcp::permission_verdict(request_id, behavior));
self.send_mcp(mcp::tool_text_result(id, &format!("permission verdict `{behavior}` sent")));
}
fn tool_set_perm(&mut self, id: &Value, args: &Value) {
let Some(level) = arg_str(args, "level").and_then(|level| level.parse::<PermissionLevel>().ok()) else {
return self.send_mcp(mcp::tool_error_result(id, "`level` must be mute, notify, converse, or act"));
};
let whisper = args.get("whisper").and_then(Value::as_bool).unwrap_or(false);
let channel = arg_str(args, "channel");
if channel.is_some() || whisper {
let server = match self.resolve_server(id, args) {
Ok(server) => server,
Err(error) => return self.send_mcp(error),
};
let scope = if whisper { None } else { channel.map(str::to_owned) };
self.set_scope_override(&server, scope, level);
} else {
self.config.default_permission = level;
}
self.send_mcp(mcp::tool_text_result(id, "permission updated"));
}
fn relay_permission(&self, request_id: &str, tool_name: &str, description: &str) {
let mut meta = std::collections::BTreeMap::new();
meta.insert("kind".to_owned(), "permission_request".to_owned());
meta.insert("request_id".to_owned(), request_id.to_owned());
let content = format!(
"Claude Code requests approval to run `{tool_name}`: {description}\nAnswer with the submit_permission tool: {{\"request_id\": \"{request_id}\", \"decision\": \"allow\"|\"deny\"}}."
);
self.send_mcp(mcp::channel_notification(&content, &meta));
}
fn handle_inbound(&mut self, server: &str, frame: ProtocolMessage) {
match frame {
ProtocolMessage::ChannelMsg { channel, from, payload } => self.inject(server, Some(channel), from, payload),
ProtocolMessage::Whisper { from, payload, .. } => self.inject(server, None, from, payload),
ProtocolMessage::Joined { channel } => {
if let Some(handle) = self.servers.get(server) {
handle.joined.lock().expect("joined mutex poisoned").insert(channel.clone());
}
self.resolve_pending(server, &format!("joined {channel}"));
}
ProtocolMessage::ChannelList { channels } => self.resolve_pending(server, &format_channels(&channels)),
ProtocolMessage::Presence { channel, sessions } => self.resolve_pending(server, &format_presence(channel.as_deref(), &sessions)),
ProtocolMessage::Ack { detail } => self.resolve_pending(server, detail.as_deref().unwrap_or("ok")),
ProtocolMessage::InviteToken { token } => self.resolve_pending(server, &format!("invite token: {token}")),
ProtocolMessage::Error(error) => self.resolve_error(server, &error),
ProtocolMessage::ServerInfo { admin } => {
if admin {
self.admin_servers.insert(server.to_owned());
} else {
self.admin_servers.remove(server);
}
}
_ => {}
}
}
fn inject(&self, server: &str, channel: Option<String>, from: SessionPath, payload: Payload) {
let body = match payload {
Payload::Plain(text) => text,
Payload::Encrypted(_) => "<end-to-end-encrypted payload — not supported in v1>".to_owned(),
};
let scope = channel.as_ref().map_or(Scope::Whisper, |c| Scope::Channel(c.clone()));
match policy::inbound_delivery(&self.config, server, &scope) {
Delivery::Drop => {}
Delivery::Inject(level) => self.sink.deliver(&Injection {
server: server.to_owned(),
channel,
from,
level,
body,
}),
}
}
fn resolve_pending(&mut self, server: &str, text: &str) {
match self.pending.get_mut(server).and_then(VecDeque::pop_front) {
Some(Pending::Tool { id, ok }) => self.send_mcp(mcp::tool_text_result(&id, ok.as_deref().unwrap_or(text))),
Some(Pending::Resubscribe) | None => {}
}
}
fn resolve_error(&mut self, server: &str, error: &ProtocolError) {
match self.pending.get_mut(server).and_then(VecDeque::pop_front) {
Some(Pending::Tool { id, .. }) => self.send_mcp(mcp::tool_error_result(&id, &error.to_string())),
Some(Pending::Resubscribe) => {}
None => self.notify(server, "error", &format!("Server `{server}` error: {error}")),
}
}
fn notify(&self, server: &str, kind: &str, text: &str) {
let mut meta = std::collections::BTreeMap::new();
meta.insert("server".to_owned(), server.to_owned());
meta.insert("kind".to_owned(), kind.to_owned());
self.send_mcp(mcp::channel_notification(text, &meta));
}
fn link_up(&mut self, server: &str) {
self.link_up_at.insert(server.to_owned(), tokio::time::Instant::now());
if self.link_down_notified.remove(server) {
self.notify(server, "link", &format!("Reconnected to `{server}`."));
}
let Some(handle) = self.servers.get(server) else { return };
let channels: Vec<String> = handle.joined.lock().expect("joined mutex poisoned").iter().cloned().collect();
for channel in channels {
self.pending.entry(server.to_owned()).or_default().push_back(Pending::Resubscribe);
self.send_to_server(server, ProtocolMessage::Join { channel, token: None });
}
}
fn link_down(&mut self, server: &str) {
if let Some(queue) = self.pending.remove(server) {
for entry in queue {
if let Pending::Tool { id, .. } = entry {
self.send_mcp(mcp::tool_error_result(&id, &format!("connection to `{server}` lost; retry")));
}
}
}
let stable = self.link_up_at.remove(server).is_none_or(|up| up.elapsed() >= client::STABLE_UPTIME);
if stable {
self.rapid_drops.remove(server);
} else {
let drops = {
let count = self.rapid_drops.entry(server.to_owned()).or_insert(0);
*count += 1;
*count
};
if drops == RAPID_DROP_DIAGNOSIS_THRESHOLD {
self.notify(
server,
"link",
&format!(
"The link to `{server}` keeps dropping right after connecting — if another live session is using the handle `{session}`, the two supersede each other; start one with a distinct `--as`. Going quiet until the link stabilizes.",
session = self.session
),
);
return;
}
if drops > RAPID_DROP_DIAGNOSIS_THRESHOLD {
return;
}
}
if self.link_down_notified.insert(server.to_owned()) {
self.notify(server, "link", &format!("Disconnected from `{server}` — reconnecting."));
}
}
fn link_duplicate(&mut self, server: &str, canonical: &str) {
if let Some(queue) = self.pending.remove(server) {
for entry in queue {
if let Pending::Tool { id, .. } = entry {
self.send_mcp(mcp::tool_error_result(&id, &format!("`{server}` is the same server as `{canonical}`; target `{canonical}` instead")));
}
}
}
self.servers.remove(server);
self.admin_servers.remove(server);
self.link_down_notified.remove(server);
self.notify(
server,
"link",
&format!("`{server}` is the same server as `{canonical}` — this duplicate link is disabled; target `{canonical}` instead."),
);
}
fn tools(&self) -> Vec<Tool> {
let mut tools = vec![join_channel_tool(), leave_channel_tool(), list_channels_tool(), who_tool(), submit_permission_tool(), set_perm_tool()];
if self.any_emit_allowed() {
tools.push(send_channel_tool());
tools.push(whisper_tool());
}
if !self.admin_servers.is_empty() {
tools.extend(admin_tools());
}
tools
}
fn any_emit_allowed(&self) -> bool {
let joined: Vec<(String, String)> = self
.servers
.iter()
.flat_map(|(server, handle)| {
handle
.joined
.lock()
.expect("joined mutex poisoned")
.iter()
.map(|channel| (server.clone(), channel.clone()))
.collect::<Vec<_>>()
})
.collect();
policy::any_emit_allowed(&self.config, joined.iter().map(|(server, channel)| (server.as_str(), channel.as_str())))
}
fn resolve_server(&self, id: &Value, args: &Value) -> Result<String, Value> {
if let Some(server) = arg_str(args, "server") {
if self.servers.contains_key(server) {
return Ok(server.to_owned());
}
return Err(mcp::tool_error_result(id, &format!("not connected to server `{server}`")));
}
match self.servers.keys().next() {
Some(only) if self.servers.len() == 1 => Ok(only.clone()),
_ => Err(mcp::tool_error_result(id, "multiple servers connected; pass `server`")),
}
}
fn our_path(&self, server: &str) -> SessionPath {
self.servers.get(server).map_or_else(
|| SessionPath::new("unknown", "unknown", self.session.clone()),
|handle| SessionPath::new(handle.registration.username.clone(), handle.registration.machine.clone(), self.session.clone()),
)
}
fn set_scope_override(&mut self, server: &str, channel: Option<String>, level: PermissionLevel) {
self.config.overrides.retain(|o| !(o.server == server && o.channel == channel));
self.config.overrides.push(PermissionOverride {
server: server.to_owned(),
channel,
level,
});
}
fn send_mcp(&self, message: Value) {
let _ = self.to_mcp.send(message);
}
fn send_to_server(&self, server: &str, frame: ProtocolMessage) {
if let Some(handle) = self.servers.get(server) {
let _ = handle.to_server.send(frame);
}
}
}
fn arg_str<'a>(args: &'a Value, key: &str) -> Option<&'a str> {
args.get(key).and_then(Value::as_str)
}
fn format_channels(channels: &[crate::protocol::ChannelInfo]) -> String {
if channels.is_empty() {
return "no channels visible".to_owned();
}
channels
.iter()
.map(|c| format!("{} ({}{})", c.name, c.visibility.as_str(), if c.member { ", member" } else { "" }))
.collect::<Vec<_>>()
.join("\n")
}
fn format_presence(channel: Option<&str>, sessions: &[SessionPath]) -> String {
let scope = channel.map_or_else(|| "server-wide".to_owned(), |c| format!("#{c}"));
if sessions.is_empty() {
return format!("{scope}: nobody online");
}
let who = sessions.iter().map(SessionPath::to_string).collect::<Vec<_>>().join(", ");
format!("{scope}: {who}")
}
fn join_channel_tool() -> Tool {
Tool {
name: "join_channel",
description: "Join a channel on a server and subscribe this session to it.",
input_schema: json!({
"type": "object",
"properties": {
"server": { "type": "string", "description": "Server URL (optional if only one is connected)." },
"channel": { "type": "string", "description": "Channel name to join." },
"token": { "type": "string", "description": "Invite token, if the channel requires one." },
"perm": { "type": "string", "enum": ["mute", "notify", "converse", "act"], "description": "Autonomy level for this channel." }
},
"required": ["channel"]
}),
}
}
fn leave_channel_tool() -> Tool {
Tool {
name: "leave_channel",
description: "Unsubscribe this session from a channel (stays connected to the server).",
input_schema: json!({
"type": "object",
"properties": {
"server": { "type": "string", "description": "Server URL (optional if only one is connected)." },
"channel": { "type": "string", "description": "Channel name to leave." }
},
"required": ["channel"]
}),
}
}
fn list_channels_tool() -> Tool {
Tool {
name: "list_channels",
description: "List the channels visible to you on a server.",
input_schema: json!({
"type": "object",
"properties": { "server": { "type": "string", "description": "Server URL (optional if only one is connected)." } }
}),
}
}
fn who_tool() -> Tool {
Tool {
name: "who",
description: "List who is present on a server, optionally scoped to a channel.",
input_schema: json!({
"type": "object",
"properties": {
"server": { "type": "string", "description": "Server URL (optional if only one is connected)." },
"channel": { "type": "string", "description": "Restrict presence to this channel." }
}
}),
}
}
fn submit_permission_tool() -> Tool {
Tool {
name: "submit_permission",
description: "Answer a relayed Claude Code permission request (allow or deny).",
input_schema: json!({
"type": "object",
"properties": {
"request_id": { "type": "string", "description": "The request_id from the permission prompt." },
"decision": { "type": "string", "enum": ["allow", "deny"], "description": "The verdict." }
},
"required": ["request_id", "decision"]
}),
}
}
fn send_channel_tool() -> Tool {
Tool {
name: "send_channel",
description: "Send a message to a channel (allowed only at converse/act).",
input_schema: json!({
"type": "object",
"properties": {
"server": { "type": "string", "description": "Server URL (optional if only one is connected)." },
"channel": { "type": "string", "description": "Channel to send to." },
"text": { "type": "string", "description": "The message text." }
},
"required": ["channel", "text"]
}),
}
}
fn whisper_tool() -> Tool {
Tool {
name: "whisper",
description: "Send a direct message to exactly one session path (allowed only at converse/act).",
input_schema: json!({
"type": "object",
"properties": {
"server": { "type": "string", "description": "Server URL (optional if only one is connected)." },
"target": { "type": "string", "description": "The recipient's full user/machine/session path." },
"text": { "type": "string", "description": "The message text." }
},
"required": ["target", "text"]
}),
}
}
fn set_perm_tool() -> Tool {
Tool {
name: "set_perm",
description: "Set your autonomy level live (mute/notify/converse/act) for a channel, the whisper scope, or the machine default. Takes effect on the next inbound message — no reconnect.",
input_schema: json!({
"type": "object",
"properties": {
"server": { "type": "string", "description": "Server URL (optional if only one is connected)." },
"channel": { "type": "string", "description": "Channel to scope to (omit with `whisper` for the whisper scope, or omit both for the machine default)." },
"whisper": { "type": "boolean", "description": "Apply to the whisper scope instead of a channel." },
"level": { "type": "string", "enum": ["mute", "notify", "converse", "act"] }
},
"required": ["level"]
}),
}
}
fn admin_tools() -> Vec<Tool> {
let server = json!({ "type": "string", "description": "Server URL (optional if only one is connected)." });
vec![
Tool {
name: "create_channel",
description: "Admin: create a channel (visibility public/unlisted/private; default public).",
input_schema: json!({
"type": "object",
"properties": { "server": server, "name": { "type": "string" }, "visibility": { "type": "string", "enum": ["public", "unlisted", "private"] } },
"required": ["name"]
}),
},
Tool {
name: "delete_channel",
description: "Admin: delete a channel.",
input_schema: json!({ "type": "object", "properties": { "server": server, "name": { "type": "string" } }, "required": ["name"] }),
},
Tool {
name: "set_visibility",
description: "Admin: change a channel's visibility (public/unlisted/private).",
input_schema: json!({
"type": "object",
"properties": { "server": server, "name": { "type": "string" }, "visibility": { "type": "string", "enum": ["public", "unlisted", "private"] } },
"required": ["name", "visibility"]
}),
},
Tool {
name: "acl_add",
description: "Admin: add a user to a channel's access-control list.",
input_schema: json!({ "type": "object", "properties": { "server": server, "channel": { "type": "string" }, "user": { "type": "string" } }, "required": ["channel", "user"] }),
},
Tool {
name: "acl_remove",
description: "Admin: remove a user from a channel's access-control list.",
input_schema: json!({ "type": "object", "properties": { "server": server, "channel": { "type": "string" }, "user": { "type": "string" } }, "required": ["channel", "user"] }),
},
Tool {
name: "invite_create",
description: "Admin: mint an invite token for a channel (optional uses / expires_in_secs).",
input_schema: json!({
"type": "object",
"properties": { "server": server, "channel": { "type": "string" }, "uses": { "type": "integer" }, "expires_in_secs": { "type": "integer" } },
"required": ["channel"]
}),
},
Tool {
name: "invite_revoke",
description: "Admin: revoke an invite token.",
input_schema: json!({ "type": "object", "properties": { "server": server, "token": { "type": "string" } }, "required": ["token"] }),
},
Tool {
name: "kick",
description: "Admin: remove a session path or user from a channel.",
input_schema: json!({ "type": "object", "properties": { "server": server, "channel": { "type": "string" }, "target": { "type": "string" } }, "required": ["channel", "target"] }),
},
Tool {
name: "ban",
description: "Admin: ban a user from a channel (drops them and blocks rejoin).",
input_schema: json!({ "type": "object", "properties": { "server": server, "channel": { "type": "string" }, "user": { "type": "string" } }, "required": ["channel", "user"] }),
},
]
}
#[cfg(test)]
mod tests;