Skip to main content

batuta/agent/tool/
mcp_client.rs

1//! MCP Client Tool — wraps external MCP server tools.
2//!
3//! Each `McpClientTool` represents a single tool discovered from
4//! an external MCP server. The tool proxies execute calls through
5//! an `McpTransport` trait, which abstracts over stdio/SSE/HTTP.
6//!
7//! # Privacy Enforcement (Poka-Yoke)
8//!
9//! MCP servers are subject to `PrivacyTier` rules:
10//! - **Sovereign**: Only `stdio` transport allowed (local process)
11//! - **Private/Standard**: All transports allowed
12//!
13//! # References
14//!
15//! - arXiv:2505.02279 — MCP interoperability survey
16//! - arXiv:2503.23278 — MCP security analysis
17
18use std::time::Duration;
19
20use async_trait::async_trait;
21
22use super::{Tool, ToolResult};
23use crate::agent::capability::Capability;
24use crate::agent::driver::ToolDefinition;
25
26/// Transport abstraction for MCP server communication.
27///
28/// Separates the tool from the transport layer so that:
29/// - Tests use `MockMcpTransport`
30/// - Production uses `StdioMcpTransport` (Phase 2: `pmcp::Client` v2.3)
31/// - Future: SSE / WebSocket transports (both available in pmcp v2.3)
32#[async_trait]
33pub trait McpTransport: Send + Sync {
34    /// Call a tool on the MCP server.
35    async fn call_tool(&self, tool_name: &str, input: serde_json::Value) -> Result<String, String>;
36
37    /// Server name for capability matching.
38    fn server_name(&self) -> &str;
39}
40
41/// MCP client tool that proxies calls to an external MCP server.
42pub struct McpClientTool {
43    /// MCP server name (for capability matching).
44    server_name: String,
45    /// Tool name on the MCP server.
46    tool_name: String,
47    /// Tool description.
48    description: String,
49    /// JSON Schema for tool input.
50    input_schema: serde_json::Value,
51    /// Transport for calling the MCP server.
52    transport: Box<dyn McpTransport>,
53    /// Execution timeout.
54    timeout: Duration,
55}
56
57impl McpClientTool {
58    /// Create a new MCP client tool.
59    pub fn new(
60        server_name: impl Into<String>,
61        tool_name: impl Into<String>,
62        description: impl Into<String>,
63        input_schema: serde_json::Value,
64        transport: Box<dyn McpTransport>,
65    ) -> Self {
66        Self {
67            server_name: server_name.into(),
68            tool_name: tool_name.into(),
69            description: description.into(),
70            input_schema,
71            transport,
72            timeout: Duration::from_secs(60),
73        }
74    }
75
76    /// Set the execution timeout.
77    #[must_use]
78    pub fn with_timeout(mut self, timeout: Duration) -> Self {
79        self.timeout = timeout;
80        self
81    }
82
83    /// The prefixed tool name: `mcp_{server}_{tool}`.
84    fn prefixed_name(&self) -> String {
85        format!("mcp_{}_{}", self.server_name, self.tool_name)
86    }
87}
88
89#[async_trait]
90impl Tool for McpClientTool {
91    fn name(&self) -> &'static str {
92        // Leak the name to get 'static lifetime.
93        // This is safe because tool names live for the process.
94        Box::leak(self.prefixed_name().into_boxed_str())
95    }
96
97    fn definition(&self) -> ToolDefinition {
98        ToolDefinition {
99            name: self.prefixed_name(),
100            description: format!("[MCP:{}] {}", self.server_name, self.description),
101            input_schema: self.input_schema.clone(),
102        }
103    }
104
105    async fn execute(&self, input: serde_json::Value) -> ToolResult {
106        match self.transport.call_tool(&self.tool_name, input).await {
107            Ok(content) => ToolResult::success(content),
108            Err(e) => ToolResult::error(format!(
109                "MCP call to {}:{} failed: {}",
110                self.server_name, self.tool_name, e
111            )),
112        }
113    }
114
115    fn required_capability(&self) -> Capability {
116        Capability::Mcp { server: self.server_name.clone(), tool: self.tool_name.clone() }
117    }
118
119    fn timeout(&self) -> Duration {
120        self.timeout
121    }
122}
123
124/// Stdio MCP transport — launches a subprocess and communicates via stdin/stdout.
125///
126/// The subprocess is expected to speak JSON-RPC 2.0 with MCP tools/call messages.
127/// Each `call_tool` sends a request line and reads a response line.
128///
129/// # Privacy
130///
131/// This transport is allowed in Sovereign tier because the subprocess
132/// runs locally (no network egress).
133pub struct StdioMcpTransport {
134    server: String,
135    command: Vec<String>,
136    /// Environment variables to set in the spawned subprocess.
137    /// PMAT-CODE-MCP-ENV-001: threaded from `McpServerConfig.env`.
138    /// Empty map = inherit parent env unchanged.
139    env: std::collections::BTreeMap<String, String>,
140}
141
142impl StdioMcpTransport {
143    /// Create a stdio transport for the given server.
144    ///
145    /// `command` is the full command line (e.g., `["node", "server.js"]`).
146    /// Process env is inherited from the parent unchanged. Use
147    /// [`Self::new_with_env`] to set per-server overrides.
148    pub fn new(server: impl Into<String>, command: Vec<String>) -> Self {
149        Self { server: server.into(), command, env: std::collections::BTreeMap::new() }
150    }
151
152    /// Create a stdio transport with explicit environment variables
153    /// applied to the spawned subprocess (PMAT-CODE-MCP-ENV-001).
154    ///
155    /// Each `(key, value)` is set with `Command::env`. The parent's
156    /// existing env is inherited; overrides go on top. Empty map =
157    /// equivalent to [`Self::new`].
158    pub fn new_with_env(
159        server: impl Into<String>,
160        command: Vec<String>,
161        env: std::collections::BTreeMap<String, String>,
162    ) -> Self {
163        Self { server: server.into(), command, env }
164    }
165
166    /// Read-only access to the env map (for tests + observability).
167    pub fn env(&self) -> &std::collections::BTreeMap<String, String> {
168        &self.env
169    }
170}
171
172#[async_trait]
173impl McpTransport for StdioMcpTransport {
174    async fn call_tool(&self, tool_name: &str, input: serde_json::Value) -> Result<String, String> {
175        let request = serde_json::json!({
176            "jsonrpc": "2.0",
177            "id": 1,
178            "method": "tools/call",
179            "params": {
180                "name": tool_name,
181                "arguments": input,
182            }
183        });
184        let response = self.send_jsonrpc(&request).await?;
185        let result = response.get("result").ok_or("no result in response")?;
186        // MCP tools/call returns { content: [{ text: "..." }] }
187        if let Some(content) = result.get("content") {
188            if let Some(arr) = content.as_array() {
189                let texts: Vec<&str> =
190                    arr.iter().filter_map(|c| c.get("text").and_then(|t| t.as_str())).collect();
191                if !texts.is_empty() {
192                    return Ok(texts.join("\n"));
193                }
194            }
195        }
196        Ok(serde_json::to_string(result)
197            .unwrap_or_else(|e| format!(r#"{{"error": "serialize: {e}"}}"#)))
198    }
199
200    fn server_name(&self) -> &str {
201        &self.server
202    }
203}
204
205/// Discovered tool info from MCP `tools/list`.
206#[derive(Debug, Clone)]
207pub struct DiscoveredTool {
208    /// Tool name on the MCP server.
209    pub name: String,
210    /// Human-readable description.
211    pub description: String,
212    /// JSON Schema for input parameters.
213    pub input_schema: serde_json::Value,
214}
215
216impl StdioMcpTransport {
217    /// Discover available tools via MCP `tools/list`.
218    pub async fn discover_tools(&self) -> Result<Vec<DiscoveredTool>, String> {
219        let request = serde_json::json!({
220            "jsonrpc": "2.0",
221            "id": 1,
222            "method": "tools/list",
223            "params": {}
224        });
225        let response = self.send_jsonrpc(&request).await?;
226        let result = response.get("result").ok_or("no result in tools/list response")?;
227        let tools =
228            result.get("tools").and_then(|t| t.as_array()).ok_or("no tools array in response")?;
229        let mut discovered = Vec::new();
230        for tool in tools {
231            let name = tool.get("name").and_then(|n| n.as_str()).unwrap_or("").to_string();
232            let desc = tool.get("description").and_then(|d| d.as_str()).unwrap_or("").to_string();
233            let schema = tool.get("inputSchema").cloned().unwrap_or(serde_json::json!({}));
234            if !name.is_empty() {
235                discovered.push(DiscoveredTool { name, description: desc, input_schema: schema });
236            }
237        }
238        Ok(discovered)
239    }
240
241    /// Send a JSON-RPC request and return the parsed response.
242    async fn send_jsonrpc(&self, request: &serde_json::Value) -> Result<serde_json::Value, String> {
243        if self.command.is_empty() {
244            return Err("stdio transport: empty command".into());
245        }
246        let request_str =
247            serde_json::to_string(request).map_err(|e| format!("serialize request: {e}"))?;
248        let mut cmd = tokio::process::Command::new(&self.command[0]);
249        cmd.args(&self.command[1..])
250            .stdin(std::process::Stdio::piped())
251            .stdout(std::process::Stdio::piped())
252            .stderr(std::process::Stdio::piped())
253            .kill_on_drop(true);
254        // PMAT-CODE-MCP-ENV-001: layer env overrides on top of inherited
255        // parent env. Each (k, v) wins on collision; empty map = no-op.
256        for (k, v) in &self.env {
257            cmd.env(k, v);
258        }
259        let mut child = cmd.spawn().map_err(|e| format!("spawn {}: {e}", self.command[0]))?;
260        if let Some(mut stdin) = child.stdin.take() {
261            use tokio::io::AsyncWriteExt;
262            stdin
263                .write_all(request_str.as_bytes())
264                .await
265                .map_err(|e| format!("write stdin: {e}"))?;
266            stdin.write_all(b"\n").await.map_err(|e| format!("write newline: {e}"))?;
267            drop(stdin);
268        }
269        let result = child.wait_with_output().await.map_err(|e| format!("wait: {e}"))?;
270        if !result.status.success() {
271            let stderr = String::from_utf8_lossy(&result.stderr);
272            return Err(format!("process exited {}: {}", result.status, stderr.trim()));
273        }
274        let stdout = String::from_utf8_lossy(&result.stdout);
275        let response: serde_json::Value =
276            serde_json::from_str(stdout.trim()).map_err(|e| format!("parse response: {e}"))?;
277        if let Some(error) = response.get("error") {
278            let msg = error.get("message").and_then(|m| m.as_str()).unwrap_or("unknown error");
279            return Err(msg.to_string());
280        }
281        Ok(response)
282    }
283}
284
285/// Discover and register MCP tools from manifest config.
286///
287/// For each `mcp_server` in the manifest with `stdio` transport,
288/// launches the subprocess, calls `tools/list`, and wraps each
289/// discovered tool as an `McpClientTool`.
290#[cfg(feature = "agents-mcp")]
291pub async fn discover_mcp_tools(
292    manifest: &crate::agent::manifest::AgentManifest,
293) -> Vec<McpClientTool> {
294    use crate::agent::manifest::McpTransport;
295    use std::sync::Arc;
296
297    let mut tools = Vec::new();
298    for server in &manifest.mcp_servers {
299        if !matches!(server.transport, McpTransport::Stdio) {
300            continue;
301        }
302        // PMAT-CODE-MCP-ENV-001: pass env from server config so .mcp.json
303        // `env` field is honored at spawn time (was previously discarded).
304        let transport = Arc::new(StdioMcpTransport::new_with_env(
305            &server.name,
306            server.command.clone(),
307            server.env.clone(),
308        ));
309        let discovered = match transport.discover_tools().await {
310            Ok(d) => d,
311            Err(e) => {
312                tracing::warn!(
313                    server = %server.name,
314                    error = %e,
315                    "MCP tool discovery failed"
316                );
317                continue;
318            }
319        };
320        for tool_info in discovered {
321            let allowed = server.capabilities.iter().any(|c| c == "*" || c == &tool_info.name);
322            if !allowed {
323                tracing::debug!(
324                    server = %server.name,
325                    tool = %tool_info.name,
326                    "MCP tool not in capabilities, skipping"
327                );
328                continue;
329            }
330            tools.push(McpClientTool::new(
331                &server.name,
332                &tool_info.name,
333                &tool_info.description,
334                tool_info.input_schema,
335                Box::new(SharedTransport(Arc::clone(&transport))),
336            ));
337        }
338    }
339    tools
340}
341
342/// Wrapper to share an `Arc<StdioMcpTransport>` as `Box<dyn McpTransport>`.
343#[cfg(feature = "agents-mcp")]
344struct SharedTransport(std::sync::Arc<StdioMcpTransport>);
345
346#[cfg(feature = "agents-mcp")]
347#[async_trait]
348impl McpTransport for SharedTransport {
349    async fn call_tool(&self, tool_name: &str, input: serde_json::Value) -> Result<String, String> {
350        self.0.call_tool(tool_name, input).await
351    }
352    fn server_name(&self) -> &str {
353        self.0.server_name()
354    }
355}
356
357/// Mock MCP transport for testing.
358pub struct MockMcpTransport {
359    server: String,
360    responses: std::sync::Mutex<Vec<Result<String, String>>>,
361}
362
363impl MockMcpTransport {
364    /// Create a mock transport with pre-configured responses.
365    pub fn new(server: impl Into<String>, responses: Vec<Result<String, String>>) -> Self {
366        Self { server: server.into(), responses: std::sync::Mutex::new(responses) }
367    }
368}
369
370#[async_trait]
371impl McpTransport for MockMcpTransport {
372    async fn call_tool(
373        &self,
374        _tool_name: &str,
375        _input: serde_json::Value,
376    ) -> Result<String, String> {
377        let mut responses = self.responses.lock().expect("mock transport lock");
378        if responses.is_empty() {
379            Err("mock transport exhausted".into())
380        } else {
381            responses.remove(0)
382        }
383    }
384
385    fn server_name(&self) -> &str {
386        &self.server
387    }
388}
389
390#[cfg(test)]
391#[path = "mcp_client_tests.rs"]
392mod tests;