1use serde::{Deserialize, Serialize};
8use serde_json::Value;
9use std::collections::HashMap;
10use std::sync::Arc;
11use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
12use tokio::process::{Child, Command};
13use tokio::sync::Mutex;
14
15#[derive(Debug, Clone, Serialize, Deserialize)]
17pub struct McpServerConfig {
18 pub name: String,
20 pub command: String,
22 #[serde(default)]
24 pub args: Vec<String>,
25 #[serde(default)]
27 pub env: HashMap<String, String>,
28 pub cwd: Option<String>,
30}
31
32pub struct McpServer {
34 config: McpServerConfig,
35 child: Child,
36 stdin: tokio::io::BufWriter<tokio::process::ChildStdin>,
37 stdout: BufReader<tokio::process::ChildStdout>,
38 next_id: u64,
39}
40
41#[derive(Debug, Clone, Serialize, Deserialize)]
43pub struct McpToolInfo {
44 pub name: String,
45 pub description: Option<String>,
46 #[serde(rename = "inputSchema")]
47 pub input_schema: Option<Value>,
48}
49
50#[derive(Debug, Serialize)]
52struct McpRequest {
53 jsonrpc: &'static str,
54 method: String,
55 #[serde(skip_serializing_if = "Option::is_none")]
56 params: Option<Value>,
57 id: u64,
58}
59
60#[derive(Debug, Deserialize)]
62struct McpResponse {
63 result: Option<Value>,
64 error: Option<McpError>,
65 #[allow(dead_code)]
66 id: Option<u64>,
67}
68
69#[derive(Debug, Deserialize)]
70struct McpError {
71 #[allow(dead_code)]
72 code: Option<i64>,
73 message: String,
74}
75
76impl McpServer {
77 pub async fn start(config: McpServerConfig) -> Result<Self, String> {
79 let mut cmd = Command::new(&config.command);
80 cmd.args(&config.args)
81 .stdin(std::process::Stdio::piped())
82 .stdout(std::process::Stdio::piped())
83 .stderr(std::process::Stdio::piped());
84
85 if let Some(ref cwd) = config.cwd {
86 cmd.current_dir(cwd);
87 }
88 for (k, v) in &config.env {
89 cmd.env(k, v);
90 }
91
92 let mut child = cmd
93 .spawn()
94 .map_err(|e| format!("failed to start MCP server '{}': {}", config.name, e))?;
95
96 let stdin = child
97 .stdin
98 .take()
99 .ok_or_else(|| "MCP server has no stdin".to_string())?;
100 let stdout = child
101 .stdout
102 .take()
103 .ok_or_else(|| "MCP server has no stdout".to_string())?;
104
105 let mut server = Self {
106 config,
107 child,
108 stdin: tokio::io::BufWriter::new(stdin),
109 stdout: BufReader::new(stdout),
110 next_id: 1,
111 };
112
113 server
115 .send_request(
116 "initialize",
117 Some(serde_json::json!({
118 "protocolVersion": "2024-11-05",
119 "capabilities": {},
120 "clientInfo": {
121 "name": "car-runtime",
122 "version": env!("CARGO_PKG_VERSION")
123 }
124 })),
125 )
126 .await?;
127
128 let notification = serde_json::json!({
130 "jsonrpc": "2.0",
131 "method": "notifications/initialized"
132 });
133 let msg =
134 serde_json::to_string(¬ification).map_err(|e| format!("serialize error: {e}"))?;
135 server
136 .stdin
137 .write_all(msg.as_bytes())
138 .await
139 .map_err(|e| format!("write error: {e}"))?;
140 server
141 .stdin
142 .write_all(b"\n")
143 .await
144 .map_err(|e| format!("write error: {e}"))?;
145 server
146 .stdin
147 .flush()
148 .await
149 .map_err(|e| format!("flush error: {e}"))?;
150
151 Ok(server)
152 }
153
154 async fn send_request(&mut self, method: &str, params: Option<Value>) -> Result<Value, String> {
155 let id = self.next_id;
156 self.next_id += 1;
157
158 let req = McpRequest {
159 jsonrpc: "2.0",
160 method: method.to_string(),
161 params,
162 id,
163 };
164
165 let msg = serde_json::to_string(&req).map_err(|e| format!("serialize error: {e}"))?;
166
167 self.stdin
168 .write_all(msg.as_bytes())
169 .await
170 .map_err(|e| format!("write to MCP server: {e}"))?;
171 self.stdin
172 .write_all(b"\n")
173 .await
174 .map_err(|e| format!("write newline: {e}"))?;
175 self.stdin
176 .flush()
177 .await
178 .map_err(|e| format!("flush: {e}"))?;
179
180 let mut line = String::new();
182 self.stdout
183 .read_line(&mut line)
184 .await
185 .map_err(|e| format!("read from MCP server: {e}"))?;
186
187 let resp: McpResponse = serde_json::from_str(&line)
188 .map_err(|e| format!("invalid MCP response: {e} (raw: {})", line.trim()))?;
189
190 if let Some(err) = resp.error {
191 return Err(format!("MCP error: {}", err.message));
192 }
193
194 resp.result
195 .ok_or_else(|| "MCP server returned no result".to_string())
196 }
197
198 pub async fn list_tools(&mut self) -> Result<Vec<McpToolInfo>, String> {
200 let result = self.send_request("tools/list", None).await?;
201 let tools = result
202 .get("tools")
203 .and_then(|t| t.as_array())
204 .cloned()
205 .unwrap_or_default();
206
207 tools
208 .into_iter()
209 .map(|t| serde_json::from_value(t).map_err(|e| format!("invalid tool definition: {e}")))
210 .collect()
211 }
212
213 pub async fn call_tool(&mut self, name: &str, arguments: Value) -> Result<Value, String> {
215 let result = self
216 .send_request(
217 "tools/call",
218 Some(serde_json::json!({
219 "name": name,
220 "arguments": arguments,
221 })),
222 )
223 .await?;
224
225 if let Some(content) = result.get("content").and_then(|c| c.as_array()) {
227 let texts: Vec<&str> = content
228 .iter()
229 .filter_map(|block| {
230 if block.get("type").and_then(|t| t.as_str()) == Some("text") {
231 block.get("text").and_then(|t| t.as_str())
232 } else {
233 None
234 }
235 })
236 .collect();
237 if !texts.is_empty() {
238 return Ok(Value::String(texts.join("\n")));
239 }
240 }
241
242 Ok(result)
243 }
244
245 pub async fn shutdown(mut self) {
247 let _ = self.stdin.shutdown().await;
248 let _ = self.child.kill().await;
249 let _ = self.child.wait().await;
250 }
251
252 pub fn name(&self) -> &str {
254 &self.config.name
255 }
256}
257
258pub struct McpToolExecutor {
260 servers: Arc<Mutex<HashMap<String, Arc<Mutex<McpServer>>>>>,
261 tool_routes: Arc<Mutex<HashMap<String, String>>>,
263 fallback: Option<Arc<dyn super::ToolExecutor>>,
265}
266
267impl McpToolExecutor {
268 pub fn new() -> Self {
269 Self {
270 servers: Arc::new(Mutex::new(HashMap::new())),
271 tool_routes: Arc::new(Mutex::new(HashMap::new())),
272 fallback: None,
273 }
274 }
275
276 pub fn with_fallback(mut self, fallback: Arc<dyn super::ToolExecutor>) -> Self {
277 self.fallback = Some(fallback);
278 self
279 }
280
281 pub async fn add_server(&self, mut server: McpServer) -> Result<Vec<String>, String> {
284 let server_name = server.config.name.clone();
285 let tools = server.list_tools().await?;
286
287 let tool_names: Vec<String> = tools
288 .iter()
289 .map(|t| format!("mcp_{}_{}", server_name, t.name))
290 .collect();
291
292 {
294 let mut routes = self.tool_routes.lock().await;
295 for (info, canonical_name) in tools.iter().zip(tool_names.iter()) {
296 routes.insert(canonical_name.clone(), server_name.clone());
297 routes.insert(info.name.clone(), server_name.clone());
299 }
300 }
301
302 self.servers
304 .lock()
305 .await
306 .insert(server_name, Arc::new(Mutex::new(server)));
307
308 Ok(tool_names)
309 }
310
311 pub async fn tool_schemas(&self) -> Vec<(String, car_ir::ToolSchema)> {
313 let mut schemas = Vec::new();
314 let servers = self.servers.lock().await;
315 for (server_name, server) in servers.iter() {
316 let mut srv = server.lock().await;
317 if let Ok(tools) = srv.list_tools().await {
318 for tool in tools {
319 let canonical_name = format!("mcp_{}_{}", server_name, tool.name);
320 schemas.push((
321 server_name.clone(),
322 car_ir::ToolSchema {
323 name: canonical_name,
324 description: tool.description.unwrap_or_default(),
325 parameters: tool
326 .input_schema
327 .unwrap_or(serde_json::json!({"type": "object"})),
328 returns: None,
329 idempotent: false,
330 cache_ttl_secs: None,
331 rate_limit: None,
332 },
333 ));
334 }
335 }
336 }
337 schemas
338 }
339
340 pub async fn shutdown_all(&self) {
342 let mut servers = self.servers.lock().await;
343 servers.drain();
345 }
346}
347
348impl Default for McpToolExecutor {
349 fn default() -> Self {
350 Self::new()
351 }
352}
353
354#[async_trait::async_trait]
355impl super::ToolExecutor for McpToolExecutor {
356 async fn execute(&self, tool: &str, params: &Value) -> Result<Value, String> {
357 let server_name = {
359 let routes = self.tool_routes.lock().await;
360 routes.get(tool).cloned()
361 };
362
363 if let Some(server_name) = server_name {
364 let servers = self.servers.lock().await;
365 if let Some(server) = servers.get(&server_name) {
366 let mut srv = server.lock().await;
367 let bare_name = tool
369 .strip_prefix(&format!("mcp_{}_", server_name))
370 .unwrap_or(tool);
371 return srv.call_tool(bare_name, params.clone()).await;
372 }
373 }
374
375 if let Some(ref fallback) = self.fallback {
377 return fallback.execute(tool, params).await;
378 }
379
380 Err(format!("unknown MCP tool: '{}'", tool))
381 }
382}