Skip to main content

matrixcode_core/mcp/
lazy.rs

1//! MCP Lazy Loader
2//!
3//! 懒加载 MCP 工具:只在首次调用时启动服务器
4
5use anyhow::{anyhow, Result};
6use async_trait::async_trait;
7use serde_json::Value;
8use std::collections::HashMap;
9use std::sync::Arc;
10use tokio::sync::{Mutex, RwLock};
11
12use super::client::McpClient;
13use super::config::{McpConfig, McpServerConfig};
14use super::proxy::McpToolWrapper;
15use crate::tools::{ToolDefinition, Tool};
16use crate::approval::RiskLevel;
17
18// ============================================================================
19// Lazy MCP Tool
20// ============================================================================
21
22/// 懒加载 MCP 工具 - 调用时才启动服务器
23pub struct LazyMcpTool {
24    /// 工具定义(预先知道)
25    definition: ToolDefinition,
26    
27    /// 服务器名称
28    server_name: String,
29    
30    /// 服务器配置
31    server_config: McpServerConfig,
32    
33    /// 实际工具(启动后填充)
34    actual_tool: Arc<Mutex<Option<Arc<McpToolWrapper>>>>,
35    
36    /// 客户端(启动后填充)
37    client: Arc<Mutex<Option<Arc<McpClient>>>>,
38}
39
40impl LazyMcpTool {
41    /// 创建懒加载工具
42    pub fn new(
43        server_name: String,
44        server_config: McpServerConfig,
45        tool_name: String,
46        tool_description: String,
47        tool_input_schema: Value,
48    ) -> Self {
49        // 创建工具定义(去掉服务器前缀)
50        let definition = ToolDefinition {
51            name: tool_name,
52            description: tool_description,
53            parameters: tool_input_schema,
54            is_priority: false,
55        };
56        
57        Self {
58            definition,
59            server_name,
60            server_config,
61            actual_tool: Arc::new(Mutex::new(None)),
62            client: Arc::new(Mutex::new(None)),
63        }
64    }
65    
66    /// 启动服务器并获取实际工具
67    async fn ensure_started(&self) -> Result<Arc<McpToolWrapper>> {
68        // 检查是否已启动
69        {
70            let tool_lock = self.actual_tool.lock().await;
71            if let Some(tool) = tool_lock.as_ref() {
72                return Ok(tool.clone());
73            }
74        }
75        
76        // 启动服务器
77        tracing::info!("Starting lazy MCP server '{}' for tool '{}'", 
78            self.server_name, self.definition.name);
79        
80        let transport_config = self.server_config.to_transport_config()?;
81        let client = Arc::new(McpClient::connect(&self.server_name, transport_config).await?);
82        
83        // 获取工具列表
84        let mcp_tools = client.list_tools().await?;
85        
86        // 找到匹配的工具
87        let tool_name_without_prefix = self.definition.name.clone();
88        let mcp_tool = mcp_tools.into_iter()
89            .find(|t| {
90                // 匹配去掉前缀的名字
91                let name = t.name.clone();
92                name == tool_name_without_prefix || 
93                name == format!("{}_{}", self.server_name, tool_name_without_prefix)
94            })
95            .ok_or_else(|| anyhow!(
96                "Tool '{}' not found in MCP server '{}'", 
97                self.definition.name, self.server_name
98            ))?;
99        
100        // 创建工具包装器
101        let wrapper = Arc::new(McpToolWrapper::new(
102            client.clone(),
103            mcp_tool,
104            self.server_name.clone()
105        ));
106        
107        // 缓存
108        {
109            let mut tool_lock = self.actual_tool.lock().await;
110            *tool_lock = Some(wrapper.clone());
111        }
112        {
113            let mut client_lock = self.client.lock().await;
114            *client_lock = Some(client);
115        }
116        
117        tracing::info!("Lazy MCP server '{}' started successfully", self.server_name);
118        
119        Ok(wrapper)
120    }
121    
122    /// 关闭服务器
123    pub async fn shutdown(&self) -> Result<()> {
124        let client_lock = self.client.lock().await;
125        if let Some(client) = client_lock.as_ref() {
126            client.shutdown().await?;
127            tracing::info!("Lazy MCP server '{}' stopped", self.server_name);
128        }
129        Ok(())
130    }
131}
132
133#[async_trait]
134impl Tool for LazyMcpTool {
135    fn definition(&self) -> ToolDefinition {
136        self.definition.clone()
137    }
138    
139    async fn execute(&self, params: Value) -> Result<String> {
140        // 确保服务器已启动
141        let tool = self.ensure_started().await?;
142        
143        // 执行工具
144        tool.execute(params).await
145    }
146    
147    fn risk_level(&self) -> RiskLevel {
148        // MCP 工具默认为 Mutating(可能修改外部状态)
149        RiskLevel::Mutating
150    }
151}
152
153// ============================================================================
154// Lazy MCP Loader
155// ============================================================================
156
157/// 懒加载 MCP 工具集 - 从配置预先创建工具定义,但不启动服务器
158pub struct LazyMcpLoader {
159    /// MCP 配置
160    config: McpConfig,
161    
162    /// 预先创建的懒加载工具(预留字段)
163    #[allow(dead_code)]
164    tools: Vec<LazyMcpTool>,
165}
166
167impl LazyMcpLoader {
168    /// 从配置创建懒加载工具集
169    pub fn from_config(config: McpConfig) -> Self {
170        let tools = Vec::new();
171        
172        // 为每个服务器预先创建占位工具
173        // (实际工具定义需要查询服务器,这里无法预先知道)
174        // 所以这个实现需要在首次查询时动态发现
175        
176        Self {
177            config,
178            tools,
179        }
180    }
181    
182    /// 发现工具定义(不启动服务器)
183    /// 返回占位符工具,调用时才启动
184    pub async fn discover_tools(&self) -> Result<Vec<LazyMcpTool>> {
185        // 这个方法需要实际连接服务器才能知道工具列表
186        // 所以懒加载实现需要改变策略
187        
188        // 方案:提供一个特殊工具来触发服务器启动
189        // 或者:在配置中预先声明工具
190        
191        // 这里返回空,实际实现需要其他策略
192        Ok(Vec::new())
193    }
194    
195    /// 获取配置
196    pub fn config(&self) -> &McpConfig {
197        &self.config
198    }
199}
200
201// ============================================================================
202// Alternative: MCP Tool Placeholder
203// ============================================================================
204
205/// MCP 工具占位符 - 用户显式调用时才启动服务器
206pub struct McpToolPlaceholder {
207    /// 服务器名称
208    server_name: String,
209    
210    /// 服务器配置
211    server_config: McpServerConfig,
212    
213    /// 已启动的工具(None 表示未启动)
214    tools: Arc<RwLock<Option<Vec<Arc<McpToolWrapper>>>>>,
215    
216    /// 客户端(None 表示未启动)
217    client: Arc<RwLock<Option<Arc<McpClient>>>>,
218}
219
220impl McpToolPlaceholder {
221    /// 创建占位符
222    pub fn new(server_name: String, server_config: McpServerConfig) -> Self {
223        Self {
224            server_name,
225            server_config,
226            tools: Arc::new(RwLock::new(None)),
227            client: Arc::new(RwLock::new(None)),
228        }
229    }
230    
231    /// 启动服务器并返回工具列表
232    pub async fn start(&self) -> Result<Vec<Arc<McpToolWrapper>>> {
233        // 检查是否已启动
234        {
235            let tools_lock = self.tools.read().await;
236            if let Some(tools) = tools_lock.as_ref() {
237                return Ok(tools.clone());
238            }
239        }
240        
241        // 启动服务器
242        tracing::info!("Starting MCP server '{}' on demand", self.server_name);
243        
244        let transport_config = self.server_config.to_transport_config()?;
245        let client = Arc::new(McpClient::connect(&self.server_name, transport_config).await?);
246        
247        // 获取工具列表
248        if !client.supports_tools().await {
249            return Err(anyhow!("MCP server '{}' does not support tools", self.server_name));
250        }
251        
252        let mcp_tools = client.list_tools().await?;
253        
254        // 创建工具包装器
255        let tools: Vec<Arc<McpToolWrapper>> = mcp_tools.into_iter()
256            .map(|tool| Arc::new(McpToolWrapper::new(
257                client.clone(),
258                tool,
259                self.server_name.clone()
260            )))
261            .collect();
262        
263        // 缓存
264        {
265            let mut tools_lock = self.tools.write().await;
266            *tools_lock = Some(tools.clone());
267        }
268        {
269            let mut client_lock = self.client.write().await;
270            *client_lock = Some(client);
271        }
272        
273        tracing::info!("MCP server '{}' started with {} tools", self.server_name, tools.len());
274        
275        Ok(tools)
276    }
277    
278    /// 关闭服务器
279    pub async fn shutdown(&self) -> Result<()> {
280        let client_lock = self.client.read().await;
281        if let Some(client) = client_lock.as_ref() {
282            client.shutdown().await?;
283        }
284        Ok(())
285    }
286    
287    /// 是否已启动
288    pub async fn is_started(&self) -> bool {
289        self.tools.read().await.is_some()
290    }
291    
292    /// 获取服务器名称
293    pub fn server_name(&self) -> &str {
294        &self.server_name
295    }
296}
297
298// ============================================================================
299// MCP Tool Registry (Dynamic + Lazy)
300// ============================================================================
301
302/// MCP 工具注册表 - 管理所有懒加载的 MCP 服务器
303/// 
304/// # 功能
305/// - 从配置文件加载服务器(懒加载)
306/// - 运行时动态添加/移除服务器
307/// - 查询已连接服务器的状态
308/// - 全局单例管理
309pub struct McpToolRegistry {
310    /// 服务器占位符(按名称索引)
311    placeholders: HashMap<String, Arc<McpToolPlaceholder>>,
312}
313
314impl McpToolRegistry {
315    /// 从配置创建注册表
316    pub fn from_config(config: &McpConfig) -> Self {
317        let placeholders = config.servers
318            .iter()
319            .filter(|(_, cfg)| cfg.enabled)
320            .map(|(name, cfg)| {
321                (name.clone(), Arc::new(McpToolPlaceholder::new(
322                    name.clone(),
323                    cfg.clone()
324                )))
325            })
326            .collect();
327        
328        Self { placeholders }
329    }
330    
331    /// 创建空注册表
332    pub fn new() -> Self {
333        Self {
334            placeholders: HashMap::new(),
335        }
336    }
337    
338    /// 动态添加服务器(运行时)
339    pub fn add_server(&mut self, name: String, config: McpServerConfig) {
340        self.placeholders.insert(
341            name.clone(),
342            Arc::new(McpToolPlaceholder::new(name, config))
343        );
344    }
345    
346    /// 移除服务器(如果已启动则先关闭)
347    pub async fn remove_server(&mut self, name: &str) -> Result<()> {
348        if let Some(placeholder) = self.placeholders.remove(name) {
349            placeholder.shutdown().await?;
350        }
351        Ok(())
352    }
353    
354    /// 获取服务器占位符
355    pub fn get_server(&self, name: &str) -> Option<Arc<McpToolPlaceholder>> {
356        self.placeholders.get(name).cloned()
357    }
358    
359    /// 获取所有服务器名称
360    pub fn server_names(&self) -> Vec<&String> {
361        self.placeholders.keys().collect()
362    }
363    
364    /// 获取已启动的服务器列表
365    pub async fn started_servers(&self) -> Vec<String> {
366        let mut started = Vec::new();
367        for (name, placeholder) in &self.placeholders {
368            if placeholder.is_started().await {
369                started.push(name.clone());
370            }
371        }
372        started
373    }
374    
375    /// 获取服务器状态(用于 TUI 显示)
376    pub async fn server_status(&self) -> HashMap<String, ServerStatus> {
377        let mut status = HashMap::new();
378        for (name, placeholder) in &self.placeholders {
379            let is_started = placeholder.is_started().await;
380            status.insert(name.clone(), ServerStatus {
381                name: name.clone(),
382                is_started,
383                tool_count: if is_started {
384                    placeholder.tools.read().await.as_ref().map(|t| t.len()).unwrap_or(0)
385                } else {
386                    0
387                },
388            });
389        }
390        status
391    }
392    
393    /// 启动所有服务器
394    pub async fn start_all(&self) -> Result<HashMap<String, Vec<Arc<McpToolWrapper>>>> {
395        let mut results = HashMap::new();
396        
397        for (name, placeholder) in &self.placeholders {
398            let tools = placeholder.start().await?;
399            results.insert(name.clone(), tools);
400        }
401        
402        Ok(results)
403    }
404    
405    /// 关闭所有服务器
406    pub async fn shutdown_all(&self) {
407        for placeholder in self.placeholders.values() {
408            if let Err(e) = placeholder.shutdown().await {
409                tracing::error!("Failed to shutdown MCP server '{}': {}", 
410                    placeholder.server_name(), e);
411            }
412        }
413    }
414    
415    /// 从 CLI 参数添加服务器
416    /// 
417    /// # 格式
418    /// - `name:command args` → 自定义名称
419    /// - `command args` → 自动推断名称(取命令第一部分)
420    /// 
421    /// # Example
422    /// ```ignore
423    /// registry.add_from_cli_arg("playwright:npx -y @playwright/mcp@latest")?;
424    /// registry.add_from_cli_arg("npx -y @modelcontextprotocol/server-filesystem")?;
425    /// ```
426    pub fn add_from_cli_arg(&mut self, arg: &str) -> Result<()> {
427        let (name, command, args) = parse_cli_mcp_arg(arg)?;
428        
429        let config = McpServerConfig {
430            command: Some(command),
431            args,
432            url: None,
433            enabled: true,
434            ..Default::default()
435        };
436        
437        self.add_server(name, config);
438        Ok(())
439    }
440    
441    /// 从 CLI 参数批量添加服务器
442    pub fn add_from_cli_args(&mut self, args: &[String]) -> Result<()> {
443        for arg in args {
444            self.add_from_cli_arg(arg)?;
445        }
446        Ok(())
447    }
448}
449
450impl Default for McpToolRegistry {
451    fn default() -> Self {
452        Self::new()
453    }
454}
455
456/// 服务器状态(用于 UI 显示)
457#[derive(Debug, Clone)]
458pub struct ServerStatus {
459    pub name: String,
460    pub is_started: bool,
461    pub tool_count: usize,
462}
463
464/// 解析 CLI MCP 参数
465/// 
466/// # 格式
467/// - `name:command args` → (name, command, args)
468/// - `command args` → (command, command, args)
469/// 
470/// # Example
471/// ```
472/// parse_cli_mcp_arg("playwright:npx -y @playwright/mcp@latest")
473/// // → ("playwright", "npx", vec!["-y", "@playwright/mcp@latest"])
474/// 
475/// parse_cli_mcp_arg("npx -y @modelcontextprotocol/server-filesystem")
476/// // → ("npx", "npx", vec!["-y", "@modelcontextprotocol/server-filesystem"])
477/// ```
478fn parse_cli_mcp_arg(arg: &str) -> Result<(String, String, Vec<String>)> {
479    let arg = arg.trim();
480    if arg.is_empty() {
481        return Err(anyhow!("Empty MCP server argument"));
482    }
483    
484    // 尝试解析 name:command args 格式
485    if let Some((name, rest)) = arg.split_once(':') {
486        let name = name.trim().to_string();
487        let rest = rest.trim();
488        
489        // 解析命令和参数
490        let parts = shell_words::split(rest)
491            .map_err(|e| anyhow!("Failed to parse command: {}", e))?;
492        
493        if parts.is_empty() {
494            return Err(anyhow!("Missing command for MCP server '{}'", name));
495        }
496        
497        let command = parts[0].clone();
498        let args = parts[1..].to_vec();
499        
500        return Ok((name, command, args));
501    }
502    
503    // 无名称前缀,使用命令作为名称
504    let parts = shell_words::split(arg)
505        .map_err(|e| anyhow!("Failed to parse command: {}", e))?;
506    
507    if parts.is_empty() {
508        return Err(anyhow!("Empty MCP server argument"));
509    }
510    
511    let command = parts[0].clone();
512    let name = command.clone();  // 使用命令名作为服务器名
513    let args = parts[1..].to_vec();
514    
515    Ok((name, command, args))
516}