use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::collections::HashMap;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex as StdMutex};
use std::time::Duration;
use tokio::io::{AsyncBufRead, AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::process::{Child, ChildStderr, Command};
use tokio::sync::{oneshot, Mutex};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct McpServerConfig {
pub name: String,
pub command: String,
#[serde(default)]
pub args: Vec<String>,
#[serde(default)]
pub env: HashMap<String, String>,
pub cwd: Option<String>,
}
type Pending = Arc<StdMutex<HashMap<u64, oneshot::Sender<McpResponse>>>>;
pub struct McpServer {
config: McpServerConfig,
child: Child,
stdin: tokio::io::BufWriter<tokio::process::ChildStdin>,
next_id: u64,
pending: Pending,
reader: tokio::task::JoinHandle<()>,
stderr_reader: tokio::task::JoinHandle<()>,
alive: Arc<AtomicBool>,
request_timeout: Duration,
}
impl Drop for McpServer {
fn drop(&mut self) {
self.reader.abort();
self.stderr_reader.abort();
let _ = self.child.start_kill();
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct McpToolInfo {
pub name: String,
pub description: Option<String>,
#[serde(rename = "inputSchema")]
pub input_schema: Option<Value>,
}
#[derive(Debug, Serialize)]
struct McpRequest {
jsonrpc: &'static str,
method: String,
#[serde(skip_serializing_if = "Option::is_none")]
params: Option<Value>,
id: u64,
}
#[derive(Debug, Deserialize)]
struct McpResponse {
result: Option<Value>,
error: Option<McpError>,
id: Option<u64>,
}
#[derive(Debug, Deserialize)]
struct McpError {
#[allow(dead_code)]
code: Option<i64>,
message: String,
}
fn route_line(line: &str, pending: &StdMutex<HashMap<u64, oneshot::Sender<McpResponse>>>) {
let resp: McpResponse = match serde_json::from_str(line) {
Ok(r) => r,
Err(_) => return, };
if let Some(id) = resp.id {
if let Some(tx) = pending.lock().unwrap().remove(&id) {
let _ = tx.send(resp);
}
}
}
async fn reader_loop<R: AsyncBufRead + Unpin>(
mut stdout: R,
pending: Pending,
alive: Arc<AtomicBool>,
server_name: String,
) {
let mut line = String::new();
loop {
line.clear();
match stdout.read_line(&mut line).await {
Ok(0) | Err(_) => break, Ok(_) => route_line(&line, &pending),
}
}
alive.store(false, Ordering::SeqCst);
pending.lock().unwrap().clear(); tracing::debug!(server = %server_name, "MCP reader exited; connection closed");
}
async fn stderr_drain_loop(stderr: ChildStderr, server_name: String) {
let mut lines = BufReader::new(stderr).lines();
while let Ok(Some(line)) = lines.next_line().await {
tracing::debug!(server = %server_name, "mcp stderr: {line}");
}
}
impl McpServer {
pub async fn start(config: McpServerConfig) -> Result<Self, String> {
let mut cmd = Command::new(&config.command);
cmd.args(&config.args)
.stdin(std::process::Stdio::piped())
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped());
if let Some(ref cwd) = config.cwd {
cmd.current_dir(cwd);
}
for (k, v) in &config.env {
cmd.env(k, v);
}
let mut child = cmd
.spawn()
.map_err(|e| format!("failed to start MCP server '{}': {}", config.name, e))?;
let stdin = child
.stdin
.take()
.ok_or_else(|| "MCP server has no stdin".to_string())?;
let stdout = child
.stdout
.take()
.ok_or_else(|| "MCP server has no stdout".to_string())?;
let stderr = child
.stderr
.take()
.ok_or_else(|| "MCP server has no stderr".to_string())?;
let pending: Pending = Arc::new(StdMutex::new(HashMap::new()));
let alive = Arc::new(AtomicBool::new(true));
let reader = tokio::spawn(reader_loop(
BufReader::new(stdout),
Arc::clone(&pending),
Arc::clone(&alive),
config.name.clone(),
));
let stderr_reader = tokio::spawn(stderr_drain_loop(stderr, config.name.clone()));
let mut server = Self {
config,
child,
stdin: tokio::io::BufWriter::new(stdin),
next_id: 1,
pending,
reader,
stderr_reader,
alive,
request_timeout: Duration::from_secs(120),
};
server
.send_request(
"initialize",
Some(serde_json::json!({
"protocolVersion": "2024-11-05",
"capabilities": {},
"clientInfo": {
"name": "car-runtime",
"version": env!("CARGO_PKG_VERSION")
}
})),
)
.await?;
let notification = serde_json::json!({
"jsonrpc": "2.0",
"method": "notifications/initialized"
});
let msg =
serde_json::to_string(¬ification).map_err(|e| format!("serialize error: {e}"))?;
server.write_message(&msg).await?;
Ok(server)
}
async fn reconnect(&mut self) -> Result<(), String> {
tracing::warn!(server = %self.config.name, "MCP connection closed; reconnecting (server-side state is lost)");
self.reader.abort(); self.stderr_reader.abort();
let _ = self.child.kill().await;
let fresh = Box::pin(Self::start(self.config.clone())).await?;
*self = fresh;
Ok(())
}
async fn write_message(&mut self, msg: &str) -> Result<(), String> {
self.stdin
.write_all(msg.as_bytes())
.await
.map_err(|e| format!("write to MCP server: {e}"))?;
self.stdin
.write_all(b"\n")
.await
.map_err(|e| format!("write newline: {e}"))?;
self.stdin.flush().await.map_err(|e| format!("flush: {e}"))?;
Ok(())
}
async fn send_request(&mut self, method: &str, params: Option<Value>) -> Result<Value, String> {
if !self.alive.load(Ordering::SeqCst) {
self.reconnect().await.map_err(|e| {
format!(
"MCP session '{}' is dead and reconnect failed: {e}",
self.config.name
)
})?;
}
let id = self.next_id;
self.next_id += 1;
let (tx, rx) = oneshot::channel();
self.pending.lock().unwrap().insert(id, tx);
let req = McpRequest {
jsonrpc: "2.0",
method: method.to_string(),
params,
id,
};
let msg = serde_json::to_string(&req).map_err(|e| format!("serialize error: {e}"))?;
if let Err(e) = self.write_message(&msg).await {
self.pending.lock().unwrap().remove(&id);
self.alive.store(false, Ordering::SeqCst); return Err(e);
}
let resp = match tokio::time::timeout(self.request_timeout, rx).await {
Ok(Ok(resp)) => resp,
Ok(Err(_)) => {
return Err(format!(
"MCP server '{}' closed the connection",
self.config.name
))
}
Err(_) => {
self.pending.lock().unwrap().remove(&id);
return Err(format!("MCP request '{method}' timed out"));
}
};
if let Some(err) = resp.error {
return Err(format!("MCP error: {}", err.message));
}
resp.result
.ok_or_else(|| "MCP server returned no result".to_string())
}
pub async fn list_tools(&mut self) -> Result<Vec<McpToolInfo>, String> {
let result = self.send_request("tools/list", None).await?;
let tools = result
.get("tools")
.and_then(|t| t.as_array())
.cloned()
.unwrap_or_default();
tools
.into_iter()
.map(|t| serde_json::from_value(t).map_err(|e| format!("invalid tool definition: {e}")))
.collect()
}
pub async fn call_tool(&mut self, name: &str, arguments: Value) -> Result<Value, String> {
let result = self
.send_request(
"tools/call",
Some(serde_json::json!({
"name": name,
"arguments": arguments,
})),
)
.await?;
if let Some(content) = result.get("content").and_then(|c| c.as_array()) {
let texts: Vec<&str> = content
.iter()
.filter_map(|block| {
if block.get("type").and_then(|t| t.as_str()) == Some("text") {
block.get("text").and_then(|t| t.as_str())
} else {
None
}
})
.collect();
if !texts.is_empty() {
return Ok(Value::String(texts.join("\n")));
}
}
Ok(result)
}
pub async fn shutdown(mut self) {
let _ = self.stdin.shutdown().await;
let _ = self.child.kill().await;
let _ = self.child.wait().await;
}
pub fn name(&self) -> &str {
&self.config.name
}
}
pub struct McpToolExecutor {
servers: Arc<Mutex<HashMap<String, Arc<Mutex<McpServer>>>>>,
tool_routes: Arc<Mutex<HashMap<String, String>>>,
fallback: Option<Arc<dyn super::ToolExecutor>>,
}
impl McpToolExecutor {
pub fn new() -> Self {
Self {
servers: Arc::new(Mutex::new(HashMap::new())),
tool_routes: Arc::new(Mutex::new(HashMap::new())),
fallback: None,
}
}
pub fn with_fallback(mut self, fallback: Arc<dyn super::ToolExecutor>) -> Self {
self.fallback = Some(fallback);
self
}
pub async fn add_server(&self, mut server: McpServer) -> Result<Vec<String>, String> {
let server_name = server.config.name.clone();
let tools = server.list_tools().await?;
let tool_names: Vec<String> = tools
.iter()
.map(|t| format!("mcp_{}_{}", server_name, t.name))
.collect();
{
let mut routes = self.tool_routes.lock().await;
for (info, canonical_name) in tools.iter().zip(tool_names.iter()) {
routes.insert(canonical_name.clone(), server_name.clone());
routes.insert(info.name.clone(), server_name.clone());
}
}
self.servers
.lock()
.await
.insert(server_name, Arc::new(Mutex::new(server)));
Ok(tool_names)
}
pub async fn tool_schemas(&self) -> Vec<(String, car_ir::ToolSchema)> {
let mut schemas = Vec::new();
let servers = self.servers.lock().await;
for (server_name, server) in servers.iter() {
let mut srv = server.lock().await;
if let Ok(tools) = srv.list_tools().await {
for tool in tools {
let canonical_name = format!("mcp_{}_{}", server_name, tool.name);
schemas.push((
server_name.clone(),
car_ir::ToolSchema {
name: canonical_name,
description: tool.description.unwrap_or_default(),
parameters: tool
.input_schema
.unwrap_or(serde_json::json!({"type": "object"})),
returns: None,
idempotent: false,
cache_ttl_secs: None,
rate_limit: None,
},
));
}
}
}
schemas
}
pub async fn shutdown_all(&self) {
let mut servers = self.servers.lock().await;
servers.drain();
}
}
impl Default for McpToolExecutor {
fn default() -> Self {
Self::new()
}
}
#[async_trait::async_trait]
impl super::ToolExecutor for McpToolExecutor {
async fn execute(&self, tool: &str, params: &Value) -> Result<Value, String> {
self.execute_with_action(tool, params, "", None).await
}
async fn execute_with_action(
&self,
tool: &str,
params: &Value,
action_id: &str,
timeout_ms: Option<u64>,
) -> Result<Value, String> {
let server_name = {
let routes = self.tool_routes.lock().await;
routes.get(tool).cloned()
};
if let Some(server_name) = server_name {
let servers = self.servers.lock().await;
if let Some(server) = servers.get(&server_name) {
let mut srv = server.lock().await;
let bare_name = tool
.strip_prefix(&format!("mcp_{}_", server_name))
.unwrap_or(tool);
return srv.call_tool(bare_name, params.clone()).await;
}
}
if let Some(ref fallback) = self.fallback {
return fallback
.execute_with_action(tool, params, action_id, timeout_ms)
.await;
}
Err(format!("unknown MCP tool: '{}'", tool))
}
}
#[cfg(test)]
mod tests {
use super::*;
fn pending() -> StdMutex<HashMap<u64, oneshot::Sender<McpResponse>>> {
StdMutex::new(HashMap::new())
}
#[tokio::test]
async fn routes_response_to_matching_waiter() {
let p = pending();
let (tx, rx) = oneshot::channel();
p.lock().unwrap().insert(7, tx);
route_line(r#"{"jsonrpc":"2.0","id":7,"result":{"value":42}}"#, &p);
let resp = rx.await.expect("waiter delivered");
assert!(resp.result.is_some());
assert!(p.lock().unwrap().is_empty());
}
#[tokio::test]
async fn unknown_id_is_discarded_without_disturbing_other_waiters() {
let p = pending();
let (tx, _rx) = oneshot::channel();
p.lock().unwrap().insert(1, tx);
route_line(r#"{"jsonrpc":"2.0","id":999,"result":{}}"#, &p);
assert!(p.lock().unwrap().contains_key(&1));
}
#[test]
fn notifications_and_garbage_are_ignored() {
let p = pending();
route_line(
r#"{"jsonrpc":"2.0","method":"notifications/progress","params":{}}"#,
&p,
);
route_line("not json at all", &p);
assert!(p.lock().unwrap().is_empty());
}
#[tokio::test]
async fn error_response_is_routed_for_send_request_to_surface() {
let p = pending();
let (tx, rx) = oneshot::channel();
p.lock().unwrap().insert(3, tx);
route_line(
r#"{"jsonrpc":"2.0","id":3,"error":{"code":-1,"message":"tool failed"}}"#,
&p,
);
let resp = rx.await.unwrap();
assert!(resp.error.is_some());
assert_eq!(resp.error.unwrap().message, "tool failed");
}
#[tokio::test]
async fn reader_loop_routes_then_marks_dead_and_clears_on_eof() {
let pending: Pending = Arc::new(StdMutex::new(HashMap::new()));
let alive = Arc::new(AtomicBool::new(true));
let (tx, rx) = oneshot::channel();
let (tx2, rx2) = oneshot::channel();
pending.lock().unwrap().insert(1, tx);
pending.lock().unwrap().insert(2, tx2);
let input = b"{\"jsonrpc\":\"2.0\",\"id\":1,\"result\":{\"ok\":true}}\n";
reader_loop(
BufReader::new(&input[..]),
Arc::clone(&pending),
Arc::clone(&alive),
"t".into(),
)
.await;
assert!(rx.await.unwrap().result.is_some(), "id 1 routed");
assert!(!alive.load(Ordering::SeqCst), "EOF marks the session dead");
assert!(pending.lock().unwrap().is_empty(), "waiters swept on EOF");
assert!(rx2.await.is_err());
}
#[tokio::test]
async fn reader_loop_skips_noise_without_desync() {
let pending: Pending = Arc::new(StdMutex::new(HashMap::new()));
let alive = Arc::new(AtomicBool::new(true));
let (tx, rx) = oneshot::channel();
pending.lock().unwrap().insert(5, tx);
let input = b"garbage not json\n\
{\"jsonrpc\":\"2.0\",\"method\":\"notifications/progress\",\"params\":{}}\n\
{\"jsonrpc\":\"2.0\",\"id\":5,\"result\":{\"done\":true}}\n";
reader_loop(
BufReader::new(&input[..]),
Arc::clone(&pending),
alive,
"t".into(),
)
.await;
assert!(rx.await.unwrap().result.is_some(), "id 5 delivered past noise");
}
}