Skip to main content

conclavelib/
bridge.rs

1//! The bridge (`conclave bridge`): a dual peer between Claude Code and central servers.
2//!
3//! One process that is simultaneously a stdio **MCP server** to Claude Code and a **WS
4//! client** to one or more central servers (DESIGN.md §13). It translates inbound central
5//! events into injected `<channel>` / `<whisper>` notifications and outbound MCP tool calls
6//! into central messages, owning the session identity, its connections, and the local
7//! **permission policy** (DESIGN.md §9): per inbound message it resolves the
8//! `(server, channel)` level, drops on `mute`, otherwise injects through a pluggable
9//! notification sink; and it rejects outbound emit calls whose target channel is below
10//! `converse`.
11//!
12//! Split by responsibility: `policy` resolves the local autonomy level and gates emit;
13//! `sink` frames a delivered message and pushes it to the session; `mcp` is the JSON-RPC
14//! stdio peer toward Claude Code; `client` holds the outbound WS connections to central with
15//! reconnect + re-subscribe. `BridgeCore` is the transport-free dispatcher those feed.
16
17mod client;
18mod mcp;
19mod policy;
20mod sink;
21
22use std::{
23    collections::{HashMap, HashSet, VecDeque},
24    sync::{Arc, Mutex},
25    time::Duration,
26};
27
28use serde_json::{Value, json};
29use tokio::sync::{Notify, mpsc};
30
31use crate::{
32    base::{PermissionLevel, Res, SessionPath, Visibility, Void},
33    identity::{Config, Identity, PermissionOverride, Scope, ServerRegistration},
34    protocol::{AdminOp, Payload, ProtocolError, ProtocolMessage},
35};
36
37use mcp::{FromMcp, McpSink, Tool};
38use policy::Delivery;
39use sink::{Injection, NotificationSink};
40
41/// How often the bridge sends a keepalive `Ping` to each server (well under the server's 60s idle
42/// reaper, DESIGN.md §10).
43const KEEPALIVE_INTERVAL: Duration = Duration::from_secs(20);
44
45/// Everything the running bridge needs: this machine's key, the local config, the session handle,
46/// and (optionally) a subset of configured servers to connect to.
47pub struct BridgeSetup {
48    /// This machine's identity (signs the challenge).
49    pub identity: Identity,
50    /// The local config: permission policy + known-server registrations (M1).
51    pub config: Config,
52    /// The live-session handle (`--as`, default = repo/dir name).
53    pub session: String,
54    /// A subset of `config.servers` URLs to connect to; empty means all of them.
55    pub servers: Vec<String>,
56}
57
58/// Runs the bridge: the MCP stdio peer, one reconnecting WS link per server, and the dispatcher.
59///
60/// # Errors
61///
62/// Returns an error if no known server is configured to connect to.
63pub async fn run(setup: BridgeSetup) -> Void {
64    let registrations = resolve_registrations(&setup.config, &setup.servers)?;
65
66    let (from_mcp_tx, from_mcp_rx) = mpsc::unbounded_channel();
67    let (to_mcp_tx, to_mcp_rx) = mpsc::unbounded_channel();
68    tokio::spawn(mcp::read_loop(tokio::io::stdin(), from_mcp_tx));
69    tokio::spawn(mcp::write_loop(tokio::io::stdout(), to_mcp_rx));
70
71    let (inbound_tx, inbound_rx) = mpsc::unbounded_channel();
72    let shutdown = Arc::new(Notify::new());
73    let identity = Arc::new(setup.identity);
74
75    let sink = Box::new(McpSink::new(to_mcp_tx.clone()));
76    let mut core = BridgeCore::new(setup.config.clone(), setup.session.clone(), to_mcp_tx, sink);
77
78    for registration in registrations {
79        let joined = Arc::new(Mutex::new(HashSet::new()));
80        let (out_tx, out_rx) = mpsc::unbounded_channel();
81        core.register_server(registration.clone(), out_tx.clone(), Arc::clone(&joined));
82
83        let identity = Arc::clone(&identity);
84        let url = registration.url.clone();
85        let session = setup.session.clone();
86        let connect = move || {
87            let identity = Arc::clone(&identity);
88            let url = url.clone();
89            let session = session.clone();
90            async move { client::connect_ws(&url, &identity, &session).await }
91        };
92        tokio::spawn(client::run_link(registration.url.clone(), connect, joined, inbound_tx.clone(), out_rx, Arc::clone(&shutdown)));
93        spawn_keepalive(out_tx, Arc::clone(&shutdown));
94    }
95
96    core.run(from_mcp_rx, inbound_rx, shutdown).await
97}
98
99/// Selects the server registrations to connect to: the requested subset, or all if none named.
100fn resolve_registrations(config: &Config, requested: &[String]) -> Res<Vec<ServerRegistration>> {
101    let selected: Vec<ServerRegistration> = if requested.is_empty() {
102        config.servers.clone()
103    } else {
104        config.servers.iter().filter(|r| requested.iter().any(|u| u == &r.url)).cloned().collect()
105    };
106    anyhow::ensure!(!selected.is_empty(), "no known server to connect to (register one first, or pass --server)");
107    Ok(selected)
108}
109
110/// Sends a periodic keepalive `Ping` so the server's heartbeat reaper keeps the session present.
111fn spawn_keepalive(to_server: mpsc::UnboundedSender<ProtocolMessage>, shutdown: Arc<Notify>) {
112    tokio::spawn(async move {
113        let mut ticker = tokio::time::interval(KEEPALIVE_INTERVAL);
114        loop {
115            tokio::select! {
116                () = shutdown.notified() => break,
117                _ = ticker.tick() => {
118                    if to_server.send(ProtocolMessage::Ping).is_err() {
119                        break;
120                    }
121                }
122            }
123        }
124    });
125}
126
127/// A connected server: its registration (for the local path), outbound sink, and joined channels.
128struct ServerHandle {
129    registration: ServerRegistration,
130    to_server: mpsc::UnboundedSender<ProtocolMessage>,
131    joined: Arc<Mutex<HashSet<String>>>,
132}
133
134/// The transport-free bridge dispatcher: MCP events and inbound server frames in, MCP responses /
135/// injections / outbound frames out. Everything I/O lives in [`run`]; this is unit-testable.
136struct BridgeCore {
137    config: Config,
138    session: String,
139    to_mcp: mpsc::UnboundedSender<Value>,
140    sink: Box<dyn NotificationSink>,
141    servers: HashMap<String, ServerHandle>,
142    /// Per-server FIFO of MCP request ids awaiting a control response (join / list / who / admin).
143    pending: HashMap<String, VecDeque<Value>>,
144    /// Servers on which the authenticated user is an admin (from `ServerInfo`) — gates admin tools.
145    admin_servers: HashSet<String>,
146}
147
148impl BridgeCore {
149    fn new(config: Config, session: String, to_mcp: mpsc::UnboundedSender<Value>, sink: Box<dyn NotificationSink>) -> Self {
150        Self {
151            config,
152            session,
153            to_mcp,
154            sink,
155            servers: HashMap::new(),
156            pending: HashMap::new(),
157            admin_servers: HashSet::new(),
158        }
159    }
160
161    fn register_server(&mut self, registration: ServerRegistration, to_server: mpsc::UnboundedSender<ProtocolMessage>, joined: Arc<Mutex<HashSet<String>>>) {
162        self.servers.insert(registration.url.clone(), ServerHandle { registration, to_server, joined });
163    }
164
165    /// The dispatcher loop: MCP events, inbound server frames, and Ctrl-C / stdin-EOF shutdown.
166    async fn run(mut self, mut from_mcp: mpsc::UnboundedReceiver<FromMcp>, mut inbound: mpsc::UnboundedReceiver<(String, ProtocolMessage)>, shutdown: Arc<Notify>) -> Void {
167        loop {
168            tokio::select! {
169                () = shutdown.notified() => break,
170                _ = tokio::signal::ctrl_c() => break,
171                event = from_mcp.recv() => match event {
172                    Some(event) => self.handle_mcp(event),
173                    None => break,
174                },
175                frame = inbound.recv() => match frame {
176                    Some((server, frame)) => self.handle_inbound(&server, frame),
177                    None => break,
178                },
179            }
180        }
181        shutdown.notify_waiters();
182        Ok(())
183    }
184
185    // -----------------------------------------------------------------------
186    // MCP → bridge.
187    // -----------------------------------------------------------------------
188
189    fn handle_mcp(&mut self, event: FromMcp) {
190        match event {
191            FromMcp::Initialize { id, protocol_version } => self.send_mcp(mcp::initialize_result(&id, &protocol_version)),
192            FromMcp::ListTools { id } => {
193                let tools = self.tools();
194                self.send_mcp(mcp::tools_list_result(&id, &tools));
195            }
196            FromMcp::Ping { id } => self.send_mcp(mcp::ping_result(&id)),
197            FromMcp::CallTool { id, name, args } => self.dispatch_tool(&id, &name, &args),
198            FromMcp::PermissionRequest { request_id, tool_name, description, .. } => self.relay_permission(&request_id, &tool_name, &description),
199            FromMcp::Initialized => {}
200            FromMcp::UnknownRequest { id } => self.send_mcp(mcp::method_not_found(&id)),
201        }
202    }
203
204    /// Dispatches a tool call. Each tool sends its own MCP reply; the `join`/`list`/`who` tools
205    /// **defer** (send nothing now) and are resolved when the server's response arrives.
206    fn dispatch_tool(&mut self, id: &Value, name: &str, args: &Value) {
207        match name {
208            "join_channel" => self.tool_join(id, args),
209            "send_channel" => self.tool_send(id, args),
210            "whisper" => self.tool_whisper(id, args),
211            "list_channels" => self.tool_list(id, args),
212            "who" => self.tool_who(id, args),
213            "submit_permission" => self.tool_submit_permission(id, args),
214            "set_perm" => self.tool_set_perm(id, args),
215            "create_channel" => self.tool_create_channel(id, args),
216            "delete_channel" => self.tool_delete_channel(id, args),
217            "set_visibility" => self.tool_set_visibility(id, args),
218            "acl_add" => self.tool_acl(id, args, true),
219            "acl_remove" => self.tool_acl(id, args, false),
220            "invite_create" => self.tool_invite_create(id, args),
221            "invite_revoke" => self.tool_invite_revoke(id, args),
222            "kick" => self.tool_kick(id, args),
223            "ban" => self.tool_ban(id, args),
224            other => self.send_mcp(mcp::tool_error_result(id, &format!("unknown tool `{other}`"))),
225        }
226    }
227
228    /// Pushes an admin op to the server, deferring its MCP result to the `Ack` / `InviteToken` /
229    /// `Error` response. The server authorizes by role, so a non-admin call is refused server-side.
230    fn defer_admin(&mut self, id: &Value, server: &str, op: AdminOp) {
231        self.pending.entry(server.to_owned()).or_default().push_back(id.clone());
232        self.send_to_server(server, ProtocolMessage::Admin(op));
233    }
234
235    fn tool_create_channel(&mut self, id: &Value, args: &Value) {
236        let server = match self.resolve_server(id, args) {
237            Ok(server) => server,
238            Err(error) => return self.send_mcp(error),
239        };
240        let Some(name) = arg_str(args, "name") else {
241            return self.send_mcp(mcp::tool_error_result(id, "`name` is required"));
242        };
243        let visibility = arg_str(args, "visibility").and_then(|v| v.parse().ok()).unwrap_or(Visibility::Public);
244        self.defer_admin(id, &server, AdminOp::CreateChannel { name: name.to_owned(), visibility });
245    }
246
247    fn tool_delete_channel(&mut self, id: &Value, args: &Value) {
248        let server = match self.resolve_server(id, args) {
249            Ok(server) => server,
250            Err(error) => return self.send_mcp(error),
251        };
252        let Some(name) = arg_str(args, "name") else {
253            return self.send_mcp(mcp::tool_error_result(id, "`name` is required"));
254        };
255        self.defer_admin(id, &server, AdminOp::DeleteChannel { name: name.to_owned() });
256    }
257
258    fn tool_set_visibility(&mut self, id: &Value, args: &Value) {
259        let server = match self.resolve_server(id, args) {
260            Ok(server) => server,
261            Err(error) => return self.send_mcp(error),
262        };
263        let (Some(name), Some(visibility)) = (arg_str(args, "name"), arg_str(args, "visibility").and_then(|v| v.parse::<Visibility>().ok())) else {
264            return self.send_mcp(mcp::tool_error_result(id, "`name` and a valid `visibility` are required"));
265        };
266        self.defer_admin(id, &server, AdminOp::SetVisibility { name: name.to_owned(), visibility });
267    }
268
269    fn tool_acl(&mut self, id: &Value, args: &Value, add: bool) {
270        let server = match self.resolve_server(id, args) {
271            Ok(server) => server,
272            Err(error) => return self.send_mcp(error),
273        };
274        let (Some(channel), Some(user)) = (arg_str(args, "channel"), arg_str(args, "user")) else {
275            return self.send_mcp(mcp::tool_error_result(id, "`channel` and `user` are required"));
276        };
277        let op = if add {
278            AdminOp::AclAdd {
279                channel: channel.to_owned(),
280                user: user.to_owned(),
281            }
282        } else {
283            AdminOp::AclRemove {
284                channel: channel.to_owned(),
285                user: user.to_owned(),
286            }
287        };
288        self.defer_admin(id, &server, op);
289    }
290
291    fn tool_invite_create(&mut self, id: &Value, args: &Value) {
292        let server = match self.resolve_server(id, args) {
293            Ok(server) => server,
294            Err(error) => return self.send_mcp(error),
295        };
296        let Some(channel) = arg_str(args, "channel") else {
297            return self.send_mcp(mcp::tool_error_result(id, "`channel` is required"));
298        };
299        let uses = args.get("uses").and_then(Value::as_u64).and_then(|u| u32::try_from(u).ok());
300        let expires_in_secs = args.get("expires_in_secs").and_then(Value::as_u64);
301        self.defer_admin(
302            id,
303            &server,
304            AdminOp::InviteCreate {
305                channel: channel.to_owned(),
306                uses,
307                expires_in_secs,
308            },
309        );
310    }
311
312    fn tool_invite_revoke(&mut self, id: &Value, args: &Value) {
313        let server = match self.resolve_server(id, args) {
314            Ok(server) => server,
315            Err(error) => return self.send_mcp(error),
316        };
317        let Some(token) = arg_str(args, "token") else {
318            return self.send_mcp(mcp::tool_error_result(id, "`token` is required"));
319        };
320        self.defer_admin(id, &server, AdminOp::InviteRevoke { token: token.to_owned() });
321    }
322
323    fn tool_kick(&mut self, id: &Value, args: &Value) {
324        let server = match self.resolve_server(id, args) {
325            Ok(server) => server,
326            Err(error) => return self.send_mcp(error),
327        };
328        let (Some(channel), Some(target)) = (arg_str(args, "channel"), arg_str(args, "target")) else {
329            return self.send_mcp(mcp::tool_error_result(id, "`channel` and `target` are required"));
330        };
331        self.defer_admin(
332            id,
333            &server,
334            AdminOp::Kick {
335                channel: channel.to_owned(),
336                target: target.to_owned(),
337            },
338        );
339    }
340
341    fn tool_ban(&mut self, id: &Value, args: &Value) {
342        let server = match self.resolve_server(id, args) {
343            Ok(server) => server,
344            Err(error) => return self.send_mcp(error),
345        };
346        let (Some(channel), Some(user)) = (arg_str(args, "channel"), arg_str(args, "user")) else {
347            return self.send_mcp(mcp::tool_error_result(id, "`channel` and `user` are required"));
348        };
349        self.defer_admin(
350            id,
351            &server,
352            AdminOp::Ban {
353                channel: channel.to_owned(),
354                user: user.to_owned(),
355            },
356        );
357    }
358
359    fn tool_join(&mut self, id: &Value, args: &Value) {
360        let server = match self.resolve_server(id, args) {
361            Ok(server) => server,
362            Err(error) => return self.send_mcp(error),
363        };
364        let Some(channel) = arg_str(args, "channel") else {
365            return self.send_mcp(mcp::tool_error_result(id, "`channel` is required"));
366        };
367        let token = arg_str(args, "token").map(str::to_owned);
368
369        if let Some(perm) = arg_str(args, "perm") {
370            match perm.parse::<PermissionLevel>() {
371                Ok(level) => self.set_scope_override(&server, Some(channel.to_owned()), level),
372                Err(err) => return self.send_mcp(mcp::tool_error_result(id, &err.to_string())),
373            }
374        }
375
376        if let Some(handle) = self.servers.get(&server) {
377            handle.joined.lock().expect("joined mutex poisoned").insert(channel.to_owned());
378        }
379        // Deferred: the result is sent when the server's `Joined` / `Error` arrives.
380        self.pending.entry(server.clone()).or_default().push_back(id.clone());
381        self.send_to_server(&server, ProtocolMessage::Join { channel: channel.to_owned(), token });
382    }
383
384    fn tool_send(&mut self, id: &Value, args: &Value) {
385        let server = match self.resolve_server(id, args) {
386            Ok(server) => server,
387            Err(error) => return self.send_mcp(error),
388        };
389        let (Some(channel), Some(text)) = (arg_str(args, "channel"), arg_str(args, "text")) else {
390            return self.send_mcp(mcp::tool_error_result(id, "`channel` and `text` are required"));
391        };
392
393        // Call-time per-channel rejection (DESIGN.md §9): below `converse` cannot emit.
394        if !policy::emit_allowed(&self.config, &server, &Scope::Channel(channel.to_owned())) {
395            return self.send_mcp(mcp::tool_error_result(id, &format!("permission denied: `{channel}` on `{server}` is below `converse`")));
396        }
397
398        let from = self.our_path(&server);
399        self.send_to_server(
400            &server,
401            ProtocolMessage::ChannelMsg {
402                channel: channel.to_owned(),
403                from,
404                payload: Payload::Plain(text.to_owned()),
405            },
406        );
407        self.send_mcp(mcp::tool_text_result(id, &format!("sent to {channel}")));
408    }
409
410    fn tool_whisper(&mut self, id: &Value, args: &Value) {
411        let server = match self.resolve_server(id, args) {
412            Ok(server) => server,
413            Err(error) => return self.send_mcp(error),
414        };
415        let (Some(target), Some(text)) = (arg_str(args, "target"), arg_str(args, "text")) else {
416            return self.send_mcp(mcp::tool_error_result(id, "`target` and `text` are required"));
417        };
418        let Ok(target) = target.parse::<SessionPath>() else {
419            return self.send_mcp(mcp::tool_error_result(id, "`target` must be a `user/machine/session` path"));
420        };
421
422        if !policy::emit_allowed(&self.config, &server, &Scope::Whisper) {
423            return self.send_mcp(mcp::tool_error_result(id, &format!("permission denied: whispers on `{server}` are below `converse`")));
424        }
425
426        let from = self.our_path(&server);
427        self.send_to_server(
428            &server,
429            ProtocolMessage::Whisper {
430                from,
431                target,
432                payload: Payload::Plain(text.to_owned()),
433            },
434        );
435        self.send_mcp(mcp::tool_text_result(id, "whisper sent"));
436    }
437
438    fn tool_list(&mut self, id: &Value, args: &Value) {
439        let server = match self.resolve_server(id, args) {
440            Ok(server) => server,
441            Err(error) => return self.send_mcp(error),
442        };
443        self.pending.entry(server.clone()).or_default().push_back(id.clone());
444        self.send_to_server(&server, ProtocolMessage::ListChannels);
445    }
446
447    fn tool_who(&mut self, id: &Value, args: &Value) {
448        let server = match self.resolve_server(id, args) {
449            Ok(server) => server,
450            Err(error) => return self.send_mcp(error),
451        };
452        let channel = arg_str(args, "channel").map(str::to_owned);
453        self.pending.entry(server.clone()).or_default().push_back(id.clone());
454        self.send_to_server(&server, ProtocolMessage::Who { channel });
455    }
456
457    fn tool_submit_permission(&mut self, id: &Value, args: &Value) {
458        let Some(request_id) = arg_str(args, "request_id") else {
459            return self.send_mcp(mcp::tool_error_result(id, "`request_id` is required"));
460        };
461        let behavior = if arg_str(args, "decision") == Some("allow") { "allow" } else { "deny" };
462        self.send_mcp(mcp::permission_verdict(request_id, behavior));
463        self.send_mcp(mcp::tool_text_result(id, &format!("permission verdict `{behavior}` sent")));
464    }
465
466    /// Changes the local autonomy level live (no reconnect); it applies to the next inbound message.
467    fn tool_set_perm(&mut self, id: &Value, args: &Value) {
468        let Some(level) = arg_str(args, "level").and_then(|level| level.parse::<PermissionLevel>().ok()) else {
469            return self.send_mcp(mcp::tool_error_result(id, "`level` must be mute, notify, converse, or act"));
470        };
471        let whisper = args.get("whisper").and_then(Value::as_bool).unwrap_or(false);
472        let channel = arg_str(args, "channel");
473
474        // Channel / whisper scopes are per-server; with neither, set the machine-wide default.
475        if channel.is_some() || whisper {
476            let server = match self.resolve_server(id, args) {
477                Ok(server) => server,
478                Err(error) => return self.send_mcp(error),
479            };
480            let scope = if whisper { None } else { channel.map(str::to_owned) };
481            self.set_scope_override(&server, scope, level);
482        } else {
483            self.config.default_permission = level;
484        }
485        self.send_mcp(mcp::tool_text_result(id, "permission updated"));
486    }
487
488    /// Relays a Claude Code permission request to the local session (DESIGN.md §12/§14). The verdict
489    /// is returned by the `submit_permission` tool.
490    fn relay_permission(&self, request_id: &str, tool_name: &str, description: &str) {
491        let mut meta = std::collections::BTreeMap::new();
492        meta.insert("kind".to_owned(), "permission_request".to_owned());
493        meta.insert("request_id".to_owned(), request_id.to_owned());
494        let content = format!(
495            "Claude Code requests approval to run `{tool_name}`: {description}\nAnswer with the submit_permission tool: {{\"request_id\": \"{request_id}\", \"decision\": \"allow\"|\"deny\"}}."
496        );
497        self.send_mcp(mcp::channel_notification(&content, &meta));
498    }
499
500    // -----------------------------------------------------------------------
501    // Server → bridge.
502    // -----------------------------------------------------------------------
503
504    fn handle_inbound(&mut self, server: &str, frame: ProtocolMessage) {
505        match frame {
506            ProtocolMessage::ChannelMsg { channel, from, payload } => self.inject(server, Some(channel), from, payload),
507            ProtocolMessage::Whisper { from, payload, .. } => self.inject(server, None, from, payload),
508            ProtocolMessage::Joined { channel } => self.resolve_pending(server, &format!("joined {channel}")),
509            ProtocolMessage::ChannelList { channels } => self.resolve_pending(server, &format_channels(&channels)),
510            ProtocolMessage::Presence { channel, sessions } => self.resolve_pending(server, &format_presence(channel.as_deref(), &sessions)),
511            ProtocolMessage::Ack { detail } => self.resolve_pending(server, detail.as_deref().unwrap_or("ok")),
512            ProtocolMessage::InviteToken { token } => self.resolve_pending(server, &format!("invite token: {token}")),
513            ProtocolMessage::Error(error) => self.resolve_error(server, &error),
514            // The post-auth role signal gates the admin tools (DESIGN.md §7).
515            ProtocolMessage::ServerInfo { admin } => {
516                if admin {
517                    self.admin_servers.insert(server.to_owned());
518                } else {
519                    self.admin_servers.remove(server);
520                }
521            }
522            // Keepalive acks and any handshake frames (consumed by the client) are ignored here.
523            _ => {}
524        }
525    }
526
527    fn inject(&self, server: &str, channel: Option<String>, from: SessionPath, payload: Payload) {
528        let body = match payload {
529            Payload::Plain(text) => text,
530            Payload::Encrypted(_) => "<end-to-end-encrypted payload — not supported in v1>".to_owned(),
531        };
532        let scope = channel.as_ref().map_or(Scope::Whisper, |c| Scope::Channel(c.clone()));
533
534        match policy::inbound_delivery(&self.config, server, &scope) {
535            Delivery::Drop => {}
536            Delivery::Inject(level) => self.sink.deliver(&Injection {
537                server: server.to_owned(),
538                channel,
539                from,
540                level,
541                body,
542            }),
543        }
544    }
545
546    fn resolve_pending(&mut self, server: &str, text: &str) {
547        if let Some(id) = self.pending.get_mut(server).and_then(VecDeque::pop_front) {
548            self.send_mcp(mcp::tool_text_result(&id, text));
549        }
550    }
551
552    fn resolve_error(&mut self, server: &str, error: &ProtocolError) {
553        if let Some(id) = self.pending.get_mut(server).and_then(VecDeque::pop_front) {
554            self.send_mcp(mcp::tool_error_result(&id, &error.to_string()));
555        } else {
556            // A stray error (e.g. a whisper to an offline target) — surface it as a notice.
557            let mut meta = std::collections::BTreeMap::new();
558            meta.insert("server".to_owned(), server.to_owned());
559            meta.insert("kind".to_owned(), "error".to_owned());
560            self.send_mcp(mcp::channel_notification(&format!("Server `{server}` error: {error}"), &meta));
561        }
562    }
563
564    // -----------------------------------------------------------------------
565    // Tool set (emit tools gated on `>= converse`, DESIGN.md §9) + helpers.
566    // -----------------------------------------------------------------------
567
568    fn tools(&self) -> Vec<Tool> {
569        let mut tools = vec![join_channel_tool(), list_channels_tool(), who_tool(), submit_permission_tool(), set_perm_tool()];
570        if self.any_emit_allowed() {
571            tools.push(send_channel_tool());
572            tools.push(whisper_tool());
573        }
574        // Admin tools are offered only when the user is an admin on some connected server (§7).
575        if !self.admin_servers.is_empty() {
576            tools.extend(admin_tools());
577        }
578        tools
579    }
580
581    fn any_emit_allowed(&self) -> bool {
582        let joined: Vec<(String, String)> = self
583            .servers
584            .iter()
585            .flat_map(|(server, handle)| {
586                handle
587                    .joined
588                    .lock()
589                    .expect("joined mutex poisoned")
590                    .iter()
591                    .map(|channel| (server.clone(), channel.clone()))
592                    .collect::<Vec<_>>()
593            })
594            .collect();
595        policy::any_emit_allowed(&self.config, joined.iter().map(|(server, channel)| (server.as_str(), channel.as_str())))
596    }
597
598    /// Resolves the target server: the `server` argument, or the sole connection if unambiguous.
599    fn resolve_server(&self, id: &Value, args: &Value) -> Result<String, Value> {
600        if let Some(server) = arg_str(args, "server") {
601            if self.servers.contains_key(server) {
602                return Ok(server.to_owned());
603            }
604            return Err(mcp::tool_error_result(id, &format!("not connected to server `{server}`")));
605        }
606        match self.servers.keys().next() {
607            Some(only) if self.servers.len() == 1 => Ok(only.clone()),
608            _ => Err(mcp::tool_error_result(id, "multiple servers connected; pass `server`")),
609        }
610    }
611
612    fn our_path(&self, server: &str) -> SessionPath {
613        self.servers.get(server).map_or_else(
614            || SessionPath::new("unknown", "unknown", self.session.clone()),
615            |handle| SessionPath::new(handle.registration.username.clone(), handle.registration.machine.clone(), self.session.clone()),
616        )
617    }
618
619    /// Sets a local permission override for a `(server, scope)`, replacing any prior one. `channel`
620    /// is `Some(name)` for a channel scope or `None` for the whisper scope.
621    fn set_scope_override(&mut self, server: &str, channel: Option<String>, level: PermissionLevel) {
622        self.config.overrides.retain(|o| !(o.server == server && o.channel == channel));
623        self.config.overrides.push(PermissionOverride {
624            server: server.to_owned(),
625            channel,
626            level,
627        });
628    }
629
630    fn send_mcp(&self, message: Value) {
631        let _ = self.to_mcp.send(message);
632    }
633
634    fn send_to_server(&self, server: &str, frame: ProtocolMessage) {
635        if let Some(handle) = self.servers.get(server) {
636            let _ = handle.to_server.send(frame);
637        }
638    }
639}
640
641fn arg_str<'a>(args: &'a Value, key: &str) -> Option<&'a str> {
642    args.get(key).and_then(Value::as_str)
643}
644
645fn format_channels(channels: &[crate::protocol::ChannelInfo]) -> String {
646    if channels.is_empty() {
647        return "no channels visible".to_owned();
648    }
649    channels
650        .iter()
651        .map(|c| format!("{} ({}{})", c.name, c.visibility.as_str(), if c.member { ", member" } else { "" }))
652        .collect::<Vec<_>>()
653        .join("\n")
654}
655
656fn format_presence(channel: Option<&str>, sessions: &[SessionPath]) -> String {
657    let scope = channel.map_or_else(|| "server-wide".to_owned(), |c| format!("#{c}"));
658    if sessions.is_empty() {
659        return format!("{scope}: nobody online");
660    }
661    let who = sessions.iter().map(SessionPath::to_string).collect::<Vec<_>>().join(", ");
662    format!("{scope}: {who}")
663}
664
665// --- Tool definitions ---------------------------------------------------------
666
667fn join_channel_tool() -> Tool {
668    Tool {
669        name: "join_channel",
670        description: "Join a channel on a server and subscribe this session to it.",
671        input_schema: json!({
672            "type": "object",
673            "properties": {
674                "server": { "type": "string", "description": "Server URL (optional if only one is connected)." },
675                "channel": { "type": "string", "description": "Channel name to join." },
676                "token": { "type": "string", "description": "Invite token, if the channel requires one." },
677                "perm": { "type": "string", "enum": ["mute", "notify", "converse", "act"], "description": "Autonomy level for this channel." }
678            },
679            "required": ["channel"]
680        }),
681    }
682}
683
684fn list_channels_tool() -> Tool {
685    Tool {
686        name: "list_channels",
687        description: "List the channels visible to you on a server.",
688        input_schema: json!({
689            "type": "object",
690            "properties": { "server": { "type": "string", "description": "Server URL (optional if only one is connected)." } }
691        }),
692    }
693}
694
695fn who_tool() -> Tool {
696    Tool {
697        name: "who",
698        description: "List who is present on a server, optionally scoped to a channel.",
699        input_schema: json!({
700            "type": "object",
701            "properties": {
702                "server": { "type": "string", "description": "Server URL (optional if only one is connected)." },
703                "channel": { "type": "string", "description": "Restrict presence to this channel." }
704            }
705        }),
706    }
707}
708
709fn submit_permission_tool() -> Tool {
710    Tool {
711        name: "submit_permission",
712        description: "Answer a relayed Claude Code permission request (allow or deny).",
713        input_schema: json!({
714            "type": "object",
715            "properties": {
716                "request_id": { "type": "string", "description": "The request_id from the permission prompt." },
717                "decision": { "type": "string", "enum": ["allow", "deny"], "description": "The verdict." }
718            },
719            "required": ["request_id", "decision"]
720        }),
721    }
722}
723
724fn send_channel_tool() -> Tool {
725    Tool {
726        name: "send_channel",
727        description: "Send a message to a channel (allowed only at converse/act).",
728        input_schema: json!({
729            "type": "object",
730            "properties": {
731                "server": { "type": "string", "description": "Server URL (optional if only one is connected)." },
732                "channel": { "type": "string", "description": "Channel to send to." },
733                "text": { "type": "string", "description": "The message text." }
734            },
735            "required": ["channel", "text"]
736        }),
737    }
738}
739
740fn whisper_tool() -> Tool {
741    Tool {
742        name: "whisper",
743        description: "Send a direct message to exactly one session path (allowed only at converse/act).",
744        input_schema: json!({
745            "type": "object",
746            "properties": {
747                "server": { "type": "string", "description": "Server URL (optional if only one is connected)." },
748                "target": { "type": "string", "description": "The recipient's full user/machine/session path." },
749                "text": { "type": "string", "description": "The message text." }
750            },
751            "required": ["target", "text"]
752        }),
753    }
754}
755
756fn set_perm_tool() -> Tool {
757    Tool {
758        name: "set_perm",
759        description: "Set your autonomy level live (mute/notify/converse/act) for a channel, the whisper scope, or the machine default. Takes effect on the next inbound message — no reconnect.",
760        input_schema: json!({
761            "type": "object",
762            "properties": {
763                "server": { "type": "string", "description": "Server URL (optional if only one is connected)." },
764                "channel": { "type": "string", "description": "Channel to scope to (omit with `whisper` for the whisper scope, or omit both for the machine default)." },
765                "whisper": { "type": "boolean", "description": "Apply to the whisper scope instead of a channel." },
766                "level": { "type": "string", "enum": ["mute", "notify", "converse", "act"] }
767            },
768            "required": ["level"]
769        }),
770    }
771}
772
773/// Admin / moderation tools — offered only to admin users, authorized again by role server-side (§7).
774fn admin_tools() -> Vec<Tool> {
775    let server = json!({ "type": "string", "description": "Server URL (optional if only one is connected)." });
776    vec![
777        Tool {
778            name: "create_channel",
779            description: "Admin: create a channel (visibility public/unlisted/private; default public).",
780            input_schema: json!({
781                "type": "object",
782                "properties": { "server": server, "name": { "type": "string" }, "visibility": { "type": "string", "enum": ["public", "unlisted", "private"] } },
783                "required": ["name"]
784            }),
785        },
786        Tool {
787            name: "delete_channel",
788            description: "Admin: delete a channel.",
789            input_schema: json!({ "type": "object", "properties": { "server": server, "name": { "type": "string" } }, "required": ["name"] }),
790        },
791        Tool {
792            name: "set_visibility",
793            description: "Admin: change a channel's visibility (public/unlisted/private).",
794            input_schema: json!({
795                "type": "object",
796                "properties": { "server": server, "name": { "type": "string" }, "visibility": { "type": "string", "enum": ["public", "unlisted", "private"] } },
797                "required": ["name", "visibility"]
798            }),
799        },
800        Tool {
801            name: "acl_add",
802            description: "Admin: add a user to a channel's access-control list.",
803            input_schema: json!({ "type": "object", "properties": { "server": server, "channel": { "type": "string" }, "user": { "type": "string" } }, "required": ["channel", "user"] }),
804        },
805        Tool {
806            name: "acl_remove",
807            description: "Admin: remove a user from a channel's access-control list.",
808            input_schema: json!({ "type": "object", "properties": { "server": server, "channel": { "type": "string" }, "user": { "type": "string" } }, "required": ["channel", "user"] }),
809        },
810        Tool {
811            name: "invite_create",
812            description: "Admin: mint an invite token for a channel (optional uses / expires_in_secs).",
813            input_schema: json!({
814                "type": "object",
815                "properties": { "server": server, "channel": { "type": "string" }, "uses": { "type": "integer" }, "expires_in_secs": { "type": "integer" } },
816                "required": ["channel"]
817            }),
818        },
819        Tool {
820            name: "invite_revoke",
821            description: "Admin: revoke an invite token.",
822            input_schema: json!({ "type": "object", "properties": { "server": server, "token": { "type": "string" } }, "required": ["token"] }),
823        },
824        Tool {
825            name: "kick",
826            description: "Admin: remove a session path or user from a channel.",
827            input_schema: json!({ "type": "object", "properties": { "server": server, "channel": { "type": "string" }, "target": { "type": "string" } }, "required": ["channel", "target"] }),
828        },
829        Tool {
830            name: "ban",
831            description: "Admin: ban a user from a channel (drops them and blocks rejoin).",
832            input_schema: json!({ "type": "object", "properties": { "server": server, "channel": { "type": "string" }, "user": { "type": "string" } }, "required": ["channel", "user"] }),
833        },
834    ]
835}
836
837#[cfg(test)]
838mod tests;