Skip to main content

modular_agent_core/
mcp.rs

1//! Model Context Protocol (MCP) integration for external tool servers.
2//!
3//! This module provides integration with MCP-compliant tool servers, allowing
4//! external tools to be registered and called through the standard tool registry.
5//!
6//! MCP is a protocol for connecting LLM applications with external tool providers.
7//! This module supports loading MCP server configurations from JSON files
8//! (compatible with Claude Desktop format) and manages connection pooling
9//! for efficient server communication.
10//!
11//! # Features
12//!
13//! - Load MCP server configurations from JSON files
14//! - Automatic connection pooling for MCP servers
15//! - Register MCP tools with the global tool registry
16//! - Graceful shutdown of all MCP connections
17//!
18//! # Example
19//!
20//! ```no_run
21//! use modular_agent_core::mcp::{register_tools_from_mcp_json, shutdown_all_mcp_connections};
22//!
23//! #[tokio::main]
24//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
25//!     // Load and register tools from MCP configuration
26//!     let tools = register_tools_from_mcp_json("mcp.json").await?;
27//!     println!("Registered {} MCP tools", tools.len());
28//!
29//!     // ... use tools ...
30//!
31//!     // Clean up connections on shutdown
32//!     shutdown_all_mcp_connections().await?;
33//!     Ok(())
34//! }
35//! ```
36
37#![cfg(feature = "mcp")]
38
39use std::collections::HashMap;
40use std::path::Path;
41use std::sync::{Arc, OnceLock};
42
43use modular_agent_core::{AgentContext, AgentError, AgentValue, async_trait};
44use rmcp::{
45    model::{CallToolRequestParam, CallToolResult},
46    service::ServiceExt,
47    transport::{ConfigureCommandExt, TokioChildProcess},
48};
49use serde::Deserialize;
50use tokio::process::Command;
51use tokio::sync::Mutex as AsyncMutex;
52
53use crate::tool::{Tool, ToolInfo, register_tool};
54
55/// Tool implementation that delegates to an MCP server.
56///
57/// Uses connection pooling to efficiently reuse connections to MCP servers.
58struct MCPTool {
59    /// Name of the MCP server this tool belongs to.
60    server_name: String,
61    /// Configuration for connecting to the MCP server.
62    server_config: MCPServerConfig,
63    /// The underlying MCP tool definition.
64    tool: rmcp::model::Tool,
65    /// Tool metadata for registration.
66    info: ToolInfo,
67}
68
69impl MCPTool {
70    /// Creates a new MCPTool from server configuration and tool definition.
71    ///
72    /// # Arguments
73    ///
74    /// * `name` - The fully qualified tool name (typically "server::tool")
75    /// * `server_name` - Name of the MCP server
76    /// * `server_config` - Configuration for the MCP server
77    /// * `tool` - The MCP tool definition
78    fn new(
79        name: String,
80        server_name: String,
81        server_config: MCPServerConfig,
82        tool: rmcp::model::Tool,
83    ) -> Self {
84        let info = ToolInfo {
85            name,
86            description: tool.description.clone().unwrap_or_default().into_owned(),
87            parameters: serde_json::to_value(&tool.input_schema).ok(),
88        };
89        Self {
90            server_name,
91            server_config,
92            tool,
93            info,
94        }
95    }
96
97    /// Invokes the tool on the MCP server.
98    ///
99    /// Gets or creates a connection from the pool and calls the tool.
100    async fn tool_call(
101        &self,
102        _ctx: AgentContext,
103        value: AgentValue,
104    ) -> Result<AgentValue, AgentError> {
105        // Get or create connection from pool
106        let conn = {
107            let mut pool = connection_pool().lock().await;
108            pool.get_or_create(&self.server_name, &self.server_config)
109                .await?
110        };
111
112        let arguments = value.as_object().map(|obj| {
113            obj.iter()
114                .map(|(k, v)| {
115                    (
116                        k.clone(),
117                        serde_json::to_value(v).unwrap_or(serde_json::Value::Null),
118                    )
119                })
120                .collect::<serde_json::Map<String, serde_json::Value>>()
121        });
122
123        let tool_result = {
124            let connection = conn.lock().await;
125            let service = connection.service.as_ref().ok_or_else(|| {
126                AgentError::Other(format!(
127                    "MCP service for '{}' is not available",
128                    self.server_name
129                ))
130            })?;
131            service
132                .call_tool(CallToolRequestParam {
133                    name: self.tool.name.clone().into(),
134                    arguments,
135                    task: None,
136                })
137                .await
138                .map_err(|e| {
139                    AgentError::Other(format!("Failed to call tool '{}': {e}", self.tool.name))
140                })?
141        };
142
143        Ok(call_tool_result_to_agent_value(tool_result)?)
144    }
145}
146
147#[async_trait]
148impl Tool for MCPTool {
149    fn info(&self) -> &ToolInfo {
150        &self.info
151    }
152
153    async fn call(&self, ctx: AgentContext, args: AgentValue) -> Result<AgentValue, AgentError> {
154        self.tool_call(ctx, args).await
155    }
156}
157
158/// Root configuration structure for MCP servers.
159///
160/// Compatible with the Claude Desktop MCP configuration format (`mcp.json`).
161///
162/// # Example JSON
163///
164/// ```json
165/// {
166///   "mcpServers": {
167///     "filesystem": {
168///       "command": "npx",
169///       "args": ["-y", "@anthropic/mcp-server-filesystem", "/path/to/dir"]
170///     }
171///   }
172/// }
173/// ```
174#[derive(Debug, Deserialize)]
175pub struct MCPConfig {
176    /// Map of server names to their configurations.
177    #[serde(rename = "mcpServers")]
178    pub mcp_servers: HashMap<String, MCPServerConfig>,
179}
180
181/// Configuration for a single MCP server.
182///
183/// Specifies how to start the MCP server process.
184#[derive(Debug, Clone, Deserialize)]
185pub struct MCPServerConfig {
186    /// The command to execute (e.g., "npx", "node", "python").
187    pub command: String,
188
189    /// Arguments to pass to the command.
190    pub args: Vec<String>,
191
192    /// Optional environment variables for the process.
193    #[serde(default)]
194    pub env: Option<HashMap<String, String>>,
195}
196
197/// Type alias for a running MCP service connection.
198type MCPService = rmcp::service::RunningService<rmcp::service::RoleClient, ()>;
199
200/// A single connection to an MCP server.
201struct MCPConnection {
202    /// The running service, or None if not connected.
203    service: Option<MCPService>,
204}
205
206/// Connection pool for managing MCP server connections.
207///
208/// Maintains persistent connections to MCP servers and reuses them
209/// across multiple tool calls for efficiency.
210struct MCPConnectionPool {
211    /// Map of server names to their connections.
212    connections: HashMap<String, Arc<AsyncMutex<MCPConnection>>>,
213}
214
215impl MCPConnectionPool {
216    /// Creates a new empty connection pool.
217    fn new() -> Self {
218        Self {
219            connections: HashMap::new(),
220        }
221    }
222
223    /// Gets an existing connection or creates a new one for the server.
224    ///
225    /// If a connection already exists for the server, it is reused.
226    /// Otherwise, a new MCP server process is started.
227    async fn get_or_create(
228        &mut self,
229        server_name: &str,
230        config: &MCPServerConfig,
231    ) -> Result<Arc<AsyncMutex<MCPConnection>>, AgentError> {
232        // Check if connection already exists
233        if let Some(conn) = self.connections.get(server_name) {
234            log::debug!("Reusing existing MCP connection for '{}'", server_name);
235            return Ok(conn.clone());
236        }
237
238        log::info!(
239            "Starting MCP server '{}' (command: {})",
240            server_name,
241            config.command
242        );
243
244        // Start new MCP service
245        let service = ()
246            .serve(
247                TokioChildProcess::new(Command::new(&config.command).configure(|cmd| {
248                    for arg in &config.args {
249                        cmd.arg(arg);
250                    }
251                    if let Some(env) = &config.env {
252                        for (key, value) in env {
253                            cmd.env(key, value);
254                        }
255                    }
256                }))
257                .map_err(|e| {
258                    log::error!("Failed to start MCP process for '{}': {}", server_name, e);
259                    AgentError::Other(format!(
260                        "Failed to start MCP process for '{}': {e}",
261                        server_name
262                    ))
263                })?,
264            )
265            .await
266            .map_err(|e| {
267                log::error!("Failed to start MCP service for '{}': {}", server_name, e);
268                AgentError::Other(format!(
269                    "Failed to start MCP service for '{}': {e}",
270                    server_name
271                ))
272            })?;
273
274        log::info!("Successfully started MCP server '{}'", server_name);
275
276        let connection = MCPConnection {
277            service: Some(service),
278        };
279
280        let conn_arc = Arc::new(AsyncMutex::new(connection));
281        self.connections
282            .insert(server_name.to_string(), conn_arc.clone());
283        Ok(conn_arc)
284    }
285
286    /// Shuts down all connections in the pool.
287    ///
288    /// Cancels all running MCP services and clears the connection map.
289    async fn shutdown_all(&mut self) -> Result<(), AgentError> {
290        let count = self.connections.len();
291        log::debug!("Shutting down {} MCP server connection(s)", count);
292
293        for (name, conn) in self.connections.drain() {
294            log::debug!("Shutting down MCP server '{}'", name);
295            let mut connection = conn.lock().await;
296            if let Some(service) = connection.service.take() {
297                service.cancel().await.map_err(|e| {
298                    log::error!("Failed to cancel MCP service '{}': {}", name, e);
299                    AgentError::Other(format!("Failed to cancel MCP service: {e}"))
300                })?;
301                log::debug!("Successfully shut down MCP server '{}'", name);
302            }
303        }
304        Ok(())
305    }
306}
307
308/// Global connection pool instance.
309static CONNECTION_POOL: OnceLock<AsyncMutex<MCPConnectionPool>> = OnceLock::new();
310
311/// Returns the global connection pool, initializing it if necessary.
312fn connection_pool() -> &'static AsyncMutex<MCPConnectionPool> {
313    CONNECTION_POOL.get_or_init(|| AsyncMutex::new(MCPConnectionPool::new()))
314}
315
316/// Shuts down all MCP server connections.
317///
318/// Call this during application shutdown to cleanly terminate all
319/// MCP server processes.
320///
321/// # Example
322///
323/// ```no_run
324/// use modular_agent_core::mcp::shutdown_all_mcp_connections;
325///
326/// #[tokio::main]
327/// async fn main() {
328///     // ... use MCP tools ...
329///
330///     // Clean shutdown
331///     shutdown_all_mcp_connections().await.expect("Failed to shutdown MCP");
332/// }
333/// ```
334pub async fn shutdown_all_mcp_connections() -> Result<(), AgentError> {
335    log::info!("Shutting down all MCP server connections");
336    connection_pool().lock().await.shutdown_all().await?;
337    log::info!("All MCP server connections shut down successfully");
338    Ok(())
339}
340
341/// Registers all tools from a single MCP server.
342///
343/// Connects to the MCP server, lists its available tools, and registers
344/// each one with the global tool registry.
345///
346/// # Arguments
347///
348/// * `server_name` - Name of the MCP server
349/// * `server_config` - Configuration for the MCP server
350///
351/// # Returns
352///
353/// A vector of registered tool names in the format "server_name::tool_name".
354async fn register_tools_from_server(
355    server_name: String,
356    server_config: MCPServerConfig,
357) -> Result<Vec<String>, AgentError> {
358    log::debug!("Registering tools from MCP server '{}'", server_name);
359
360    // Get or create connection from pool
361    let conn = {
362        let mut pool = connection_pool().lock().await;
363        pool.get_or_create(&server_name, &server_config).await?
364    };
365
366    // List all available tools from this server
367    log::debug!("Listing tools from MCP server '{}'", server_name);
368    let tools_list = {
369        let connection = conn.lock().await;
370        let service = connection.service.as_ref().ok_or_else(|| {
371            log::error!("MCP service for '{}' is not available", server_name);
372            AgentError::Other(format!(
373                "MCP service for '{}' is not available",
374                server_name
375            ))
376        })?;
377        service.list_tools(Default::default()).await.map_err(|e| {
378            log::error!("Failed to list MCP tools for '{}': {}", server_name, e);
379            AgentError::Other(format!(
380                "Failed to list MCP tools for '{}': {e}",
381                server_name
382            ))
383        })?
384    };
385
386    let mut registered_tool_names = Vec::new();
387
388    // Register all tools from this server using connection pool
389    for tool_info in tools_list.tools {
390        let mcp_tool_name = format!("{}::{}", server_name, tool_info.name);
391        registered_tool_names.push(mcp_tool_name.clone());
392
393        register_tool(MCPTool::new(
394            mcp_tool_name.clone(),
395            server_name.clone(),
396            server_config.clone(),
397            tool_info,
398        ));
399        log::debug!("Registered MCP tool '{}'", mcp_tool_name);
400    }
401
402    log::info!(
403        "Registered {} tools from MCP server '{}'",
404        registered_tool_names.len(),
405        server_name
406    );
407
408    Ok(registered_tool_names)
409}
410
411/// Loads MCP configuration from a JSON file and registers all tools
412///
413/// # Arguments
414/// * `json_path` - Path to the mcp.json file
415///
416/// # Returns
417/// A vector of registered tool names in the format "server_name::tool_name"
418///
419/// # Example
420/// ```no_run
421/// use modular_agent_core::mcp::register_tools_from_mcp_json;
422///
423/// #[tokio::main]
424/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
425///     let tool_names = register_tools_from_mcp_json("mcp.json").await?;
426///     println!("Registered {} tools", tool_names.len());
427///     Ok(())
428/// }
429/// ```
430pub async fn register_tools_from_mcp_json<P: AsRef<Path>>(
431    json_path: P,
432) -> Result<Vec<String>, AgentError> {
433    let path = json_path.as_ref();
434    log::info!("Loading MCP configuration from: {}", path.display());
435
436    // Read the JSON file
437    let json_content = std::fs::read_to_string(path).map_err(|e| {
438        log::error!("Failed to read MCP config file '{}': {}", path.display(), e);
439        AgentError::Other(format!("Failed to read MCP config file: {e}"))
440    })?;
441
442    // Parse the JSON
443    let config: MCPConfig = serde_json::from_str(&json_content).map_err(|e| {
444        log::error!("Failed to parse MCP config JSON: {}", e);
445        AgentError::Other(format!("Failed to parse MCP config JSON: {e}"))
446    })?;
447
448    log::info!("Found {} MCP servers in config", config.mcp_servers.len());
449
450    let mut registered_tool_names = Vec::new();
451
452    // Iterate through each MCP server
453    for (server_name, server_config) in config.mcp_servers {
454        let tools = register_tools_from_server(server_name, server_config).await?;
455        registered_tool_names.extend(tools);
456    }
457
458    log::info!(
459        "Successfully registered {} MCP tools total",
460        registered_tool_names.len()
461    );
462
463    Ok(registered_tool_names)
464}
465
466/// Converts an MCP tool call result to an AgentValue.
467///
468/// Extracts text content from the result and returns it as an array.
469/// If the result indicates an error, returns an AgentError instead.
470fn call_tool_result_to_agent_value(result: CallToolResult) -> Result<AgentValue, AgentError> {
471    let mut contents = Vec::new();
472    for c in result.content.iter() {
473        match &c.raw {
474            rmcp::model::RawContent::Text(text) => {
475                contents.push(AgentValue::string(text.text.clone()));
476            }
477            _ => {
478                // Handle other content types as needed
479            }
480        }
481    }
482    let data = AgentValue::array(contents.into());
483    if result.is_error == Some(true) {
484        return Err(AgentError::Other(
485            serde_json::to_string(&data).map_err(|e| AgentError::InvalidValue(e.to_string()))?,
486        ));
487    }
488    Ok(data)
489}