use std::collections::BTreeMap;
use serde_json::{Value, json};
use tokio::{
io::{AsyncBufReadExt as _, AsyncRead, AsyncWrite, AsyncWriteExt as _, BufReader},
sync::mpsc,
};
use super::sink::{Injection, NotificationSink};
const DEFAULT_PROTOCOL_VERSION: &str = "2025-06-18";
const METHOD_NOT_FOUND: i64 = -32601;
#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) enum FromMcp {
Initialize {
id: Value,
protocol_version: String,
},
ListTools {
id: Value,
},
CallTool {
id: Value,
name: String,
args: Value,
},
Ping {
id: Value,
},
PermissionRequest {
request_id: String,
tool_name: String,
description: String,
input_preview: String,
},
Initialized,
UnknownRequest {
id: Value,
},
}
pub(crate) struct Tool {
pub name: &'static str,
pub description: &'static str,
pub input_schema: Value,
}
impl Tool {
fn to_json(&self) -> Value {
json!({ "name": self.name, "description": self.description, "inputSchema": self.input_schema })
}
}
pub(crate) async fn read_loop<R: AsyncRead + Unpin>(reader: R, tx: mpsc::UnboundedSender<FromMcp>) {
let mut lines = BufReader::new(reader).lines();
while let Ok(Some(line)) = lines.next_line().await {
if line.trim().is_empty() {
continue;
}
if let Some(event) = parse(&line)
&& tx.send(event).is_err()
{
break;
}
}
}
pub(crate) async fn write_loop<W: AsyncWrite + Unpin>(mut writer: W, mut rx: mpsc::UnboundedReceiver<Value>) {
while let Some(message) = rx.recv().await {
let Ok(mut line) = serde_json::to_vec(&message) else { continue };
line.push(b'\n');
if writer.write_all(&line).await.is_err() || writer.flush().await.is_err() {
break;
}
}
}
fn parse(line: &str) -> Option<FromMcp> {
let value: Value = serde_json::from_str(line).ok()?;
let method = value.get("method").and_then(Value::as_str);
let id = value.get("id").cloned();
match (method, id) {
(Some("initialize"), Some(id)) => {
let protocol_version = value.pointer("/params/protocolVersion").and_then(Value::as_str).unwrap_or(DEFAULT_PROTOCOL_VERSION).to_owned();
Some(FromMcp::Initialize { id, protocol_version })
}
(Some("tools/list"), Some(id)) => Some(FromMcp::ListTools { id }),
(Some("tools/call"), Some(id)) => {
let name = value.pointer("/params/name").and_then(Value::as_str)?.to_owned();
let args = value.pointer("/params/arguments").cloned().unwrap_or(Value::Null);
Some(FromMcp::CallTool { id, name, args })
}
(Some("ping"), Some(id)) => Some(FromMcp::Ping { id }),
(Some("notifications/claude/channel/permission_request"), None) => Some(FromMcp::PermissionRequest {
request_id: string_at(&value, "/params/request_id"),
tool_name: string_at(&value, "/params/tool_name"),
description: string_at(&value, "/params/description"),
input_preview: string_at(&value, "/params/input_preview"),
}),
(Some("notifications/initialized"), None) => Some(FromMcp::Initialized),
(Some(_), Some(id)) => Some(FromMcp::UnknownRequest { id }),
_ => None,
}
}
fn string_at(value: &Value, pointer: &str) -> String {
value.pointer(pointer).and_then(Value::as_str).unwrap_or_default().to_owned()
}
pub(crate) fn initialize_result(id: &Value, protocol_version: &str) -> Value {
json!({
"jsonrpc": "2.0",
"id": id,
"result": {
"protocolVersion": protocol_version,
"capabilities": {
"experimental": { "claude/channel": {}, "claude/channel/permission": {} },
"tools": {}
},
"serverInfo": { "name": "conclave", "version": env!("CARGO_PKG_VERSION") },
"instructions": "Conclave bridge. Inbound channel/whisper traffic is injected as <channel>/<whisper> tags carrying server/channel/from/kind. \
Reply with the send_channel or whisper tools (offered only when a joined channel is at least `converse`); discover with list_channels / who; connect with join_channel."
}
})
}
pub(crate) fn tools_list_result(id: &Value, tools: &[Tool]) -> Value {
let tools: Vec<Value> = tools.iter().map(Tool::to_json).collect();
json!({ "jsonrpc": "2.0", "id": id, "result": { "tools": tools } })
}
pub(crate) fn tool_text_result(id: &Value, text: &str) -> Value {
json!({ "jsonrpc": "2.0", "id": id, "result": { "content": [ { "type": "text", "text": text } ] } })
}
pub(crate) fn tool_error_result(id: &Value, message: &str) -> Value {
json!({ "jsonrpc": "2.0", "id": id, "result": { "content": [ { "type": "text", "text": message } ], "isError": true } })
}
pub(crate) fn method_not_found(id: &Value) -> Value {
json!({ "jsonrpc": "2.0", "id": id, "error": { "code": METHOD_NOT_FOUND, "message": "method not found" } })
}
pub(crate) fn ping_result(id: &Value) -> Value {
json!({ "jsonrpc": "2.0", "id": id, "result": {} })
}
pub(crate) fn channel_notification(content: &str, meta: &BTreeMap<String, String>) -> Value {
json!({ "jsonrpc": "2.0", "method": "notifications/claude/channel", "params": { "content": content, "meta": meta } })
}
pub(crate) fn permission_verdict(request_id: &str, behavior: &str) -> Value {
json!({ "jsonrpc": "2.0", "method": "notifications/claude/channel/permission", "params": { "request_id": request_id, "behavior": behavior } })
}
pub(crate) struct McpSink {
outbound: mpsc::UnboundedSender<Value>,
}
impl McpSink {
pub(crate) fn new(outbound: mpsc::UnboundedSender<Value>) -> Self {
Self { outbound }
}
}
impl NotificationSink for McpSink {
fn deliver(&self, injection: &Injection) {
let _ = self.outbound.send(channel_notification(&injection.content(), &injection.meta()));
}
}
#[cfg(test)]
mod tests {
#![allow(clippy::unwrap_used)]
use super::*;
use pretty_assertions::assert_eq;
#[test]
fn bridge_inject_initialize_declares_the_claude_channel_capability() {
let result = initialize_result(&json!(1), "2025-06-18");
let experimental = result.pointer("/result/capabilities/experimental").unwrap();
assert!(experimental.get("claude/channel").is_some(), "must declare claude/channel: {result}");
assert!(experimental.get("claude/channel/permission").is_some(), "must declare claude/channel/permission: {result}");
assert_eq!(result.pointer("/result/protocolVersion").and_then(Value::as_str), Some("2025-06-18"));
}
#[test]
fn bridge_inject_channel_notification_has_the_validated_shape() {
let mut meta = BTreeMap::new();
meta.insert("server".to_owned(), "s1".to_owned());
let note = channel_notification("hello", &meta);
assert_eq!(note.get("method").and_then(Value::as_str), Some("notifications/claude/channel"));
assert_eq!(note.pointer("/params/content").and_then(Value::as_str), Some("hello"));
assert_eq!(note.pointer("/params/meta/server").and_then(Value::as_str), Some("s1"));
}
#[test]
fn bridge_inject_parses_a_tool_call() {
let line = r#"{"jsonrpc":"2.0","id":7,"method":"tools/call","params":{"name":"send_channel","arguments":{"server":"s1","channel":"ops","text":"hi"}}}"#;
match parse(line) {
Some(FromMcp::CallTool { id, name, args }) => {
assert_eq!(id, json!(7));
assert_eq!(name, "send_channel");
assert_eq!(args.pointer("/channel").and_then(Value::as_str), Some("ops"));
}
other => panic!("expected CallTool, got {other:?}"),
}
}
#[test]
fn bridge_inject_parses_a_permission_request() {
let line = r#"{"jsonrpc":"2.0","method":"notifications/claude/channel/permission_request","params":{"request_id":"abcde","tool_name":"Bash","description":"run ls","input_preview":"ls -la"}}"#;
assert_eq!(
parse(line),
Some(FromMcp::PermissionRequest {
request_id: "abcde".to_owned(),
tool_name: "Bash".to_owned(),
description: "run ls".to_owned(),
input_preview: "ls -la".to_owned(),
})
);
}
#[test]
fn bridge_inject_verdict_answers_a_request() {
let verdict = permission_verdict("abcde", "allow");
assert_eq!(verdict.get("method").and_then(Value::as_str), Some("notifications/claude/channel/permission"));
assert_eq!(verdict.pointer("/params/request_id").and_then(Value::as_str), Some("abcde"));
assert_eq!(verdict.pointer("/params/behavior").and_then(Value::as_str), Some("allow"));
}
}