1use crate::error::Result;
4use crate::impl_tool_factory;
5use crate::tools::{Tool, ToolCall, ToolExample, ToolResult};
6use async_trait::async_trait;
7use serde_json::{json, Value};
8use std::collections::HashMap;
9use std::process::Stdio;
10use std::sync::Arc;
11use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
12use tokio::process::{Child, Command};
13use tokio::sync::Mutex;
14use tokio::time::{timeout, Duration};
15
16#[derive(Debug, Clone)]
18pub struct McpServerConfig {
19 pub name: String,
20 pub command: Vec<String>,
21 pub args: Vec<String>,
22 pub env: HashMap<String, String>,
23 pub timeout_seconds: u64,
24}
25
26pub struct McpServer {
28 config: McpServerConfig,
29 process: Option<Child>,
30 request_id: Arc<std::sync::Mutex<u64>>,
31 started: bool,
32}
33
34impl McpServer {
35 pub fn new(config: McpServerConfig) -> Self {
36 Self {
37 config,
38 process: None,
39 request_id: Arc::new(std::sync::Mutex::new(0)),
40 started: false,
41 }
42 }
43
44 pub async fn start(&mut self) -> Result<()> {
46 if self.started {
47 return Ok(());
48 }
49
50 let mut cmd = Command::new(&self.config.command[0]);
51 if self.config.command.len() > 1 {
52 cmd.args(&self.config.command[1..]);
53 }
54 cmd.args(&self.config.args);
55
56 for (key, value) in &self.config.env {
58 cmd.env(key, value);
59 }
60
61 cmd.stdin(Stdio::piped())
62 .stdout(Stdio::piped())
63 .stderr(Stdio::piped());
64
65 self.process = Some(cmd.spawn()?);
66 self.started = true;
67
68 self.initialize().await?;
70
71 Ok(())
72 }
73
74 pub fn stop(&mut self) {
76 if let Some(mut process) = self.process.take() {
77 std::mem::drop(process.kill());
78 }
79 self.started = false;
80 }
81
82 async fn initialize(&mut self) -> Result<()> {
84 let init_request = json!({
85 "jsonrpc": "2.0",
86 "id": self.next_request_id(),
87 "method": "initialize",
88 "params": {
89 "protocolVersion": "2024-11-05",
90 "capabilities": {
91 "tools": {}
92 },
93 "clientInfo": {
94 "name": "coro",
95 "version": "0.1.0"
96 }
97 }
98 });
99
100 self.send_request(init_request).await?;
101 Ok(())
102 }
103
104 fn next_request_id(&self) -> u64 {
106 let mut id = self.request_id.lock().unwrap();
107 *id += 1;
108 *id
109 }
110
111 async fn send_request(&mut self, request: Value) -> Result<Value> {
113 if !self.started || self.process.is_none() {
114 return Err("MCP server not started".into());
115 }
116
117 let process = self.process.as_mut().unwrap();
118
119 if let Some(stdin) = process.stdin.as_mut() {
121 let request_str = serde_json::to_string(&request)?;
122 stdin.write_all(request_str.as_bytes()).await?;
123 stdin.write_all(b"\n").await?;
124 stdin.flush().await?;
125 } else {
126 return Err("No stdin available for MCP server".into());
127 }
128
129 let response = timeout(
131 Duration::from_secs(self.config.timeout_seconds),
132 self.read_response(),
133 )
134 .await??;
135
136 Ok(response)
137 }
138
139 async fn read_response(&mut self) -> Result<Value> {
141 if let Some(process) = self.process.as_mut() {
142 if let Some(stdout) = process.stdout.as_mut() {
143 let mut reader = BufReader::new(stdout);
144 let mut line = String::new();
145 reader.read_line(&mut line).await?;
146
147 if line.trim().is_empty() {
148 return Err("Empty response from MCP server".into());
149 }
150
151 let response: Value = serde_json::from_str(line.trim())?;
152 Ok(response)
153 } else {
154 Err("No stdout available for MCP server".into())
155 }
156 } else {
157 Err("MCP server process not available".into())
158 }
159 }
160
161 pub async fn list_tools(&mut self) -> Result<Vec<Value>> {
163 let request = json!({
164 "jsonrpc": "2.0",
165 "id": self.next_request_id(),
166 "method": "tools/list"
167 });
168
169 let response = self.send_request(request).await?;
170
171 if let Some(result) = response.get("result") {
172 if let Some(tools) = result.get("tools") {
173 if let Some(tools_array) = tools.as_array() {
174 return Ok(tools_array.clone());
175 }
176 }
177 }
178
179 Ok(Vec::new())
180 }
181
182 pub async fn call_tool(&mut self, tool_name: &str, arguments: Value) -> Result<Value> {
184 let request = json!({
185 "jsonrpc": "2.0",
186 "id": self.next_request_id(),
187 "method": "tools/call",
188 "params": {
189 "name": tool_name,
190 "arguments": arguments
191 }
192 });
193
194 let response = self.send_request(request).await?;
195
196 if let Some(error) = response.get("error") {
197 return Err(format!("MCP tool error: {}", error).into());
198 }
199
200 if let Some(result) = response.get("result") {
201 return Ok(result.clone());
202 }
203
204 Err("No result in MCP response".into())
205 }
206}
207
208pub struct McpTool {
210 servers: Arc<Mutex<HashMap<String, McpServer>>>,
211}
212
213impl Default for McpTool {
214 fn default() -> Self {
215 Self::new()
216 }
217}
218
219impl McpTool {
220 pub fn new() -> Self {
221 Self {
222 servers: Arc::new(Mutex::new(HashMap::new())),
223 }
224 }
225}
226
227#[async_trait]
228impl Tool for McpTool {
229 fn name(&self) -> &str {
230 "mcp_tool"
231 }
232
233 fn description(&self) -> &str {
234 "Tool for interacting with MCP (Model Context Protocol) servers\n\
235 * Manages connections to external MCP servers\n\
236 * Provides access to tools exposed by MCP servers\n\
237 * Supports server lifecycle management (start, stop, restart)\n\
238 * Handles JSON-RPC communication with MCP servers\n\
239 \n\
240 Operations:\n\
241 - `start_server`: Start an MCP server with given configuration\n\
242 - `stop_server`: Stop a running MCP server\n\
243 - `list_servers`: List all configured MCP servers\n\
244 - `list_tools`: List tools available from a specific MCP server\n\
245 - `call_tool`: Call a tool on a specific MCP server\n\
246 \n\
247 MCP servers are external processes that expose tools and resources\n\
248 through the Model Context Protocol. This allows integration with\n\
249 various external systems and services."
250 }
251
252 fn parameters_schema(&self) -> serde_json::Value {
253 json!({
254 "type": "object",
255 "properties": {
256 "operation": {
257 "type": "string",
258 "enum": ["start_server", "stop_server", "list_servers", "list_tools", "call_tool"],
259 "description": "The operation to perform"
260 },
261 "server_name": {
262 "type": "string",
263 "description": "Name of the MCP server (required for most operations)"
264 },
265 "command": {
266 "type": "array",
267 "items": {"type": "string"},
268 "description": "Command to start the MCP server (required for start_server)"
269 },
270 "args": {
271 "type": "array",
272 "items": {"type": "string"},
273 "description": "Arguments for the MCP server command"
274 },
275 "env": {
276 "type": "object",
277 "description": "Environment variables for the MCP server"
278 },
279 "timeout_seconds": {
280 "type": "integer",
281 "description": "Timeout for MCP server operations in seconds (default: 30)"
282 },
283 "tool_name": {
284 "type": "string",
285 "description": "Name of the tool to call (required for call_tool)"
286 },
287 "tool_arguments": {
288 "type": "object",
289 "description": "Arguments to pass to the tool (required for call_tool)"
290 }
291 },
292 "required": ["operation"]
293 })
294 }
295
296 async fn execute(&self, call: ToolCall) -> Result<ToolResult> {
297 let operation: String = call.get_parameter("operation")?;
298
299 match operation.as_str() {
300 "start_server" => {
301 let server_name: String = call.get_parameter("server_name")?;
302 let command: Vec<String> = call.get_parameter("command")?;
303 let args: Vec<String> = call.get_parameter_or("args", Vec::new());
304 let env: HashMap<String, String> = call.get_parameter_or("env", HashMap::new());
305 let timeout_seconds: u64 = call.get_parameter_or("timeout_seconds", 30);
306 self.start_server(&call.id, server_name, command, args, env, timeout_seconds).await
307 }
308 "stop_server" => {
309 let server_name: String = call.get_parameter("server_name")?;
310 self.stop_server(&call.id, server_name).await
311 }
312 "list_servers" => {
313 self.list_servers(&call.id).await
314 }
315 "list_tools" => {
316 let server_name: String = call.get_parameter("server_name")?;
317 self.list_tools(&call.id, server_name).await
318 }
319 "call_tool" => {
320 let server_name: String = call.get_parameter("server_name")?;
321 let tool_name: String = call.get_parameter("tool_name")?;
322 let tool_arguments: Value = call.get_parameter("tool_arguments")?;
323 self.call_tool(&call.id, server_name, tool_name, tool_arguments).await
324 }
325 _ => Ok(ToolResult::error(&call.id, &format!(
326 "Unknown operation: {}. Supported operations: start_server, stop_server, list_servers, list_tools, call_tool",
327 operation
328 ))),
329 }
330 }
331
332 fn examples(&self) -> Vec<ToolExample> {
333 vec![
334 ToolExample {
335 description: "Start an MCP server".to_string(),
336 parameters: json!({
337 "operation": "start_server",
338 "server_name": "filesystem",
339 "command": ["node", "/path/to/mcp-server.js"],
340 "args": ["--port", "3000"],
341 "env": {"NODE_ENV": "production"}
342 }),
343 expected_result: "MCP server started successfully".to_string(),
344 },
345 ToolExample {
346 description: "List tools from an MCP server".to_string(),
347 parameters: json!({
348 "operation": "list_tools",
349 "server_name": "filesystem"
350 }),
351 expected_result: "List of available tools".to_string(),
352 },
353 ToolExample {
354 description: "Call a tool on an MCP server".to_string(),
355 parameters: json!({
356 "operation": "call_tool",
357 "server_name": "filesystem",
358 "tool_name": "read_file",
359 "tool_arguments": {"path": "/path/to/file.txt"}
360 }),
361 expected_result: "Tool execution result".to_string(),
362 },
363 ]
364 }
365}
366
367impl McpTool {
368 async fn start_server(
370 &self,
371 call_id: &str,
372 server_name: String,
373 command: Vec<String>,
374 args: Vec<String>,
375 env: HashMap<String, String>,
376 timeout_seconds: u64,
377 ) -> Result<ToolResult> {
378 if command.is_empty() {
379 return Ok(ToolResult::error(call_id, "Command cannot be empty"));
380 }
381
382 let config = McpServerConfig {
383 name: server_name.clone(),
384 command,
385 args,
386 env,
387 timeout_seconds,
388 };
389
390 let mut server = McpServer::new(config);
391
392 match server.start().await {
393 Ok(()) => {
394 let mut servers = self.servers.lock().await;
395 servers.insert(server_name.clone(), server);
396
397 Ok(ToolResult::success(
398 call_id,
399 &format!("MCP server '{}' started successfully", server_name),
400 ))
401 }
402 Err(e) => Ok(ToolResult::error(
403 call_id,
404 &format!("Failed to start MCP server '{}': {}", server_name, e),
405 )),
406 }
407 }
408
409 async fn stop_server(&self, call_id: &str, server_name: String) -> Result<ToolResult> {
411 let mut servers = self.servers.lock().await;
412
413 if let Some(mut server) = servers.remove(&server_name) {
414 server.stop();
415 Ok(ToolResult::success(
416 call_id,
417 &format!("MCP server '{}' stopped successfully", server_name),
418 ))
419 } else {
420 Ok(ToolResult::error(
421 call_id,
422 &format!("MCP server '{}' not found", server_name),
423 ))
424 }
425 }
426
427 async fn list_servers(&self, call_id: &str) -> Result<ToolResult> {
429 let servers = self.servers.lock().await;
430
431 if servers.is_empty() {
432 return Ok(ToolResult::success(
433 call_id,
434 "No MCP servers are currently running",
435 ));
436 }
437
438 let mut result = String::from("Running MCP servers:\n\n");
439 for (name, server) in servers.iter() {
440 result.push_str(&format!(
441 "- {} (command: {:?}, started: {})\n",
442 name, server.config.command, server.started
443 ));
444 }
445
446 Ok(ToolResult::success(call_id, &result))
447 }
448
449 async fn list_tools(&self, call_id: &str, server_name: String) -> Result<ToolResult> {
451 let mut servers = self.servers.lock().await;
452
453 if let Some(server) = servers.get_mut(&server_name) {
454 match server.list_tools().await {
455 Ok(tools) => {
456 if tools.is_empty() {
457 Ok(ToolResult::success(
458 call_id,
459 &format!("No tools available from MCP server '{}'", server_name),
460 ))
461 } else {
462 let mut result =
463 format!("Tools available from MCP server '{}':\n\n", server_name);
464
465 for (i, tool) in tools.iter().enumerate() {
466 if let Some(name) = tool.get("name").and_then(|n| n.as_str()) {
467 result.push_str(&format!("{}. {}", i + 1, name));
468
469 if let Some(description) =
470 tool.get("description").and_then(|d| d.as_str())
471 {
472 result.push_str(&format!(" - {}", description));
473 }
474 result.push('\n');
475
476 if let Some(input_schema) = tool.get("inputSchema") {
477 result.push_str(&format!(
478 " Input schema: {}\n",
479 serde_json::to_string_pretty(input_schema)
480 .unwrap_or_default()
481 ));
482 }
483 result.push('\n');
484 }
485 }
486
487 Ok(ToolResult::success(call_id, &result))
488 }
489 }
490 Err(e) => Ok(ToolResult::error(
491 call_id,
492 &format!(
493 "Failed to list tools from MCP server '{}': {}",
494 server_name, e
495 ),
496 )),
497 }
498 } else {
499 Ok(ToolResult::error(
500 call_id,
501 &format!("MCP server '{}' not found", server_name),
502 ))
503 }
504 }
505
506 async fn call_tool(
508 &self,
509 call_id: &str,
510 server_name: String,
511 tool_name: String,
512 tool_arguments: Value,
513 ) -> Result<ToolResult> {
514 let mut servers = self.servers.lock().await;
515
516 if let Some(server) = servers.get_mut(&server_name) {
517 match server.call_tool(&tool_name, tool_arguments).await {
518 Ok(result) => {
519 let result_str = if result.is_string() {
520 result.as_str().unwrap_or("").to_string()
521 } else {
522 serde_json::to_string_pretty(&result).unwrap_or_default()
523 };
524
525 Ok(ToolResult::success(
526 call_id,
527 &format!(
528 "Tool '{}' executed successfully on MCP server '{}':\n\n{}",
529 tool_name, server_name, result_str
530 ),
531 ))
532 }
533 Err(e) => Ok(ToolResult::error(
534 call_id,
535 &format!(
536 "Failed to call tool '{}' on MCP server '{}': {}",
537 tool_name, server_name, e
538 ),
539 )),
540 }
541 } else {
542 Ok(ToolResult::error(
543 call_id,
544 &format!("MCP server '{}' not found", server_name),
545 ))
546 }
547 }
548}
549
550impl_tool_factory!(
551 McpToolFactory,
552 McpTool,
553 "mcp_tool",
554 "Tool for interacting with MCP (Model Context Protocol) servers"
555);