1mod client;
18mod mcp;
19mod policy;
20mod sink;
21
22use std::{
23 collections::{HashMap, HashSet, VecDeque},
24 sync::{Arc, Mutex},
25 time::Duration,
26};
27
28use serde_json::{Value, json};
29use tokio::sync::{Notify, mpsc};
30
31use crate::{
32 base::{PermissionLevel, Res, SessionPath, Visibility, Void},
33 identity::{Config, Identity, PermissionOverride, Scope, ServerRegistration},
34 protocol::{AdminOp, Payload, ProtocolError, ProtocolMessage},
35};
36
37use mcp::{FromMcp, McpSink, Tool};
38use policy::Delivery;
39use sink::{Injection, NotificationSink};
40
41const KEEPALIVE_INTERVAL: Duration = Duration::from_secs(20);
44
45pub struct BridgeSetup {
48 pub identity: Identity,
50 pub config: Config,
52 pub session: String,
54 pub servers: Vec<String>,
56}
57
58pub async fn run(setup: BridgeSetup) -> Void {
64 let registrations = resolve_registrations(&setup.config, &setup.servers)?;
65
66 let (from_mcp_tx, from_mcp_rx) = mpsc::unbounded_channel();
67 let (to_mcp_tx, to_mcp_rx) = mpsc::unbounded_channel();
68 tokio::spawn(mcp::read_loop(tokio::io::stdin(), from_mcp_tx));
69 tokio::spawn(mcp::write_loop(tokio::io::stdout(), to_mcp_rx));
70
71 let (inbound_tx, inbound_rx) = mpsc::unbounded_channel();
72 let shutdown = Arc::new(Notify::new());
73 let identity = Arc::new(setup.identity);
74
75 let sink = Box::new(McpSink::new(to_mcp_tx.clone()));
76 let mut core = BridgeCore::new(setup.config.clone(), setup.session.clone(), to_mcp_tx, sink);
77
78 for registration in registrations {
79 let joined = Arc::new(Mutex::new(HashSet::new()));
80 let (out_tx, out_rx) = mpsc::unbounded_channel();
81 core.register_server(registration.clone(), out_tx.clone(), Arc::clone(&joined));
82
83 let identity = Arc::clone(&identity);
84 let url = registration.url.clone();
85 let session = setup.session.clone();
86 let connect = move || {
87 let identity = Arc::clone(&identity);
88 let url = url.clone();
89 let session = session.clone();
90 async move { client::connect_ws(&url, &identity, &session).await }
91 };
92 tokio::spawn(client::run_link(registration.url.clone(), connect, inbound_tx.clone(), out_rx, Arc::clone(&shutdown)));
93 spawn_keepalive(out_tx, Arc::clone(&shutdown));
94 }
95
96 core.run(from_mcp_rx, inbound_rx, shutdown).await
97}
98
99fn resolve_registrations(config: &Config, requested: &[String]) -> Res<Vec<ServerRegistration>> {
101 let selected: Vec<ServerRegistration> = if requested.is_empty() {
102 config.servers.clone()
103 } else {
104 config.servers.iter().filter(|r| requested.iter().any(|u| u == &r.url)).cloned().collect()
105 };
106 anyhow::ensure!(!selected.is_empty(), "no known server to connect to (register one first, or pass --server)");
107 Ok(selected)
108}
109
110fn spawn_keepalive(to_server: mpsc::UnboundedSender<ProtocolMessage>, shutdown: Arc<Notify>) {
112 tokio::spawn(async move {
113 let mut ticker = tokio::time::interval(KEEPALIVE_INTERVAL);
114 loop {
115 tokio::select! {
116 () = shutdown.notified() => break,
117 _ = ticker.tick() => {
118 if to_server.send(ProtocolMessage::Ping).is_err() {
119 break;
120 }
121 }
122 }
123 }
124 });
125}
126
127struct ServerHandle {
129 registration: ServerRegistration,
130 to_server: mpsc::UnboundedSender<ProtocolMessage>,
131 joined: Arc<Mutex<HashSet<String>>>,
132}
133
134enum Pending {
137 Tool { id: Value, ok: Option<String> },
141 Resubscribe,
143}
144
145struct BridgeCore {
148 config: Config,
149 session: String,
150 to_mcp: mpsc::UnboundedSender<Value>,
151 sink: Box<dyn NotificationSink>,
152 servers: HashMap<String, ServerHandle>,
153 pending: HashMap<String, VecDeque<Pending>>,
155 link_down_notified: HashSet<String>,
158 admin_servers: HashSet<String>,
160}
161
162impl BridgeCore {
163 fn new(config: Config, session: String, to_mcp: mpsc::UnboundedSender<Value>, sink: Box<dyn NotificationSink>) -> Self {
164 Self {
165 config,
166 session,
167 to_mcp,
168 sink,
169 servers: HashMap::new(),
170 pending: HashMap::new(),
171 link_down_notified: HashSet::new(),
172 admin_servers: HashSet::new(),
173 }
174 }
175
176 fn register_server(&mut self, registration: ServerRegistration, to_server: mpsc::UnboundedSender<ProtocolMessage>, joined: Arc<Mutex<HashSet<String>>>) {
177 self.servers.insert(registration.url.clone(), ServerHandle { registration, to_server, joined });
178 }
179
180 async fn run(mut self, mut from_mcp: mpsc::UnboundedReceiver<FromMcp>, mut inbound: mpsc::UnboundedReceiver<(String, client::LinkEvent)>, shutdown: Arc<Notify>) -> Void {
182 loop {
183 tokio::select! {
184 () = shutdown.notified() => break,
185 _ = tokio::signal::ctrl_c() => break,
186 event = from_mcp.recv() => match event {
187 Some(event) => self.handle_mcp(event),
188 None => break,
189 },
190 event = inbound.recv() => match event {
191 Some((server, event)) => self.handle_link_event(&server, event),
192 None => break,
193 },
194 }
195 }
196 shutdown.notify_waiters();
197 Ok(())
198 }
199
200 fn handle_link_event(&mut self, server: &str, event: client::LinkEvent) {
202 match event {
203 client::LinkEvent::Up => self.link_up(server),
204 client::LinkEvent::Down => self.link_down(server),
205 client::LinkEvent::Frame(frame) => self.handle_inbound(server, frame),
206 }
207 }
208
209 fn handle_mcp(&mut self, event: FromMcp) {
214 match event {
215 FromMcp::Initialize { id, protocol_version } => self.send_mcp(mcp::initialize_result(&id, &protocol_version)),
216 FromMcp::ListTools { id } => {
217 let tools = self.tools();
218 self.send_mcp(mcp::tools_list_result(&id, &tools));
219 }
220 FromMcp::Ping { id } => self.send_mcp(mcp::ping_result(&id)),
221 FromMcp::CallTool { id, name, args } => self.dispatch_tool(&id, &name, &args),
222 FromMcp::PermissionRequest { request_id, tool_name, description, .. } => self.relay_permission(&request_id, &tool_name, &description),
223 FromMcp::Initialized => {}
224 FromMcp::UnknownRequest { id } => self.send_mcp(mcp::method_not_found(&id)),
225 }
226 }
227
228 fn dispatch_tool(&mut self, id: &Value, name: &str, args: &Value) {
231 match name {
232 "join_channel" => self.tool_join(id, args),
233 "leave_channel" => self.tool_leave(id, args),
234 "send_channel" => self.tool_send(id, args),
235 "whisper" => self.tool_whisper(id, args),
236 "list_channels" => self.tool_list(id, args),
237 "who" => self.tool_who(id, args),
238 "submit_permission" => self.tool_submit_permission(id, args),
239 "set_perm" => self.tool_set_perm(id, args),
240 "create_channel" => self.tool_create_channel(id, args),
241 "delete_channel" => self.tool_delete_channel(id, args),
242 "set_visibility" => self.tool_set_visibility(id, args),
243 "acl_add" => self.tool_acl(id, args, true),
244 "acl_remove" => self.tool_acl(id, args, false),
245 "invite_create" => self.tool_invite_create(id, args),
246 "invite_revoke" => self.tool_invite_revoke(id, args),
247 "kick" => self.tool_kick(id, args),
248 "ban" => self.tool_ban(id, args),
249 other => self.send_mcp(mcp::tool_error_result(id, &format!("unknown tool `{other}`"))),
250 }
251 }
252
253 fn defer(&mut self, id: &Value, server: &str, ok: Option<String>) {
258 self.pending.entry(server.to_owned()).or_default().push_back(Pending::Tool { id: id.clone(), ok });
259 }
260
261 fn defer_admin(&mut self, id: &Value, server: &str, op: AdminOp) {
262 if !self.admin_servers.contains(server) {
265 return self.send_mcp(mcp::tool_error_result(id, &format!("not an admin on `{server}`")));
266 }
267 self.defer(id, server, None);
268 self.send_to_server(server, ProtocolMessage::Admin(op));
269 }
270
271 fn tool_create_channel(&mut self, id: &Value, args: &Value) {
272 let server = match self.resolve_server(id, args) {
273 Ok(server) => server,
274 Err(error) => return self.send_mcp(error),
275 };
276 let Some(name) = arg_str(args, "name") else {
277 return self.send_mcp(mcp::tool_error_result(id, "`name` is required"));
278 };
279 let visibility = arg_str(args, "visibility").and_then(|v| v.parse().ok()).unwrap_or(Visibility::Public);
280 self.defer_admin(id, &server, AdminOp::CreateChannel { name: name.to_owned(), visibility });
281 }
282
283 fn tool_delete_channel(&mut self, id: &Value, args: &Value) {
284 let server = match self.resolve_server(id, args) {
285 Ok(server) => server,
286 Err(error) => return self.send_mcp(error),
287 };
288 let Some(name) = arg_str(args, "name") else {
289 return self.send_mcp(mcp::tool_error_result(id, "`name` is required"));
290 };
291 self.defer_admin(id, &server, AdminOp::DeleteChannel { name: name.to_owned() });
292 }
293
294 fn tool_set_visibility(&mut self, id: &Value, args: &Value) {
295 let server = match self.resolve_server(id, args) {
296 Ok(server) => server,
297 Err(error) => return self.send_mcp(error),
298 };
299 let (Some(name), Some(visibility)) = (arg_str(args, "name"), arg_str(args, "visibility").and_then(|v| v.parse::<Visibility>().ok())) else {
300 return self.send_mcp(mcp::tool_error_result(id, "`name` and a valid `visibility` are required"));
301 };
302 self.defer_admin(id, &server, AdminOp::SetVisibility { name: name.to_owned(), visibility });
303 }
304
305 fn tool_acl(&mut self, id: &Value, args: &Value, add: bool) {
306 let server = match self.resolve_server(id, args) {
307 Ok(server) => server,
308 Err(error) => return self.send_mcp(error),
309 };
310 let (Some(channel), Some(user)) = (arg_str(args, "channel"), arg_str(args, "user")) else {
311 return self.send_mcp(mcp::tool_error_result(id, "`channel` and `user` are required"));
312 };
313 let op = if add {
314 AdminOp::AclAdd {
315 channel: channel.to_owned(),
316 user: user.to_owned(),
317 }
318 } else {
319 AdminOp::AclRemove {
320 channel: channel.to_owned(),
321 user: user.to_owned(),
322 }
323 };
324 self.defer_admin(id, &server, op);
325 }
326
327 fn tool_invite_create(&mut self, id: &Value, args: &Value) {
328 let server = match self.resolve_server(id, args) {
329 Ok(server) => server,
330 Err(error) => return self.send_mcp(error),
331 };
332 let Some(channel) = arg_str(args, "channel") else {
333 return self.send_mcp(mcp::tool_error_result(id, "`channel` is required"));
334 };
335 let uses = args.get("uses").and_then(Value::as_u64).and_then(|u| u32::try_from(u).ok());
336 let expires_in_secs = args.get("expires_in_secs").and_then(Value::as_u64);
337 self.defer_admin(
338 id,
339 &server,
340 AdminOp::InviteCreate {
341 channel: channel.to_owned(),
342 uses,
343 expires_in_secs,
344 },
345 );
346 }
347
348 fn tool_invite_revoke(&mut self, id: &Value, args: &Value) {
349 let server = match self.resolve_server(id, args) {
350 Ok(server) => server,
351 Err(error) => return self.send_mcp(error),
352 };
353 let Some(token) = arg_str(args, "token") else {
354 return self.send_mcp(mcp::tool_error_result(id, "`token` is required"));
355 };
356 self.defer_admin(id, &server, AdminOp::InviteRevoke { token: token.to_owned() });
357 }
358
359 fn tool_kick(&mut self, id: &Value, args: &Value) {
360 let server = match self.resolve_server(id, args) {
361 Ok(server) => server,
362 Err(error) => return self.send_mcp(error),
363 };
364 let (Some(channel), Some(target)) = (arg_str(args, "channel"), arg_str(args, "target")) else {
365 return self.send_mcp(mcp::tool_error_result(id, "`channel` and `target` are required"));
366 };
367 self.defer_admin(
368 id,
369 &server,
370 AdminOp::Kick {
371 channel: channel.to_owned(),
372 target: target.to_owned(),
373 },
374 );
375 }
376
377 fn tool_ban(&mut self, id: &Value, args: &Value) {
378 let server = match self.resolve_server(id, args) {
379 Ok(server) => server,
380 Err(error) => return self.send_mcp(error),
381 };
382 let (Some(channel), Some(user)) = (arg_str(args, "channel"), arg_str(args, "user")) else {
383 return self.send_mcp(mcp::tool_error_result(id, "`channel` and `user` are required"));
384 };
385 self.defer_admin(
386 id,
387 &server,
388 AdminOp::Ban {
389 channel: channel.to_owned(),
390 user: user.to_owned(),
391 },
392 );
393 }
394
395 fn tool_join(&mut self, id: &Value, args: &Value) {
396 let server = match self.resolve_server(id, args) {
397 Ok(server) => server,
398 Err(error) => return self.send_mcp(error),
399 };
400 let Some(channel) = arg_str(args, "channel") else {
401 return self.send_mcp(mcp::tool_error_result(id, "`channel` is required"));
402 };
403 let token = arg_str(args, "token").map(str::to_owned);
404
405 if let Some(perm) = arg_str(args, "perm") {
406 match perm.parse::<PermissionLevel>() {
407 Ok(level) => self.set_scope_override(&server, Some(channel.to_owned()), level),
408 Err(err) => return self.send_mcp(mcp::tool_error_result(id, &err.to_string())),
409 }
410 }
411
412 self.defer(id, &server, None);
415 self.send_to_server(&server, ProtocolMessage::Join { channel: channel.to_owned(), token });
416 }
417
418 fn tool_leave(&mut self, id: &Value, args: &Value) {
422 let server = match self.resolve_server(id, args) {
423 Ok(server) => server,
424 Err(error) => return self.send_mcp(error),
425 };
426 let Some(channel) = arg_str(args, "channel") else {
427 return self.send_mcp(mcp::tool_error_result(id, "`channel` is required"));
428 };
429
430 if let Some(handle) = self.servers.get(&server) {
431 handle.joined.lock().expect("joined mutex poisoned").remove(channel);
432 }
433 self.defer(id, &server, Some(format!("left {channel}")));
434 self.send_to_server(&server, ProtocolMessage::Leave { channel: channel.to_owned() });
435 }
436
437 fn tool_send(&mut self, id: &Value, args: &Value) {
438 let server = match self.resolve_server(id, args) {
439 Ok(server) => server,
440 Err(error) => return self.send_mcp(error),
441 };
442 let (Some(channel), Some(text)) = (arg_str(args, "channel"), arg_str(args, "text")) else {
443 return self.send_mcp(mcp::tool_error_result(id, "`channel` and `text` are required"));
444 };
445
446 if !policy::emit_allowed(&self.config, &server, &Scope::Channel(channel.to_owned())) {
448 return self.send_mcp(mcp::tool_error_result(id, &format!("permission denied: `{channel}` on `{server}` is below `converse`")));
449 }
450
451 let from = self.our_path(&server);
452 self.defer(id, &server, Some(format!("sent to {channel}")));
455 self.send_to_server(
456 &server,
457 ProtocolMessage::ChannelMsg {
458 channel: channel.to_owned(),
459 from,
460 payload: Payload::Plain(text.to_owned()),
461 },
462 );
463 }
464
465 fn tool_whisper(&mut self, id: &Value, args: &Value) {
466 let server = match self.resolve_server(id, args) {
467 Ok(server) => server,
468 Err(error) => return self.send_mcp(error),
469 };
470 let (Some(target), Some(text)) = (arg_str(args, "target"), arg_str(args, "text")) else {
471 return self.send_mcp(mcp::tool_error_result(id, "`target` and `text` are required"));
472 };
473 let Ok(target) = target.parse::<SessionPath>() else {
474 return self.send_mcp(mcp::tool_error_result(id, "`target` must be a `user/machine/session` path"));
475 };
476
477 if !policy::emit_allowed(&self.config, &server, &Scope::Whisper) {
478 return self.send_mcp(mcp::tool_error_result(id, &format!("permission denied: whispers on `{server}` are below `converse`")));
479 }
480
481 let from = self.our_path(&server);
482 self.defer(id, &server, Some("whisper sent".to_owned()));
485 self.send_to_server(
486 &server,
487 ProtocolMessage::Whisper {
488 from,
489 target,
490 payload: Payload::Plain(text.to_owned()),
491 },
492 );
493 }
494
495 fn tool_list(&mut self, id: &Value, args: &Value) {
496 let server = match self.resolve_server(id, args) {
497 Ok(server) => server,
498 Err(error) => return self.send_mcp(error),
499 };
500 self.defer(id, &server, None);
501 self.send_to_server(&server, ProtocolMessage::ListChannels);
502 }
503
504 fn tool_who(&mut self, id: &Value, args: &Value) {
505 let server = match self.resolve_server(id, args) {
506 Ok(server) => server,
507 Err(error) => return self.send_mcp(error),
508 };
509 let channel = arg_str(args, "channel").map(str::to_owned);
510 self.defer(id, &server, None);
511 self.send_to_server(&server, ProtocolMessage::Who { channel });
512 }
513
514 fn tool_submit_permission(&mut self, id: &Value, args: &Value) {
515 let Some(request_id) = arg_str(args, "request_id") else {
516 return self.send_mcp(mcp::tool_error_result(id, "`request_id` is required"));
517 };
518 let behavior = if arg_str(args, "decision") == Some("allow") { "allow" } else { "deny" };
519 self.send_mcp(mcp::permission_verdict(request_id, behavior));
520 self.send_mcp(mcp::tool_text_result(id, &format!("permission verdict `{behavior}` sent")));
521 }
522
523 fn tool_set_perm(&mut self, id: &Value, args: &Value) {
525 let Some(level) = arg_str(args, "level").and_then(|level| level.parse::<PermissionLevel>().ok()) else {
526 return self.send_mcp(mcp::tool_error_result(id, "`level` must be mute, notify, converse, or act"));
527 };
528 let whisper = args.get("whisper").and_then(Value::as_bool).unwrap_or(false);
529 let channel = arg_str(args, "channel");
530
531 if channel.is_some() || whisper {
533 let server = match self.resolve_server(id, args) {
534 Ok(server) => server,
535 Err(error) => return self.send_mcp(error),
536 };
537 let scope = if whisper { None } else { channel.map(str::to_owned) };
538 self.set_scope_override(&server, scope, level);
539 } else {
540 self.config.default_permission = level;
541 }
542 self.send_mcp(mcp::tool_text_result(id, "permission updated"));
543 }
544
545 fn relay_permission(&self, request_id: &str, tool_name: &str, description: &str) {
548 let mut meta = std::collections::BTreeMap::new();
549 meta.insert("kind".to_owned(), "permission_request".to_owned());
550 meta.insert("request_id".to_owned(), request_id.to_owned());
551 let content = format!(
552 "Claude Code requests approval to run `{tool_name}`: {description}\nAnswer with the submit_permission tool: {{\"request_id\": \"{request_id}\", \"decision\": \"allow\"|\"deny\"}}."
553 );
554 self.send_mcp(mcp::channel_notification(&content, &meta));
555 }
556
557 fn handle_inbound(&mut self, server: &str, frame: ProtocolMessage) {
562 match frame {
563 ProtocolMessage::ChannelMsg { channel, from, payload } => self.inject(server, Some(channel), from, payload),
564 ProtocolMessage::Whisper { from, payload, .. } => self.inject(server, None, from, payload),
565 ProtocolMessage::Joined { channel } => {
566 if let Some(handle) = self.servers.get(server) {
568 handle.joined.lock().expect("joined mutex poisoned").insert(channel.clone());
569 }
570 self.resolve_pending(server, &format!("joined {channel}"));
571 }
572 ProtocolMessage::ChannelList { channels } => self.resolve_pending(server, &format_channels(&channels)),
573 ProtocolMessage::Presence { channel, sessions } => self.resolve_pending(server, &format_presence(channel.as_deref(), &sessions)),
574 ProtocolMessage::Ack { detail } => self.resolve_pending(server, detail.as_deref().unwrap_or("ok")),
575 ProtocolMessage::InviteToken { token } => self.resolve_pending(server, &format!("invite token: {token}")),
576 ProtocolMessage::Error(error) => self.resolve_error(server, &error),
577 ProtocolMessage::ServerInfo { admin } => {
579 if admin {
580 self.admin_servers.insert(server.to_owned());
581 } else {
582 self.admin_servers.remove(server);
583 }
584 }
585 _ => {}
587 }
588 }
589
590 fn inject(&self, server: &str, channel: Option<String>, from: SessionPath, payload: Payload) {
591 let body = match payload {
592 Payload::Plain(text) => text,
593 Payload::Encrypted(_) => "<end-to-end-encrypted payload — not supported in v1>".to_owned(),
594 };
595 let scope = channel.as_ref().map_or(Scope::Whisper, |c| Scope::Channel(c.clone()));
596
597 match policy::inbound_delivery(&self.config, server, &scope) {
598 Delivery::Drop => {}
599 Delivery::Inject(level) => self.sink.deliver(&Injection {
600 server: server.to_owned(),
601 channel,
602 from,
603 level,
604 body,
605 }),
606 }
607 }
608
609 fn resolve_pending(&mut self, server: &str, text: &str) {
610 match self.pending.get_mut(server).and_then(VecDeque::pop_front) {
611 Some(Pending::Tool { id, ok }) => self.send_mcp(mcp::tool_text_result(&id, ok.as_deref().unwrap_or(text))),
612 Some(Pending::Resubscribe) | None => {}
614 }
615 }
616
617 fn resolve_error(&mut self, server: &str, error: &ProtocolError) {
618 match self.pending.get_mut(server).and_then(VecDeque::pop_front) {
619 Some(Pending::Tool { id, .. }) => self.send_mcp(mcp::tool_error_result(&id, &error.to_string())),
620 Some(Pending::Resubscribe) => {}
622 None => self.notify(server, "error", &format!("Server `{server}` error: {error}")),
624 }
625 }
626
627 fn notify(&self, server: &str, kind: &str, text: &str) {
629 let mut meta = std::collections::BTreeMap::new();
630 meta.insert("server".to_owned(), server.to_owned());
631 meta.insert("kind".to_owned(), kind.to_owned());
632 self.send_mcp(mcp::channel_notification(text, &meta));
633 }
634
635 fn link_up(&mut self, server: &str) {
639 if self.link_down_notified.remove(server) {
640 self.notify(server, "link", &format!("Reconnected to `{server}`."));
641 }
642 let Some(handle) = self.servers.get(server) else { return };
643 let channels: Vec<String> = handle.joined.lock().expect("joined mutex poisoned").iter().cloned().collect();
644 for channel in channels {
645 self.pending.entry(server.to_owned()).or_default().push_back(Pending::Resubscribe);
646 self.send_to_server(server, ProtocolMessage::Join { channel, token: None });
647 }
648 }
649
650 fn link_down(&mut self, server: &str) {
654 if let Some(queue) = self.pending.remove(server) {
655 for entry in queue {
656 if let Pending::Tool { id, .. } = entry {
657 self.send_mcp(mcp::tool_error_result(&id, &format!("connection to `{server}` lost; retry")));
658 }
659 }
660 }
661 if self.link_down_notified.insert(server.to_owned()) {
662 self.notify(server, "link", &format!("Disconnected from `{server}` — reconnecting."));
663 }
664 }
665
666 fn tools(&self) -> Vec<Tool> {
671 let mut tools = vec![join_channel_tool(), leave_channel_tool(), list_channels_tool(), who_tool(), submit_permission_tool(), set_perm_tool()];
672 if self.any_emit_allowed() {
673 tools.push(send_channel_tool());
674 tools.push(whisper_tool());
675 }
676 if !self.admin_servers.is_empty() {
678 tools.extend(admin_tools());
679 }
680 tools
681 }
682
683 fn any_emit_allowed(&self) -> bool {
684 let joined: Vec<(String, String)> = self
685 .servers
686 .iter()
687 .flat_map(|(server, handle)| {
688 handle
689 .joined
690 .lock()
691 .expect("joined mutex poisoned")
692 .iter()
693 .map(|channel| (server.clone(), channel.clone()))
694 .collect::<Vec<_>>()
695 })
696 .collect();
697 policy::any_emit_allowed(&self.config, joined.iter().map(|(server, channel)| (server.as_str(), channel.as_str())))
698 }
699
700 fn resolve_server(&self, id: &Value, args: &Value) -> Result<String, Value> {
702 if let Some(server) = arg_str(args, "server") {
703 if self.servers.contains_key(server) {
704 return Ok(server.to_owned());
705 }
706 return Err(mcp::tool_error_result(id, &format!("not connected to server `{server}`")));
707 }
708 match self.servers.keys().next() {
709 Some(only) if self.servers.len() == 1 => Ok(only.clone()),
710 _ => Err(mcp::tool_error_result(id, "multiple servers connected; pass `server`")),
711 }
712 }
713
714 fn our_path(&self, server: &str) -> SessionPath {
715 self.servers.get(server).map_or_else(
716 || SessionPath::new("unknown", "unknown", self.session.clone()),
717 |handle| SessionPath::new(handle.registration.username.clone(), handle.registration.machine.clone(), self.session.clone()),
718 )
719 }
720
721 fn set_scope_override(&mut self, server: &str, channel: Option<String>, level: PermissionLevel) {
724 self.config.overrides.retain(|o| !(o.server == server && o.channel == channel));
725 self.config.overrides.push(PermissionOverride {
726 server: server.to_owned(),
727 channel,
728 level,
729 });
730 }
731
732 fn send_mcp(&self, message: Value) {
733 let _ = self.to_mcp.send(message);
734 }
735
736 fn send_to_server(&self, server: &str, frame: ProtocolMessage) {
737 if let Some(handle) = self.servers.get(server) {
738 let _ = handle.to_server.send(frame);
739 }
740 }
741}
742
743fn arg_str<'a>(args: &'a Value, key: &str) -> Option<&'a str> {
744 args.get(key).and_then(Value::as_str)
745}
746
747fn format_channels(channels: &[crate::protocol::ChannelInfo]) -> String {
748 if channels.is_empty() {
749 return "no channels visible".to_owned();
750 }
751 channels
752 .iter()
753 .map(|c| format!("{} ({}{})", c.name, c.visibility.as_str(), if c.member { ", member" } else { "" }))
754 .collect::<Vec<_>>()
755 .join("\n")
756}
757
758fn format_presence(channel: Option<&str>, sessions: &[SessionPath]) -> String {
759 let scope = channel.map_or_else(|| "server-wide".to_owned(), |c| format!("#{c}"));
760 if sessions.is_empty() {
761 return format!("{scope}: nobody online");
762 }
763 let who = sessions.iter().map(SessionPath::to_string).collect::<Vec<_>>().join(", ");
764 format!("{scope}: {who}")
765}
766
767fn join_channel_tool() -> Tool {
770 Tool {
771 name: "join_channel",
772 description: "Join a channel on a server and subscribe this session to it.",
773 input_schema: json!({
774 "type": "object",
775 "properties": {
776 "server": { "type": "string", "description": "Server URL (optional if only one is connected)." },
777 "channel": { "type": "string", "description": "Channel name to join." },
778 "token": { "type": "string", "description": "Invite token, if the channel requires one." },
779 "perm": { "type": "string", "enum": ["mute", "notify", "converse", "act"], "description": "Autonomy level for this channel." }
780 },
781 "required": ["channel"]
782 }),
783 }
784}
785
786fn leave_channel_tool() -> Tool {
787 Tool {
788 name: "leave_channel",
789 description: "Unsubscribe this session from a channel (stays connected to the server).",
790 input_schema: json!({
791 "type": "object",
792 "properties": {
793 "server": { "type": "string", "description": "Server URL (optional if only one is connected)." },
794 "channel": { "type": "string", "description": "Channel name to leave." }
795 },
796 "required": ["channel"]
797 }),
798 }
799}
800
801fn list_channels_tool() -> Tool {
802 Tool {
803 name: "list_channels",
804 description: "List the channels visible to you on a server.",
805 input_schema: json!({
806 "type": "object",
807 "properties": { "server": { "type": "string", "description": "Server URL (optional if only one is connected)." } }
808 }),
809 }
810}
811
812fn who_tool() -> Tool {
813 Tool {
814 name: "who",
815 description: "List who is present on a server, optionally scoped to a channel.",
816 input_schema: json!({
817 "type": "object",
818 "properties": {
819 "server": { "type": "string", "description": "Server URL (optional if only one is connected)." },
820 "channel": { "type": "string", "description": "Restrict presence to this channel." }
821 }
822 }),
823 }
824}
825
826fn submit_permission_tool() -> Tool {
827 Tool {
828 name: "submit_permission",
829 description: "Answer a relayed Claude Code permission request (allow or deny).",
830 input_schema: json!({
831 "type": "object",
832 "properties": {
833 "request_id": { "type": "string", "description": "The request_id from the permission prompt." },
834 "decision": { "type": "string", "enum": ["allow", "deny"], "description": "The verdict." }
835 },
836 "required": ["request_id", "decision"]
837 }),
838 }
839}
840
841fn send_channel_tool() -> Tool {
842 Tool {
843 name: "send_channel",
844 description: "Send a message to a channel (allowed only at converse/act).",
845 input_schema: json!({
846 "type": "object",
847 "properties": {
848 "server": { "type": "string", "description": "Server URL (optional if only one is connected)." },
849 "channel": { "type": "string", "description": "Channel to send to." },
850 "text": { "type": "string", "description": "The message text." }
851 },
852 "required": ["channel", "text"]
853 }),
854 }
855}
856
857fn whisper_tool() -> Tool {
858 Tool {
859 name: "whisper",
860 description: "Send a direct message to exactly one session path (allowed only at converse/act).",
861 input_schema: json!({
862 "type": "object",
863 "properties": {
864 "server": { "type": "string", "description": "Server URL (optional if only one is connected)." },
865 "target": { "type": "string", "description": "The recipient's full user/machine/session path." },
866 "text": { "type": "string", "description": "The message text." }
867 },
868 "required": ["target", "text"]
869 }),
870 }
871}
872
873fn set_perm_tool() -> Tool {
874 Tool {
875 name: "set_perm",
876 description: "Set your autonomy level live (mute/notify/converse/act) for a channel, the whisper scope, or the machine default. Takes effect on the next inbound message — no reconnect.",
877 input_schema: json!({
878 "type": "object",
879 "properties": {
880 "server": { "type": "string", "description": "Server URL (optional if only one is connected)." },
881 "channel": { "type": "string", "description": "Channel to scope to (omit with `whisper` for the whisper scope, or omit both for the machine default)." },
882 "whisper": { "type": "boolean", "description": "Apply to the whisper scope instead of a channel." },
883 "level": { "type": "string", "enum": ["mute", "notify", "converse", "act"] }
884 },
885 "required": ["level"]
886 }),
887 }
888}
889
890fn admin_tools() -> Vec<Tool> {
892 let server = json!({ "type": "string", "description": "Server URL (optional if only one is connected)." });
893 vec![
894 Tool {
895 name: "create_channel",
896 description: "Admin: create a channel (visibility public/unlisted/private; default public).",
897 input_schema: json!({
898 "type": "object",
899 "properties": { "server": server, "name": { "type": "string" }, "visibility": { "type": "string", "enum": ["public", "unlisted", "private"] } },
900 "required": ["name"]
901 }),
902 },
903 Tool {
904 name: "delete_channel",
905 description: "Admin: delete a channel.",
906 input_schema: json!({ "type": "object", "properties": { "server": server, "name": { "type": "string" } }, "required": ["name"] }),
907 },
908 Tool {
909 name: "set_visibility",
910 description: "Admin: change a channel's visibility (public/unlisted/private).",
911 input_schema: json!({
912 "type": "object",
913 "properties": { "server": server, "name": { "type": "string" }, "visibility": { "type": "string", "enum": ["public", "unlisted", "private"] } },
914 "required": ["name", "visibility"]
915 }),
916 },
917 Tool {
918 name: "acl_add",
919 description: "Admin: add a user to a channel's access-control list.",
920 input_schema: json!({ "type": "object", "properties": { "server": server, "channel": { "type": "string" }, "user": { "type": "string" } }, "required": ["channel", "user"] }),
921 },
922 Tool {
923 name: "acl_remove",
924 description: "Admin: remove a user from a channel's access-control list.",
925 input_schema: json!({ "type": "object", "properties": { "server": server, "channel": { "type": "string" }, "user": { "type": "string" } }, "required": ["channel", "user"] }),
926 },
927 Tool {
928 name: "invite_create",
929 description: "Admin: mint an invite token for a channel (optional uses / expires_in_secs).",
930 input_schema: json!({
931 "type": "object",
932 "properties": { "server": server, "channel": { "type": "string" }, "uses": { "type": "integer" }, "expires_in_secs": { "type": "integer" } },
933 "required": ["channel"]
934 }),
935 },
936 Tool {
937 name: "invite_revoke",
938 description: "Admin: revoke an invite token.",
939 input_schema: json!({ "type": "object", "properties": { "server": server, "token": { "type": "string" } }, "required": ["token"] }),
940 },
941 Tool {
942 name: "kick",
943 description: "Admin: remove a session path or user from a channel.",
944 input_schema: json!({ "type": "object", "properties": { "server": server, "channel": { "type": "string" }, "target": { "type": "string" } }, "required": ["channel", "target"] }),
945 },
946 Tool {
947 name: "ban",
948 description: "Admin: ban a user from a channel (drops them and blocks rejoin).",
949 input_schema: json!({ "type": "object", "properties": { "server": server, "channel": { "type": "string" }, "user": { "type": "string" } }, "required": ["channel", "user"] }),
950 },
951 ]
952}
953
954#[cfg(test)]
955mod tests;