Skip to main content

car_engine/
mcp.rs

1//! MCP (Model Context Protocol) server integration.
2//!
3//! Discovers tools from MCP servers via stdin/stdout JSON-RPC and registers
4//! them into the canonical tool registry. MCP tools participate in the same
5//! capability/permission/policy flow as all other tools.
6
7use serde::{Deserialize, Serialize};
8use serde_json::Value;
9use std::collections::HashMap;
10use std::sync::Arc;
11use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
12use tokio::process::{Child, Command};
13use tokio::sync::Mutex;
14
15/// Configuration for an MCP server.
16#[derive(Debug, Clone, Serialize, Deserialize)]
17pub struct McpServerConfig {
18    /// Display name for this server.
19    pub name: String,
20    /// Command to launch the server.
21    pub command: String,
22    /// Arguments for the command.
23    #[serde(default)]
24    pub args: Vec<String>,
25    /// Environment variables.
26    #[serde(default)]
27    pub env: HashMap<String, String>,
28    /// Working directory.
29    pub cwd: Option<String>,
30}
31
32/// A running MCP server connection.
33pub struct McpServer {
34    config: McpServerConfig,
35    child: Child,
36    stdin: tokio::io::BufWriter<tokio::process::ChildStdin>,
37    stdout: BufReader<tokio::process::ChildStdout>,
38    next_id: u64,
39}
40
41/// An MCP tool discovered from a server.
42#[derive(Debug, Clone, Serialize, Deserialize)]
43pub struct McpToolInfo {
44    pub name: String,
45    pub description: Option<String>,
46    #[serde(rename = "inputSchema")]
47    pub input_schema: Option<Value>,
48}
49
50/// MCP JSON-RPC request.
51#[derive(Debug, Serialize)]
52struct McpRequest {
53    jsonrpc: &'static str,
54    method: String,
55    #[serde(skip_serializing_if = "Option::is_none")]
56    params: Option<Value>,
57    id: u64,
58}
59
60/// MCP JSON-RPC response.
61#[derive(Debug, Deserialize)]
62struct McpResponse {
63    result: Option<Value>,
64    error: Option<McpError>,
65    #[allow(dead_code)]
66    id: Option<u64>,
67}
68
69#[derive(Debug, Deserialize)]
70struct McpError {
71    #[allow(dead_code)]
72    code: Option<i64>,
73    message: String,
74}
75
76impl McpServer {
77    /// Start an MCP server and initialize the connection.
78    pub async fn start(config: McpServerConfig) -> Result<Self, String> {
79        let mut cmd = Command::new(&config.command);
80        cmd.args(&config.args)
81            .stdin(std::process::Stdio::piped())
82            .stdout(std::process::Stdio::piped())
83            .stderr(std::process::Stdio::piped());
84
85        if let Some(ref cwd) = config.cwd {
86            cmd.current_dir(cwd);
87        }
88        for (k, v) in &config.env {
89            cmd.env(k, v);
90        }
91
92        let mut child = cmd
93            .spawn()
94            .map_err(|e| format!("failed to start MCP server '{}': {}", config.name, e))?;
95
96        let stdin = child
97            .stdin
98            .take()
99            .ok_or_else(|| "MCP server has no stdin".to_string())?;
100        let stdout = child
101            .stdout
102            .take()
103            .ok_or_else(|| "MCP server has no stdout".to_string())?;
104
105        let mut server = Self {
106            config,
107            child,
108            stdin: tokio::io::BufWriter::new(stdin),
109            stdout: BufReader::new(stdout),
110            next_id: 1,
111        };
112
113        // Send initialize
114        server
115            .send_request(
116                "initialize",
117                Some(serde_json::json!({
118                    "protocolVersion": "2024-11-05",
119                    "capabilities": {},
120                    "clientInfo": {
121                        "name": "car-runtime",
122                        "version": env!("CARGO_PKG_VERSION")
123                    }
124                })),
125            )
126            .await?;
127
128        // Send initialized notification (no id, per MCP spec)
129        let notification = serde_json::json!({
130            "jsonrpc": "2.0",
131            "method": "notifications/initialized"
132        });
133        let msg =
134            serde_json::to_string(&notification).map_err(|e| format!("serialize error: {e}"))?;
135        server
136            .stdin
137            .write_all(msg.as_bytes())
138            .await
139            .map_err(|e| format!("write error: {e}"))?;
140        server
141            .stdin
142            .write_all(b"\n")
143            .await
144            .map_err(|e| format!("write error: {e}"))?;
145        server
146            .stdin
147            .flush()
148            .await
149            .map_err(|e| format!("flush error: {e}"))?;
150
151        Ok(server)
152    }
153
154    async fn send_request(&mut self, method: &str, params: Option<Value>) -> Result<Value, String> {
155        let id = self.next_id;
156        self.next_id += 1;
157
158        let req = McpRequest {
159            jsonrpc: "2.0",
160            method: method.to_string(),
161            params,
162            id,
163        };
164
165        let msg = serde_json::to_string(&req).map_err(|e| format!("serialize error: {e}"))?;
166
167        self.stdin
168            .write_all(msg.as_bytes())
169            .await
170            .map_err(|e| format!("write to MCP server: {e}"))?;
171        self.stdin
172            .write_all(b"\n")
173            .await
174            .map_err(|e| format!("write newline: {e}"))?;
175        self.stdin
176            .flush()
177            .await
178            .map_err(|e| format!("flush: {e}"))?;
179
180        // Read response line
181        let mut line = String::new();
182        self.stdout
183            .read_line(&mut line)
184            .await
185            .map_err(|e| format!("read from MCP server: {e}"))?;
186
187        let resp: McpResponse = serde_json::from_str(&line)
188            .map_err(|e| format!("invalid MCP response: {e} (raw: {})", line.trim()))?;
189
190        if let Some(err) = resp.error {
191            return Err(format!("MCP error: {}", err.message));
192        }
193
194        resp.result
195            .ok_or_else(|| "MCP server returned no result".to_string())
196    }
197
198    /// Discover tools from this MCP server.
199    pub async fn list_tools(&mut self) -> Result<Vec<McpToolInfo>, String> {
200        let result = self.send_request("tools/list", None).await?;
201        let tools = result
202            .get("tools")
203            .and_then(|t| t.as_array())
204            .cloned()
205            .unwrap_or_default();
206
207        tools
208            .into_iter()
209            .map(|t| serde_json::from_value(t).map_err(|e| format!("invalid tool definition: {e}")))
210            .collect()
211    }
212
213    /// Call a tool on this MCP server.
214    pub async fn call_tool(&mut self, name: &str, arguments: Value) -> Result<Value, String> {
215        let result = self
216            .send_request(
217                "tools/call",
218                Some(serde_json::json!({
219                    "name": name,
220                    "arguments": arguments,
221                })),
222            )
223            .await?;
224
225        // Extract text content from MCP response format
226        if let Some(content) = result.get("content").and_then(|c| c.as_array()) {
227            let texts: Vec<&str> = content
228                .iter()
229                .filter_map(|block| {
230                    if block.get("type").and_then(|t| t.as_str()) == Some("text") {
231                        block.get("text").and_then(|t| t.as_str())
232                    } else {
233                        None
234                    }
235                })
236                .collect();
237            if !texts.is_empty() {
238                return Ok(Value::String(texts.join("\n")));
239            }
240        }
241
242        Ok(result)
243    }
244
245    /// Shut down the MCP server gracefully.
246    pub async fn shutdown(mut self) {
247        let _ = self.stdin.shutdown().await;
248        let _ = self.child.kill().await;
249        let _ = self.child.wait().await;
250    }
251
252    /// Get the server name.
253    pub fn name(&self) -> &str {
254        &self.config.name
255    }
256}
257
258/// MCP tool executor -- routes tool calls to the appropriate MCP server.
259pub struct McpToolExecutor {
260    servers: Arc<Mutex<HashMap<String, Arc<Mutex<McpServer>>>>>,
261    /// Maps tool_name -> server_name for routing.
262    tool_routes: Arc<Mutex<HashMap<String, String>>>,
263    /// Optional fallback for non-MCP tools.
264    fallback: Option<Arc<dyn super::ToolExecutor>>,
265}
266
267impl McpToolExecutor {
268    pub fn new() -> Self {
269        Self {
270            servers: Arc::new(Mutex::new(HashMap::new())),
271            tool_routes: Arc::new(Mutex::new(HashMap::new())),
272            fallback: None,
273        }
274    }
275
276    pub fn with_fallback(mut self, fallback: Arc<dyn super::ToolExecutor>) -> Self {
277        self.fallback = Some(fallback);
278        self
279    }
280
281    /// Add an MCP server and discover its tools.
282    /// Returns the list of discovered tool names (canonical form: `mcp_{server}_{tool}`).
283    pub async fn add_server(&self, mut server: McpServer) -> Result<Vec<String>, String> {
284        let server_name = server.config.name.clone();
285        let tools = server.list_tools().await?;
286
287        let tool_names: Vec<String> = tools
288            .iter()
289            .map(|t| format!("mcp_{}_{}", server_name, t.name))
290            .collect();
291
292        // Register tool routes
293        {
294            let mut routes = self.tool_routes.lock().await;
295            for (info, canonical_name) in tools.iter().zip(tool_names.iter()) {
296                routes.insert(canonical_name.clone(), server_name.clone());
297                // Also register the bare name for convenience
298                routes.insert(info.name.clone(), server_name.clone());
299            }
300        }
301
302        // Store server
303        self.servers
304            .lock()
305            .await
306            .insert(server_name, Arc::new(Mutex::new(server)));
307
308        Ok(tool_names)
309    }
310
311    /// Get tool schemas from all connected MCP servers.
312    pub async fn tool_schemas(&self) -> Vec<(String, car_ir::ToolSchema)> {
313        let mut schemas = Vec::new();
314        let servers = self.servers.lock().await;
315        for (server_name, server) in servers.iter() {
316            let mut srv = server.lock().await;
317            if let Ok(tools) = srv.list_tools().await {
318                for tool in tools {
319                    let canonical_name = format!("mcp_{}_{}", server_name, tool.name);
320                    schemas.push((
321                        server_name.clone(),
322                        car_ir::ToolSchema {
323                            name: canonical_name,
324                            description: tool.description.unwrap_or_default(),
325                            parameters: tool
326                                .input_schema
327                                .unwrap_or(serde_json::json!({"type": "object"})),
328                            returns: None,
329                            idempotent: false,
330                            cache_ttl_secs: None,
331                            rate_limit: None,
332                        },
333                    ));
334                }
335            }
336        }
337        schemas
338    }
339
340    /// Shut down all MCP servers.
341    pub async fn shutdown_all(&self) {
342        let mut servers = self.servers.lock().await;
343        // Dropping the Arc<Mutex<McpServer>> will drop the Child, killing the process.
344        servers.drain();
345    }
346}
347
348impl Default for McpToolExecutor {
349    fn default() -> Self {
350        Self::new()
351    }
352}
353
354#[async_trait::async_trait]
355impl super::ToolExecutor for McpToolExecutor {
356    async fn execute(&self, tool: &str, params: &Value) -> Result<Value, String> {
357        // Find which server handles this tool
358        let server_name = {
359            let routes = self.tool_routes.lock().await;
360            routes.get(tool).cloned()
361        };
362
363        if let Some(server_name) = server_name {
364            let servers = self.servers.lock().await;
365            if let Some(server) = servers.get(&server_name) {
366                let mut srv = server.lock().await;
367                // Strip the mcp_{server}_ prefix to get the bare tool name
368                let bare_name = tool
369                    .strip_prefix(&format!("mcp_{}_", server_name))
370                    .unwrap_or(tool);
371                return srv.call_tool(bare_name, params.clone()).await;
372            }
373        }
374
375        // Fallback
376        if let Some(ref fallback) = self.fallback {
377            return fallback.execute(tool, params).await;
378        }
379
380        Err(format!("unknown MCP tool: '{}'", tool))
381    }
382}