coro_core/tools/builtin/
mcp.rs

1//! MCP (Model Context Protocol) tool support
2
3use crate::error::Result;
4use crate::impl_tool_factory;
5use crate::tools::{Tool, ToolCall, ToolExample, ToolResult};
6use async_trait::async_trait;
7use serde_json::{json, Value};
8use std::collections::HashMap;
9use std::process::Stdio;
10use std::sync::Arc;
11use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
12use tokio::process::{Child, Command};
13use tokio::sync::Mutex;
14use tokio::time::{timeout, Duration};
15
16/// MCP server configuration
17#[derive(Debug, Clone)]
18pub struct McpServerConfig {
19    pub name: String,
20    pub command: Vec<String>,
21    pub args: Vec<String>,
22    pub env: HashMap<String, String>,
23    pub timeout_seconds: u64,
24}
25
26/// MCP server instance
27pub struct McpServer {
28    config: McpServerConfig,
29    process: Option<Child>,
30    request_id: Arc<std::sync::Mutex<u64>>,
31    started: bool,
32}
33
34impl McpServer {
35    pub fn new(config: McpServerConfig) -> Self {
36        Self {
37            config,
38            process: None,
39            request_id: Arc::new(std::sync::Mutex::new(0)),
40            started: false,
41        }
42    }
43
44    /// Start the MCP server process
45    pub async fn start(&mut self) -> Result<()> {
46        if self.started {
47            return Ok(());
48        }
49
50        let mut cmd = Command::new(&self.config.command[0]);
51        if self.config.command.len() > 1 {
52            cmd.args(&self.config.command[1..]);
53        }
54        cmd.args(&self.config.args);
55
56        // Set environment variables
57        for (key, value) in &self.config.env {
58            cmd.env(key, value);
59        }
60
61        cmd.stdin(Stdio::piped())
62            .stdout(Stdio::piped())
63            .stderr(Stdio::piped());
64
65        self.process = Some(cmd.spawn()?);
66        self.started = true;
67
68        // Send initialization request
69        self.initialize().await?;
70
71        Ok(())
72    }
73
74    /// Stop the MCP server
75    pub fn stop(&mut self) {
76        if let Some(mut process) = self.process.take() {
77            std::mem::drop(process.kill());
78        }
79        self.started = false;
80    }
81
82    /// Send initialization request to MCP server
83    async fn initialize(&mut self) -> Result<()> {
84        let init_request = json!({
85            "jsonrpc": "2.0",
86            "id": self.next_request_id(),
87            "method": "initialize",
88            "params": {
89                "protocolVersion": "2024-11-05",
90                "capabilities": {
91                    "tools": {}
92                },
93                "clientInfo": {
94                    "name": "coro",
95                    "version": "0.1.0"
96                }
97            }
98        });
99
100        self.send_request(init_request).await?;
101        Ok(())
102    }
103
104    /// Get next request ID
105    fn next_request_id(&self) -> u64 {
106        let mut id = self.request_id.lock().unwrap();
107        *id += 1;
108        *id
109    }
110
111    /// Send a JSON-RPC request to the MCP server
112    async fn send_request(&mut self, request: Value) -> Result<Value> {
113        if !self.started || self.process.is_none() {
114            return Err("MCP server not started".into());
115        }
116
117        let process = self.process.as_mut().unwrap();
118
119        // Send request
120        if let Some(stdin) = process.stdin.as_mut() {
121            let request_str = serde_json::to_string(&request)?;
122            stdin.write_all(request_str.as_bytes()).await?;
123            stdin.write_all(b"\n").await?;
124            stdin.flush().await?;
125        } else {
126            return Err("No stdin available for MCP server".into());
127        }
128
129        // Read response with timeout
130        let response = timeout(
131            Duration::from_secs(self.config.timeout_seconds),
132            self.read_response(),
133        )
134        .await??;
135
136        Ok(response)
137    }
138
139    /// Read JSON-RPC response from MCP server
140    async fn read_response(&mut self) -> Result<Value> {
141        if let Some(process) = self.process.as_mut() {
142            if let Some(stdout) = process.stdout.as_mut() {
143                let mut reader = BufReader::new(stdout);
144                let mut line = String::new();
145                reader.read_line(&mut line).await?;
146
147                if line.trim().is_empty() {
148                    return Err("Empty response from MCP server".into());
149                }
150
151                let response: Value = serde_json::from_str(line.trim())?;
152                Ok(response)
153            } else {
154                Err("No stdout available for MCP server".into())
155            }
156        } else {
157            Err("MCP server process not available".into())
158        }
159    }
160
161    /// List available tools from MCP server
162    pub async fn list_tools(&mut self) -> Result<Vec<Value>> {
163        let request = json!({
164            "jsonrpc": "2.0",
165            "id": self.next_request_id(),
166            "method": "tools/list"
167        });
168
169        let response = self.send_request(request).await?;
170
171        if let Some(result) = response.get("result") {
172            if let Some(tools) = result.get("tools") {
173                if let Some(tools_array) = tools.as_array() {
174                    return Ok(tools_array.clone());
175                }
176            }
177        }
178
179        Ok(Vec::new())
180    }
181
182    /// Call a tool on the MCP server
183    pub async fn call_tool(&mut self, tool_name: &str, arguments: Value) -> Result<Value> {
184        let request = json!({
185            "jsonrpc": "2.0",
186            "id": self.next_request_id(),
187            "method": "tools/call",
188            "params": {
189                "name": tool_name,
190                "arguments": arguments
191            }
192        });
193
194        let response = self.send_request(request).await?;
195
196        if let Some(error) = response.get("error") {
197            return Err(format!("MCP tool error: {}", error).into());
198        }
199
200        if let Some(result) = response.get("result") {
201            return Ok(result.clone());
202        }
203
204        Err("No result in MCP response".into())
205    }
206}
207
208/// Tool for interacting with MCP servers
209pub struct McpTool {
210    servers: Arc<Mutex<HashMap<String, McpServer>>>,
211}
212
213impl Default for McpTool {
214    fn default() -> Self {
215        Self::new()
216    }
217}
218
219impl McpTool {
220    pub fn new() -> Self {
221        Self {
222            servers: Arc::new(Mutex::new(HashMap::new())),
223        }
224    }
225}
226
227#[async_trait]
228impl Tool for McpTool {
229    fn name(&self) -> &str {
230        "mcp_tool"
231    }
232
233    fn description(&self) -> &str {
234        "Tool for interacting with MCP (Model Context Protocol) servers\n\
235         * Manages connections to external MCP servers\n\
236         * Provides access to tools exposed by MCP servers\n\
237         * Supports server lifecycle management (start, stop, restart)\n\
238         * Handles JSON-RPC communication with MCP servers\n\
239         \n\
240         Operations:\n\
241         - `start_server`: Start an MCP server with given configuration\n\
242         - `stop_server`: Stop a running MCP server\n\
243         - `list_servers`: List all configured MCP servers\n\
244         - `list_tools`: List tools available from a specific MCP server\n\
245         - `call_tool`: Call a tool on a specific MCP server\n\
246         \n\
247         MCP servers are external processes that expose tools and resources\n\
248         through the Model Context Protocol. This allows integration with\n\
249         various external systems and services."
250    }
251
252    fn parameters_schema(&self) -> serde_json::Value {
253        json!({
254            "type": "object",
255            "properties": {
256                "operation": {
257                    "type": "string",
258                    "enum": ["start_server", "stop_server", "list_servers", "list_tools", "call_tool"],
259                    "description": "The operation to perform"
260                },
261                "server_name": {
262                    "type": "string",
263                    "description": "Name of the MCP server (required for most operations)"
264                },
265                "command": {
266                    "type": "array",
267                    "items": {"type": "string"},
268                    "description": "Command to start the MCP server (required for start_server)"
269                },
270                "args": {
271                    "type": "array",
272                    "items": {"type": "string"},
273                    "description": "Arguments for the MCP server command"
274                },
275                "env": {
276                    "type": "object",
277                    "description": "Environment variables for the MCP server"
278                },
279                "timeout_seconds": {
280                    "type": "integer",
281                    "description": "Timeout for MCP server operations in seconds (default: 30)"
282                },
283                "tool_name": {
284                    "type": "string",
285                    "description": "Name of the tool to call (required for call_tool)"
286                },
287                "tool_arguments": {
288                    "type": "object",
289                    "description": "Arguments to pass to the tool (required for call_tool)"
290                }
291            },
292            "required": ["operation"]
293        })
294    }
295
296    async fn execute(&self, call: ToolCall) -> Result<ToolResult> {
297        let operation: String = call.get_parameter("operation")?;
298
299        match operation.as_str() {
300            "start_server" => {
301                let server_name: String = call.get_parameter("server_name")?;
302                let command: Vec<String> = call.get_parameter("command")?;
303                let args: Vec<String> = call.get_parameter_or("args", Vec::new());
304                let env: HashMap<String, String> = call.get_parameter_or("env", HashMap::new());
305                let timeout_seconds: u64 = call.get_parameter_or("timeout_seconds", 30);
306                self.start_server(&call.id, server_name, command, args, env, timeout_seconds).await
307            }
308            "stop_server" => {
309                let server_name: String = call.get_parameter("server_name")?;
310                self.stop_server(&call.id, server_name).await
311            }
312            "list_servers" => {
313                self.list_servers(&call.id).await
314            }
315            "list_tools" => {
316                let server_name: String = call.get_parameter("server_name")?;
317                self.list_tools(&call.id, server_name).await
318            }
319            "call_tool" => {
320                let server_name: String = call.get_parameter("server_name")?;
321                let tool_name: String = call.get_parameter("tool_name")?;
322                let tool_arguments: Value = call.get_parameter("tool_arguments")?;
323                self.call_tool(&call.id, server_name, tool_name, tool_arguments).await
324            }
325            _ => Ok(ToolResult::error(&call.id, &format!(
326                "Unknown operation: {}. Supported operations: start_server, stop_server, list_servers, list_tools, call_tool", 
327                operation
328            ))),
329        }
330    }
331
332    fn examples(&self) -> Vec<ToolExample> {
333        vec![
334            ToolExample {
335                description: "Start an MCP server".to_string(),
336                parameters: json!({
337                    "operation": "start_server",
338                    "server_name": "filesystem",
339                    "command": ["node", "/path/to/mcp-server.js"],
340                    "args": ["--port", "3000"],
341                    "env": {"NODE_ENV": "production"}
342                }),
343                expected_result: "MCP server started successfully".to_string(),
344            },
345            ToolExample {
346                description: "List tools from an MCP server".to_string(),
347                parameters: json!({
348                    "operation": "list_tools",
349                    "server_name": "filesystem"
350                }),
351                expected_result: "List of available tools".to_string(),
352            },
353            ToolExample {
354                description: "Call a tool on an MCP server".to_string(),
355                parameters: json!({
356                    "operation": "call_tool",
357                    "server_name": "filesystem",
358                    "tool_name": "read_file",
359                    "tool_arguments": {"path": "/path/to/file.txt"}
360                }),
361                expected_result: "Tool execution result".to_string(),
362            },
363        ]
364    }
365}
366
367impl McpTool {
368    /// Start an MCP server
369    async fn start_server(
370        &self,
371        call_id: &str,
372        server_name: String,
373        command: Vec<String>,
374        args: Vec<String>,
375        env: HashMap<String, String>,
376        timeout_seconds: u64,
377    ) -> Result<ToolResult> {
378        if command.is_empty() {
379            return Ok(ToolResult::error(call_id, "Command cannot be empty"));
380        }
381
382        let config = McpServerConfig {
383            name: server_name.clone(),
384            command,
385            args,
386            env,
387            timeout_seconds,
388        };
389
390        let mut server = McpServer::new(config);
391
392        match server.start().await {
393            Ok(()) => {
394                let mut servers = self.servers.lock().await;
395                servers.insert(server_name.clone(), server);
396
397                Ok(ToolResult::success(
398                    call_id,
399                    &format!("MCP server '{}' started successfully", server_name),
400                ))
401            }
402            Err(e) => Ok(ToolResult::error(
403                call_id,
404                &format!("Failed to start MCP server '{}': {}", server_name, e),
405            )),
406        }
407    }
408
409    /// Stop an MCP server
410    async fn stop_server(&self, call_id: &str, server_name: String) -> Result<ToolResult> {
411        let mut servers = self.servers.lock().await;
412
413        if let Some(mut server) = servers.remove(&server_name) {
414            server.stop();
415            Ok(ToolResult::success(
416                call_id,
417                &format!("MCP server '{}' stopped successfully", server_name),
418            ))
419        } else {
420            Ok(ToolResult::error(
421                call_id,
422                &format!("MCP server '{}' not found", server_name),
423            ))
424        }
425    }
426
427    /// List all MCP servers
428    async fn list_servers(&self, call_id: &str) -> Result<ToolResult> {
429        let servers = self.servers.lock().await;
430
431        if servers.is_empty() {
432            return Ok(ToolResult::success(
433                call_id,
434                "No MCP servers are currently running",
435            ));
436        }
437
438        let mut result = String::from("Running MCP servers:\n\n");
439        for (name, server) in servers.iter() {
440            result.push_str(&format!(
441                "- {} (command: {:?}, started: {})\n",
442                name, server.config.command, server.started
443            ));
444        }
445
446        Ok(ToolResult::success(call_id, &result))
447    }
448
449    /// List tools from an MCP server
450    async fn list_tools(&self, call_id: &str, server_name: String) -> Result<ToolResult> {
451        let mut servers = self.servers.lock().await;
452
453        if let Some(server) = servers.get_mut(&server_name) {
454            match server.list_tools().await {
455                Ok(tools) => {
456                    if tools.is_empty() {
457                        Ok(ToolResult::success(
458                            call_id,
459                            &format!("No tools available from MCP server '{}'", server_name),
460                        ))
461                    } else {
462                        let mut result =
463                            format!("Tools available from MCP server '{}':\n\n", server_name);
464
465                        for (i, tool) in tools.iter().enumerate() {
466                            if let Some(name) = tool.get("name").and_then(|n| n.as_str()) {
467                                result.push_str(&format!("{}. {}", i + 1, name));
468
469                                if let Some(description) =
470                                    tool.get("description").and_then(|d| d.as_str())
471                                {
472                                    result.push_str(&format!(" - {}", description));
473                                }
474                                result.push('\n');
475
476                                if let Some(input_schema) = tool.get("inputSchema") {
477                                    result.push_str(&format!(
478                                        "   Input schema: {}\n",
479                                        serde_json::to_string_pretty(input_schema)
480                                            .unwrap_or_default()
481                                    ));
482                                }
483                                result.push('\n');
484                            }
485                        }
486
487                        Ok(ToolResult::success(call_id, &result))
488                    }
489                }
490                Err(e) => Ok(ToolResult::error(
491                    call_id,
492                    &format!(
493                        "Failed to list tools from MCP server '{}': {}",
494                        server_name, e
495                    ),
496                )),
497            }
498        } else {
499            Ok(ToolResult::error(
500                call_id,
501                &format!("MCP server '{}' not found", server_name),
502            ))
503        }
504    }
505
506    /// Call a tool on an MCP server
507    async fn call_tool(
508        &self,
509        call_id: &str,
510        server_name: String,
511        tool_name: String,
512        tool_arguments: Value,
513    ) -> Result<ToolResult> {
514        let mut servers = self.servers.lock().await;
515
516        if let Some(server) = servers.get_mut(&server_name) {
517            match server.call_tool(&tool_name, tool_arguments).await {
518                Ok(result) => {
519                    let result_str = if result.is_string() {
520                        result.as_str().unwrap_or("").to_string()
521                    } else {
522                        serde_json::to_string_pretty(&result).unwrap_or_default()
523                    };
524
525                    Ok(ToolResult::success(
526                        call_id,
527                        &format!(
528                            "Tool '{}' executed successfully on MCP server '{}':\n\n{}",
529                            tool_name, server_name, result_str
530                        ),
531                    ))
532                }
533                Err(e) => Ok(ToolResult::error(
534                    call_id,
535                    &format!(
536                        "Failed to call tool '{}' on MCP server '{}': {}",
537                        tool_name, server_name, e
538                    ),
539                )),
540            }
541        } else {
542            Ok(ToolResult::error(
543                call_id,
544                &format!("MCP server '{}' not found", server_name),
545            ))
546        }
547    }
548}
549
550impl_tool_factory!(
551    McpToolFactory,
552    McpTool,
553    "mcp_tool",
554    "Tool for interacting with MCP (Model Context Protocol) servers"
555);