Skip to main content

conclavelib/
bridge.rs

1//! The bridge (`conclave bridge`): a dual peer between Claude Code and central servers.
2//!
3//! One process that is simultaneously a stdio **MCP server** to Claude Code and a **WS
4//! client** to one or more central servers (DESIGN.md §13). It translates inbound central
5//! events into injected `<channel>` / `<whisper>` notifications and outbound MCP tool calls
6//! into central messages, owning the session identity, its connections, and the local
7//! **permission policy** (DESIGN.md §9): per inbound message it resolves the
8//! `(server, channel)` level, drops on `mute`, otherwise injects through a pluggable
9//! notification sink; and it rejects outbound emit calls whose target channel is below
10//! `converse`.
11//!
12//! Split by responsibility: `policy` resolves the local autonomy level and gates emit;
13//! `sink` frames a delivered message and pushes it to the session; `mcp` is the JSON-RPC
14//! stdio peer toward Claude Code; `client` holds the outbound WS connections to central with
15//! reconnect + re-subscribe. `BridgeCore` is the transport-free dispatcher those feed.
16
17mod client;
18mod mcp;
19mod policy;
20mod sink;
21
22use std::{
23    collections::{HashMap, HashSet, VecDeque},
24    sync::{Arc, Mutex},
25    time::Duration,
26};
27
28use serde_json::{Value, json};
29use tokio::sync::{Notify, mpsc};
30
31use crate::{
32    base::{PermissionLevel, Res, SessionPath, Visibility, Void},
33    identity::{Config, Identity, PermissionOverride, Scope, ServerRegistration},
34    protocol::{AdminOp, Payload, ProtocolError, ProtocolMessage},
35};
36
37use mcp::{FromMcp, McpSink, Tool};
38use policy::Delivery;
39use sink::{Injection, NotificationSink};
40
41/// How often the bridge sends a keepalive `Ping` to each server (well under the server's 60s idle
42/// reaper, DESIGN.md §10).
43const KEEPALIVE_INTERVAL: Duration = Duration::from_secs(20);
44
45/// Everything the running bridge needs: this machine's key, the local config, the session handle,
46/// and (optionally) a subset of configured servers to connect to.
47pub struct BridgeSetup {
48    /// This machine's identity (signs the challenge).
49    pub identity: Identity,
50    /// The local config: permission policy + known-server registrations (M1).
51    pub config: Config,
52    /// The live-session handle (`--as`, default = repo/dir name).
53    pub session: String,
54    /// A subset of `config.servers` URLs to connect to; empty means all of them.
55    pub servers: Vec<String>,
56}
57
58/// Runs the bridge: the MCP stdio peer, one reconnecting WS link per server, and the dispatcher.
59///
60/// # Errors
61///
62/// Returns an error if no known server is configured to connect to.
63pub async fn run(setup: BridgeSetup) -> Void {
64    let registrations = resolve_registrations(&setup.config, &setup.servers)?;
65
66    let (from_mcp_tx, from_mcp_rx) = mpsc::unbounded_channel();
67    let (to_mcp_tx, to_mcp_rx) = mpsc::unbounded_channel();
68    tokio::spawn(mcp::read_loop(tokio::io::stdin(), from_mcp_tx));
69    tokio::spawn(mcp::write_loop(tokio::io::stdout(), to_mcp_rx));
70
71    let (inbound_tx, inbound_rx) = mpsc::unbounded_channel();
72    let shutdown = Arc::new(Notify::new());
73    let identity = Arc::new(setup.identity);
74
75    let sink = Box::new(McpSink::new(to_mcp_tx.clone()));
76    let mut core = BridgeCore::new(setup.config.clone(), setup.session.clone(), to_mcp_tx, sink);
77
78    for registration in registrations {
79        let joined = Arc::new(Mutex::new(HashSet::new()));
80        let (out_tx, out_rx) = mpsc::unbounded_channel();
81        core.register_server(registration.clone(), out_tx.clone(), Arc::clone(&joined));
82
83        let identity = Arc::clone(&identity);
84        let url = registration.url.clone();
85        let session = setup.session.clone();
86        let connect = move || {
87            let identity = Arc::clone(&identity);
88            let url = url.clone();
89            let session = session.clone();
90            async move { client::connect_ws(&url, &identity, &session).await }
91        };
92        tokio::spawn(client::run_link(registration.url.clone(), connect, inbound_tx.clone(), out_rx, Arc::clone(&shutdown)));
93        spawn_keepalive(out_tx, Arc::clone(&shutdown));
94    }
95
96    core.run(from_mcp_rx, inbound_rx, shutdown).await
97}
98
99/// Selects the server registrations to connect to: the requested subset, or all if none named.
100fn resolve_registrations(config: &Config, requested: &[String]) -> Res<Vec<ServerRegistration>> {
101    let selected: Vec<ServerRegistration> = if requested.is_empty() {
102        config.servers.clone()
103    } else {
104        config.servers.iter().filter(|r| requested.iter().any(|u| u == &r.url)).cloned().collect()
105    };
106    anyhow::ensure!(!selected.is_empty(), "no known server to connect to (register one first, or pass --server)");
107    Ok(selected)
108}
109
110/// Sends a periodic keepalive `Ping` so the server's heartbeat reaper keeps the session present.
111fn spawn_keepalive(to_server: mpsc::UnboundedSender<ProtocolMessage>, shutdown: Arc<Notify>) {
112    tokio::spawn(async move {
113        let mut ticker = tokio::time::interval(KEEPALIVE_INTERVAL);
114        loop {
115            tokio::select! {
116                () = shutdown.notified() => break,
117                _ = ticker.tick() => {
118                    if to_server.send(ProtocolMessage::Ping).is_err() {
119                        break;
120                    }
121                }
122            }
123        }
124    });
125}
126
127/// A connected server: its registration (for the local path), outbound sink, and joined channels.
128struct ServerHandle {
129    registration: ServerRegistration,
130    to_server: mpsc::UnboundedSender<ProtocolMessage>,
131    joined: Arc<Mutex<HashSet<String>>>,
132}
133
134/// One entry in a server's in-order response queue. Every server-bound request that expects a
135/// response enqueues one, so the FIFO stays correctly correlated (PRD-0008 T-001).
136enum Pending {
137    /// A deferred MCP tool call awaiting the server's response. `ok` overrides the success text
138    /// (used by send/whisper, whose `Ack` carries no useful detail); `None` uses the frame's own
139    /// rendered content (join / list / who / invite / admin).
140    Tool { id: Value, ok: Option<String> },
141    /// An internal re-subscribe `Join` issued on reconnect — its `Joined` ack is consumed silently.
142    Resubscribe,
143}
144
145/// The transport-free bridge dispatcher: MCP events and inbound server frames in, MCP responses /
146/// injections / outbound frames out. Everything I/O lives in [`run`]; this is unit-testable.
147struct BridgeCore {
148    config: Config,
149    session: String,
150    to_mcp: mpsc::UnboundedSender<Value>,
151    sink: Box<dyn NotificationSink>,
152    servers: HashMap<String, ServerHandle>,
153    /// Per-server in-order queue of responses awaited from that server (tool calls + re-subscribes).
154    pending: HashMap<String, VecDeque<Pending>>,
155    /// Servers the session has been told are disconnected — so link state surfaces once per drop and
156    /// a reconnect is announced (PRD-0008 T-003, #21).
157    link_down_notified: HashSet<String>,
158    /// Servers on which the authenticated user is an admin (from `ServerInfo`) — gates admin tools.
159    admin_servers: HashSet<String>,
160}
161
162impl BridgeCore {
163    fn new(config: Config, session: String, to_mcp: mpsc::UnboundedSender<Value>, sink: Box<dyn NotificationSink>) -> Self {
164        Self {
165            config,
166            session,
167            to_mcp,
168            sink,
169            servers: HashMap::new(),
170            pending: HashMap::new(),
171            link_down_notified: HashSet::new(),
172            admin_servers: HashSet::new(),
173        }
174    }
175
176    fn register_server(&mut self, registration: ServerRegistration, to_server: mpsc::UnboundedSender<ProtocolMessage>, joined: Arc<Mutex<HashSet<String>>>) {
177        self.servers.insert(registration.url.clone(), ServerHandle { registration, to_server, joined });
178    }
179
180    /// The dispatcher loop: MCP events, inbound server frames, and Ctrl-C / stdin-EOF shutdown.
181    async fn run(mut self, mut from_mcp: mpsc::UnboundedReceiver<FromMcp>, mut inbound: mpsc::UnboundedReceiver<(String, client::LinkEvent)>, shutdown: Arc<Notify>) -> Void {
182        loop {
183            tokio::select! {
184                () = shutdown.notified() => break,
185                _ = tokio::signal::ctrl_c() => break,
186                event = from_mcp.recv() => match event {
187                    Some(event) => self.handle_mcp(event),
188                    None => break,
189                },
190                event = inbound.recv() => match event {
191                    Some((server, event)) => self.handle_link_event(&server, event),
192                    None => break,
193                },
194            }
195        }
196        shutdown.notify_waiters();
197        Ok(())
198    }
199
200    /// Routes a link event: lifecycle (re-subscribe on `Up`, fail pending on `Down`) or a frame.
201    fn handle_link_event(&mut self, server: &str, event: client::LinkEvent) {
202        match event {
203            client::LinkEvent::Up => self.link_up(server),
204            client::LinkEvent::Down => self.link_down(server),
205            client::LinkEvent::Frame(frame) => self.handle_inbound(server, frame),
206        }
207    }
208
209    // -----------------------------------------------------------------------
210    // MCP → bridge.
211    // -----------------------------------------------------------------------
212
213    fn handle_mcp(&mut self, event: FromMcp) {
214        match event {
215            FromMcp::Initialize { id, protocol_version } => self.send_mcp(mcp::initialize_result(&id, &protocol_version)),
216            FromMcp::ListTools { id } => {
217                let tools = self.tools();
218                self.send_mcp(mcp::tools_list_result(&id, &tools));
219            }
220            FromMcp::Ping { id } => self.send_mcp(mcp::ping_result(&id)),
221            FromMcp::CallTool { id, name, args } => self.dispatch_tool(&id, &name, &args),
222            FromMcp::PermissionRequest { request_id, tool_name, description, .. } => self.relay_permission(&request_id, &tool_name, &description),
223            FromMcp::Initialized => {}
224            FromMcp::UnknownRequest { id } => self.send_mcp(mcp::method_not_found(&id)),
225        }
226    }
227
228    /// Dispatches a tool call. Each tool sends its own MCP reply; the `join`/`list`/`who` tools
229    /// **defer** (send nothing now) and are resolved when the server's response arrives.
230    fn dispatch_tool(&mut self, id: &Value, name: &str, args: &Value) {
231        match name {
232            "join_channel" => self.tool_join(id, args),
233            "leave_channel" => self.tool_leave(id, args),
234            "send_channel" => self.tool_send(id, args),
235            "whisper" => self.tool_whisper(id, args),
236            "list_channels" => self.tool_list(id, args),
237            "who" => self.tool_who(id, args),
238            "submit_permission" => self.tool_submit_permission(id, args),
239            "set_perm" => self.tool_set_perm(id, args),
240            "create_channel" => self.tool_create_channel(id, args),
241            "delete_channel" => self.tool_delete_channel(id, args),
242            "set_visibility" => self.tool_set_visibility(id, args),
243            "acl_add" => self.tool_acl(id, args, true),
244            "acl_remove" => self.tool_acl(id, args, false),
245            "invite_create" => self.tool_invite_create(id, args),
246            "invite_revoke" => self.tool_invite_revoke(id, args),
247            "kick" => self.tool_kick(id, args),
248            "ban" => self.tool_ban(id, args),
249            other => self.send_mcp(mcp::tool_error_result(id, &format!("unknown tool `{other}`"))),
250        }
251    }
252
253    /// Pushes an admin op to the server, deferring its MCP result to the `Ack` / `InviteToken` /
254    /// `Error` response. The server authorizes by role, so a non-admin call is refused server-side.
255    /// Enqueues a deferred tool call awaiting `server`'s response. `ok` overrides the success text
256    /// (`None` uses the response frame's own content).
257    fn defer(&mut self, id: &Value, server: &str, ok: Option<String>) {
258        self.pending.entry(server.to_owned()).or_default().push_back(Pending::Tool { id: id.clone(), ok });
259    }
260
261    fn defer_admin(&mut self, id: &Value, server: &str, op: AdminOp) {
262        // Scope admin ops to servers that actually asserted admin for this user, so one server
263        // claiming admin cannot confer admin power against another home (PRD-0008 T-007, #31).
264        if !self.admin_servers.contains(server) {
265            return self.send_mcp(mcp::tool_error_result(id, &format!("not an admin on `{server}`")));
266        }
267        self.defer(id, server, None);
268        self.send_to_server(server, ProtocolMessage::Admin(op));
269    }
270
271    fn tool_create_channel(&mut self, id: &Value, args: &Value) {
272        let server = match self.resolve_server(id, args) {
273            Ok(server) => server,
274            Err(error) => return self.send_mcp(error),
275        };
276        let Some(name) = arg_str(args, "name") else {
277            return self.send_mcp(mcp::tool_error_result(id, "`name` is required"));
278        };
279        let visibility = arg_str(args, "visibility").and_then(|v| v.parse().ok()).unwrap_or(Visibility::Public);
280        self.defer_admin(id, &server, AdminOp::CreateChannel { name: name.to_owned(), visibility });
281    }
282
283    fn tool_delete_channel(&mut self, id: &Value, args: &Value) {
284        let server = match self.resolve_server(id, args) {
285            Ok(server) => server,
286            Err(error) => return self.send_mcp(error),
287        };
288        let Some(name) = arg_str(args, "name") else {
289            return self.send_mcp(mcp::tool_error_result(id, "`name` is required"));
290        };
291        self.defer_admin(id, &server, AdminOp::DeleteChannel { name: name.to_owned() });
292    }
293
294    fn tool_set_visibility(&mut self, id: &Value, args: &Value) {
295        let server = match self.resolve_server(id, args) {
296            Ok(server) => server,
297            Err(error) => return self.send_mcp(error),
298        };
299        let (Some(name), Some(visibility)) = (arg_str(args, "name"), arg_str(args, "visibility").and_then(|v| v.parse::<Visibility>().ok())) else {
300            return self.send_mcp(mcp::tool_error_result(id, "`name` and a valid `visibility` are required"));
301        };
302        self.defer_admin(id, &server, AdminOp::SetVisibility { name: name.to_owned(), visibility });
303    }
304
305    fn tool_acl(&mut self, id: &Value, args: &Value, add: bool) {
306        let server = match self.resolve_server(id, args) {
307            Ok(server) => server,
308            Err(error) => return self.send_mcp(error),
309        };
310        let (Some(channel), Some(user)) = (arg_str(args, "channel"), arg_str(args, "user")) else {
311            return self.send_mcp(mcp::tool_error_result(id, "`channel` and `user` are required"));
312        };
313        let op = if add {
314            AdminOp::AclAdd {
315                channel: channel.to_owned(),
316                user: user.to_owned(),
317            }
318        } else {
319            AdminOp::AclRemove {
320                channel: channel.to_owned(),
321                user: user.to_owned(),
322            }
323        };
324        self.defer_admin(id, &server, op);
325    }
326
327    fn tool_invite_create(&mut self, id: &Value, args: &Value) {
328        let server = match self.resolve_server(id, args) {
329            Ok(server) => server,
330            Err(error) => return self.send_mcp(error),
331        };
332        let Some(channel) = arg_str(args, "channel") else {
333            return self.send_mcp(mcp::tool_error_result(id, "`channel` is required"));
334        };
335        let uses = args.get("uses").and_then(Value::as_u64).and_then(|u| u32::try_from(u).ok());
336        let expires_in_secs = args.get("expires_in_secs").and_then(Value::as_u64);
337        self.defer_admin(
338            id,
339            &server,
340            AdminOp::InviteCreate {
341                channel: channel.to_owned(),
342                uses,
343                expires_in_secs,
344            },
345        );
346    }
347
348    fn tool_invite_revoke(&mut self, id: &Value, args: &Value) {
349        let server = match self.resolve_server(id, args) {
350            Ok(server) => server,
351            Err(error) => return self.send_mcp(error),
352        };
353        let Some(token) = arg_str(args, "token") else {
354            return self.send_mcp(mcp::tool_error_result(id, "`token` is required"));
355        };
356        self.defer_admin(id, &server, AdminOp::InviteRevoke { token: token.to_owned() });
357    }
358
359    fn tool_kick(&mut self, id: &Value, args: &Value) {
360        let server = match self.resolve_server(id, args) {
361            Ok(server) => server,
362            Err(error) => return self.send_mcp(error),
363        };
364        let (Some(channel), Some(target)) = (arg_str(args, "channel"), arg_str(args, "target")) else {
365            return self.send_mcp(mcp::tool_error_result(id, "`channel` and `target` are required"));
366        };
367        self.defer_admin(
368            id,
369            &server,
370            AdminOp::Kick {
371                channel: channel.to_owned(),
372                target: target.to_owned(),
373            },
374        );
375    }
376
377    fn tool_ban(&mut self, id: &Value, args: &Value) {
378        let server = match self.resolve_server(id, args) {
379            Ok(server) => server,
380            Err(error) => return self.send_mcp(error),
381        };
382        let (Some(channel), Some(user)) = (arg_str(args, "channel"), arg_str(args, "user")) else {
383            return self.send_mcp(mcp::tool_error_result(id, "`channel` and `user` are required"));
384        };
385        self.defer_admin(
386            id,
387            &server,
388            AdminOp::Ban {
389                channel: channel.to_owned(),
390                user: user.to_owned(),
391            },
392        );
393    }
394
395    fn tool_join(&mut self, id: &Value, args: &Value) {
396        let server = match self.resolve_server(id, args) {
397            Ok(server) => server,
398            Err(error) => return self.send_mcp(error),
399        };
400        let Some(channel) = arg_str(args, "channel") else {
401            return self.send_mcp(mcp::tool_error_result(id, "`channel` is required"));
402        };
403        let token = arg_str(args, "token").map(str::to_owned);
404
405        if let Some(perm) = arg_str(args, "perm") {
406            match perm.parse::<PermissionLevel>() {
407                Ok(level) => self.set_scope_override(&server, Some(channel.to_owned()), level),
408                Err(err) => return self.send_mcp(mcp::tool_error_result(id, &err.to_string())),
409            }
410        }
411
412        // Deferred: the result — and recording the channel as joined (done on the `Joined` ack, so a
413        // rejected join isn't pre-recorded, PRD-0008 T-003 #20) — waits for the server to confirm.
414        self.defer(id, &server, None);
415        self.send_to_server(&server, ProtocolMessage::Join { channel: channel.to_owned(), token });
416    }
417
418    /// Unsubscribes this session from a channel without disconnecting (PRD-0011 T-005). The local
419    /// record drops immediately — a Leave cannot fail server-side (it always acks) and forgetting it
420    /// up front also stops any reconnect from resubscribing.
421    fn tool_leave(&mut self, id: &Value, args: &Value) {
422        let server = match self.resolve_server(id, args) {
423            Ok(server) => server,
424            Err(error) => return self.send_mcp(error),
425        };
426        let Some(channel) = arg_str(args, "channel") else {
427            return self.send_mcp(mcp::tool_error_result(id, "`channel` is required"));
428        };
429
430        if let Some(handle) = self.servers.get(&server) {
431            handle.joined.lock().expect("joined mutex poisoned").remove(channel);
432        }
433        self.defer(id, &server, Some(format!("left {channel}")));
434        self.send_to_server(&server, ProtocolMessage::Leave { channel: channel.to_owned() });
435    }
436
437    fn tool_send(&mut self, id: &Value, args: &Value) {
438        let server = match self.resolve_server(id, args) {
439            Ok(server) => server,
440            Err(error) => return self.send_mcp(error),
441        };
442        let (Some(channel), Some(text)) = (arg_str(args, "channel"), arg_str(args, "text")) else {
443            return self.send_mcp(mcp::tool_error_result(id, "`channel` and `text` are required"));
444        };
445
446        // Call-time per-channel rejection (DESIGN.md §9): below `converse` cannot emit.
447        if !policy::emit_allowed(&self.config, &server, &Scope::Channel(channel.to_owned())) {
448            return self.send_mcp(mcp::tool_error_result(id, &format!("permission denied: `{channel}` on `{server}` is below `converse`")));
449        }
450
451        let from = self.our_path(&server);
452        // Deferred until the server confirms (Ack) or rejects (Error), so the result reflects real
453        // delivery and its Error correlates instead of stealing another call's slot (T-001).
454        self.defer(id, &server, Some(format!("sent to {channel}")));
455        self.send_to_server(
456            &server,
457            ProtocolMessage::ChannelMsg {
458                channel: channel.to_owned(),
459                from,
460                payload: Payload::Plain(text.to_owned()),
461            },
462        );
463    }
464
465    fn tool_whisper(&mut self, id: &Value, args: &Value) {
466        let server = match self.resolve_server(id, args) {
467            Ok(server) => server,
468            Err(error) => return self.send_mcp(error),
469        };
470        let (Some(target), Some(text)) = (arg_str(args, "target"), arg_str(args, "text")) else {
471            return self.send_mcp(mcp::tool_error_result(id, "`target` and `text` are required"));
472        };
473        let Ok(target) = target.parse::<SessionPath>() else {
474            return self.send_mcp(mcp::tool_error_result(id, "`target` must be a `user/machine/session` path"));
475        };
476
477        if !policy::emit_allowed(&self.config, &server, &Scope::Whisper) {
478            return self.send_mcp(mcp::tool_error_result(id, &format!("permission denied: whispers on `{server}` are below `converse`")));
479        }
480
481        let from = self.our_path(&server);
482        // Deferred until the server confirms/rejects — a whisper to an offline target now returns
483        // that error to the caller instead of misrouting it to another pending call (T-001).
484        self.defer(id, &server, Some("whisper sent".to_owned()));
485        self.send_to_server(
486            &server,
487            ProtocolMessage::Whisper {
488                from,
489                target,
490                payload: Payload::Plain(text.to_owned()),
491            },
492        );
493    }
494
495    fn tool_list(&mut self, id: &Value, args: &Value) {
496        let server = match self.resolve_server(id, args) {
497            Ok(server) => server,
498            Err(error) => return self.send_mcp(error),
499        };
500        self.defer(id, &server, None);
501        self.send_to_server(&server, ProtocolMessage::ListChannels);
502    }
503
504    fn tool_who(&mut self, id: &Value, args: &Value) {
505        let server = match self.resolve_server(id, args) {
506            Ok(server) => server,
507            Err(error) => return self.send_mcp(error),
508        };
509        let channel = arg_str(args, "channel").map(str::to_owned);
510        self.defer(id, &server, None);
511        self.send_to_server(&server, ProtocolMessage::Who { channel });
512    }
513
514    fn tool_submit_permission(&mut self, id: &Value, args: &Value) {
515        let Some(request_id) = arg_str(args, "request_id") else {
516            return self.send_mcp(mcp::tool_error_result(id, "`request_id` is required"));
517        };
518        let behavior = if arg_str(args, "decision") == Some("allow") { "allow" } else { "deny" };
519        self.send_mcp(mcp::permission_verdict(request_id, behavior));
520        self.send_mcp(mcp::tool_text_result(id, &format!("permission verdict `{behavior}` sent")));
521    }
522
523    /// Changes the local autonomy level live (no reconnect); it applies to the next inbound message.
524    fn tool_set_perm(&mut self, id: &Value, args: &Value) {
525        let Some(level) = arg_str(args, "level").and_then(|level| level.parse::<PermissionLevel>().ok()) else {
526            return self.send_mcp(mcp::tool_error_result(id, "`level` must be mute, notify, converse, or act"));
527        };
528        let whisper = args.get("whisper").and_then(Value::as_bool).unwrap_or(false);
529        let channel = arg_str(args, "channel");
530
531        // Channel / whisper scopes are per-server; with neither, set the machine-wide default.
532        if channel.is_some() || whisper {
533            let server = match self.resolve_server(id, args) {
534                Ok(server) => server,
535                Err(error) => return self.send_mcp(error),
536            };
537            let scope = if whisper { None } else { channel.map(str::to_owned) };
538            self.set_scope_override(&server, scope, level);
539        } else {
540            self.config.default_permission = level;
541        }
542        self.send_mcp(mcp::tool_text_result(id, "permission updated"));
543    }
544
545    /// Relays a Claude Code permission request to the local session (DESIGN.md §12/§14). The verdict
546    /// is returned by the `submit_permission` tool.
547    fn relay_permission(&self, request_id: &str, tool_name: &str, description: &str) {
548        let mut meta = std::collections::BTreeMap::new();
549        meta.insert("kind".to_owned(), "permission_request".to_owned());
550        meta.insert("request_id".to_owned(), request_id.to_owned());
551        let content = format!(
552            "Claude Code requests approval to run `{tool_name}`: {description}\nAnswer with the submit_permission tool: {{\"request_id\": \"{request_id}\", \"decision\": \"allow\"|\"deny\"}}."
553        );
554        self.send_mcp(mcp::channel_notification(&content, &meta));
555    }
556
557    // -----------------------------------------------------------------------
558    // Server → bridge.
559    // -----------------------------------------------------------------------
560
561    fn handle_inbound(&mut self, server: &str, frame: ProtocolMessage) {
562        match frame {
563            ProtocolMessage::ChannelMsg { channel, from, payload } => self.inject(server, Some(channel), from, payload),
564            ProtocolMessage::Whisper { from, payload, .. } => self.inject(server, None, from, payload),
565            ProtocolMessage::Joined { channel } => {
566                // Record the subscription only now that the server has confirmed it (#20).
567                if let Some(handle) = self.servers.get(server) {
568                    handle.joined.lock().expect("joined mutex poisoned").insert(channel.clone());
569                }
570                self.resolve_pending(server, &format!("joined {channel}"));
571            }
572            ProtocolMessage::ChannelList { channels } => self.resolve_pending(server, &format_channels(&channels)),
573            ProtocolMessage::Presence { channel, sessions } => self.resolve_pending(server, &format_presence(channel.as_deref(), &sessions)),
574            ProtocolMessage::Ack { detail } => self.resolve_pending(server, detail.as_deref().unwrap_or("ok")),
575            ProtocolMessage::InviteToken { token } => self.resolve_pending(server, &format!("invite token: {token}")),
576            ProtocolMessage::Error(error) => self.resolve_error(server, &error),
577            // The post-auth role signal gates the admin tools (DESIGN.md §7).
578            ProtocolMessage::ServerInfo { admin } => {
579                if admin {
580                    self.admin_servers.insert(server.to_owned());
581                } else {
582                    self.admin_servers.remove(server);
583                }
584            }
585            // Keepalive acks and any handshake frames (consumed by the client) are ignored here.
586            _ => {}
587        }
588    }
589
590    fn inject(&self, server: &str, channel: Option<String>, from: SessionPath, payload: Payload) {
591        let body = match payload {
592            Payload::Plain(text) => text,
593            Payload::Encrypted(_) => "<end-to-end-encrypted payload — not supported in v1>".to_owned(),
594        };
595        let scope = channel.as_ref().map_or(Scope::Whisper, |c| Scope::Channel(c.clone()));
596
597        match policy::inbound_delivery(&self.config, server, &scope) {
598            Delivery::Drop => {}
599            Delivery::Inject(level) => self.sink.deliver(&Injection {
600                server: server.to_owned(),
601                channel,
602                from,
603                level,
604                body,
605            }),
606        }
607    }
608
609    fn resolve_pending(&mut self, server: &str, text: &str) {
610        match self.pending.get_mut(server).and_then(VecDeque::pop_front) {
611            Some(Pending::Tool { id, ok }) => self.send_mcp(mcp::tool_text_result(&id, ok.as_deref().unwrap_or(text))),
612            // A re-subscribe `Joined` ack (or an orphan success) — consume it, don't answer a tool.
613            Some(Pending::Resubscribe) | None => {}
614        }
615    }
616
617    fn resolve_error(&mut self, server: &str, error: &ProtocolError) {
618        match self.pending.get_mut(server).and_then(VecDeque::pop_front) {
619            Some(Pending::Tool { id, .. }) => self.send_mcp(mcp::tool_error_result(&id, &error.to_string())),
620            // A re-subscribe `Join` that failed (e.g. the channel was deleted) — consume silently.
621            Some(Pending::Resubscribe) => {}
622            // A stray error with nothing pending — surface it as a notice.
623            None => self.notify(server, "error", &format!("Server `{server}` error: {error}")),
624        }
625    }
626
627    /// Surfaces a system notice (link state, stray errors) into the session as a channel notification.
628    fn notify(&self, server: &str, kind: &str, text: &str) {
629        let mut meta = std::collections::BTreeMap::new();
630        meta.insert("server".to_owned(), server.to_owned());
631        meta.insert("kind".to_owned(), kind.to_owned());
632        self.send_mcp(mcp::channel_notification(text, &meta));
633    }
634
635    /// On a fresh connection, announce a reconnect (if the session was told we dropped) and
636    /// re-subscribe every joined channel, enqueuing a silent `Resubscribe` per `Join` so its
637    /// `Joined` ack never resolves an unrelated tool call (PRD-0008 T-001/T-003).
638    fn link_up(&mut self, server: &str) {
639        if self.link_down_notified.remove(server) {
640            self.notify(server, "link", &format!("Reconnected to `{server}`."));
641        }
642        let Some(handle) = self.servers.get(server) else { return };
643        let channels: Vec<String> = handle.joined.lock().expect("joined mutex poisoned").iter().cloned().collect();
644        for channel in channels {
645            self.pending.entry(server.to_owned()).or_default().push_back(Pending::Resubscribe);
646            self.send_to_server(server, ProtocolMessage::Join { channel, token: None });
647        }
648    }
649
650    /// On a link drop, fail every pending tool call for `server` (so a deferred call never hangs),
651    /// then surface the disconnect to the session once (PRD-0008 T-002/T-003). Re-subscribe entries
652    /// are simply dropped.
653    fn link_down(&mut self, server: &str) {
654        if let Some(queue) = self.pending.remove(server) {
655            for entry in queue {
656                if let Pending::Tool { id, .. } = entry {
657                    self.send_mcp(mcp::tool_error_result(&id, &format!("connection to `{server}` lost; retry")));
658                }
659            }
660        }
661        if self.link_down_notified.insert(server.to_owned()) {
662            self.notify(server, "link", &format!("Disconnected from `{server}` — reconnecting."));
663        }
664    }
665
666    // -----------------------------------------------------------------------
667    // Tool set (emit tools gated on `>= converse`, DESIGN.md §9) + helpers.
668    // -----------------------------------------------------------------------
669
670    fn tools(&self) -> Vec<Tool> {
671        let mut tools = vec![join_channel_tool(), leave_channel_tool(), list_channels_tool(), who_tool(), submit_permission_tool(), set_perm_tool()];
672        if self.any_emit_allowed() {
673            tools.push(send_channel_tool());
674            tools.push(whisper_tool());
675        }
676        // Admin tools are offered only when the user is an admin on some connected server (§7).
677        if !self.admin_servers.is_empty() {
678            tools.extend(admin_tools());
679        }
680        tools
681    }
682
683    fn any_emit_allowed(&self) -> bool {
684        let joined: Vec<(String, String)> = self
685            .servers
686            .iter()
687            .flat_map(|(server, handle)| {
688                handle
689                    .joined
690                    .lock()
691                    .expect("joined mutex poisoned")
692                    .iter()
693                    .map(|channel| (server.clone(), channel.clone()))
694                    .collect::<Vec<_>>()
695            })
696            .collect();
697        policy::any_emit_allowed(&self.config, joined.iter().map(|(server, channel)| (server.as_str(), channel.as_str())))
698    }
699
700    /// Resolves the target server: the `server` argument, or the sole connection if unambiguous.
701    fn resolve_server(&self, id: &Value, args: &Value) -> Result<String, Value> {
702        if let Some(server) = arg_str(args, "server") {
703            if self.servers.contains_key(server) {
704                return Ok(server.to_owned());
705            }
706            return Err(mcp::tool_error_result(id, &format!("not connected to server `{server}`")));
707        }
708        match self.servers.keys().next() {
709            Some(only) if self.servers.len() == 1 => Ok(only.clone()),
710            _ => Err(mcp::tool_error_result(id, "multiple servers connected; pass `server`")),
711        }
712    }
713
714    fn our_path(&self, server: &str) -> SessionPath {
715        self.servers.get(server).map_or_else(
716            || SessionPath::new("unknown", "unknown", self.session.clone()),
717            |handle| SessionPath::new(handle.registration.username.clone(), handle.registration.machine.clone(), self.session.clone()),
718        )
719    }
720
721    /// Sets a local permission override for a `(server, scope)`, replacing any prior one. `channel`
722    /// is `Some(name)` for a channel scope or `None` for the whisper scope.
723    fn set_scope_override(&mut self, server: &str, channel: Option<String>, level: PermissionLevel) {
724        self.config.overrides.retain(|o| !(o.server == server && o.channel == channel));
725        self.config.overrides.push(PermissionOverride {
726            server: server.to_owned(),
727            channel,
728            level,
729        });
730    }
731
732    fn send_mcp(&self, message: Value) {
733        let _ = self.to_mcp.send(message);
734    }
735
736    fn send_to_server(&self, server: &str, frame: ProtocolMessage) {
737        if let Some(handle) = self.servers.get(server) {
738            let _ = handle.to_server.send(frame);
739        }
740    }
741}
742
743fn arg_str<'a>(args: &'a Value, key: &str) -> Option<&'a str> {
744    args.get(key).and_then(Value::as_str)
745}
746
747fn format_channels(channels: &[crate::protocol::ChannelInfo]) -> String {
748    if channels.is_empty() {
749        return "no channels visible".to_owned();
750    }
751    channels
752        .iter()
753        .map(|c| format!("{} ({}{})", c.name, c.visibility.as_str(), if c.member { ", member" } else { "" }))
754        .collect::<Vec<_>>()
755        .join("\n")
756}
757
758fn format_presence(channel: Option<&str>, sessions: &[SessionPath]) -> String {
759    let scope = channel.map_or_else(|| "server-wide".to_owned(), |c| format!("#{c}"));
760    if sessions.is_empty() {
761        return format!("{scope}: nobody online");
762    }
763    let who = sessions.iter().map(SessionPath::to_string).collect::<Vec<_>>().join(", ");
764    format!("{scope}: {who}")
765}
766
767// --- Tool definitions ---------------------------------------------------------
768
769fn join_channel_tool() -> Tool {
770    Tool {
771        name: "join_channel",
772        description: "Join a channel on a server and subscribe this session to it.",
773        input_schema: json!({
774            "type": "object",
775            "properties": {
776                "server": { "type": "string", "description": "Server URL (optional if only one is connected)." },
777                "channel": { "type": "string", "description": "Channel name to join." },
778                "token": { "type": "string", "description": "Invite token, if the channel requires one." },
779                "perm": { "type": "string", "enum": ["mute", "notify", "converse", "act"], "description": "Autonomy level for this channel." }
780            },
781            "required": ["channel"]
782        }),
783    }
784}
785
786fn leave_channel_tool() -> Tool {
787    Tool {
788        name: "leave_channel",
789        description: "Unsubscribe this session from a channel (stays connected to the server).",
790        input_schema: json!({
791            "type": "object",
792            "properties": {
793                "server": { "type": "string", "description": "Server URL (optional if only one is connected)." },
794                "channel": { "type": "string", "description": "Channel name to leave." }
795            },
796            "required": ["channel"]
797        }),
798    }
799}
800
801fn list_channels_tool() -> Tool {
802    Tool {
803        name: "list_channels",
804        description: "List the channels visible to you on a server.",
805        input_schema: json!({
806            "type": "object",
807            "properties": { "server": { "type": "string", "description": "Server URL (optional if only one is connected)." } }
808        }),
809    }
810}
811
812fn who_tool() -> Tool {
813    Tool {
814        name: "who",
815        description: "List who is present on a server, optionally scoped to a channel.",
816        input_schema: json!({
817            "type": "object",
818            "properties": {
819                "server": { "type": "string", "description": "Server URL (optional if only one is connected)." },
820                "channel": { "type": "string", "description": "Restrict presence to this channel." }
821            }
822        }),
823    }
824}
825
826fn submit_permission_tool() -> Tool {
827    Tool {
828        name: "submit_permission",
829        description: "Answer a relayed Claude Code permission request (allow or deny).",
830        input_schema: json!({
831            "type": "object",
832            "properties": {
833                "request_id": { "type": "string", "description": "The request_id from the permission prompt." },
834                "decision": { "type": "string", "enum": ["allow", "deny"], "description": "The verdict." }
835            },
836            "required": ["request_id", "decision"]
837        }),
838    }
839}
840
841fn send_channel_tool() -> Tool {
842    Tool {
843        name: "send_channel",
844        description: "Send a message to a channel (allowed only at converse/act).",
845        input_schema: json!({
846            "type": "object",
847            "properties": {
848                "server": { "type": "string", "description": "Server URL (optional if only one is connected)." },
849                "channel": { "type": "string", "description": "Channel to send to." },
850                "text": { "type": "string", "description": "The message text." }
851            },
852            "required": ["channel", "text"]
853        }),
854    }
855}
856
857fn whisper_tool() -> Tool {
858    Tool {
859        name: "whisper",
860        description: "Send a direct message to exactly one session path (allowed only at converse/act).",
861        input_schema: json!({
862            "type": "object",
863            "properties": {
864                "server": { "type": "string", "description": "Server URL (optional if only one is connected)." },
865                "target": { "type": "string", "description": "The recipient's full user/machine/session path." },
866                "text": { "type": "string", "description": "The message text." }
867            },
868            "required": ["target", "text"]
869        }),
870    }
871}
872
873fn set_perm_tool() -> Tool {
874    Tool {
875        name: "set_perm",
876        description: "Set your autonomy level live (mute/notify/converse/act) for a channel, the whisper scope, or the machine default. Takes effect on the next inbound message — no reconnect.",
877        input_schema: json!({
878            "type": "object",
879            "properties": {
880                "server": { "type": "string", "description": "Server URL (optional if only one is connected)." },
881                "channel": { "type": "string", "description": "Channel to scope to (omit with `whisper` for the whisper scope, or omit both for the machine default)." },
882                "whisper": { "type": "boolean", "description": "Apply to the whisper scope instead of a channel." },
883                "level": { "type": "string", "enum": ["mute", "notify", "converse", "act"] }
884            },
885            "required": ["level"]
886        }),
887    }
888}
889
890/// Admin / moderation tools — offered only to admin users, authorized again by role server-side (§7).
891fn admin_tools() -> Vec<Tool> {
892    let server = json!({ "type": "string", "description": "Server URL (optional if only one is connected)." });
893    vec![
894        Tool {
895            name: "create_channel",
896            description: "Admin: create a channel (visibility public/unlisted/private; default public).",
897            input_schema: json!({
898                "type": "object",
899                "properties": { "server": server, "name": { "type": "string" }, "visibility": { "type": "string", "enum": ["public", "unlisted", "private"] } },
900                "required": ["name"]
901            }),
902        },
903        Tool {
904            name: "delete_channel",
905            description: "Admin: delete a channel.",
906            input_schema: json!({ "type": "object", "properties": { "server": server, "name": { "type": "string" } }, "required": ["name"] }),
907        },
908        Tool {
909            name: "set_visibility",
910            description: "Admin: change a channel's visibility (public/unlisted/private).",
911            input_schema: json!({
912                "type": "object",
913                "properties": { "server": server, "name": { "type": "string" }, "visibility": { "type": "string", "enum": ["public", "unlisted", "private"] } },
914                "required": ["name", "visibility"]
915            }),
916        },
917        Tool {
918            name: "acl_add",
919            description: "Admin: add a user to a channel's access-control list.",
920            input_schema: json!({ "type": "object", "properties": { "server": server, "channel": { "type": "string" }, "user": { "type": "string" } }, "required": ["channel", "user"] }),
921        },
922        Tool {
923            name: "acl_remove",
924            description: "Admin: remove a user from a channel's access-control list.",
925            input_schema: json!({ "type": "object", "properties": { "server": server, "channel": { "type": "string" }, "user": { "type": "string" } }, "required": ["channel", "user"] }),
926        },
927        Tool {
928            name: "invite_create",
929            description: "Admin: mint an invite token for a channel (optional uses / expires_in_secs).",
930            input_schema: json!({
931                "type": "object",
932                "properties": { "server": server, "channel": { "type": "string" }, "uses": { "type": "integer" }, "expires_in_secs": { "type": "integer" } },
933                "required": ["channel"]
934            }),
935        },
936        Tool {
937            name: "invite_revoke",
938            description: "Admin: revoke an invite token.",
939            input_schema: json!({ "type": "object", "properties": { "server": server, "token": { "type": "string" } }, "required": ["token"] }),
940        },
941        Tool {
942            name: "kick",
943            description: "Admin: remove a session path or user from a channel.",
944            input_schema: json!({ "type": "object", "properties": { "server": server, "channel": { "type": "string" }, "target": { "type": "string" } }, "required": ["channel", "target"] }),
945        },
946        Tool {
947            name: "ban",
948            description: "Admin: ban a user from a channel (drops them and blocks rejoin).",
949            input_schema: json!({ "type": "object", "properties": { "server": server, "channel": { "type": "string" }, "user": { "type": "string" } }, "required": ["channel", "user"] }),
950        },
951    ]
952}
953
954#[cfg(test)]
955mod tests;