1mod client;
18mod mcp;
19mod policy;
20mod sink;
21
22use std::{
23 collections::{HashMap, HashSet, VecDeque},
24 sync::{Arc, Mutex},
25 time::Duration,
26};
27
28use serde_json::{Value, json};
29use tokio::sync::{Notify, mpsc};
30
31use crate::{
32 base::{PermissionLevel, Res, SessionPath, Visibility, Void},
33 identity::{Config, Identity, PermissionOverride, Scope, ServerRegistration},
34 protocol::{AdminOp, Payload, ProtocolError, ProtocolMessage},
35};
36
37use mcp::{FromMcp, McpSink, Tool};
38use policy::Delivery;
39use sink::{Injection, NotificationSink};
40
41const KEEPALIVE_INTERVAL: Duration = Duration::from_secs(20);
44
45const RAPID_DROP_DIAGNOSIS_THRESHOLD: u32 = 3;
49
50pub struct BridgeSetup {
53 pub identity: Identity,
55 pub config: Config,
57 pub session: String,
59 pub servers: Vec<String>,
61}
62
63pub async fn run(setup: BridgeSetup) -> Void {
69 let registrations = resolve_registrations(&setup.config, &setup.servers)?;
70
71 let (from_mcp_tx, from_mcp_rx) = mpsc::unbounded_channel();
72 let (to_mcp_tx, to_mcp_rx) = mpsc::unbounded_channel();
73 tokio::spawn(mcp::read_loop(tokio::io::stdin(), from_mcp_tx));
74 tokio::spawn(mcp::write_loop(tokio::io::stdout(), to_mcp_rx));
75
76 let (inbound_tx, inbound_rx) = mpsc::unbounded_channel();
77 let shutdown = Arc::new(Notify::new());
78 let identity = Arc::new(setup.identity);
79
80 let sink = Box::new(McpSink::new(to_mcp_tx.clone()));
81 let mut core = BridgeCore::new(setup.config.clone(), setup.session.clone(), to_mcp_tx, sink);
82
83 let claims = client::ServerClaims::default();
85 for registration in registrations {
86 let joined = Arc::new(Mutex::new(HashSet::new()));
87 let (out_tx, out_rx) = mpsc::unbounded_channel();
88 core.register_server(registration.clone(), out_tx.clone(), Arc::clone(&joined));
89
90 let identity = Arc::clone(&identity);
91 let url = registration.url.clone();
92 let session = setup.session.clone();
93 let claims = Arc::clone(&claims);
94 let connect = move || {
95 let identity = Arc::clone(&identity);
96 let url = url.clone();
97 let session = session.clone();
98 let claims = Arc::clone(&claims);
99 async move { client::connect_ws(&url, &identity, &session, &claims).await }
100 };
101 tokio::spawn(client::run_link(registration.url.clone(), connect, inbound_tx.clone(), out_rx, Arc::clone(&shutdown)));
102 spawn_keepalive(out_tx, Arc::clone(&shutdown));
103 }
104
105 core.run(from_mcp_rx, inbound_rx, shutdown).await
106}
107
108fn resolve_registrations(config: &Config, requested: &[String]) -> Res<Vec<ServerRegistration>> {
110 let selected: Vec<ServerRegistration> = if requested.is_empty() {
111 config.servers.clone()
112 } else {
113 config.servers.iter().filter(|r| requested.iter().any(|u| u == &r.url)).cloned().collect()
114 };
115 anyhow::ensure!(!selected.is_empty(), "no known server to connect to (register one first, or pass --server)");
116 Ok(selected)
117}
118
119fn spawn_keepalive(to_server: mpsc::UnboundedSender<ProtocolMessage>, shutdown: Arc<Notify>) {
121 tokio::spawn(async move {
122 let mut ticker = tokio::time::interval(KEEPALIVE_INTERVAL);
123 loop {
124 tokio::select! {
125 () = shutdown.notified() => break,
126 _ = ticker.tick() => {
127 if to_server.send(ProtocolMessage::Ping).is_err() {
128 break;
129 }
130 }
131 }
132 }
133 });
134}
135
136struct ServerHandle {
138 registration: ServerRegistration,
139 to_server: mpsc::UnboundedSender<ProtocolMessage>,
140 joined: Arc<Mutex<HashSet<String>>>,
141}
142
143enum Pending {
146 Tool { id: Value, ok: Option<String> },
150 Resubscribe,
152}
153
154struct BridgeCore {
157 config: Config,
158 session: String,
159 to_mcp: mpsc::UnboundedSender<Value>,
160 sink: Box<dyn NotificationSink>,
161 servers: HashMap<String, ServerHandle>,
162 pending: HashMap<String, VecDeque<Pending>>,
164 link_down_notified: HashSet<String>,
167 admin_servers: HashSet<String>,
169 link_up_at: HashMap<String, tokio::time::Instant>,
171 rapid_drops: HashMap<String, u32>,
174}
175
176impl BridgeCore {
177 fn new(config: Config, session: String, to_mcp: mpsc::UnboundedSender<Value>, sink: Box<dyn NotificationSink>) -> Self {
178 Self {
179 config,
180 session,
181 to_mcp,
182 sink,
183 servers: HashMap::new(),
184 pending: HashMap::new(),
185 link_down_notified: HashSet::new(),
186 admin_servers: HashSet::new(),
187 link_up_at: HashMap::new(),
188 rapid_drops: HashMap::new(),
189 }
190 }
191
192 fn register_server(&mut self, registration: ServerRegistration, to_server: mpsc::UnboundedSender<ProtocolMessage>, joined: Arc<Mutex<HashSet<String>>>) {
193 self.servers.insert(registration.url.clone(), ServerHandle { registration, to_server, joined });
194 }
195
196 async fn run(mut self, mut from_mcp: mpsc::UnboundedReceiver<FromMcp>, mut inbound: mpsc::UnboundedReceiver<(String, client::LinkEvent)>, shutdown: Arc<Notify>) -> Void {
198 loop {
199 tokio::select! {
200 () = shutdown.notified() => break,
201 _ = tokio::signal::ctrl_c() => break,
202 event = from_mcp.recv() => match event {
203 Some(event) => self.handle_mcp(event),
204 None => break,
205 },
206 event = inbound.recv() => match event {
207 Some((server, event)) => self.handle_link_event(&server, event),
208 None => break,
209 },
210 }
211 }
212 shutdown.notify_waiters();
213 Ok(())
214 }
215
216 fn handle_link_event(&mut self, server: &str, event: client::LinkEvent) {
218 let before = self.tool_signature();
219 match event {
220 client::LinkEvent::Up => self.link_up(server),
221 client::LinkEvent::Down => self.link_down(server),
222 client::LinkEvent::Duplicate { canonical } => self.link_duplicate(server, &canonical),
223 client::LinkEvent::Frame(frame) => self.handle_inbound(server, frame),
224 }
225 self.notify_tools_changed(before);
226 }
227
228 fn tool_signature(&self) -> (bool, bool) {
231 (self.any_emit_allowed(), self.admin_servers.is_empty())
232 }
233
234 fn notify_tools_changed(&mut self, before: (bool, bool)) {
238 if self.tool_signature() != before {
239 self.send_mcp(mcp::tools_list_changed());
240 }
241 }
242
243 fn handle_mcp(&mut self, event: FromMcp) {
249 let before = self.tool_signature();
250 self.dispatch_mcp(event);
251 self.notify_tools_changed(before);
252 }
253
254 fn dispatch_mcp(&mut self, event: FromMcp) {
255 match event {
256 FromMcp::Initialize { id, protocol_version } => self.send_mcp(mcp::initialize_result(&id, &protocol_version)),
257 FromMcp::ListTools { id } => {
258 let tools = self.tools();
259 self.send_mcp(mcp::tools_list_result(&id, &tools));
260 }
261 FromMcp::Ping { id } => self.send_mcp(mcp::ping_result(&id)),
262 FromMcp::CallTool { id, name, args } => self.dispatch_tool(&id, &name, &args),
263 FromMcp::PermissionRequest { request_id, tool_name, description, .. } => self.relay_permission(&request_id, &tool_name, &description),
264 FromMcp::Initialized => {}
265 FromMcp::UnknownRequest { id } => self.send_mcp(mcp::method_not_found(&id)),
266 }
267 }
268
269 fn dispatch_tool(&mut self, id: &Value, name: &str, args: &Value) {
272 match name {
273 "join_channel" => self.tool_join(id, args),
274 "leave_channel" => self.tool_leave(id, args),
275 "send_channel" => self.tool_send(id, args),
276 "whisper" => self.tool_whisper(id, args),
277 "list_channels" => self.tool_list(id, args),
278 "who" => self.tool_who(id, args),
279 "submit_permission" => self.tool_submit_permission(id, args),
280 "set_perm" => self.tool_set_perm(id, args),
281 "create_channel" => self.tool_create_channel(id, args),
282 "delete_channel" => self.tool_delete_channel(id, args),
283 "set_visibility" => self.tool_set_visibility(id, args),
284 "acl_add" => self.tool_acl(id, args, true),
285 "acl_remove" => self.tool_acl(id, args, false),
286 "invite_create" => self.tool_invite_create(id, args),
287 "invite_revoke" => self.tool_invite_revoke(id, args),
288 "kick" => self.tool_kick(id, args),
289 "ban" => self.tool_ban(id, args),
290 other => self.send_mcp(mcp::tool_error_result(id, &format!("unknown tool `{other}`"))),
291 }
292 }
293
294 fn defer(&mut self, id: &Value, server: &str, ok: Option<String>) {
299 self.pending.entry(server.to_owned()).or_default().push_back(Pending::Tool { id: id.clone(), ok });
300 }
301
302 fn defer_admin(&mut self, id: &Value, server: &str, op: AdminOp) {
303 if !self.admin_servers.contains(server) {
306 return self.send_mcp(mcp::tool_error_result(id, &format!("not an admin on `{server}`")));
307 }
308 self.defer(id, server, None);
309 self.send_to_server(server, ProtocolMessage::Admin(op));
310 }
311
312 fn tool_create_channel(&mut self, id: &Value, args: &Value) {
313 let server = match self.resolve_server(id, args) {
314 Ok(server) => server,
315 Err(error) => return self.send_mcp(error),
316 };
317 let Some(name) = arg_str(args, "name") else {
318 return self.send_mcp(mcp::tool_error_result(id, "`name` is required"));
319 };
320 let visibility = arg_str(args, "visibility").and_then(|v| v.parse().ok()).unwrap_or(Visibility::Public);
321 self.defer_admin(id, &server, AdminOp::CreateChannel { name: name.to_owned(), visibility });
322 }
323
324 fn tool_delete_channel(&mut self, id: &Value, args: &Value) {
325 let server = match self.resolve_server(id, args) {
326 Ok(server) => server,
327 Err(error) => return self.send_mcp(error),
328 };
329 let Some(name) = arg_str(args, "name") else {
330 return self.send_mcp(mcp::tool_error_result(id, "`name` is required"));
331 };
332 self.defer_admin(id, &server, AdminOp::DeleteChannel { name: name.to_owned() });
333 }
334
335 fn tool_set_visibility(&mut self, id: &Value, args: &Value) {
336 let server = match self.resolve_server(id, args) {
337 Ok(server) => server,
338 Err(error) => return self.send_mcp(error),
339 };
340 let (Some(name), Some(visibility)) = (arg_str(args, "name"), arg_str(args, "visibility").and_then(|v| v.parse::<Visibility>().ok())) else {
341 return self.send_mcp(mcp::tool_error_result(id, "`name` and a valid `visibility` are required"));
342 };
343 self.defer_admin(id, &server, AdminOp::SetVisibility { name: name.to_owned(), visibility });
344 }
345
346 fn tool_acl(&mut self, id: &Value, args: &Value, add: bool) {
347 let server = match self.resolve_server(id, args) {
348 Ok(server) => server,
349 Err(error) => return self.send_mcp(error),
350 };
351 let (Some(channel), Some(user)) = (arg_str(args, "channel"), arg_str(args, "user")) else {
352 return self.send_mcp(mcp::tool_error_result(id, "`channel` and `user` are required"));
353 };
354 let op = if add {
355 AdminOp::AclAdd {
356 channel: channel.to_owned(),
357 user: user.to_owned(),
358 }
359 } else {
360 AdminOp::AclRemove {
361 channel: channel.to_owned(),
362 user: user.to_owned(),
363 }
364 };
365 self.defer_admin(id, &server, op);
366 }
367
368 fn tool_invite_create(&mut self, id: &Value, args: &Value) {
369 let server = match self.resolve_server(id, args) {
370 Ok(server) => server,
371 Err(error) => return self.send_mcp(error),
372 };
373 let Some(channel) = arg_str(args, "channel") else {
374 return self.send_mcp(mcp::tool_error_result(id, "`channel` is required"));
375 };
376 let uses = args.get("uses").and_then(Value::as_u64).and_then(|u| u32::try_from(u).ok());
377 let expires_in_secs = args.get("expires_in_secs").and_then(Value::as_u64);
378 self.defer_admin(
379 id,
380 &server,
381 AdminOp::InviteCreate {
382 channel: channel.to_owned(),
383 uses,
384 expires_in_secs,
385 },
386 );
387 }
388
389 fn tool_invite_revoke(&mut self, id: &Value, args: &Value) {
390 let server = match self.resolve_server(id, args) {
391 Ok(server) => server,
392 Err(error) => return self.send_mcp(error),
393 };
394 let Some(token) = arg_str(args, "token") else {
395 return self.send_mcp(mcp::tool_error_result(id, "`token` is required"));
396 };
397 self.defer_admin(id, &server, AdminOp::InviteRevoke { token: token.to_owned() });
398 }
399
400 fn tool_kick(&mut self, id: &Value, args: &Value) {
401 let server = match self.resolve_server(id, args) {
402 Ok(server) => server,
403 Err(error) => return self.send_mcp(error),
404 };
405 let (Some(channel), Some(target)) = (arg_str(args, "channel"), arg_str(args, "target")) else {
406 return self.send_mcp(mcp::tool_error_result(id, "`channel` and `target` are required"));
407 };
408 self.defer_admin(
409 id,
410 &server,
411 AdminOp::Kick {
412 channel: channel.to_owned(),
413 target: target.to_owned(),
414 },
415 );
416 }
417
418 fn tool_ban(&mut self, id: &Value, args: &Value) {
419 let server = match self.resolve_server(id, args) {
420 Ok(server) => server,
421 Err(error) => return self.send_mcp(error),
422 };
423 let (Some(channel), Some(user)) = (arg_str(args, "channel"), arg_str(args, "user")) else {
424 return self.send_mcp(mcp::tool_error_result(id, "`channel` and `user` are required"));
425 };
426 self.defer_admin(
427 id,
428 &server,
429 AdminOp::Ban {
430 channel: channel.to_owned(),
431 user: user.to_owned(),
432 },
433 );
434 }
435
436 fn tool_join(&mut self, id: &Value, args: &Value) {
437 let server = match self.resolve_server(id, args) {
438 Ok(server) => server,
439 Err(error) => return self.send_mcp(error),
440 };
441 let Some(channel) = arg_str(args, "channel") else {
442 return self.send_mcp(mcp::tool_error_result(id, "`channel` is required"));
443 };
444 let token = arg_str(args, "token").map(str::to_owned);
445
446 if let Some(perm) = arg_str(args, "perm") {
447 match perm.parse::<PermissionLevel>() {
448 Ok(level) => self.set_scope_override(&server, Some(channel.to_owned()), level),
449 Err(err) => return self.send_mcp(mcp::tool_error_result(id, &err.to_string())),
450 }
451 }
452
453 self.defer(id, &server, None);
456 self.send_to_server(&server, ProtocolMessage::Join { channel: channel.to_owned(), token });
457 }
458
459 fn tool_leave(&mut self, id: &Value, args: &Value) {
463 let server = match self.resolve_server(id, args) {
464 Ok(server) => server,
465 Err(error) => return self.send_mcp(error),
466 };
467 let Some(channel) = arg_str(args, "channel") else {
468 return self.send_mcp(mcp::tool_error_result(id, "`channel` is required"));
469 };
470
471 if let Some(handle) = self.servers.get(&server) {
472 handle.joined.lock().expect("joined mutex poisoned").remove(channel);
473 }
474 self.defer(id, &server, Some(format!("left {channel}")));
475 self.send_to_server(&server, ProtocolMessage::Leave { channel: channel.to_owned() });
476 }
477
478 fn tool_send(&mut self, id: &Value, args: &Value) {
479 let server = match self.resolve_server(id, args) {
480 Ok(server) => server,
481 Err(error) => return self.send_mcp(error),
482 };
483 let (Some(channel), Some(text)) = (arg_str(args, "channel"), arg_str(args, "text")) else {
484 return self.send_mcp(mcp::tool_error_result(id, "`channel` and `text` are required"));
485 };
486
487 if !policy::emit_allowed(&self.config, &server, &Scope::Channel(channel.to_owned())) {
489 return self.send_mcp(mcp::tool_error_result(id, &format!("permission denied: `{channel}` on `{server}` is below `converse`")));
490 }
491
492 let from = self.our_path(&server);
493 self.defer(id, &server, Some(format!("sent to {channel}")));
496 self.send_to_server(
497 &server,
498 ProtocolMessage::ChannelMsg {
499 channel: channel.to_owned(),
500 from,
501 payload: Payload::Plain(text.to_owned()),
502 },
503 );
504 }
505
506 fn tool_whisper(&mut self, id: &Value, args: &Value) {
507 let server = match self.resolve_server(id, args) {
508 Ok(server) => server,
509 Err(error) => return self.send_mcp(error),
510 };
511 let (Some(target), Some(text)) = (arg_str(args, "target"), arg_str(args, "text")) else {
512 return self.send_mcp(mcp::tool_error_result(id, "`target` and `text` are required"));
513 };
514 let Ok(target) = target.parse::<SessionPath>() else {
515 return self.send_mcp(mcp::tool_error_result(id, "`target` must be a `user/machine/session` path"));
516 };
517
518 if !policy::emit_allowed(&self.config, &server, &Scope::Whisper) {
519 return self.send_mcp(mcp::tool_error_result(id, &format!("permission denied: whispers on `{server}` are below `converse`")));
520 }
521
522 let from = self.our_path(&server);
523 self.defer(id, &server, Some("whisper sent".to_owned()));
526 self.send_to_server(
527 &server,
528 ProtocolMessage::Whisper {
529 from,
530 target,
531 payload: Payload::Plain(text.to_owned()),
532 },
533 );
534 }
535
536 fn tool_list(&mut self, id: &Value, args: &Value) {
537 let server = match self.resolve_server(id, args) {
538 Ok(server) => server,
539 Err(error) => return self.send_mcp(error),
540 };
541 self.defer(id, &server, None);
542 self.send_to_server(&server, ProtocolMessage::ListChannels);
543 }
544
545 fn tool_who(&mut self, id: &Value, args: &Value) {
546 let server = match self.resolve_server(id, args) {
547 Ok(server) => server,
548 Err(error) => return self.send_mcp(error),
549 };
550 let channel = arg_str(args, "channel").map(str::to_owned);
551 self.defer(id, &server, None);
552 self.send_to_server(&server, ProtocolMessage::Who { channel });
553 }
554
555 fn tool_submit_permission(&mut self, id: &Value, args: &Value) {
556 let Some(request_id) = arg_str(args, "request_id") else {
557 return self.send_mcp(mcp::tool_error_result(id, "`request_id` is required"));
558 };
559 let behavior = if arg_str(args, "decision") == Some("allow") { "allow" } else { "deny" };
560 self.send_mcp(mcp::permission_verdict(request_id, behavior));
561 self.send_mcp(mcp::tool_text_result(id, &format!("permission verdict `{behavior}` sent")));
562 }
563
564 fn tool_set_perm(&mut self, id: &Value, args: &Value) {
566 let Some(level) = arg_str(args, "level").and_then(|level| level.parse::<PermissionLevel>().ok()) else {
567 return self.send_mcp(mcp::tool_error_result(id, "`level` must be mute, notify, converse, or act"));
568 };
569 let whisper = args.get("whisper").and_then(Value::as_bool).unwrap_or(false);
570 let channel = arg_str(args, "channel");
571
572 if channel.is_some() || whisper {
574 let server = match self.resolve_server(id, args) {
575 Ok(server) => server,
576 Err(error) => return self.send_mcp(error),
577 };
578 let scope = if whisper { None } else { channel.map(str::to_owned) };
579 self.set_scope_override(&server, scope, level);
580 } else {
581 self.config.default_permission = level;
582 }
583 self.send_mcp(mcp::tool_text_result(id, "permission updated"));
584 }
585
586 fn relay_permission(&self, request_id: &str, tool_name: &str, description: &str) {
589 let mut meta = std::collections::BTreeMap::new();
590 meta.insert("kind".to_owned(), "permission_request".to_owned());
591 meta.insert("request_id".to_owned(), request_id.to_owned());
592 let content = format!(
593 "Claude Code requests approval to run `{tool_name}`: {description}\nAnswer with the submit_permission tool: {{\"request_id\": \"{request_id}\", \"decision\": \"allow\"|\"deny\"}}."
594 );
595 self.send_mcp(mcp::channel_notification(&content, &meta));
596 }
597
598 fn handle_inbound(&mut self, server: &str, frame: ProtocolMessage) {
603 match frame {
604 ProtocolMessage::ChannelMsg { channel, from, payload } => self.inject(server, Some(channel), from, payload),
605 ProtocolMessage::Whisper { from, payload, .. } => self.inject(server, None, from, payload),
606 ProtocolMessage::Joined { channel } => {
607 if let Some(handle) = self.servers.get(server) {
609 handle.joined.lock().expect("joined mutex poisoned").insert(channel.clone());
610 }
611 self.resolve_pending(server, &format!("joined {channel}"));
612 }
613 ProtocolMessage::ChannelList { channels } => self.resolve_pending(server, &format_channels(&channels)),
614 ProtocolMessage::Presence { channel, sessions } => self.resolve_pending(server, &format_presence(channel.as_deref(), &sessions)),
615 ProtocolMessage::Ack { detail } => self.resolve_pending(server, detail.as_deref().unwrap_or("ok")),
616 ProtocolMessage::InviteToken { token } => self.resolve_pending(server, &format!("invite token: {token}")),
617 ProtocolMessage::Error(error) => self.resolve_error(server, &error),
618 ProtocolMessage::ServerInfo { admin } => {
620 if admin {
621 self.admin_servers.insert(server.to_owned());
622 } else {
623 self.admin_servers.remove(server);
624 }
625 }
626 _ => {}
628 }
629 }
630
631 fn inject(&self, server: &str, channel: Option<String>, from: SessionPath, payload: Payload) {
632 let body = match payload {
633 Payload::Plain(text) => text,
634 Payload::Encrypted(_) => "<end-to-end-encrypted payload — not supported in v1>".to_owned(),
635 };
636 let scope = channel.as_ref().map_or(Scope::Whisper, |c| Scope::Channel(c.clone()));
637
638 match policy::inbound_delivery(&self.config, server, &scope) {
639 Delivery::Drop => {}
640 Delivery::Inject(level) => self.sink.deliver(&Injection {
641 server: server.to_owned(),
642 channel,
643 from,
644 level,
645 body,
646 }),
647 }
648 }
649
650 fn resolve_pending(&mut self, server: &str, text: &str) {
651 match self.pending.get_mut(server).and_then(VecDeque::pop_front) {
652 Some(Pending::Tool { id, ok }) => self.send_mcp(mcp::tool_text_result(&id, ok.as_deref().unwrap_or(text))),
653 Some(Pending::Resubscribe) | None => {}
655 }
656 }
657
658 fn resolve_error(&mut self, server: &str, error: &ProtocolError) {
659 match self.pending.get_mut(server).and_then(VecDeque::pop_front) {
660 Some(Pending::Tool { id, .. }) => self.send_mcp(mcp::tool_error_result(&id, &error.to_string())),
661 Some(Pending::Resubscribe) => {}
663 None => self.notify(server, "error", &format!("Server `{server}` error: {error}")),
665 }
666 }
667
668 fn notify(&self, server: &str, kind: &str, text: &str) {
670 let mut meta = std::collections::BTreeMap::new();
671 meta.insert("server".to_owned(), server.to_owned());
672 meta.insert("kind".to_owned(), kind.to_owned());
673 self.send_mcp(mcp::channel_notification(text, &meta));
674 }
675
676 fn link_up(&mut self, server: &str) {
680 self.link_up_at.insert(server.to_owned(), tokio::time::Instant::now());
681 if self.link_down_notified.remove(server) {
682 self.notify(server, "link", &format!("Reconnected to `{server}`."));
683 }
684 let Some(handle) = self.servers.get(server) else { return };
685 let channels: Vec<String> = handle.joined.lock().expect("joined mutex poisoned").iter().cloned().collect();
686 for channel in channels {
687 self.pending.entry(server.to_owned()).or_default().push_back(Pending::Resubscribe);
688 self.send_to_server(server, ProtocolMessage::Join { channel, token: None });
689 }
690 }
691
692 fn link_down(&mut self, server: &str) {
696 if let Some(queue) = self.pending.remove(server) {
697 for entry in queue {
698 if let Pending::Tool { id, .. } = entry {
699 self.send_mcp(mcp::tool_error_result(&id, &format!("connection to `{server}` lost; retry")));
700 }
701 }
702 }
703
704 let stable = self.link_up_at.remove(server).is_none_or(|up| up.elapsed() >= client::STABLE_UPTIME);
708 if stable {
709 self.rapid_drops.remove(server);
710 } else {
711 let drops = {
712 let count = self.rapid_drops.entry(server.to_owned()).or_insert(0);
713 *count += 1;
714 *count
715 };
716 if drops == RAPID_DROP_DIAGNOSIS_THRESHOLD {
717 self.notify(
718 server,
719 "link",
720 &format!(
721 "The link to `{server}` keeps dropping right after connecting — if another live session is using the handle `{session}`, the two supersede each other; start one with a distinct `--as`. Going quiet until the link stabilizes.",
722 session = self.session
723 ),
724 );
725 return;
726 }
727 if drops > RAPID_DROP_DIAGNOSIS_THRESHOLD {
728 return;
729 }
730 }
731
732 if self.link_down_notified.insert(server.to_owned()) {
733 self.notify(server, "link", &format!("Disconnected from `{server}` — reconnecting."));
734 }
735 }
736
737 fn link_duplicate(&mut self, server: &str, canonical: &str) {
742 if let Some(queue) = self.pending.remove(server) {
743 for entry in queue {
744 if let Pending::Tool { id, .. } = entry {
745 self.send_mcp(mcp::tool_error_result(&id, &format!("`{server}` is the same server as `{canonical}`; target `{canonical}` instead")));
746 }
747 }
748 }
749 self.servers.remove(server);
750 self.admin_servers.remove(server);
751 self.link_down_notified.remove(server);
752 self.notify(
753 server,
754 "link",
755 &format!("`{server}` is the same server as `{canonical}` — this duplicate link is disabled; target `{canonical}` instead."),
756 );
757 }
758
759 fn tools(&self) -> Vec<Tool> {
764 let mut tools = vec![join_channel_tool(), leave_channel_tool(), list_channels_tool(), who_tool(), submit_permission_tool(), set_perm_tool()];
765 if self.any_emit_allowed() {
766 tools.push(send_channel_tool());
767 tools.push(whisper_tool());
768 }
769 if !self.admin_servers.is_empty() {
771 tools.extend(admin_tools());
772 }
773 tools
774 }
775
776 fn any_emit_allowed(&self) -> bool {
777 let joined: Vec<(String, String)> = self
778 .servers
779 .iter()
780 .flat_map(|(server, handle)| {
781 handle
782 .joined
783 .lock()
784 .expect("joined mutex poisoned")
785 .iter()
786 .map(|channel| (server.clone(), channel.clone()))
787 .collect::<Vec<_>>()
788 })
789 .collect();
790 policy::any_emit_allowed(&self.config, joined.iter().map(|(server, channel)| (server.as_str(), channel.as_str())))
791 }
792
793 fn resolve_server(&self, id: &Value, args: &Value) -> Result<String, Value> {
795 if let Some(server) = arg_str(args, "server") {
796 if self.servers.contains_key(server) {
797 return Ok(server.to_owned());
798 }
799 return Err(mcp::tool_error_result(id, &format!("not connected to server `{server}`")));
800 }
801 match self.servers.keys().next() {
802 Some(only) if self.servers.len() == 1 => Ok(only.clone()),
803 _ => Err(mcp::tool_error_result(id, "multiple servers connected; pass `server`")),
804 }
805 }
806
807 fn our_path(&self, server: &str) -> SessionPath {
808 self.servers.get(server).map_or_else(
809 || SessionPath::new("unknown", "unknown", self.session.clone()),
810 |handle| SessionPath::new(handle.registration.username.clone(), handle.registration.machine.clone(), self.session.clone()),
811 )
812 }
813
814 fn set_scope_override(&mut self, server: &str, channel: Option<String>, level: PermissionLevel) {
817 self.config.overrides.retain(|o| !(o.server == server && o.channel == channel));
818 self.config.overrides.push(PermissionOverride {
819 server: server.to_owned(),
820 channel,
821 level,
822 });
823 }
824
825 fn send_mcp(&self, message: Value) {
826 let _ = self.to_mcp.send(message);
827 }
828
829 fn send_to_server(&self, server: &str, frame: ProtocolMessage) {
830 if let Some(handle) = self.servers.get(server) {
831 let _ = handle.to_server.send(frame);
832 }
833 }
834}
835
836fn arg_str<'a>(args: &'a Value, key: &str) -> Option<&'a str> {
837 args.get(key).and_then(Value::as_str)
838}
839
840fn format_channels(channels: &[crate::protocol::ChannelInfo]) -> String {
841 if channels.is_empty() {
842 return "no channels visible".to_owned();
843 }
844 channels
845 .iter()
846 .map(|c| format!("{} ({}{})", c.name, c.visibility.as_str(), if c.member { ", member" } else { "" }))
847 .collect::<Vec<_>>()
848 .join("\n")
849}
850
851fn format_presence(channel: Option<&str>, sessions: &[SessionPath]) -> String {
852 let scope = channel.map_or_else(|| "server-wide".to_owned(), |c| format!("#{c}"));
853 if sessions.is_empty() {
854 return format!("{scope}: nobody online");
855 }
856 let who = sessions.iter().map(SessionPath::to_string).collect::<Vec<_>>().join(", ");
857 format!("{scope}: {who}")
858}
859
860fn join_channel_tool() -> Tool {
863 Tool {
864 name: "join_channel",
865 description: "Join a channel on a server and subscribe this session to it.",
866 input_schema: json!({
867 "type": "object",
868 "properties": {
869 "server": { "type": "string", "description": "Server URL (optional if only one is connected)." },
870 "channel": { "type": "string", "description": "Channel name to join." },
871 "token": { "type": "string", "description": "Invite token, if the channel requires one." },
872 "perm": { "type": "string", "enum": ["mute", "notify", "converse", "act"], "description": "Autonomy level for this channel." }
873 },
874 "required": ["channel"]
875 }),
876 }
877}
878
879fn leave_channel_tool() -> Tool {
880 Tool {
881 name: "leave_channel",
882 description: "Unsubscribe this session from a channel (stays connected to the server).",
883 input_schema: json!({
884 "type": "object",
885 "properties": {
886 "server": { "type": "string", "description": "Server URL (optional if only one is connected)." },
887 "channel": { "type": "string", "description": "Channel name to leave." }
888 },
889 "required": ["channel"]
890 }),
891 }
892}
893
894fn list_channels_tool() -> Tool {
895 Tool {
896 name: "list_channels",
897 description: "List the channels visible to you on a server.",
898 input_schema: json!({
899 "type": "object",
900 "properties": { "server": { "type": "string", "description": "Server URL (optional if only one is connected)." } }
901 }),
902 }
903}
904
905fn who_tool() -> Tool {
906 Tool {
907 name: "who",
908 description: "List who is present on a server, optionally scoped to a channel.",
909 input_schema: json!({
910 "type": "object",
911 "properties": {
912 "server": { "type": "string", "description": "Server URL (optional if only one is connected)." },
913 "channel": { "type": "string", "description": "Restrict presence to this channel." }
914 }
915 }),
916 }
917}
918
919fn submit_permission_tool() -> Tool {
920 Tool {
921 name: "submit_permission",
922 description: "Answer a relayed Claude Code permission request (allow or deny).",
923 input_schema: json!({
924 "type": "object",
925 "properties": {
926 "request_id": { "type": "string", "description": "The request_id from the permission prompt." },
927 "decision": { "type": "string", "enum": ["allow", "deny"], "description": "The verdict." }
928 },
929 "required": ["request_id", "decision"]
930 }),
931 }
932}
933
934fn send_channel_tool() -> Tool {
935 Tool {
936 name: "send_channel",
937 description: "Send a message to a channel (allowed only at converse/act).",
938 input_schema: json!({
939 "type": "object",
940 "properties": {
941 "server": { "type": "string", "description": "Server URL (optional if only one is connected)." },
942 "channel": { "type": "string", "description": "Channel to send to." },
943 "text": { "type": "string", "description": "The message text." }
944 },
945 "required": ["channel", "text"]
946 }),
947 }
948}
949
950fn whisper_tool() -> Tool {
951 Tool {
952 name: "whisper",
953 description: "Send a direct message to exactly one session path (allowed only at converse/act).",
954 input_schema: json!({
955 "type": "object",
956 "properties": {
957 "server": { "type": "string", "description": "Server URL (optional if only one is connected)." },
958 "target": { "type": "string", "description": "The recipient's full user/machine/session path." },
959 "text": { "type": "string", "description": "The message text." }
960 },
961 "required": ["target", "text"]
962 }),
963 }
964}
965
966fn set_perm_tool() -> Tool {
967 Tool {
968 name: "set_perm",
969 description: "Set your autonomy level live (mute/notify/converse/act) for a channel, the whisper scope, or the machine default. Takes effect on the next inbound message — no reconnect.",
970 input_schema: json!({
971 "type": "object",
972 "properties": {
973 "server": { "type": "string", "description": "Server URL (optional if only one is connected)." },
974 "channel": { "type": "string", "description": "Channel to scope to (omit with `whisper` for the whisper scope, or omit both for the machine default)." },
975 "whisper": { "type": "boolean", "description": "Apply to the whisper scope instead of a channel." },
976 "level": { "type": "string", "enum": ["mute", "notify", "converse", "act"] }
977 },
978 "required": ["level"]
979 }),
980 }
981}
982
983fn admin_tools() -> Vec<Tool> {
985 let server = json!({ "type": "string", "description": "Server URL (optional if only one is connected)." });
986 vec![
987 Tool {
988 name: "create_channel",
989 description: "Admin: create a channel (visibility public/unlisted/private; default public).",
990 input_schema: json!({
991 "type": "object",
992 "properties": { "server": server, "name": { "type": "string" }, "visibility": { "type": "string", "enum": ["public", "unlisted", "private"] } },
993 "required": ["name"]
994 }),
995 },
996 Tool {
997 name: "delete_channel",
998 description: "Admin: delete a channel.",
999 input_schema: json!({ "type": "object", "properties": { "server": server, "name": { "type": "string" } }, "required": ["name"] }),
1000 },
1001 Tool {
1002 name: "set_visibility",
1003 description: "Admin: change a channel's visibility (public/unlisted/private).",
1004 input_schema: json!({
1005 "type": "object",
1006 "properties": { "server": server, "name": { "type": "string" }, "visibility": { "type": "string", "enum": ["public", "unlisted", "private"] } },
1007 "required": ["name", "visibility"]
1008 }),
1009 },
1010 Tool {
1011 name: "acl_add",
1012 description: "Admin: add a user to a channel's access-control list.",
1013 input_schema: json!({ "type": "object", "properties": { "server": server, "channel": { "type": "string" }, "user": { "type": "string" } }, "required": ["channel", "user"] }),
1014 },
1015 Tool {
1016 name: "acl_remove",
1017 description: "Admin: remove a user from a channel's access-control list.",
1018 input_schema: json!({ "type": "object", "properties": { "server": server, "channel": { "type": "string" }, "user": { "type": "string" } }, "required": ["channel", "user"] }),
1019 },
1020 Tool {
1021 name: "invite_create",
1022 description: "Admin: mint an invite token for a channel (optional uses / expires_in_secs).",
1023 input_schema: json!({
1024 "type": "object",
1025 "properties": { "server": server, "channel": { "type": "string" }, "uses": { "type": "integer" }, "expires_in_secs": { "type": "integer" } },
1026 "required": ["channel"]
1027 }),
1028 },
1029 Tool {
1030 name: "invite_revoke",
1031 description: "Admin: revoke an invite token.",
1032 input_schema: json!({ "type": "object", "properties": { "server": server, "token": { "type": "string" } }, "required": ["token"] }),
1033 },
1034 Tool {
1035 name: "kick",
1036 description: "Admin: remove a session path or user from a channel.",
1037 input_schema: json!({ "type": "object", "properties": { "server": server, "channel": { "type": "string" }, "target": { "type": "string" } }, "required": ["channel", "target"] }),
1038 },
1039 Tool {
1040 name: "ban",
1041 description: "Admin: ban a user from a channel (drops them and blocks rejoin).",
1042 input_schema: json!({ "type": "object", "properties": { "server": server, "channel": { "type": "string" }, "user": { "type": "string" } }, "required": ["channel", "user"] }),
1043 },
1044 ]
1045}
1046
1047#[cfg(test)]
1048mod tests;