1use std::time::Duration;
19
20use async_trait::async_trait;
21
22use super::{Tool, ToolResult};
23use crate::agent::capability::Capability;
24use crate::agent::driver::ToolDefinition;
25
26#[async_trait]
33pub trait McpTransport: Send + Sync {
34 async fn call_tool(&self, tool_name: &str, input: serde_json::Value) -> Result<String, String>;
36
37 fn server_name(&self) -> &str;
39}
40
41pub struct McpClientTool {
43 server_name: String,
45 tool_name: String,
47 description: String,
49 input_schema: serde_json::Value,
51 transport: Box<dyn McpTransport>,
53 timeout: Duration,
55}
56
57impl McpClientTool {
58 pub fn new(
60 server_name: impl Into<String>,
61 tool_name: impl Into<String>,
62 description: impl Into<String>,
63 input_schema: serde_json::Value,
64 transport: Box<dyn McpTransport>,
65 ) -> Self {
66 Self {
67 server_name: server_name.into(),
68 tool_name: tool_name.into(),
69 description: description.into(),
70 input_schema,
71 transport,
72 timeout: Duration::from_secs(60),
73 }
74 }
75
76 #[must_use]
78 pub fn with_timeout(mut self, timeout: Duration) -> Self {
79 self.timeout = timeout;
80 self
81 }
82
83 fn prefixed_name(&self) -> String {
85 format!("mcp_{}_{}", self.server_name, self.tool_name)
86 }
87}
88
89#[async_trait]
90impl Tool for McpClientTool {
91 fn name(&self) -> &'static str {
92 Box::leak(self.prefixed_name().into_boxed_str())
95 }
96
97 fn definition(&self) -> ToolDefinition {
98 ToolDefinition {
99 name: self.prefixed_name(),
100 description: format!("[MCP:{}] {}", self.server_name, self.description),
101 input_schema: self.input_schema.clone(),
102 }
103 }
104
105 async fn execute(&self, input: serde_json::Value) -> ToolResult {
106 match self.transport.call_tool(&self.tool_name, input).await {
107 Ok(content) => ToolResult::success(content),
108 Err(e) => ToolResult::error(format!(
109 "MCP call to {}:{} failed: {}",
110 self.server_name, self.tool_name, e
111 )),
112 }
113 }
114
115 fn required_capability(&self) -> Capability {
116 Capability::Mcp { server: self.server_name.clone(), tool: self.tool_name.clone() }
117 }
118
119 fn timeout(&self) -> Duration {
120 self.timeout
121 }
122}
123
124pub struct StdioMcpTransport {
134 server: String,
135 command: Vec<String>,
136 env: std::collections::BTreeMap<String, String>,
140}
141
142impl StdioMcpTransport {
143 pub fn new(server: impl Into<String>, command: Vec<String>) -> Self {
149 Self { server: server.into(), command, env: std::collections::BTreeMap::new() }
150 }
151
152 pub fn new_with_env(
159 server: impl Into<String>,
160 command: Vec<String>,
161 env: std::collections::BTreeMap<String, String>,
162 ) -> Self {
163 Self { server: server.into(), command, env }
164 }
165
166 pub fn env(&self) -> &std::collections::BTreeMap<String, String> {
168 &self.env
169 }
170}
171
172#[async_trait]
173impl McpTransport for StdioMcpTransport {
174 async fn call_tool(&self, tool_name: &str, input: serde_json::Value) -> Result<String, String> {
175 let request = serde_json::json!({
176 "jsonrpc": "2.0",
177 "id": 1,
178 "method": "tools/call",
179 "params": {
180 "name": tool_name,
181 "arguments": input,
182 }
183 });
184 let response = self.send_jsonrpc(&request).await?;
185 let result = response.get("result").ok_or("no result in response")?;
186 if let Some(content) = result.get("content") {
188 if let Some(arr) = content.as_array() {
189 let texts: Vec<&str> =
190 arr.iter().filter_map(|c| c.get("text").and_then(|t| t.as_str())).collect();
191 if !texts.is_empty() {
192 return Ok(texts.join("\n"));
193 }
194 }
195 }
196 Ok(serde_json::to_string(result)
197 .unwrap_or_else(|e| format!(r#"{{"error": "serialize: {e}"}}"#)))
198 }
199
200 fn server_name(&self) -> &str {
201 &self.server
202 }
203}
204
205#[derive(Debug, Clone)]
207pub struct DiscoveredTool {
208 pub name: String,
210 pub description: String,
212 pub input_schema: serde_json::Value,
214}
215
216impl StdioMcpTransport {
217 pub async fn discover_tools(&self) -> Result<Vec<DiscoveredTool>, String> {
219 let request = serde_json::json!({
220 "jsonrpc": "2.0",
221 "id": 1,
222 "method": "tools/list",
223 "params": {}
224 });
225 let response = self.send_jsonrpc(&request).await?;
226 let result = response.get("result").ok_or("no result in tools/list response")?;
227 let tools =
228 result.get("tools").and_then(|t| t.as_array()).ok_or("no tools array in response")?;
229 let mut discovered = Vec::new();
230 for tool in tools {
231 let name = tool.get("name").and_then(|n| n.as_str()).unwrap_or("").to_string();
232 let desc = tool.get("description").and_then(|d| d.as_str()).unwrap_or("").to_string();
233 let schema = tool.get("inputSchema").cloned().unwrap_or(serde_json::json!({}));
234 if !name.is_empty() {
235 discovered.push(DiscoveredTool { name, description: desc, input_schema: schema });
236 }
237 }
238 Ok(discovered)
239 }
240
241 async fn send_jsonrpc(&self, request: &serde_json::Value) -> Result<serde_json::Value, String> {
243 if self.command.is_empty() {
244 return Err("stdio transport: empty command".into());
245 }
246 let request_str =
247 serde_json::to_string(request).map_err(|e| format!("serialize request: {e}"))?;
248 let mut cmd = tokio::process::Command::new(&self.command[0]);
249 cmd.args(&self.command[1..])
250 .stdin(std::process::Stdio::piped())
251 .stdout(std::process::Stdio::piped())
252 .stderr(std::process::Stdio::piped())
253 .kill_on_drop(true);
254 for (k, v) in &self.env {
257 cmd.env(k, v);
258 }
259 let mut child = cmd.spawn().map_err(|e| format!("spawn {}: {e}", self.command[0]))?;
260 if let Some(mut stdin) = child.stdin.take() {
261 use tokio::io::AsyncWriteExt;
262 stdin
263 .write_all(request_str.as_bytes())
264 .await
265 .map_err(|e| format!("write stdin: {e}"))?;
266 stdin.write_all(b"\n").await.map_err(|e| format!("write newline: {e}"))?;
267 drop(stdin);
268 }
269 let result = child.wait_with_output().await.map_err(|e| format!("wait: {e}"))?;
270 if !result.status.success() {
271 let stderr = String::from_utf8_lossy(&result.stderr);
272 return Err(format!("process exited {}: {}", result.status, stderr.trim()));
273 }
274 let stdout = String::from_utf8_lossy(&result.stdout);
275 let response: serde_json::Value =
276 serde_json::from_str(stdout.trim()).map_err(|e| format!("parse response: {e}"))?;
277 if let Some(error) = response.get("error") {
278 let msg = error.get("message").and_then(|m| m.as_str()).unwrap_or("unknown error");
279 return Err(msg.to_string());
280 }
281 Ok(response)
282 }
283}
284
285#[cfg(feature = "agents-mcp")]
291pub async fn discover_mcp_tools(
292 manifest: &crate::agent::manifest::AgentManifest,
293) -> Vec<McpClientTool> {
294 use crate::agent::manifest::McpTransport;
295 use std::sync::Arc;
296
297 let mut tools = Vec::new();
298 for server in &manifest.mcp_servers {
299 if !matches!(server.transport, McpTransport::Stdio) {
300 continue;
301 }
302 let transport = Arc::new(StdioMcpTransport::new_with_env(
305 &server.name,
306 server.command.clone(),
307 server.env.clone(),
308 ));
309 let discovered = match transport.discover_tools().await {
310 Ok(d) => d,
311 Err(e) => {
312 tracing::warn!(
313 server = %server.name,
314 error = %e,
315 "MCP tool discovery failed"
316 );
317 continue;
318 }
319 };
320 for tool_info in discovered {
321 let allowed = server.capabilities.iter().any(|c| c == "*" || c == &tool_info.name);
322 if !allowed {
323 tracing::debug!(
324 server = %server.name,
325 tool = %tool_info.name,
326 "MCP tool not in capabilities, skipping"
327 );
328 continue;
329 }
330 tools.push(McpClientTool::new(
331 &server.name,
332 &tool_info.name,
333 &tool_info.description,
334 tool_info.input_schema,
335 Box::new(SharedTransport(Arc::clone(&transport))),
336 ));
337 }
338 }
339 tools
340}
341
342#[cfg(feature = "agents-mcp")]
344struct SharedTransport(std::sync::Arc<StdioMcpTransport>);
345
346#[cfg(feature = "agents-mcp")]
347#[async_trait]
348impl McpTransport for SharedTransport {
349 async fn call_tool(&self, tool_name: &str, input: serde_json::Value) -> Result<String, String> {
350 self.0.call_tool(tool_name, input).await
351 }
352 fn server_name(&self) -> &str {
353 self.0.server_name()
354 }
355}
356
357pub struct MockMcpTransport {
359 server: String,
360 responses: std::sync::Mutex<Vec<Result<String, String>>>,
361}
362
363impl MockMcpTransport {
364 pub fn new(server: impl Into<String>, responses: Vec<Result<String, String>>) -> Self {
366 Self { server: server.into(), responses: std::sync::Mutex::new(responses) }
367 }
368}
369
370#[async_trait]
371impl McpTransport for MockMcpTransport {
372 async fn call_tool(
373 &self,
374 _tool_name: &str,
375 _input: serde_json::Value,
376 ) -> Result<String, String> {
377 let mut responses = self.responses.lock().expect("mock transport lock");
378 if responses.is_empty() {
379 Err("mock transport exhausted".into())
380 } else {
381 responses.remove(0)
382 }
383 }
384
385 fn server_name(&self) -> &str {
386 &self.server
387 }
388}
389
390#[cfg(test)]
391#[path = "mcp_client_tests.rs"]
392mod tests;