use crate::mcp::client::{self, McpPeer};
use anyhow::Result;
use std::collections::BTreeMap;
use tokio::sync::Mutex;
use wcore::model::Tool;
struct ConnectedPeer {
name: String,
peer: McpPeer,
tools: Vec<String>,
}
pub struct McpBridge {
peers: Mutex<Vec<ConnectedPeer>>,
tool_cache: Mutex<BTreeMap<String, Tool>>,
}
impl Default for McpBridge {
fn default() -> Self {
Self::new()
}
}
impl McpBridge {
pub fn new() -> Self {
Self {
peers: Mutex::new(Vec::new()),
tool_cache: Mutex::new(BTreeMap::new()),
}
}
pub async fn connect_stdio(&self, command: tokio::process::Command) -> Result<()> {
let name = command
.as_std()
.get_program()
.to_string_lossy()
.into_owned();
self.connect_stdio_named(name, command).await?;
Ok(())
}
pub async fn connect_stdio_named(
&self,
name: String,
command: tokio::process::Command,
) -> Result<Vec<String>> {
self.register_peer(name, McpPeer::stdio(command)?).await
}
pub async fn connect_http_named(&self, name: String, url: &str) -> Result<Vec<String>> {
self.register_peer(name, McpPeer::http(url)).await
}
async fn register_peer(&self, name: String, mut peer: McpPeer) -> Result<Vec<String>> {
peer.initialize().await?;
let mcp_tools = peer.list_all_tools().await?;
let mut tool_names = Vec::with_capacity(mcp_tools.len());
{
let mut cache = self.tool_cache.lock().await;
for mcp_tool in &mcp_tools {
let ct_tool = convert_tool(mcp_tool);
let tool_name = ct_tool.function.name.clone();
use std::collections::btree_map::Entry;
match cache.entry(tool_name.clone()) {
Entry::Occupied(_) => tracing::warn!(
"MCP tool '{}' from server '{}' conflicts with already-registered tool, skipping",
tool_name,
name
),
Entry::Vacant(e) => {
tool_names.push(tool_name);
e.insert(ct_tool);
}
}
}
}
self.peers.lock().await.push(ConnectedPeer {
name,
peer,
tools: tool_names.clone(),
});
Ok(tool_names)
}
pub async fn clear(&self) {
self.peers.lock().await.clear();
self.tool_cache.lock().await.clear();
}
pub async fn remove_server(&self, name: &str) -> Vec<String> {
let mut peers = self.peers.lock().await;
let mut removed_tools = Vec::new();
peers.retain(|p| {
if p.name.as_str() == name {
removed_tools.extend(p.tools.iter().cloned());
false
} else {
true
}
});
let mut cache = self.tool_cache.lock().await;
for tool_name in &removed_tools {
cache.remove(tool_name);
}
removed_tools
}
pub async fn list_servers(&self) -> Vec<(String, Vec<String>)> {
self.peers
.lock()
.await
.iter()
.map(|p| (p.name.clone(), p.tools.clone()))
.collect()
}
pub async fn tools(&self) -> Vec<Tool> {
self.tool_cache.lock().await.values().cloned().collect()
}
pub fn try_tools(&self) -> Vec<Tool> {
self.tool_cache
.try_lock()
.map(|cache| cache.values().cloned().collect())
.unwrap_or_default()
}
pub async fn call(&self, name: &str, arguments: &str) -> Result<String, String> {
let mut peers = self.peers.lock().await;
let connected = peers
.iter_mut()
.find(|p| p.tools.iter().any(|t| t.as_str() == name));
let Some(connected) = connected else {
return Err(format!("mcp tool '{name}' not available"));
};
let args: Option<serde_json::Map<String, serde_json::Value>> = if arguments.is_empty() {
None
} else {
Some(
serde_json::from_str(arguments)
.map_err(|e| format!("invalid tool arguments: {e}"))?,
)
};
match connected.peer.call_tool(name, args).await {
Ok(result) => {
let text = extract_text(&result.content);
if result.is_error == Some(true) {
Err(format!("mcp tool error: {text}"))
} else {
Ok(text)
}
}
Err(e) => Err(format!("mcp call failed: {e}")),
}
}
}
fn convert_tool(mcp_tool: &client::McpTool) -> Tool {
use wcore::model::{FunctionDef, ToolType};
Tool {
kind: ToolType::Function,
function: FunctionDef {
name: mcp_tool.name.clone(),
description: mcp_tool.description.clone(),
parameters: mcp_tool.input_schema.clone(),
},
strict: None,
}
}
fn extract_text(content: &[client::ContentItem]) -> String {
content
.iter()
.filter(|c| c.content_type == "text")
.filter_map(|c| c.text.as_deref())
.collect::<Vec<_>>()
.join("\n")
}