Skip to main content

conclavelib/
bridge.rs

1//! The bridge (`conclave bridge`): a dual peer between Claude Code and central servers.
2//!
3//! One process that is simultaneously a stdio **MCP server** to Claude Code and a **WS
4//! client** to one or more central servers (DESIGN.md §13). It translates inbound central
5//! events into injected `<channel>` / `<whisper>` notifications and outbound MCP tool calls
6//! into central messages, owning the session identity, its connections, and the local
7//! **permission policy** (DESIGN.md §9): per inbound message it resolves the
8//! `(server, channel)` level, drops on `mute`, otherwise injects through a pluggable
9//! notification sink; and it rejects outbound emit calls whose target channel is below
10//! `converse`.
11//!
12//! Split by responsibility: `policy` resolves the local autonomy level and gates emit;
13//! `sink` frames a delivered message and pushes it to the session; `mcp` is the JSON-RPC
14//! stdio peer toward Claude Code; `client` holds the outbound WS connections to central with
15//! reconnect + re-subscribe. `BridgeCore` is the transport-free dispatcher those feed.
16
17mod client;
18mod mcp;
19mod policy;
20mod sink;
21
22use std::{
23    collections::{HashMap, HashSet, VecDeque},
24    sync::{Arc, Mutex},
25    time::Duration,
26};
27
28use serde_json::{Value, json};
29use tokio::sync::{Notify, mpsc};
30
31use crate::{
32    base::{PermissionLevel, Res, SessionPath, Visibility, Void},
33    identity::{Config, Identity, PermissionOverride, Scope, ServerRegistration},
34    protocol::{AdminOp, Payload, ProtocolError, ProtocolMessage},
35};
36
37use mcp::{FromMcp, McpSink, Tool};
38use policy::Delivery;
39use sink::{Injection, NotificationSink};
40
41/// How often the bridge sends a keepalive `Ping` to each server (well under the server's 60s idle
42/// reaper, DESIGN.md §10).
43const KEEPALIVE_INTERVAL: Duration = Duration::from_secs(20);
44
45/// Consecutive sub-[`client::STABLE_UPTIME`] links before the orchestrator diagnoses a probable
46/// handle conflict (another live session superseding this one) and stops streaming link notices
47/// (PRD-0015 T-002).
48const RAPID_DROP_DIAGNOSIS_THRESHOLD: u32 = 3;
49
50/// Everything the running bridge needs: this machine's key, the local config, the session handle,
51/// and (optionally) a subset of configured servers to connect to.
52pub struct BridgeSetup {
53    /// This machine's identity (signs the challenge).
54    pub identity: Identity,
55    /// The local config: permission policy + known-server registrations (M1).
56    pub config: Config,
57    /// The live-session handle (`--as`, default = repo/dir name).
58    pub session: String,
59    /// A subset of `config.servers` URLs to connect to; empty means all of them.
60    pub servers: Vec<String>,
61}
62
63/// Runs the bridge: the MCP stdio peer, one reconnecting WS link per server, and the dispatcher.
64///
65/// # Errors
66///
67/// Returns an error if no known server is configured to connect to.
68pub async fn run(setup: BridgeSetup) -> Void {
69    let registrations = resolve_registrations(&setup.config, &setup.servers)?;
70
71    let (from_mcp_tx, from_mcp_rx) = mpsc::unbounded_channel();
72    let (to_mcp_tx, to_mcp_rx) = mpsc::unbounded_channel();
73    tokio::spawn(mcp::read_loop(tokio::io::stdin(), from_mcp_tx));
74    tokio::spawn(mcp::write_loop(tokio::io::stdout(), to_mcp_rx));
75
76    let (inbound_tx, inbound_rx) = mpsc::unbounded_channel();
77    let shutdown = Arc::new(Notify::new());
78    let identity = Arc::new(setup.identity);
79
80    let sink = Box::new(McpSink::new(to_mcp_tx.clone()));
81    let mut core = BridgeCore::new(setup.config.clone(), setup.session.clone(), to_mcp_tx, sink);
82
83    // Shared across links so two URLs reaching the same server dedupe at connect (PRD-0012 T-003).
84    let claims = client::ServerClaims::default();
85    for registration in registrations {
86        let joined = Arc::new(Mutex::new(HashSet::new()));
87        let (out_tx, out_rx) = mpsc::unbounded_channel();
88        core.register_server(registration.clone(), out_tx.clone(), Arc::clone(&joined));
89
90        let identity = Arc::clone(&identity);
91        let url = registration.url.clone();
92        let session = setup.session.clone();
93        let claims = Arc::clone(&claims);
94        let connect = move || {
95            let identity = Arc::clone(&identity);
96            let url = url.clone();
97            let session = session.clone();
98            let claims = Arc::clone(&claims);
99            async move { client::connect_ws(&url, &identity, &session, &claims).await }
100        };
101        tokio::spawn(client::run_link(registration.url.clone(), connect, inbound_tx.clone(), out_rx, Arc::clone(&shutdown)));
102        spawn_keepalive(out_tx, Arc::clone(&shutdown));
103    }
104
105    core.run(from_mcp_rx, inbound_rx, shutdown).await
106}
107
108/// Selects the server registrations to connect to: the requested subset, or all if none named.
109fn resolve_registrations(config: &Config, requested: &[String]) -> Res<Vec<ServerRegistration>> {
110    let selected: Vec<ServerRegistration> = if requested.is_empty() {
111        config.servers.clone()
112    } else {
113        config.servers.iter().filter(|r| requested.iter().any(|u| u == &r.url)).cloned().collect()
114    };
115    anyhow::ensure!(!selected.is_empty(), "no known server to connect to (register one first, or pass --server)");
116    Ok(selected)
117}
118
119/// Sends a periodic keepalive `Ping` so the server's heartbeat reaper keeps the session present.
120fn spawn_keepalive(to_server: mpsc::UnboundedSender<ProtocolMessage>, shutdown: Arc<Notify>) {
121    tokio::spawn(async move {
122        let mut ticker = tokio::time::interval(KEEPALIVE_INTERVAL);
123        loop {
124            tokio::select! {
125                () = shutdown.notified() => break,
126                _ = ticker.tick() => {
127                    if to_server.send(ProtocolMessage::Ping).is_err() {
128                        break;
129                    }
130                }
131            }
132        }
133    });
134}
135
136/// A connected server: its registration (for the local path), outbound sink, and joined channels.
137struct ServerHandle {
138    registration: ServerRegistration,
139    to_server: mpsc::UnboundedSender<ProtocolMessage>,
140    joined: Arc<Mutex<HashSet<String>>>,
141}
142
143/// One entry in a server's in-order response queue. Every server-bound request that expects a
144/// response enqueues one, so the FIFO stays correctly correlated (PRD-0008 T-001).
145enum Pending {
146    /// A deferred MCP tool call awaiting the server's response. `ok` overrides the success text
147    /// (used by send/whisper, whose `Ack` carries no useful detail); `None` uses the frame's own
148    /// rendered content (join / list / who / invite / admin).
149    Tool { id: Value, ok: Option<String> },
150    /// An internal re-subscribe `Join` issued on reconnect — its `Joined` ack is consumed silently.
151    Resubscribe,
152}
153
154/// The transport-free bridge dispatcher: MCP events and inbound server frames in, MCP responses /
155/// injections / outbound frames out. Everything I/O lives in [`run`]; this is unit-testable.
156struct BridgeCore {
157    config: Config,
158    session: String,
159    to_mcp: mpsc::UnboundedSender<Value>,
160    sink: Box<dyn NotificationSink>,
161    servers: HashMap<String, ServerHandle>,
162    /// Per-server in-order queue of responses awaited from that server (tool calls + re-subscribes).
163    pending: HashMap<String, VecDeque<Pending>>,
164    /// Servers the session has been told are disconnected — so link state surfaces once per drop and
165    /// a reconnect is announced (PRD-0008 T-003, #21).
166    link_down_notified: HashSet<String>,
167    /// Servers on which the authenticated user is an admin (from `ServerInfo`) — gates admin tools.
168    admin_servers: HashSet<String>,
169    /// When each server's link last came up, for the flapping diagnosis (PRD-0015 T-002).
170    link_up_at: HashMap<String, tokio::time::Instant>,
171    /// Consecutive short-lived links per server; at the threshold the conflict diagnostic fires
172    /// and link notices go quiet until a link stabilizes.
173    rapid_drops: HashMap<String, u32>,
174}
175
176impl BridgeCore {
177    fn new(config: Config, session: String, to_mcp: mpsc::UnboundedSender<Value>, sink: Box<dyn NotificationSink>) -> Self {
178        Self {
179            config,
180            session,
181            to_mcp,
182            sink,
183            servers: HashMap::new(),
184            pending: HashMap::new(),
185            link_down_notified: HashSet::new(),
186            admin_servers: HashSet::new(),
187            link_up_at: HashMap::new(),
188            rapid_drops: HashMap::new(),
189        }
190    }
191
192    fn register_server(&mut self, registration: ServerRegistration, to_server: mpsc::UnboundedSender<ProtocolMessage>, joined: Arc<Mutex<HashSet<String>>>) {
193        self.servers.insert(registration.url.clone(), ServerHandle { registration, to_server, joined });
194    }
195
196    /// The dispatcher loop: MCP events, inbound server frames, and Ctrl-C / stdin-EOF shutdown.
197    async fn run(mut self, mut from_mcp: mpsc::UnboundedReceiver<FromMcp>, mut inbound: mpsc::UnboundedReceiver<(String, client::LinkEvent)>, shutdown: Arc<Notify>) -> Void {
198        loop {
199            tokio::select! {
200                () = shutdown.notified() => break,
201                _ = tokio::signal::ctrl_c() => break,
202                event = from_mcp.recv() => match event {
203                    Some(event) => self.handle_mcp(event),
204                    None => break,
205                },
206                event = inbound.recv() => match event {
207                    Some((server, event)) => self.handle_link_event(&server, event),
208                    None => break,
209                },
210            }
211        }
212        shutdown.notify_waiters();
213        Ok(())
214    }
215
216    /// Routes a link event, then re-lists tools if the event changed the gating (PRD-0015 T-001).
217    fn handle_link_event(&mut self, server: &str, event: client::LinkEvent) {
218        let before = self.tool_signature();
219        match event {
220            client::LinkEvent::Up => self.link_up(server),
221            client::LinkEvent::Down => self.link_down(server),
222            client::LinkEvent::Duplicate { canonical } => self.link_duplicate(server, &canonical),
223            client::LinkEvent::Frame(frame) => self.handle_inbound(server, frame),
224        }
225        self.notify_tools_changed(before);
226    }
227
228    /// The gating inputs that decide which tools [`Self::tools`] offers. Checked around each
229    /// dispatched event: any change means the client's cached `tools/list` is stale.
230    fn tool_signature(&self) -> (bool, bool) {
231        (self.any_emit_allowed(), self.admin_servers.is_empty())
232    }
233
234    /// Emits `tools/list_changed` if the toolset gating moved across a dispatched event — Claude
235    /// Code caches the tool list, so without this a newly-allowed `send_channel` (or the admin
236    /// tools arriving with `ServerInfo`) would never surface mid-session (PRD-0015 T-001).
237    fn notify_tools_changed(&mut self, before: (bool, bool)) {
238        if self.tool_signature() != before {
239            self.send_mcp(mcp::tools_list_changed());
240        }
241    }
242
243    // -----------------------------------------------------------------------
244    // MCP → bridge.
245    // -----------------------------------------------------------------------
246
247    /// Routes an MCP event, then re-lists tools if the event changed the gating (PRD-0015 T-001).
248    fn handle_mcp(&mut self, event: FromMcp) {
249        let before = self.tool_signature();
250        self.dispatch_mcp(event);
251        self.notify_tools_changed(before);
252    }
253
254    fn dispatch_mcp(&mut self, event: FromMcp) {
255        match event {
256            FromMcp::Initialize { id, protocol_version } => self.send_mcp(mcp::initialize_result(&id, &protocol_version)),
257            FromMcp::ListTools { id } => {
258                let tools = self.tools();
259                self.send_mcp(mcp::tools_list_result(&id, &tools));
260            }
261            FromMcp::Ping { id } => self.send_mcp(mcp::ping_result(&id)),
262            FromMcp::CallTool { id, name, args } => self.dispatch_tool(&id, &name, &args),
263            FromMcp::PermissionRequest { request_id, tool_name, description, .. } => self.relay_permission(&request_id, &tool_name, &description),
264            FromMcp::Initialized => {}
265            FromMcp::UnknownRequest { id } => self.send_mcp(mcp::method_not_found(&id)),
266        }
267    }
268
269    /// Dispatches a tool call. Each tool sends its own MCP reply; the `join`/`list`/`who` tools
270    /// **defer** (send nothing now) and are resolved when the server's response arrives.
271    fn dispatch_tool(&mut self, id: &Value, name: &str, args: &Value) {
272        match name {
273            "join_channel" => self.tool_join(id, args),
274            "leave_channel" => self.tool_leave(id, args),
275            "send_channel" => self.tool_send(id, args),
276            "whisper" => self.tool_whisper(id, args),
277            "list_channels" => self.tool_list(id, args),
278            "who" => self.tool_who(id, args),
279            "submit_permission" => self.tool_submit_permission(id, args),
280            "set_perm" => self.tool_set_perm(id, args),
281            "create_channel" => self.tool_create_channel(id, args),
282            "delete_channel" => self.tool_delete_channel(id, args),
283            "set_visibility" => self.tool_set_visibility(id, args),
284            "acl_add" => self.tool_acl(id, args, true),
285            "acl_remove" => self.tool_acl(id, args, false),
286            "invite_create" => self.tool_invite_create(id, args),
287            "invite_revoke" => self.tool_invite_revoke(id, args),
288            "kick" => self.tool_kick(id, args),
289            "ban" => self.tool_ban(id, args),
290            other => self.send_mcp(mcp::tool_error_result(id, &format!("unknown tool `{other}`"))),
291        }
292    }
293
294    /// Pushes an admin op to the server, deferring its MCP result to the `Ack` / `InviteToken` /
295    /// `Error` response. The server authorizes by role, so a non-admin call is refused server-side.
296    /// Enqueues a deferred tool call awaiting `server`'s response. `ok` overrides the success text
297    /// (`None` uses the response frame's own content).
298    fn defer(&mut self, id: &Value, server: &str, ok: Option<String>) {
299        self.pending.entry(server.to_owned()).or_default().push_back(Pending::Tool { id: id.clone(), ok });
300    }
301
302    fn defer_admin(&mut self, id: &Value, server: &str, op: AdminOp) {
303        // Scope admin ops to servers that actually asserted admin for this user, so one server
304        // claiming admin cannot confer admin power against another home (PRD-0008 T-007, #31).
305        if !self.admin_servers.contains(server) {
306            return self.send_mcp(mcp::tool_error_result(id, &format!("not an admin on `{server}`")));
307        }
308        self.defer(id, server, None);
309        self.send_to_server(server, ProtocolMessage::Admin(op));
310    }
311
312    fn tool_create_channel(&mut self, id: &Value, args: &Value) {
313        let server = match self.resolve_server(id, args) {
314            Ok(server) => server,
315            Err(error) => return self.send_mcp(error),
316        };
317        let Some(name) = arg_str(args, "name") else {
318            return self.send_mcp(mcp::tool_error_result(id, "`name` is required"));
319        };
320        let visibility = arg_str(args, "visibility").and_then(|v| v.parse().ok()).unwrap_or(Visibility::Public);
321        self.defer_admin(id, &server, AdminOp::CreateChannel { name: name.to_owned(), visibility });
322    }
323
324    fn tool_delete_channel(&mut self, id: &Value, args: &Value) {
325        let server = match self.resolve_server(id, args) {
326            Ok(server) => server,
327            Err(error) => return self.send_mcp(error),
328        };
329        let Some(name) = arg_str(args, "name") else {
330            return self.send_mcp(mcp::tool_error_result(id, "`name` is required"));
331        };
332        self.defer_admin(id, &server, AdminOp::DeleteChannel { name: name.to_owned() });
333    }
334
335    fn tool_set_visibility(&mut self, id: &Value, args: &Value) {
336        let server = match self.resolve_server(id, args) {
337            Ok(server) => server,
338            Err(error) => return self.send_mcp(error),
339        };
340        let (Some(name), Some(visibility)) = (arg_str(args, "name"), arg_str(args, "visibility").and_then(|v| v.parse::<Visibility>().ok())) else {
341            return self.send_mcp(mcp::tool_error_result(id, "`name` and a valid `visibility` are required"));
342        };
343        self.defer_admin(id, &server, AdminOp::SetVisibility { name: name.to_owned(), visibility });
344    }
345
346    fn tool_acl(&mut self, id: &Value, args: &Value, add: bool) {
347        let server = match self.resolve_server(id, args) {
348            Ok(server) => server,
349            Err(error) => return self.send_mcp(error),
350        };
351        let (Some(channel), Some(user)) = (arg_str(args, "channel"), arg_str(args, "user")) else {
352            return self.send_mcp(mcp::tool_error_result(id, "`channel` and `user` are required"));
353        };
354        let op = if add {
355            AdminOp::AclAdd {
356                channel: channel.to_owned(),
357                user: user.to_owned(),
358            }
359        } else {
360            AdminOp::AclRemove {
361                channel: channel.to_owned(),
362                user: user.to_owned(),
363            }
364        };
365        self.defer_admin(id, &server, op);
366    }
367
368    fn tool_invite_create(&mut self, id: &Value, args: &Value) {
369        let server = match self.resolve_server(id, args) {
370            Ok(server) => server,
371            Err(error) => return self.send_mcp(error),
372        };
373        let Some(channel) = arg_str(args, "channel") else {
374            return self.send_mcp(mcp::tool_error_result(id, "`channel` is required"));
375        };
376        let uses = args.get("uses").and_then(Value::as_u64).and_then(|u| u32::try_from(u).ok());
377        let expires_in_secs = args.get("expires_in_secs").and_then(Value::as_u64);
378        self.defer_admin(
379            id,
380            &server,
381            AdminOp::InviteCreate {
382                channel: channel.to_owned(),
383                uses,
384                expires_in_secs,
385            },
386        );
387    }
388
389    fn tool_invite_revoke(&mut self, id: &Value, args: &Value) {
390        let server = match self.resolve_server(id, args) {
391            Ok(server) => server,
392            Err(error) => return self.send_mcp(error),
393        };
394        let Some(token) = arg_str(args, "token") else {
395            return self.send_mcp(mcp::tool_error_result(id, "`token` is required"));
396        };
397        self.defer_admin(id, &server, AdminOp::InviteRevoke { token: token.to_owned() });
398    }
399
400    fn tool_kick(&mut self, id: &Value, args: &Value) {
401        let server = match self.resolve_server(id, args) {
402            Ok(server) => server,
403            Err(error) => return self.send_mcp(error),
404        };
405        let (Some(channel), Some(target)) = (arg_str(args, "channel"), arg_str(args, "target")) else {
406            return self.send_mcp(mcp::tool_error_result(id, "`channel` and `target` are required"));
407        };
408        self.defer_admin(
409            id,
410            &server,
411            AdminOp::Kick {
412                channel: channel.to_owned(),
413                target: target.to_owned(),
414            },
415        );
416    }
417
418    fn tool_ban(&mut self, id: &Value, args: &Value) {
419        let server = match self.resolve_server(id, args) {
420            Ok(server) => server,
421            Err(error) => return self.send_mcp(error),
422        };
423        let (Some(channel), Some(user)) = (arg_str(args, "channel"), arg_str(args, "user")) else {
424            return self.send_mcp(mcp::tool_error_result(id, "`channel` and `user` are required"));
425        };
426        self.defer_admin(
427            id,
428            &server,
429            AdminOp::Ban {
430                channel: channel.to_owned(),
431                user: user.to_owned(),
432            },
433        );
434    }
435
436    fn tool_join(&mut self, id: &Value, args: &Value) {
437        let server = match self.resolve_server(id, args) {
438            Ok(server) => server,
439            Err(error) => return self.send_mcp(error),
440        };
441        let Some(channel) = arg_str(args, "channel") else {
442            return self.send_mcp(mcp::tool_error_result(id, "`channel` is required"));
443        };
444        let token = arg_str(args, "token").map(str::to_owned);
445
446        if let Some(perm) = arg_str(args, "perm") {
447            match perm.parse::<PermissionLevel>() {
448                Ok(level) => self.set_scope_override(&server, Some(channel.to_owned()), level),
449                Err(err) => return self.send_mcp(mcp::tool_error_result(id, &err.to_string())),
450            }
451        }
452
453        // Deferred: the result — and recording the channel as joined (done on the `Joined` ack, so a
454        // rejected join isn't pre-recorded, PRD-0008 T-003 #20) — waits for the server to confirm.
455        self.defer(id, &server, None);
456        self.send_to_server(&server, ProtocolMessage::Join { channel: channel.to_owned(), token });
457    }
458
459    /// Unsubscribes this session from a channel without disconnecting (PRD-0011 T-005). The local
460    /// record drops immediately — a Leave cannot fail server-side (it always acks) and forgetting it
461    /// up front also stops any reconnect from resubscribing.
462    fn tool_leave(&mut self, id: &Value, args: &Value) {
463        let server = match self.resolve_server(id, args) {
464            Ok(server) => server,
465            Err(error) => return self.send_mcp(error),
466        };
467        let Some(channel) = arg_str(args, "channel") else {
468            return self.send_mcp(mcp::tool_error_result(id, "`channel` is required"));
469        };
470
471        if let Some(handle) = self.servers.get(&server) {
472            handle.joined.lock().expect("joined mutex poisoned").remove(channel);
473        }
474        self.defer(id, &server, Some(format!("left {channel}")));
475        self.send_to_server(&server, ProtocolMessage::Leave { channel: channel.to_owned() });
476    }
477
478    fn tool_send(&mut self, id: &Value, args: &Value) {
479        let server = match self.resolve_server(id, args) {
480            Ok(server) => server,
481            Err(error) => return self.send_mcp(error),
482        };
483        let (Some(channel), Some(text)) = (arg_str(args, "channel"), arg_str(args, "text")) else {
484            return self.send_mcp(mcp::tool_error_result(id, "`channel` and `text` are required"));
485        };
486
487        // Call-time per-channel rejection (DESIGN.md §9): below `converse` cannot emit.
488        if !policy::emit_allowed(&self.config, &server, &Scope::Channel(channel.to_owned())) {
489            return self.send_mcp(mcp::tool_error_result(id, &format!("permission denied: `{channel}` on `{server}` is below `converse`")));
490        }
491
492        let from = self.our_path(&server);
493        // Deferred until the server confirms (Ack) or rejects (Error), so the result reflects real
494        // delivery and its Error correlates instead of stealing another call's slot (T-001).
495        self.defer(id, &server, Some(format!("sent to {channel}")));
496        self.send_to_server(
497            &server,
498            ProtocolMessage::ChannelMsg {
499                channel: channel.to_owned(),
500                from,
501                payload: Payload::Plain(text.to_owned()),
502            },
503        );
504    }
505
506    fn tool_whisper(&mut self, id: &Value, args: &Value) {
507        let server = match self.resolve_server(id, args) {
508            Ok(server) => server,
509            Err(error) => return self.send_mcp(error),
510        };
511        let (Some(target), Some(text)) = (arg_str(args, "target"), arg_str(args, "text")) else {
512            return self.send_mcp(mcp::tool_error_result(id, "`target` and `text` are required"));
513        };
514        let Ok(target) = target.parse::<SessionPath>() else {
515            return self.send_mcp(mcp::tool_error_result(id, "`target` must be a `user/machine/session` path"));
516        };
517
518        if !policy::emit_allowed(&self.config, &server, &Scope::Whisper) {
519            return self.send_mcp(mcp::tool_error_result(id, &format!("permission denied: whispers on `{server}` are below `converse`")));
520        }
521
522        let from = self.our_path(&server);
523        // Deferred until the server confirms/rejects — a whisper to an offline target now returns
524        // that error to the caller instead of misrouting it to another pending call (T-001).
525        self.defer(id, &server, Some("whisper sent".to_owned()));
526        self.send_to_server(
527            &server,
528            ProtocolMessage::Whisper {
529                from,
530                target,
531                payload: Payload::Plain(text.to_owned()),
532            },
533        );
534    }
535
536    fn tool_list(&mut self, id: &Value, args: &Value) {
537        let server = match self.resolve_server(id, args) {
538            Ok(server) => server,
539            Err(error) => return self.send_mcp(error),
540        };
541        self.defer(id, &server, None);
542        self.send_to_server(&server, ProtocolMessage::ListChannels);
543    }
544
545    fn tool_who(&mut self, id: &Value, args: &Value) {
546        let server = match self.resolve_server(id, args) {
547            Ok(server) => server,
548            Err(error) => return self.send_mcp(error),
549        };
550        let channel = arg_str(args, "channel").map(str::to_owned);
551        self.defer(id, &server, None);
552        self.send_to_server(&server, ProtocolMessage::Who { channel });
553    }
554
555    fn tool_submit_permission(&mut self, id: &Value, args: &Value) {
556        let Some(request_id) = arg_str(args, "request_id") else {
557            return self.send_mcp(mcp::tool_error_result(id, "`request_id` is required"));
558        };
559        let behavior = if arg_str(args, "decision") == Some("allow") { "allow" } else { "deny" };
560        self.send_mcp(mcp::permission_verdict(request_id, behavior));
561        self.send_mcp(mcp::tool_text_result(id, &format!("permission verdict `{behavior}` sent")));
562    }
563
564    /// Changes the local autonomy level live (no reconnect); it applies to the next inbound message.
565    fn tool_set_perm(&mut self, id: &Value, args: &Value) {
566        let Some(level) = arg_str(args, "level").and_then(|level| level.parse::<PermissionLevel>().ok()) else {
567            return self.send_mcp(mcp::tool_error_result(id, "`level` must be mute, notify, converse, or act"));
568        };
569        let whisper = args.get("whisper").and_then(Value::as_bool).unwrap_or(false);
570        let channel = arg_str(args, "channel");
571
572        // Channel / whisper scopes are per-server; with neither, set the machine-wide default.
573        if channel.is_some() || whisper {
574            let server = match self.resolve_server(id, args) {
575                Ok(server) => server,
576                Err(error) => return self.send_mcp(error),
577            };
578            let scope = if whisper { None } else { channel.map(str::to_owned) };
579            self.set_scope_override(&server, scope, level);
580        } else {
581            self.config.default_permission = level;
582        }
583        self.send_mcp(mcp::tool_text_result(id, "permission updated"));
584    }
585
586    /// Relays a Claude Code permission request to the local session (DESIGN.md §12/§14). The verdict
587    /// is returned by the `submit_permission` tool.
588    fn relay_permission(&self, request_id: &str, tool_name: &str, description: &str) {
589        let mut meta = std::collections::BTreeMap::new();
590        meta.insert("kind".to_owned(), "permission_request".to_owned());
591        meta.insert("request_id".to_owned(), request_id.to_owned());
592        let content = format!(
593            "Claude Code requests approval to run `{tool_name}`: {description}\nAnswer with the submit_permission tool: {{\"request_id\": \"{request_id}\", \"decision\": \"allow\"|\"deny\"}}."
594        );
595        self.send_mcp(mcp::channel_notification(&content, &meta));
596    }
597
598    // -----------------------------------------------------------------------
599    // Server → bridge.
600    // -----------------------------------------------------------------------
601
602    fn handle_inbound(&mut self, server: &str, frame: ProtocolMessage) {
603        match frame {
604            ProtocolMessage::ChannelMsg { channel, from, payload } => self.inject(server, Some(channel), from, payload),
605            ProtocolMessage::Whisper { from, payload, .. } => self.inject(server, None, from, payload),
606            ProtocolMessage::Joined { channel } => {
607                // Record the subscription only now that the server has confirmed it (#20).
608                if let Some(handle) = self.servers.get(server) {
609                    handle.joined.lock().expect("joined mutex poisoned").insert(channel.clone());
610                }
611                self.resolve_pending(server, &format!("joined {channel}"));
612            }
613            ProtocolMessage::ChannelList { channels } => self.resolve_pending(server, &format_channels(&channels)),
614            ProtocolMessage::Presence { channel, sessions } => self.resolve_pending(server, &format_presence(channel.as_deref(), &sessions)),
615            ProtocolMessage::Ack { detail } => self.resolve_pending(server, detail.as_deref().unwrap_or("ok")),
616            ProtocolMessage::InviteToken { token } => self.resolve_pending(server, &format!("invite token: {token}")),
617            ProtocolMessage::Error(error) => self.resolve_error(server, &error),
618            // The post-auth role signal gates the admin tools (DESIGN.md §7).
619            ProtocolMessage::ServerInfo { admin } => {
620                if admin {
621                    self.admin_servers.insert(server.to_owned());
622                } else {
623                    self.admin_servers.remove(server);
624                }
625            }
626            // Keepalive acks and any handshake frames (consumed by the client) are ignored here.
627            _ => {}
628        }
629    }
630
631    fn inject(&self, server: &str, channel: Option<String>, from: SessionPath, payload: Payload) {
632        let body = match payload {
633            Payload::Plain(text) => text,
634            Payload::Encrypted(_) => "<end-to-end-encrypted payload — not supported in v1>".to_owned(),
635        };
636        let scope = channel.as_ref().map_or(Scope::Whisper, |c| Scope::Channel(c.clone()));
637
638        match policy::inbound_delivery(&self.config, server, &scope) {
639            Delivery::Drop => {}
640            Delivery::Inject(level) => self.sink.deliver(&Injection {
641                server: server.to_owned(),
642                channel,
643                from,
644                level,
645                body,
646            }),
647        }
648    }
649
650    fn resolve_pending(&mut self, server: &str, text: &str) {
651        match self.pending.get_mut(server).and_then(VecDeque::pop_front) {
652            Some(Pending::Tool { id, ok }) => self.send_mcp(mcp::tool_text_result(&id, ok.as_deref().unwrap_or(text))),
653            // A re-subscribe `Joined` ack (or an orphan success) — consume it, don't answer a tool.
654            Some(Pending::Resubscribe) | None => {}
655        }
656    }
657
658    fn resolve_error(&mut self, server: &str, error: &ProtocolError) {
659        match self.pending.get_mut(server).and_then(VecDeque::pop_front) {
660            Some(Pending::Tool { id, .. }) => self.send_mcp(mcp::tool_error_result(&id, &error.to_string())),
661            // A re-subscribe `Join` that failed (e.g. the channel was deleted) — consume silently.
662            Some(Pending::Resubscribe) => {}
663            // A stray error with nothing pending — surface it as a notice.
664            None => self.notify(server, "error", &format!("Server `{server}` error: {error}")),
665        }
666    }
667
668    /// Surfaces a system notice (link state, stray errors) into the session as a channel notification.
669    fn notify(&self, server: &str, kind: &str, text: &str) {
670        let mut meta = std::collections::BTreeMap::new();
671        meta.insert("server".to_owned(), server.to_owned());
672        meta.insert("kind".to_owned(), kind.to_owned());
673        self.send_mcp(mcp::channel_notification(text, &meta));
674    }
675
676    /// On a fresh connection, announce a reconnect (if the session was told we dropped) and
677    /// re-subscribe every joined channel, enqueuing a silent `Resubscribe` per `Join` so its
678    /// `Joined` ack never resolves an unrelated tool call (PRD-0008 T-001/T-003).
679    fn link_up(&mut self, server: &str) {
680        self.link_up_at.insert(server.to_owned(), tokio::time::Instant::now());
681        if self.link_down_notified.remove(server) {
682            self.notify(server, "link", &format!("Reconnected to `{server}`."));
683        }
684        let Some(handle) = self.servers.get(server) else { return };
685        let channels: Vec<String> = handle.joined.lock().expect("joined mutex poisoned").iter().cloned().collect();
686        for channel in channels {
687            self.pending.entry(server.to_owned()).or_default().push_back(Pending::Resubscribe);
688            self.send_to_server(server, ProtocolMessage::Join { channel, token: None });
689        }
690    }
691
692    /// On a link drop, fail every pending tool call for `server` (so a deferred call never hangs),
693    /// then surface the disconnect to the session once (PRD-0008 T-002/T-003). Re-subscribe entries
694    /// are simply dropped.
695    fn link_down(&mut self, server: &str) {
696        if let Some(queue) = self.pending.remove(server) {
697            for entry in queue {
698                if let Pending::Tool { id, .. } = entry {
699                    self.send_mcp(mcp::tool_error_result(&id, &format!("connection to `{server}` lost; retry")));
700                }
701            }
702        }
703
704        // Notice policy (PRD-0015 T-002): a run of instant drops is almost always another live
705        // session superseding this handle — diagnose it once, then go quiet until the link
706        // stabilizes, instead of streaming Disconnected/Reconnected pairs forever.
707        let stable = self.link_up_at.remove(server).is_none_or(|up| up.elapsed() >= client::STABLE_UPTIME);
708        if stable {
709            self.rapid_drops.remove(server);
710        } else {
711            let drops = {
712                let count = self.rapid_drops.entry(server.to_owned()).or_insert(0);
713                *count += 1;
714                *count
715            };
716            if drops == RAPID_DROP_DIAGNOSIS_THRESHOLD {
717                self.notify(
718                    server,
719                    "link",
720                    &format!(
721                        "The link to `{server}` keeps dropping right after connecting — if another live session is using the handle `{session}`, the two supersede each other; start one with a distinct `--as`. Going quiet until the link stabilizes.",
722                        session = self.session
723                    ),
724                );
725                return;
726            }
727            if drops > RAPID_DROP_DIAGNOSIS_THRESHOLD {
728                return;
729            }
730        }
731
732        if self.link_down_notified.insert(server.to_owned()) {
733            self.notify(server, "link", &format!("Disconnected from `{server}` — reconnecting."));
734        }
735    }
736
737    /// A startup dedupe hit (PRD-0012 T-003): `server` reaches the same physical server as
738    /// `canonical`, and its link has permanently shut down — two links to one server would
739    /// supersede each other's session forever. Fails anything queued at the dead link, then
740    /// forgets it so tools error with the canonical URL instead of hanging.
741    fn link_duplicate(&mut self, server: &str, canonical: &str) {
742        if let Some(queue) = self.pending.remove(server) {
743            for entry in queue {
744                if let Pending::Tool { id, .. } = entry {
745                    self.send_mcp(mcp::tool_error_result(&id, &format!("`{server}` is the same server as `{canonical}`; target `{canonical}` instead")));
746                }
747            }
748        }
749        self.servers.remove(server);
750        self.admin_servers.remove(server);
751        self.link_down_notified.remove(server);
752        self.notify(
753            server,
754            "link",
755            &format!("`{server}` is the same server as `{canonical}` — this duplicate link is disabled; target `{canonical}` instead."),
756        );
757    }
758
759    // -----------------------------------------------------------------------
760    // Tool set (emit tools gated on `>= converse`, DESIGN.md §9) + helpers.
761    // -----------------------------------------------------------------------
762
763    fn tools(&self) -> Vec<Tool> {
764        let mut tools = vec![join_channel_tool(), leave_channel_tool(), list_channels_tool(), who_tool(), submit_permission_tool(), set_perm_tool()];
765        if self.any_emit_allowed() {
766            tools.push(send_channel_tool());
767            tools.push(whisper_tool());
768        }
769        // Admin tools are offered only when the user is an admin on some connected server (§7).
770        if !self.admin_servers.is_empty() {
771            tools.extend(admin_tools());
772        }
773        tools
774    }
775
776    fn any_emit_allowed(&self) -> bool {
777        let joined: Vec<(String, String)> = self
778            .servers
779            .iter()
780            .flat_map(|(server, handle)| {
781                handle
782                    .joined
783                    .lock()
784                    .expect("joined mutex poisoned")
785                    .iter()
786                    .map(|channel| (server.clone(), channel.clone()))
787                    .collect::<Vec<_>>()
788            })
789            .collect();
790        policy::any_emit_allowed(&self.config, joined.iter().map(|(server, channel)| (server.as_str(), channel.as_str())))
791    }
792
793    /// Resolves the target server: the `server` argument, or the sole connection if unambiguous.
794    fn resolve_server(&self, id: &Value, args: &Value) -> Result<String, Value> {
795        if let Some(server) = arg_str(args, "server") {
796            if self.servers.contains_key(server) {
797                return Ok(server.to_owned());
798            }
799            return Err(mcp::tool_error_result(id, &format!("not connected to server `{server}`")));
800        }
801        match self.servers.keys().next() {
802            Some(only) if self.servers.len() == 1 => Ok(only.clone()),
803            _ => Err(mcp::tool_error_result(id, "multiple servers connected; pass `server`")),
804        }
805    }
806
807    fn our_path(&self, server: &str) -> SessionPath {
808        self.servers.get(server).map_or_else(
809            || SessionPath::new("unknown", "unknown", self.session.clone()),
810            |handle| SessionPath::new(handle.registration.username.clone(), handle.registration.machine.clone(), self.session.clone()),
811        )
812    }
813
814    /// Sets a local permission override for a `(server, scope)`, replacing any prior one. `channel`
815    /// is `Some(name)` for a channel scope or `None` for the whisper scope.
816    fn set_scope_override(&mut self, server: &str, channel: Option<String>, level: PermissionLevel) {
817        self.config.overrides.retain(|o| !(o.server == server && o.channel == channel));
818        self.config.overrides.push(PermissionOverride {
819            server: server.to_owned(),
820            channel,
821            level,
822        });
823    }
824
825    fn send_mcp(&self, message: Value) {
826        let _ = self.to_mcp.send(message);
827    }
828
829    fn send_to_server(&self, server: &str, frame: ProtocolMessage) {
830        if let Some(handle) = self.servers.get(server) {
831            let _ = handle.to_server.send(frame);
832        }
833    }
834}
835
836fn arg_str<'a>(args: &'a Value, key: &str) -> Option<&'a str> {
837    args.get(key).and_then(Value::as_str)
838}
839
840fn format_channels(channels: &[crate::protocol::ChannelInfo]) -> String {
841    if channels.is_empty() {
842        return "no channels visible".to_owned();
843    }
844    channels
845        .iter()
846        .map(|c| format!("{} ({}{})", c.name, c.visibility.as_str(), if c.member { ", member" } else { "" }))
847        .collect::<Vec<_>>()
848        .join("\n")
849}
850
851fn format_presence(channel: Option<&str>, sessions: &[SessionPath]) -> String {
852    let scope = channel.map_or_else(|| "server-wide".to_owned(), |c| format!("#{c}"));
853    if sessions.is_empty() {
854        return format!("{scope}: nobody online");
855    }
856    let who = sessions.iter().map(SessionPath::to_string).collect::<Vec<_>>().join(", ");
857    format!("{scope}: {who}")
858}
859
860// --- Tool definitions ---------------------------------------------------------
861
862fn join_channel_tool() -> Tool {
863    Tool {
864        name: "join_channel",
865        description: "Join a channel on a server and subscribe this session to it.",
866        input_schema: json!({
867            "type": "object",
868            "properties": {
869                "server": { "type": "string", "description": "Server URL (optional if only one is connected)." },
870                "channel": { "type": "string", "description": "Channel name to join." },
871                "token": { "type": "string", "description": "Invite token, if the channel requires one." },
872                "perm": { "type": "string", "enum": ["mute", "notify", "converse", "act"], "description": "Autonomy level for this channel." }
873            },
874            "required": ["channel"]
875        }),
876    }
877}
878
879fn leave_channel_tool() -> Tool {
880    Tool {
881        name: "leave_channel",
882        description: "Unsubscribe this session from a channel (stays connected to the server).",
883        input_schema: json!({
884            "type": "object",
885            "properties": {
886                "server": { "type": "string", "description": "Server URL (optional if only one is connected)." },
887                "channel": { "type": "string", "description": "Channel name to leave." }
888            },
889            "required": ["channel"]
890        }),
891    }
892}
893
894fn list_channels_tool() -> Tool {
895    Tool {
896        name: "list_channels",
897        description: "List the channels visible to you on a server.",
898        input_schema: json!({
899            "type": "object",
900            "properties": { "server": { "type": "string", "description": "Server URL (optional if only one is connected)." } }
901        }),
902    }
903}
904
905fn who_tool() -> Tool {
906    Tool {
907        name: "who",
908        description: "List who is present on a server, optionally scoped to a channel.",
909        input_schema: json!({
910            "type": "object",
911            "properties": {
912                "server": { "type": "string", "description": "Server URL (optional if only one is connected)." },
913                "channel": { "type": "string", "description": "Restrict presence to this channel." }
914            }
915        }),
916    }
917}
918
919fn submit_permission_tool() -> Tool {
920    Tool {
921        name: "submit_permission",
922        description: "Answer a relayed Claude Code permission request (allow or deny).",
923        input_schema: json!({
924            "type": "object",
925            "properties": {
926                "request_id": { "type": "string", "description": "The request_id from the permission prompt." },
927                "decision": { "type": "string", "enum": ["allow", "deny"], "description": "The verdict." }
928            },
929            "required": ["request_id", "decision"]
930        }),
931    }
932}
933
934fn send_channel_tool() -> Tool {
935    Tool {
936        name: "send_channel",
937        description: "Send a message to a channel (allowed only at converse/act).",
938        input_schema: json!({
939            "type": "object",
940            "properties": {
941                "server": { "type": "string", "description": "Server URL (optional if only one is connected)." },
942                "channel": { "type": "string", "description": "Channel to send to." },
943                "text": { "type": "string", "description": "The message text." }
944            },
945            "required": ["channel", "text"]
946        }),
947    }
948}
949
950fn whisper_tool() -> Tool {
951    Tool {
952        name: "whisper",
953        description: "Send a direct message to exactly one session path (allowed only at converse/act).",
954        input_schema: json!({
955            "type": "object",
956            "properties": {
957                "server": { "type": "string", "description": "Server URL (optional if only one is connected)." },
958                "target": { "type": "string", "description": "The recipient's full user/machine/session path." },
959                "text": { "type": "string", "description": "The message text." }
960            },
961            "required": ["target", "text"]
962        }),
963    }
964}
965
966fn set_perm_tool() -> Tool {
967    Tool {
968        name: "set_perm",
969        description: "Set your autonomy level live (mute/notify/converse/act) for a channel, the whisper scope, or the machine default. Takes effect on the next inbound message — no reconnect.",
970        input_schema: json!({
971            "type": "object",
972            "properties": {
973                "server": { "type": "string", "description": "Server URL (optional if only one is connected)." },
974                "channel": { "type": "string", "description": "Channel to scope to (omit with `whisper` for the whisper scope, or omit both for the machine default)." },
975                "whisper": { "type": "boolean", "description": "Apply to the whisper scope instead of a channel." },
976                "level": { "type": "string", "enum": ["mute", "notify", "converse", "act"] }
977            },
978            "required": ["level"]
979        }),
980    }
981}
982
983/// Admin / moderation tools — offered only to admin users, authorized again by role server-side (§7).
984fn admin_tools() -> Vec<Tool> {
985    let server = json!({ "type": "string", "description": "Server URL (optional if only one is connected)." });
986    vec![
987        Tool {
988            name: "create_channel",
989            description: "Admin: create a channel (visibility public/unlisted/private; default public).",
990            input_schema: json!({
991                "type": "object",
992                "properties": { "server": server, "name": { "type": "string" }, "visibility": { "type": "string", "enum": ["public", "unlisted", "private"] } },
993                "required": ["name"]
994            }),
995        },
996        Tool {
997            name: "delete_channel",
998            description: "Admin: delete a channel.",
999            input_schema: json!({ "type": "object", "properties": { "server": server, "name": { "type": "string" } }, "required": ["name"] }),
1000        },
1001        Tool {
1002            name: "set_visibility",
1003            description: "Admin: change a channel's visibility (public/unlisted/private).",
1004            input_schema: json!({
1005                "type": "object",
1006                "properties": { "server": server, "name": { "type": "string" }, "visibility": { "type": "string", "enum": ["public", "unlisted", "private"] } },
1007                "required": ["name", "visibility"]
1008            }),
1009        },
1010        Tool {
1011            name: "acl_add",
1012            description: "Admin: add a user to a channel's access-control list.",
1013            input_schema: json!({ "type": "object", "properties": { "server": server, "channel": { "type": "string" }, "user": { "type": "string" } }, "required": ["channel", "user"] }),
1014        },
1015        Tool {
1016            name: "acl_remove",
1017            description: "Admin: remove a user from a channel's access-control list.",
1018            input_schema: json!({ "type": "object", "properties": { "server": server, "channel": { "type": "string" }, "user": { "type": "string" } }, "required": ["channel", "user"] }),
1019        },
1020        Tool {
1021            name: "invite_create",
1022            description: "Admin: mint an invite token for a channel (optional uses / expires_in_secs).",
1023            input_schema: json!({
1024                "type": "object",
1025                "properties": { "server": server, "channel": { "type": "string" }, "uses": { "type": "integer" }, "expires_in_secs": { "type": "integer" } },
1026                "required": ["channel"]
1027            }),
1028        },
1029        Tool {
1030            name: "invite_revoke",
1031            description: "Admin: revoke an invite token.",
1032            input_schema: json!({ "type": "object", "properties": { "server": server, "token": { "type": "string" } }, "required": ["token"] }),
1033        },
1034        Tool {
1035            name: "kick",
1036            description: "Admin: remove a session path or user from a channel.",
1037            input_schema: json!({ "type": "object", "properties": { "server": server, "channel": { "type": "string" }, "target": { "type": "string" } }, "required": ["channel", "target"] }),
1038        },
1039        Tool {
1040            name: "ban",
1041            description: "Admin: ban a user from a channel (drops them and blocks rejoin).",
1042            input_schema: json!({ "type": "object", "properties": { "server": server, "channel": { "type": "string" }, "user": { "type": "string" } }, "required": ["channel", "user"] }),
1043        },
1044    ]
1045}
1046
1047#[cfg(test)]
1048mod tests;