Skip to main content

matrixcode_core/mcp/
lazy.rs

1//! MCP Lazy Loader
2//!
3//! 懒加载 MCP 工具:只在首次调用时启动服务器
4
5use anyhow::{Result, anyhow};
6use async_trait::async_trait;
7use serde_json::Value;
8use std::collections::HashMap;
9use std::sync::Arc;
10use tokio::sync::{Mutex, RwLock};
11
12use super::client::McpClient;
13use super::config::{McpConfig, McpServerConfig};
14use super::proxy::McpToolWrapper;
15use crate::approval::RiskLevel;
16use crate::tools::{Tool, ToolDefinition};
17
18// ============================================================================
19// Lazy MCP Tool
20// ============================================================================
21
22/// 懒加载 MCP 工具 - 调用时才启动服务器
23pub struct LazyMcpTool {
24    /// 工具定义(预先知道)
25    definition: ToolDefinition,
26
27    /// 服务器名称
28    server_name: String,
29
30    /// 服务器配置
31    server_config: McpServerConfig,
32
33    /// 实际工具(启动后填充)
34    actual_tool: Arc<Mutex<Option<Arc<McpToolWrapper>>>>,
35
36    /// 客户端(启动后填充)
37    client: Arc<Mutex<Option<Arc<McpClient>>>>,
38}
39
40impl LazyMcpTool {
41    /// 创建懒加载工具
42    pub fn new(
43        server_name: String,
44        server_config: McpServerConfig,
45        tool_name: String,
46        tool_description: String,
47        tool_input_schema: Value,
48    ) -> Self {
49        // 创建工具定义(去掉服务器前缀)
50        let definition = ToolDefinition {
51            name: tool_name,
52            description: tool_description,
53            parameters: tool_input_schema,
54            is_priority: false,
55        };
56
57        Self {
58            definition,
59            server_name,
60            server_config,
61            actual_tool: Arc::new(Mutex::new(None)),
62            client: Arc::new(Mutex::new(None)),
63        }
64    }
65
66    /// 启动服务器并获取实际工具
67    async fn ensure_started(&self) -> Result<Arc<McpToolWrapper>> {
68        // 检查是否已启动
69        {
70            let tool_lock = self.actual_tool.lock().await;
71            if let Some(tool) = tool_lock.as_ref() {
72                return Ok(tool.clone());
73            }
74        }
75
76        // 启动服务器
77        tracing::info!(
78            "Starting lazy MCP server '{}' for tool '{}'",
79            self.server_name,
80            self.definition.name
81        );
82
83        let transport_config = self.server_config.to_transport_config()?;
84        let client = Arc::new(McpClient::connect(&self.server_name, transport_config).await?);
85
86        // 获取工具列表
87        let mcp_tools = client.list_tools().await?;
88
89        // 找到匹配的工具
90        let tool_name_without_prefix = self.definition.name.clone();
91        let mcp_tool = mcp_tools
92            .into_iter()
93            .find(|t| {
94                // 匹配去掉前缀的名字
95                let name = t.name.clone();
96                name == tool_name_without_prefix
97                    || name == format!("{}_{}", self.server_name, tool_name_without_prefix)
98            })
99            .ok_or_else(|| {
100                anyhow!(
101                    "Tool '{}' not found in MCP server '{}'",
102                    self.definition.name,
103                    self.server_name
104                )
105            })?;
106
107        // 创建工具包装器
108        let wrapper = Arc::new(McpToolWrapper::new(
109            client.clone(),
110            mcp_tool,
111            self.server_name.clone(),
112        ));
113
114        // 缓存
115        {
116            let mut tool_lock = self.actual_tool.lock().await;
117            *tool_lock = Some(wrapper.clone());
118        }
119        {
120            let mut client_lock = self.client.lock().await;
121            *client_lock = Some(client);
122        }
123
124        tracing::info!(
125            "Lazy MCP server '{}' started successfully",
126            self.server_name
127        );
128
129        Ok(wrapper)
130    }
131
132    /// 关闭服务器
133    pub async fn shutdown(&self) -> Result<()> {
134        let client_lock = self.client.lock().await;
135        if let Some(client) = client_lock.as_ref() {
136            client.shutdown().await?;
137            tracing::info!("Lazy MCP server '{}' stopped", self.server_name);
138        }
139        Ok(())
140    }
141}
142
143#[async_trait]
144impl Tool for LazyMcpTool {
145    fn definition(&self) -> ToolDefinition {
146        self.definition.clone()
147    }
148
149    async fn execute(&self, params: Value) -> Result<String> {
150        // 确保服务器已启动
151        let tool = self.ensure_started().await?;
152
153        // 执行工具
154        tool.execute(params).await
155    }
156
157    fn risk_level(&self) -> RiskLevel {
158        // MCP 工具默认为 Mutating(可能修改外部状态)
159        RiskLevel::Mutating
160    }
161}
162
163// ============================================================================
164// Lazy MCP Loader
165// ============================================================================
166
167/// 懒加载 MCP 工具集 - 从配置预先创建工具定义,但不启动服务器
168pub struct LazyMcpLoader {
169    /// MCP 配置
170    config: McpConfig,
171
172    /// 预先创建的懒加载工具(预留字段)
173    #[allow(dead_code)]
174    tools: Vec<LazyMcpTool>,
175}
176
177impl LazyMcpLoader {
178    /// 从配置创建懒加载工具集
179    pub fn from_config(config: McpConfig) -> Self {
180        let tools = Vec::new();
181
182        // 为每个服务器预先创建占位工具
183        // (实际工具定义需要查询服务器,这里无法预先知道)
184        // 所以这个实现需要在首次查询时动态发现
185
186        Self { config, tools }
187    }
188
189    /// 发现工具定义(不启动服务器)
190    /// 返回占位符工具,调用时才启动
191    pub async fn discover_tools(&self) -> Result<Vec<LazyMcpTool>> {
192        // 这个方法需要实际连接服务器才能知道工具列表
193        // 所以懒加载实现需要改变策略
194
195        // 方案:提供一个特殊工具来触发服务器启动
196        // 或者:在配置中预先声明工具
197
198        // 这里返回空,实际实现需要其他策略
199        Ok(Vec::new())
200    }
201
202    /// 获取配置
203    pub fn config(&self) -> &McpConfig {
204        &self.config
205    }
206}
207
208// ============================================================================
209// Alternative: MCP Tool Placeholder
210// ============================================================================
211
212/// MCP 工具占位符 - 用户显式调用时才启动服务器
213pub struct McpToolPlaceholder {
214    /// 服务器名称
215    server_name: String,
216
217    /// 服务器配置
218    server_config: McpServerConfig,
219
220    /// 已启动的工具(None 表示未启动)
221    tools: Arc<RwLock<Option<Vec<Arc<McpToolWrapper>>>>>,
222
223    /// 客户端(None 表示未启动)
224    client: Arc<RwLock<Option<Arc<McpClient>>>>,
225}
226
227impl McpToolPlaceholder {
228    /// 创建占位符
229    pub fn new(server_name: String, server_config: McpServerConfig) -> Self {
230        Self {
231            server_name,
232            server_config,
233            tools: Arc::new(RwLock::new(None)),
234            client: Arc::new(RwLock::new(None)),
235        }
236    }
237
238    /// 启动服务器并返回工具列表
239    pub async fn start(&self) -> Result<Vec<Arc<McpToolWrapper>>> {
240        // 检查是否已启动
241        {
242            let tools_lock = self.tools.read().await;
243            if let Some(tools) = tools_lock.as_ref() {
244                return Ok(tools.clone());
245            }
246        }
247
248        // 启动服务器
249        tracing::info!("Starting MCP server '{}' on demand", self.server_name);
250
251        let transport_config = self.server_config.to_transport_config()?;
252        let client = Arc::new(McpClient::connect(&self.server_name, transport_config).await?);
253
254        // 获取工具列表
255        if !client.supports_tools().await {
256            return Err(anyhow!(
257                "MCP server '{}' does not support tools",
258                self.server_name
259            ));
260        }
261
262        let mcp_tools = client.list_tools().await?;
263
264        // 创建工具包装器
265        let tools: Vec<Arc<McpToolWrapper>> = mcp_tools
266            .into_iter()
267            .map(|tool| {
268                Arc::new(McpToolWrapper::new(
269                    client.clone(),
270                    tool,
271                    self.server_name.clone(),
272                ))
273            })
274            .collect();
275
276        // 缓存
277        {
278            let mut tools_lock = self.tools.write().await;
279            *tools_lock = Some(tools.clone());
280        }
281        {
282            let mut client_lock = self.client.write().await;
283            *client_lock = Some(client);
284        }
285
286        tracing::info!(
287            "MCP server '{}' started with {} tools",
288            self.server_name,
289            tools.len()
290        );
291
292        Ok(tools)
293    }
294
295    /// 关闭服务器
296    pub async fn shutdown(&self) -> Result<()> {
297        let client_lock = self.client.read().await;
298        if let Some(client) = client_lock.as_ref() {
299            client.shutdown().await?;
300        }
301        Ok(())
302    }
303
304    /// 是否已启动
305    pub async fn is_started(&self) -> bool {
306        self.tools.read().await.is_some()
307    }
308
309    /// 获取服务器名称
310    pub fn server_name(&self) -> &str {
311        &self.server_name
312    }
313}
314
315// ============================================================================
316// MCP Tool Registry (Dynamic + Lazy)
317// ============================================================================
318
319/// MCP 工具注册表 - 管理所有懒加载的 MCP 服务器
320///
321/// # 功能
322/// - 从配置文件加载服务器(懒加载)
323/// - 运行时动态添加/移除服务器
324/// - 查询已连接服务器的状态
325/// - 全局单例管理
326pub struct McpToolRegistry {
327    /// 服务器占位符(按名称索引)
328    placeholders: HashMap<String, Arc<McpToolPlaceholder>>,
329}
330
331impl McpToolRegistry {
332    /// 从配置创建注册表
333    pub fn from_config(config: &McpConfig) -> Self {
334        let placeholders = config
335            .servers
336            .iter()
337            .filter(|(_, cfg)| cfg.enabled)
338            .map(|(name, cfg)| {
339                (
340                    name.clone(),
341                    Arc::new(McpToolPlaceholder::new(name.clone(), cfg.clone())),
342                )
343            })
344            .collect();
345
346        Self { placeholders }
347    }
348
349    /// 创建空注册表
350    pub fn new() -> Self {
351        Self {
352            placeholders: HashMap::new(),
353        }
354    }
355
356    /// 动态添加服务器(运行时)
357    pub fn add_server(&mut self, name: String, config: McpServerConfig) {
358        self.placeholders.insert(
359            name.clone(),
360            Arc::new(McpToolPlaceholder::new(name, config)),
361        );
362    }
363
364    /// 移除服务器(如果已启动则先关闭)
365    pub async fn remove_server(&mut self, name: &str) -> Result<()> {
366        if let Some(placeholder) = self.placeholders.remove(name) {
367            placeholder.shutdown().await?;
368        }
369        Ok(())
370    }
371
372    /// 获取服务器占位符
373    pub fn get_server(&self, name: &str) -> Option<Arc<McpToolPlaceholder>> {
374        self.placeholders.get(name).cloned()
375    }
376
377    /// 获取所有服务器名称
378    pub fn server_names(&self) -> Vec<&String> {
379        self.placeholders.keys().collect()
380    }
381
382    /// 获取已启动的服务器列表
383    pub async fn started_servers(&self) -> Vec<String> {
384        let mut started = Vec::new();
385        for (name, placeholder) in &self.placeholders {
386            if placeholder.is_started().await {
387                started.push(name.clone());
388            }
389        }
390        started
391    }
392
393    /// 获取服务器状态(用于 TUI 显示)
394    pub async fn server_status(&self) -> HashMap<String, ServerStatus> {
395        let mut status = HashMap::new();
396        for (name, placeholder) in &self.placeholders {
397            let is_started = placeholder.is_started().await;
398            status.insert(
399                name.clone(),
400                ServerStatus {
401                    name: name.clone(),
402                    is_started,
403                    tool_count: if is_started {
404                        placeholder
405                            .tools
406                            .read()
407                            .await
408                            .as_ref()
409                            .map(|t| t.len())
410                            .unwrap_or(0)
411                    } else {
412                        0
413                    },
414                },
415            );
416        }
417        status
418    }
419
420    /// 启动所有服务器
421    pub async fn start_all(&self) -> Result<HashMap<String, Vec<Arc<McpToolWrapper>>>> {
422        let mut results = HashMap::new();
423
424        for (name, placeholder) in &self.placeholders {
425            let tools = placeholder.start().await?;
426            results.insert(name.clone(), tools);
427        }
428
429        Ok(results)
430    }
431
432    /// 关闭所有服务器
433    pub async fn shutdown_all(&self) {
434        for placeholder in self.placeholders.values() {
435            if let Err(e) = placeholder.shutdown().await {
436                tracing::error!(
437                    "Failed to shutdown MCP server '{}': {}",
438                    placeholder.server_name(),
439                    e
440                );
441            }
442        }
443    }
444
445    /// 从 CLI 参数添加服务器
446    ///
447    /// # 格式
448    /// - `name:command args` → 自定义名称
449    /// - `command args` → 自动推断名称(取命令第一部分)
450    ///
451    /// # Example
452    /// ```ignore
453    /// registry.add_from_cli_arg("playwright:npx -y @playwright/mcp@latest")?;
454    /// registry.add_from_cli_arg("npx -y @modelcontextprotocol/server-filesystem")?;
455    /// ```
456    pub fn add_from_cli_arg(&mut self, arg: &str) -> Result<()> {
457        let (name, command, args) = parse_cli_mcp_arg(arg)?;
458
459        let config = McpServerConfig {
460            command: Some(command),
461            args,
462            url: None,
463            enabled: true,
464            ..Default::default()
465        };
466
467        self.add_server(name, config);
468        Ok(())
469    }
470
471    /// 从 CLI 参数批量添加服务器
472    pub fn add_from_cli_args(&mut self, args: &[String]) -> Result<()> {
473        for arg in args {
474            self.add_from_cli_arg(arg)?;
475        }
476        Ok(())
477    }
478}
479
480impl Default for McpToolRegistry {
481    fn default() -> Self {
482        Self::new()
483    }
484}
485
486/// 服务器状态(用于 UI 显示)
487#[derive(Debug, Clone)]
488pub struct ServerStatus {
489    pub name: String,
490    pub is_started: bool,
491    pub tool_count: usize,
492}
493
494/// 解析 CLI MCP 参数
495///
496/// # 格式
497/// - `name:command args` → (name, command, args)
498/// - `command args` → (command, command, args)
499///
500/// # Example
501/// ```
502/// parse_cli_mcp_arg("playwright:npx -y @playwright/mcp@latest")
503/// // → ("playwright", "npx", vec!["-y", "@playwright/mcp@latest"])
504///
505/// parse_cli_mcp_arg("npx -y @modelcontextprotocol/server-filesystem")
506/// // → ("npx", "npx", vec!["-y", "@modelcontextprotocol/server-filesystem"])
507/// ```
508fn parse_cli_mcp_arg(arg: &str) -> Result<(String, String, Vec<String>)> {
509    let arg = arg.trim();
510    if arg.is_empty() {
511        return Err(anyhow!("Empty MCP server argument"));
512    }
513
514    // 尝试解析 name:command args 格式
515    if let Some((name, rest)) = arg.split_once(':') {
516        let name = name.trim().to_string();
517        let rest = rest.trim();
518
519        // 解析命令和参数
520        let parts =
521            shell_words::split(rest).map_err(|e| anyhow!("Failed to parse command: {}", e))?;
522
523        if parts.is_empty() {
524            return Err(anyhow!("Missing command for MCP server '{}'", name));
525        }
526
527        let command = parts[0].clone();
528        let args = parts[1..].to_vec();
529
530        return Ok((name, command, args));
531    }
532
533    // 无名称前缀,使用命令作为名称
534    let parts = shell_words::split(arg).map_err(|e| anyhow!("Failed to parse command: {}", e))?;
535
536    if parts.is_empty() {
537        return Err(anyhow!("Empty MCP server argument"));
538    }
539
540    let command = parts[0].clone();
541    let name = command.clone(); // 使用命令名作为服务器名
542    let args = parts[1..].to_vec();
543
544    Ok((name, command, args))
545}