use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::process::{Child, Command};
use tokio::sync::Mutex;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct McpServerConfig {
pub name: String,
pub command: String,
#[serde(default)]
pub args: Vec<String>,
#[serde(default)]
pub env: HashMap<String, String>,
pub cwd: Option<String>,
}
pub struct McpServer {
config: McpServerConfig,
child: Child,
stdin: tokio::io::BufWriter<tokio::process::ChildStdin>,
stdout: BufReader<tokio::process::ChildStdout>,
next_id: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct McpToolInfo {
pub name: String,
pub description: Option<String>,
#[serde(rename = "inputSchema")]
pub input_schema: Option<Value>,
}
#[derive(Debug, Serialize)]
struct McpRequest {
jsonrpc: &'static str,
method: String,
#[serde(skip_serializing_if = "Option::is_none")]
params: Option<Value>,
id: u64,
}
#[derive(Debug, Deserialize)]
struct McpResponse {
result: Option<Value>,
error: Option<McpError>,
#[allow(dead_code)]
id: Option<u64>,
}
#[derive(Debug, Deserialize)]
struct McpError {
#[allow(dead_code)]
code: Option<i64>,
message: String,
}
impl McpServer {
pub async fn start(config: McpServerConfig) -> Result<Self, String> {
let mut cmd = Command::new(&config.command);
cmd.args(&config.args)
.stdin(std::process::Stdio::piped())
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped());
if let Some(ref cwd) = config.cwd {
cmd.current_dir(cwd);
}
for (k, v) in &config.env {
cmd.env(k, v);
}
let mut child = cmd
.spawn()
.map_err(|e| format!("failed to start MCP server '{}': {}", config.name, e))?;
let stdin = child
.stdin
.take()
.ok_or_else(|| "MCP server has no stdin".to_string())?;
let stdout = child
.stdout
.take()
.ok_or_else(|| "MCP server has no stdout".to_string())?;
let mut server = Self {
config,
child,
stdin: tokio::io::BufWriter::new(stdin),
stdout: BufReader::new(stdout),
next_id: 1,
};
server
.send_request(
"initialize",
Some(serde_json::json!({
"protocolVersion": "2024-11-05",
"capabilities": {},
"clientInfo": {
"name": "car-runtime",
"version": env!("CARGO_PKG_VERSION")
}
})),
)
.await?;
let notification = serde_json::json!({
"jsonrpc": "2.0",
"method": "notifications/initialized"
});
let msg =
serde_json::to_string(¬ification).map_err(|e| format!("serialize error: {e}"))?;
server
.stdin
.write_all(msg.as_bytes())
.await
.map_err(|e| format!("write error: {e}"))?;
server
.stdin
.write_all(b"\n")
.await
.map_err(|e| format!("write error: {e}"))?;
server
.stdin
.flush()
.await
.map_err(|e| format!("flush error: {e}"))?;
Ok(server)
}
async fn send_request(&mut self, method: &str, params: Option<Value>) -> Result<Value, String> {
let id = self.next_id;
self.next_id += 1;
let req = McpRequest {
jsonrpc: "2.0",
method: method.to_string(),
params,
id,
};
let msg = serde_json::to_string(&req).map_err(|e| format!("serialize error: {e}"))?;
self.stdin
.write_all(msg.as_bytes())
.await
.map_err(|e| format!("write to MCP server: {e}"))?;
self.stdin
.write_all(b"\n")
.await
.map_err(|e| format!("write newline: {e}"))?;
self.stdin
.flush()
.await
.map_err(|e| format!("flush: {e}"))?;
let mut line = String::new();
self.stdout
.read_line(&mut line)
.await
.map_err(|e| format!("read from MCP server: {e}"))?;
let resp: McpResponse = serde_json::from_str(&line)
.map_err(|e| format!("invalid MCP response: {e} (raw: {})", line.trim()))?;
if let Some(err) = resp.error {
return Err(format!("MCP error: {}", err.message));
}
resp.result
.ok_or_else(|| "MCP server returned no result".to_string())
}
pub async fn list_tools(&mut self) -> Result<Vec<McpToolInfo>, String> {
let result = self.send_request("tools/list", None).await?;
let tools = result
.get("tools")
.and_then(|t| t.as_array())
.cloned()
.unwrap_or_default();
tools
.into_iter()
.map(|t| serde_json::from_value(t).map_err(|e| format!("invalid tool definition: {e}")))
.collect()
}
pub async fn call_tool(&mut self, name: &str, arguments: Value) -> Result<Value, String> {
let result = self
.send_request(
"tools/call",
Some(serde_json::json!({
"name": name,
"arguments": arguments,
})),
)
.await?;
if let Some(content) = result.get("content").and_then(|c| c.as_array()) {
let texts: Vec<&str> = content
.iter()
.filter_map(|block| {
if block.get("type").and_then(|t| t.as_str()) == Some("text") {
block.get("text").and_then(|t| t.as_str())
} else {
None
}
})
.collect();
if !texts.is_empty() {
return Ok(Value::String(texts.join("\n")));
}
}
Ok(result)
}
pub async fn shutdown(mut self) {
let _ = self.stdin.shutdown().await;
let _ = self.child.kill().await;
let _ = self.child.wait().await;
}
pub fn name(&self) -> &str {
&self.config.name
}
}
pub struct McpToolExecutor {
servers: Arc<Mutex<HashMap<String, Arc<Mutex<McpServer>>>>>,
tool_routes: Arc<Mutex<HashMap<String, String>>>,
fallback: Option<Arc<dyn super::ToolExecutor>>,
}
impl McpToolExecutor {
pub fn new() -> Self {
Self {
servers: Arc::new(Mutex::new(HashMap::new())),
tool_routes: Arc::new(Mutex::new(HashMap::new())),
fallback: None,
}
}
pub fn with_fallback(mut self, fallback: Arc<dyn super::ToolExecutor>) -> Self {
self.fallback = Some(fallback);
self
}
pub async fn add_server(&self, mut server: McpServer) -> Result<Vec<String>, String> {
let server_name = server.config.name.clone();
let tools = server.list_tools().await?;
let tool_names: Vec<String> = tools
.iter()
.map(|t| format!("mcp_{}_{}", server_name, t.name))
.collect();
{
let mut routes = self.tool_routes.lock().await;
for (info, canonical_name) in tools.iter().zip(tool_names.iter()) {
routes.insert(canonical_name.clone(), server_name.clone());
routes.insert(info.name.clone(), server_name.clone());
}
}
self.servers
.lock()
.await
.insert(server_name, Arc::new(Mutex::new(server)));
Ok(tool_names)
}
pub async fn tool_schemas(&self) -> Vec<(String, car_ir::ToolSchema)> {
let mut schemas = Vec::new();
let servers = self.servers.lock().await;
for (server_name, server) in servers.iter() {
let mut srv = server.lock().await;
if let Ok(tools) = srv.list_tools().await {
for tool in tools {
let canonical_name = format!("mcp_{}_{}", server_name, tool.name);
schemas.push((
server_name.clone(),
car_ir::ToolSchema {
name: canonical_name,
description: tool.description.unwrap_or_default(),
parameters: tool
.input_schema
.unwrap_or(serde_json::json!({"type": "object"})),
returns: None,
idempotent: false,
cache_ttl_secs: None,
rate_limit: None,
},
));
}
}
}
schemas
}
pub async fn shutdown_all(&self) {
let mut servers = self.servers.lock().await;
servers.drain();
}
}
impl Default for McpToolExecutor {
fn default() -> Self {
Self::new()
}
}
#[async_trait::async_trait]
impl super::ToolExecutor for McpToolExecutor {
async fn execute(&self, tool: &str, params: &Value) -> Result<Value, String> {
self.execute_with_action(tool, params, "").await
}
async fn execute_with_action(
&self,
tool: &str,
params: &Value,
action_id: &str,
) -> Result<Value, String> {
let server_name = {
let routes = self.tool_routes.lock().await;
routes.get(tool).cloned()
};
if let Some(server_name) = server_name {
let servers = self.servers.lock().await;
if let Some(server) = servers.get(&server_name) {
let mut srv = server.lock().await;
let bare_name = tool
.strip_prefix(&format!("mcp_{}_", server_name))
.unwrap_or(tool);
return srv.call_tool(bare_name, params.clone()).await;
}
}
if let Some(ref fallback) = self.fallback {
return fallback.execute_with_action(tool, params, action_id).await;
}
Err(format!("unknown MCP tool: '{}'", tool))
}
}