pub mod transport;
use crate::config::{McpServerConfig, McpTransportConfig};
use crate::provider::ToolSpec;
use crate::tools::Tool;
use anyhow::{Context, Result, bail};
use async_trait::async_trait;
use serde_json::{Value, json};
use std::borrow::Cow;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use tokio::sync::Mutex;
use tracing::{debug, info, warn};
use transport::{
HttpTransport, McpTransport, NotificationHandler, ServerRequestHandler, StdioTransport,
};
pub struct RemoteToolSpec {
pub name: String,
pub description: String,
pub input_schema: Value,
}
pub struct McpClient {
name: String,
config: McpServerConfig,
transport: tokio::sync::RwLock<Arc<dyn McpTransport>>,
workspace_root: String,
request_id: Mutex<u64>,
tools_changed: Arc<AtomicBool>,
}
impl McpClient {
pub async fn new(config: &McpServerConfig, workspace_root: &str) -> Result<Self> {
let transport = Self::build_transport(&config.transport).await?;
Ok(Self {
name: config.name.clone(),
config: config.clone(),
transport: tokio::sync::RwLock::new(transport),
workspace_root: workspace_root.to_string(),
request_id: Mutex::new(1),
tools_changed: Arc::new(AtomicBool::new(false)),
})
}
async fn build_transport(transport: &McpTransportConfig) -> Result<Arc<dyn McpTransport>> {
Ok(match transport {
McpTransportConfig::Http { url, api_key } => {
Arc::new(HttpTransport::new(url.clone(), api_key.clone()))
}
McpTransportConfig::Stdio { command, args, env } => {
Arc::new(StdioTransport::new(command, args, env).await?)
}
})
}
pub async fn reconnect(&self) -> Result<()> {
info!("MCP '{}': reconnecting", self.name);
{
let old = self.transport.read().await.clone();
if let Err(e) = old.shutdown().await {
warn!(
"MCP '{}': shutdown during reconnect failed: {e:#}",
self.name
);
}
}
let new_transport = Self::build_transport(&self.config.transport)
.await
.with_context(|| format!("MCP '{}': failed to build new transport", self.name))?;
*self.transport.write().await = new_transport;
*self.request_id.lock().await = 1;
self.tools_changed.store(false, Ordering::Relaxed);
self.connect().await?;
Ok(())
}
pub fn name(&self) -> &str {
&self.name
}
pub fn take_tools_changed(&self) -> bool {
self.tools_changed.swap(false, Ordering::Relaxed)
}
async fn next_id(&self) -> u64 {
let mut id = self.request_id.lock().await;
let current = *id;
*id += 1;
current
}
fn server_request_handler(&self) -> ServerRequestHandler {
let workspace_root = self.workspace_root.clone();
Arc::new(move |method: &str, params: &Value| -> Value {
match method {
"roots/list" => {
json!({
"result": {
"roots": [{
"uri": format!("file://{workspace_root}"),
"name": "workspace"
}]
}
})
}
"elicitation/create" => {
let message = params.get("message").and_then(|v| v.as_str()).unwrap_or("");
json!({
"result": {
"action": "accept",
"content": message
}
})
}
"sampling/createMessage" => {
json!({
"error": {
"code": -32601,
"message": "Sampling is not supported by this client"
}
})
}
_ => {
json!({
"error": {
"code": -32601,
"message": format!("Unknown method: {method}")
}
})
}
}
})
}
fn notification_handler(&self) -> NotificationHandler {
let tools_changed = Arc::clone(&self.tools_changed);
let name = self.name.clone();
Arc::new(move |method: &str, _params: &Value| {
debug!("MCP '{name}': notification: {method}");
if method == "notifications/tools/list_changed" {
info!("MCP '{name}': tool list changed, will refresh");
tools_changed.store(true, Ordering::Relaxed);
}
})
}
async fn send(&self, method: &str, params: Value) -> Result<Value> {
let id = self.next_id().await;
let body = json!({
"jsonrpc": "2.0",
"id": id,
"method": method,
"params": params,
});
let req_handler = self.server_request_handler();
let notif_handler = self.notification_handler();
let transport = self.transport.read().await.clone();
let response = transport
.request(&body, &req_handler, ¬if_handler)
.await?;
if let Some(err) = response.get("error") {
let msg = err["message"].as_str().unwrap_or("unknown error");
let code = err["code"].as_i64().unwrap_or(-1);
bail!("MCP server error {code}: {msg}");
}
Ok(response.get("result").cloned().unwrap_or(Value::Null))
}
pub async fn connect(&self) -> Result<()> {
let params = json!({
"protocolVersion": "2025-03-26",
"capabilities": {
"roots": { "listChanged": false },
"elicitation": {}
},
"clientInfo": {
"name": "sapphire-agent",
"version": env!("CARGO_PKG_VERSION")
}
});
let result = self.send("initialize", params).await?;
info!(
"MCP '{}': connected (server: {})",
self.name,
result
.get("serverInfo")
.and_then(|s| s.get("name"))
.and_then(|n| n.as_str())
.unwrap_or("unknown")
);
let notification = json!({
"jsonrpc": "2.0",
"method": "notifications/initialized",
});
let req_handler = self.server_request_handler();
let notif_handler = self.notification_handler();
let transport = self.transport.read().await.clone();
let _ = transport
.request(¬ification, &req_handler, ¬if_handler)
.await;
Ok(())
}
pub async fn list_tools(&self) -> Result<Vec<RemoteToolSpec>> {
let result = self.send("tools/list", json!({})).await?;
let tools = result
.get("tools")
.and_then(|v| v.as_array())
.cloned()
.unwrap_or_default();
let specs: Vec<RemoteToolSpec> = tools
.into_iter()
.filter_map(|t| {
let name = t.get("name")?.as_str()?.to_string();
let description = t
.get("description")
.and_then(|d| d.as_str())
.unwrap_or("")
.to_string();
let input_schema = t.get("inputSchema").cloned().unwrap_or(json!({}));
Some(RemoteToolSpec {
name,
description,
input_schema,
})
})
.collect();
info!("MCP '{}': found {} tools", self.name, specs.len());
Ok(specs)
}
pub async fn call_tool(&self, name: &str, arguments: &Value) -> Result<Value> {
let params = json!({
"name": name,
"arguments": arguments,
});
self.send("tools/call", params).await
}
#[allow(dead_code)]
pub async fn shutdown(&self) -> Result<()> {
let transport = self.transport.read().await.clone();
transport.shutdown().await
}
}
pub struct McpTool {
client: Arc<McpClient>,
spec: ToolSpec,
remote_tool_name: String,
}
#[async_trait]
impl Tool for McpTool {
fn spec(&self) -> &ToolSpec {
&self.spec
}
async fn execute(&self, input: &serde_json::Value) -> Result<String> {
let result = self.client.call_tool(&self.remote_tool_name, input).await?;
if let Some(contents) = result.get("content").and_then(|c| c.as_array()) {
let texts: Vec<&str> = contents
.iter()
.filter_map(|c| {
if c.get("type").and_then(|t| t.as_str()) == Some("text") {
c.get("text").and_then(|t| t.as_str())
} else {
None
}
})
.collect();
if !texts.is_empty() {
return Ok(texts.join("\n"));
}
}
Ok(serde_json::to_string_pretty(&result)?)
}
}
pub fn build_tools_for_client(
client: &Arc<McpClient>,
remote_tools: Vec<RemoteToolSpec>,
) -> Vec<Box<dyn Tool>> {
remote_tools
.into_iter()
.map(|rt| {
let tool_name = format!("mcp__{}__{}", client.name(), rt.name);
Box::new(McpTool {
client: Arc::clone(client),
spec: ToolSpec {
name: Cow::Owned(tool_name),
description: Cow::Owned(rt.description),
input_schema: rt.input_schema,
},
remote_tool_name: rt.name,
}) as Box<dyn Tool>
})
.collect()
}
pub async fn create_mcp_tools(
configs: &[McpServerConfig],
workspace_root: &str,
) -> (Vec<Box<dyn Tool>>, Vec<Arc<McpClient>>) {
let mut tools: Vec<Box<dyn Tool>> = Vec::new();
let mut clients: Vec<Arc<McpClient>> = Vec::new();
for config in configs {
let client = match McpClient::new(config, workspace_root).await {
Ok(c) => Arc::new(c),
Err(e) => {
warn!("MCP '{}': failed to create client: {e:#}", config.name);
continue;
}
};
if let Err(e) = client.connect().await {
warn!("MCP '{}': failed to connect: {e:#}", config.name);
continue;
}
match client.list_tools().await {
Ok(remote_tools) => {
tools.extend(build_tools_for_client(&client, remote_tools));
}
Err(e) => {
warn!("MCP '{}': failed to list tools: {e:#}", config.name);
}
}
clients.push(client);
}
(tools, clients)
}