Skip to main content

mcp_stdio_proxy/model/
global.rs

1use axum::Router;
2use dashmap::DashMap;
3use log::{debug, error, info};
4use moka::future::Cache;
5use once_cell::sync::Lazy;
6use std::sync::Arc;
7use tokio::sync::{Mutex, OwnedMutexGuard};
8use tokio::time::{Duration, Instant};
9use tokio_util::sync::CancellationToken;
10use tracing::warn;
11
12use anyhow::Result;
13
14use crate::proxy::McpHandler;
15
16use super::{CheckMcpStatusResponseStatus, McpConfig, McpProtocol, McpRouterPath, McpType};
17
18// 全局单例路由表
19pub static GLOBAL_ROUTES: Lazy<Arc<DashMap<String, Router>>> =
20    Lazy::new(|| Arc::new(DashMap::new()));
21
22// 全局单例 ProxyHandlerManager
23pub static GLOBAL_PROXY_MANAGER: Lazy<ProxyHandlerManager> =
24    Lazy::new(ProxyHandlerManager::default);
25
26/// 动态路由服务
27#[derive(Clone)]
28pub struct DynamicRouterService(pub McpProtocol);
29
30impl DynamicRouterService {
31    // 注册动态 handler
32    pub fn register_route(path: &str, handler: Router) {
33        debug!("=== 注册路由 ===");
34        debug!("注册路径: {}", path);
35        GLOBAL_ROUTES.insert(path.to_string(), handler);
36        debug!("=== 注册路由完成 ===");
37    }
38
39    // 删除动态 handler
40    pub fn delete_route(path: &str) {
41        debug!("=== 删除路由 ===");
42        debug!("删除路径: {}", path);
43        GLOBAL_ROUTES.remove(path);
44        debug!("=== 删除路由完成 ===");
45    }
46
47    // 获取动态 handler
48    pub fn get_route(path: &str) -> Option<Router> {
49        let result = GLOBAL_ROUTES.get(path).map(|entry| entry.value().clone());
50        if result.is_some() {
51            debug!("get_route('{}') = Some(Router)", path);
52        } else {
53            debug!("get_route('{}') = None", path);
54        }
55        result
56    }
57
58    // 获取所有已注册的路由(debug用)
59    pub fn get_all_routes() -> Vec<String> {
60        GLOBAL_ROUTES
61            .iter()
62            .map(|entry| entry.key().clone())
63            .collect()
64    }
65}
66
67impl std::fmt::Debug for DynamicRouterService {
68    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
69        let routes = GLOBAL_ROUTES
70            .iter()
71            .map(|entry| entry.key().clone())
72            .collect::<Vec<_>>();
73        write!(f, "DynamicRouterService {{ routes: {routes:?} }}")
74    }
75}
76
77// =============================================================================
78// RAII 进程管理设计
79// =============================================================================
80//
81// 设计目标:当 mcp_id 从 map 中移除时,自动释放对应的 MCP 进程资源
82//
83// 核心结构:
84// - McpProcessGuard: 进程生命周期守护器,实现 Drop trait 自动取消 CancellationToken
85// - McpService: 封装 McpHandler + McpProcessGuard + 服务状态,作为 map 的 value
86// - ProxyHandlerManager: 使用单一 DashMap<String, McpService> 管理所有服务
87//
88// 资源释放流程:
89// 1. 从 map 中 remove mcp_id
90// 2. McpService 被 drop
91// 3. McpProcessGuard::drop() 被调用
92// 4. CancellationToken 被 cancel
93// 5. 监听该 token 的 SseServer/子进程收到信号,自动退出
94// =============================================================================
95
96/// MCP 进程生命周期守护器
97///
98/// 实现 RAII 模式:当此结构体被 drop 时,自动取消 CancellationToken,
99/// 触发关联的 SseServer 和子进程退出。
100///
101/// # 使用场景
102///
103/// 1. 从 `ProxyHandlerManager` 移除 mcp_id 时,自动清理进程
104/// 2. 服务重启时,旧服务自动被清理
105/// 3. 系统关闭时,所有服务自动清理
106pub struct McpProcessGuard {
107    mcp_id: String,
108    cancellation_token: CancellationToken,
109}
110
111impl McpProcessGuard {
112    pub fn new(mcp_id: String, cancellation_token: CancellationToken) -> Self {
113        debug!("[RAII] 创建进程守护器: mcp_id={}", mcp_id);
114        Self {
115            mcp_id,
116            cancellation_token,
117        }
118    }
119
120    /// 克隆 CancellationToken(用于传递给异步任务)
121    pub fn clone_token(&self) -> CancellationToken {
122        self.cancellation_token.clone()
123    }
124}
125
126impl Drop for McpProcessGuard {
127    fn drop(&mut self) {
128        info!(
129            "[RAII] 进程守护器被 drop,取消 CancellationToken: mcp_id={}",
130            self.mcp_id
131        );
132        self.cancellation_token.cancel();
133    }
134}
135
136// McpProcessGuard 不实现 Clone,确保唯一所有权
137impl std::fmt::Debug for McpProcessGuard {
138    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
139        f.debug_struct("McpProcessGuard")
140            .field("mcp_id", &self.mcp_id)
141            .field("is_cancelled", &self.cancellation_token.is_cancelled())
142            .finish()
143    }
144}
145
146/// MCP 服务封装
147///
148/// 将 McpHandler、McpProcessGuard 和服务状态封装在一起,
149/// 作为 `ProxyHandlerManager` 中 DashMap 的 value。
150///
151/// # RAII 保证
152///
153/// 当 McpService 被 drop 时:
154/// 1. McpProcessGuard 被 drop,触发 CancellationToken 取消
155/// 2. 关联的子进程收到信号,自动退出
156pub struct McpService {
157    /// 进程守护器(RAII 核心)
158    process_guard: McpProcessGuard,
159    /// MCP 透明代理处理器(可选,启动中时为 None)
160    handler: Option<McpHandler>,
161    /// 服务状态信息
162    status: McpServiceStatusInfo,
163}
164
165/// MCP 服务状态信息(不包含 CancellationToken,由 McpProcessGuard 管理)
166#[derive(Debug, Clone)]
167pub struct McpServiceStatusInfo {
168    pub mcp_id: String,
169    pub mcp_type: McpType,
170    pub mcp_router_path: McpRouterPath,
171    pub check_mcp_status_response_status: CheckMcpStatusResponseStatus,
172    pub last_accessed: Instant,
173    pub mcp_config: Option<McpConfig>,
174    /// 连续健康检查失败次数(用于容错机制)
175    pub consecutive_probe_failures: u32,
176}
177
178impl McpServiceStatusInfo {
179    pub fn new(
180        mcp_id: String,
181        mcp_type: McpType,
182        mcp_router_path: McpRouterPath,
183        check_mcp_status_response_status: CheckMcpStatusResponseStatus,
184    ) -> Self {
185        Self {
186            mcp_id,
187            mcp_type,
188            mcp_router_path,
189            check_mcp_status_response_status,
190            last_accessed: Instant::now(),
191            mcp_config: None,
192            consecutive_probe_failures: 0,
193        }
194    }
195
196    pub fn update_last_accessed(&mut self) {
197        self.last_accessed = Instant::now();
198    }
199
200    /// 重置健康检查失败计数
201    pub fn reset_probe_failures(&mut self) {
202        self.consecutive_probe_failures = 0;
203    }
204
205    /// 增加健康检查失败计数,返回增加后的值
206    pub fn increment_probe_failures(&mut self) -> u32 {
207        self.consecutive_probe_failures += 1;
208        self.consecutive_probe_failures
209    }
210}
211
212impl McpService {
213    /// 创建新的 MCP 服务
214    ///
215    /// # 参数
216    /// - `mcp_id`: 服务唯一标识
217    /// - `mcp_type`: 服务类型
218    /// - `mcp_router_path`: 路由路径
219    /// - `cancellation_token`: 用于控制进程生命周期的取消令牌
220    pub fn new(
221        mcp_id: String,
222        mcp_type: McpType,
223        mcp_router_path: McpRouterPath,
224        cancellation_token: CancellationToken,
225    ) -> Self {
226        let process_guard = McpProcessGuard::new(mcp_id.clone(), cancellation_token);
227        let status = McpServiceStatusInfo::new(
228            mcp_id,
229            mcp_type,
230            mcp_router_path,
231            CheckMcpStatusResponseStatus::Pending,
232        );
233        Self {
234            process_guard,
235            handler: None,
236            status,
237        }
238    }
239
240    /// 设置 MCP Handler
241    pub fn set_handler(&mut self, handler: McpHandler) {
242        self.handler = Some(handler);
243    }
244
245    /// 获取 MCP Handler
246    pub fn handler(&self) -> Option<&McpHandler> {
247        self.handler.as_ref()
248    }
249
250    /// 获取服务状态
251    pub fn status(&self) -> &McpServiceStatusInfo {
252        &self.status
253    }
254
255    /// 获取可变服务状态
256    pub fn status_mut(&mut self) -> &mut McpServiceStatusInfo {
257        &mut self.status
258    }
259
260    /// 克隆 CancellationToken
261    pub fn clone_token(&self) -> CancellationToken {
262        self.process_guard.clone_token()
263    }
264
265    /// 更新服务状态
266    pub fn update_status(&mut self, status: CheckMcpStatusResponseStatus) {
267        self.status.check_mcp_status_response_status = status;
268    }
269
270    /// 更新最后访问时间
271    pub fn update_last_accessed(&mut self) {
272        self.status.update_last_accessed();
273    }
274
275    /// 设置 MCP 配置
276    pub fn set_mcp_config(&mut self, config: McpConfig) {
277        self.status.mcp_config = Some(config);
278    }
279}
280
281impl std::fmt::Debug for McpService {
282    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
283        f.debug_struct("McpService")
284            .field("process_guard", &self.process_guard)
285            .field("handler", &self.handler.is_some())
286            .field("status", &self.status)
287            .finish()
288    }
289}
290
291// =============================================================================
292// 兼容层:保留 McpServiceStatus 以兼容现有代码
293// =============================================================================
294
295/// MCP 服务状态(兼容层)
296///
297/// 保留此结构体以兼容现有代码,内部委托给 McpServiceStatusInfo
298#[derive(Debug, Clone)]
299pub struct McpServiceStatus {
300    pub mcp_id: String,
301    pub mcp_type: McpType,
302    pub mcp_router_path: McpRouterPath,
303    pub cancellation_token: CancellationToken,
304    pub check_mcp_status_response_status: CheckMcpStatusResponseStatus,
305    pub last_accessed: Instant,
306    pub mcp_config: Option<McpConfig>,
307    /// 连续健康检查失败次数
308    pub consecutive_probe_failures: u32,
309}
310
311impl McpServiceStatus {
312    pub fn new(
313        mcp_id: String,
314        mcp_type: McpType,
315        mcp_router_path: McpRouterPath,
316        cancellation_token: CancellationToken,
317        check_mcp_status_response_status: CheckMcpStatusResponseStatus,
318    ) -> Self {
319        Self {
320            mcp_id,
321            mcp_type,
322            mcp_router_path,
323            cancellation_token,
324            check_mcp_status_response_status,
325            last_accessed: Instant::now(),
326            mcp_config: None,
327            consecutive_probe_failures: 0,
328        }
329    }
330
331    pub fn with_mcp_config(mut self, mcp_config: McpConfig) -> Self {
332        self.mcp_config = Some(mcp_config);
333        self
334    }
335
336    pub fn update_last_accessed(&mut self) {
337        self.last_accessed = Instant::now();
338    }
339}
340
341// =============================================================================
342// ProxyHandlerManager:使用 RAII 模式管理 MCP 服务
343// =============================================================================
344
345/// MCP 代理管理器
346///
347/// 使用 RAII 模式管理 MCP 服务:
348/// - 从 map 中移除 mcp_id 时,自动释放对应的进程资源
349/// - 不需要显式调用 cleanup 方法(但仍提供显式清理接口)
350#[derive(Debug)]
351pub struct ProxyHandlerManager {
352    /// 使用单一 DashMap 管理所有 MCP 服务(RAII 核心)
353    services: DashMap<String, McpService>,
354}
355
356impl Default for ProxyHandlerManager {
357    fn default() -> Self {
358        ProxyHandlerManager {
359            services: DashMap::new(),
360        }
361    }
362}
363
364impl ProxyHandlerManager {
365    /// 添加 MCP 服务(RAII 模式)
366    ///
367    /// 使用新的 RAII 结构创建服务,当服务被移除时会自动清理资源
368    pub fn add_mcp_service(
369        &self,
370        mcp_id: String,
371        mcp_type: McpType,
372        mcp_router_path: McpRouterPath,
373        cancellation_token: CancellationToken,
374    ) {
375        let service = McpService::new(
376            mcp_id.clone(),
377            mcp_type,
378            mcp_router_path,
379            cancellation_token,
380        );
381
382        // RAII: 如果已存在同名服务,insert 会返回旧服务,旧服务被 drop 时自动清理
383        if let Some(old_service) = self.services.insert(mcp_id.clone(), service) {
384            info!(
385                "[RAII] 覆盖已存在的服务,旧服务将被自动清理: mcp_id={}",
386                mcp_id
387            );
388            drop(old_service);
389        }
390    }
391
392    /// 添加 MCP 服务状态(兼容旧 API)
393    ///
394    /// 保持与现有代码的兼容性,内部转换为新的 RAII 结构
395    ///
396    /// 注意:`last_accessed` 会被重置为当前时间(插入视为新访问)
397    pub fn add_mcp_service_status_and_proxy(
398        &self,
399        mcp_service_status: McpServiceStatus,
400        proxy_handler: Option<McpHandler>,
401    ) {
402        let mcp_id = mcp_service_status.mcp_id.clone();
403
404        // 创建 McpService 使用 RAII 模式
405        // 注意:last_accessed 会在 McpServiceStatusInfo::new() 中重置为 Instant::now()
406        let mut service = McpService::new(
407            mcp_id.clone(),
408            mcp_service_status.mcp_type,
409            mcp_service_status.mcp_router_path,
410            mcp_service_status.cancellation_token,
411        );
412
413        // 设置初始状态
414        service.status_mut().check_mcp_status_response_status =
415            mcp_service_status.check_mcp_status_response_status;
416
417        // 设置配置(如果有)
418        if let Some(config) = mcp_service_status.mcp_config {
419            service.set_mcp_config(config);
420        }
421
422        // 设置 handler(如果有)
423        if let Some(handler) = proxy_handler {
424            service.set_handler(handler);
425        }
426
427        // RAII: 如果已存在同名服务,insert 会返回旧服务,旧服务被 drop 时自动清理
428        if let Some(old_service) = self.services.insert(mcp_id.clone(), service) {
429            info!(
430                "[RAII] 覆盖已存在的服务,旧服务将被自动清理: mcp_id={}",
431                mcp_id
432            );
433            // old_service 在此作用域结束时 drop,触发 McpProcessGuard::drop()
434            drop(old_service);
435        }
436    }
437
438    /// 获取所有的 MCP 服务状态(兼容旧 API)
439    ///
440    /// 优化:先快速收集所有 keys,然后逐个获取详细信息
441    /// 避免 iter() 长时间锁住多个分片,让其他写操作有机会执行
442    pub fn get_all_mcp_service_status(&self) -> Vec<McpServiceStatus> {
443        // 第一步:快速收集所有 keys(只 clone String,锁持有时间短)
444        let keys: Vec<String> = self
445            .services
446            .iter()
447            .map(|entry| entry.key().clone())
448            .collect();
449
450        // 第二步:逐个获取详细信息(每次只锁一个分片)
451        keys.into_iter()
452            .filter_map(|mcp_id| self.get_mcp_service_status(&mcp_id))
453            .collect()
454    }
455
456    /// 获取 MCP 服务状态(兼容旧 API)
457    pub fn get_mcp_service_status(&self, mcp_id: &str) -> Option<McpServiceStatus> {
458        self.services.get(mcp_id).map(|entry| {
459            let service = entry.value();
460            let status = service.status();
461            McpServiceStatus {
462                mcp_id: status.mcp_id.clone(),
463                mcp_type: status.mcp_type.clone(),
464                mcp_router_path: status.mcp_router_path.clone(),
465                cancellation_token: service.clone_token(),
466                check_mcp_status_response_status: status.check_mcp_status_response_status.clone(),
467                last_accessed: status.last_accessed,
468                mcp_config: status.mcp_config.clone(),
469                consecutive_probe_failures: status.consecutive_probe_failures,
470            }
471        })
472    }
473
474    /// 更新最后访问时间
475    ///
476    /// 使用 entry API 确保原子性操作
477    pub fn update_last_accessed(&self, mcp_id: &str) {
478        self.services
479            .entry(mcp_id.to_string())
480            .and_modify(|service| service.update_last_accessed());
481    }
482
483    /// 修改 MCP 服务状态 (Ready/Pending/Error)
484    ///
485    /// 使用 entry API 确保原子性操作
486    pub fn update_mcp_service_status(&self, mcp_id: &str, status: CheckMcpStatusResponseStatus) {
487        self.services
488            .entry(mcp_id.to_string())
489            .and_modify(|service| service.update_status(status));
490    }
491
492    /// 获取 MCP Handler
493    pub fn get_proxy_handler(&self, mcp_id: &str) -> Option<McpHandler> {
494        self.services
495            .get(mcp_id)
496            .and_then(|entry| entry.value().handler().cloned())
497    }
498
499    /// 获取服务的 MCP 配置(用于自动重启)
500    pub fn get_mcp_config(&self, mcp_id: &str) -> Option<McpConfig> {
501        self.services
502            .get(mcp_id)
503            .and_then(|entry| entry.value().status().mcp_config.clone())
504    }
505
506    /// 添加 MCP Handler 到已存在的服务
507    ///
508    /// 使用 entry API 确保原子性操作
509    pub fn add_proxy_handler(&self, mcp_id: &str, proxy_handler: McpHandler) {
510        match self.services.entry(mcp_id.to_string()) {
511            dashmap::mapref::entry::Entry::Occupied(mut entry) => {
512                entry.get_mut().set_handler(proxy_handler);
513            }
514            dashmap::mapref::entry::Entry::Vacant(_) => {
515                warn!("[RAII] 尝试添加 handler 到不存在的服务: mcp_id={}", mcp_id);
516            }
517        }
518    }
519
520    /// 检查服务是否存在
521    pub fn contains_service(&self, mcp_id: &str) -> bool {
522        self.services.contains_key(mcp_id)
523    }
524
525    /// 获取服务数量
526    pub fn service_count(&self) -> usize {
527        self.services.len()
528    }
529
530    /// 注册 MCP 配置到缓存
531    pub async fn register_mcp_config(&self, mcp_id: &str, config: McpConfig) {
532        GLOBAL_MCP_CONFIG_CACHE
533            .insert(mcp_id.to_string(), config)
534            .await;
535        info!("MCP 配置已注册到缓存: {}", mcp_id);
536    }
537
538    /// 从缓存获取 MCP 配置
539    pub async fn get_mcp_config_from_cache(&self, mcp_id: &str) -> Option<McpConfig> {
540        if let Some(config) = GLOBAL_MCP_CONFIG_CACHE.get(mcp_id).await {
541            debug!("从缓存获取 MCP 配置: {}", mcp_id);
542            Some(config)
543        } else {
544            debug!("缓存中未找到 MCP 配置: {}", mcp_id);
545            None
546        }
547    }
548
549    /// 从缓存删除 MCP 配置
550    pub async fn unregister_mcp_config(&self, mcp_id: &str) {
551        GLOBAL_MCP_CONFIG_CACHE.invalidate(mcp_id).await;
552        info!("MCP 配置已从缓存删除: {}", mcp_id);
553    }
554
555    /// 清理资源 (RAII 模式简化版)
556    ///
557    /// 通过 RAII 模式,从 DashMap 中移除服务会自动:
558    /// 1. 触发 McpProcessGuard::drop()
559    /// 2. 取消 CancellationToken
560    /// 3. 关联的子进程收到信号退出
561    ///
562    /// 此方法额外清理路由和缓存
563    pub async fn cleanup_resources(&self, mcp_id: &str) -> Result<()> {
564        info!("[RAII] 开始清理资源: mcp_id={}", mcp_id);
565
566        // 创建路径以构建要删除的路由路径
567        let mcp_sse_router_path = McpRouterPath::new(mcp_id.to_string(), McpProtocol::Sse)
568            .map_err(|e| {
569                anyhow::anyhow!("Failed to create SSE router path for {}: {}", mcp_id, e)
570            })?;
571        let base_sse_path = mcp_sse_router_path.base_path;
572
573        let mcp_stream_router_path = McpRouterPath::new(mcp_id.to_string(), McpProtocol::Stream)
574            .map_err(|e| {
575                anyhow::anyhow!("Failed to create Stream router path for {}: {}", mcp_id, e)
576            })?;
577        let base_stream_path = mcp_stream_router_path.base_path;
578
579        // 移除相关路由
580        DynamicRouterService::delete_route(&base_sse_path);
581        DynamicRouterService::delete_route(&base_stream_path);
582
583        // RAII 核心:从 DashMap 移除会触发 McpProcessGuard::drop()
584        // 这会自动取消 CancellationToken,进而触发子进程退出
585        if self.services.remove(mcp_id).is_some() {
586            info!(
587                "[RAII] 服务已从 map 移除,McpProcessGuard 将自动取消令牌: mcp_id={}",
588                mcp_id
589            );
590        } else {
591            debug!("[RAII] 服务不存在,跳过移除: mcp_id={}", mcp_id);
592        }
593
594        // 清理配置缓存
595        self.unregister_mcp_config(mcp_id).await;
596
597        // 清理健康状态缓存
598        GLOBAL_RESTART_TRACKER.clear_health_status(mcp_id);
599
600        info!("[RAII] MCP 服务资源清理完成: mcp_id={}", mcp_id);
601        Ok(())
602    }
603
604    /// 系统关闭,清理所有资源
605    ///
606    /// RAII 模式下,清除 DashMap 会自动释放所有资源
607    pub async fn cleanup_all_resources(&self) -> Result<()> {
608        info!("[RAII] 开始清理所有 MCP 服务资源");
609
610        // 收集所有 mcp_id
611        let mcp_ids: Vec<String> = self
612            .services
613            .iter()
614            .map(|entry| entry.key().clone())
615            .collect();
616
617        let count = mcp_ids.len();
618
619        // 逐个清理(包括路由和缓存)
620        for mcp_id in mcp_ids {
621            if let Err(e) = self.cleanup_resources(&mcp_id).await {
622                error!("[RAII] 清理资源失败: mcp_id={}, error={}", mcp_id, e);
623                // 继续清理其他资源
624            }
625        }
626
627        info!("[RAII] 所有 MCP 服务资源清理完成,共清理 {} 个服务", count);
628        Ok(())
629    }
630
631    /// 仅移除服务(依赖 RAII 自动清理进程)
632    ///
633    /// 从 DashMap 中移除服务,触发 RAII 自动清理。
634    /// 不会清理路由和缓存,适用于需要快速移除服务的场景。
635    pub fn remove_service(&self, mcp_id: &str) -> bool {
636        if self.services.remove(mcp_id).is_some() {
637            info!("[RAII] 服务已移除,进程将自动清理: mcp_id={}", mcp_id);
638            true
639        } else {
640            debug!("[RAII] 服务不存在: mcp_id={}", mcp_id);
641            false
642        }
643    }
644
645    /// 重置健康检查失败计数
646    ///
647    /// 使用 get_mut 避免为不存在的服务创建空 entry
648    pub fn reset_probe_failures(&self, mcp_id: &str) {
649        if let Some(mut entry) = self.services.get_mut(mcp_id) {
650            entry.value_mut().status_mut().reset_probe_failures();
651        }
652    }
653
654    /// 增加健康检查失败计数,返回增加后的值
655    ///
656    /// 使用 entry API 确保原子性操作
657    pub fn increment_probe_failures(&self, mcp_id: &str) -> u32 {
658        self.services
659            .get_mut(mcp_id)
660            .map(|mut entry| entry.value_mut().status_mut().increment_probe_failures())
661            .unwrap_or(0)
662    }
663
664    /// 清理资源用于重启(保留配置缓存)
665    ///
666    /// 与 cleanup_resources 不同,此方法不会清理配置缓存,
667    /// 允许后续使用缓存的配置重新启动服务。
668    pub async fn cleanup_resources_for_restart(&self, mcp_id: &str) -> Result<()> {
669        info!("[RAII] 开始清理资源用于重启: mcp_id={}", mcp_id);
670
671        // 创建路径以构建要删除的路由路径
672        let mcp_sse_router_path = McpRouterPath::new(mcp_id.to_string(), McpProtocol::Sse)
673            .map_err(|e| {
674                anyhow::anyhow!("Failed to create SSE router path for {}: {}", mcp_id, e)
675            })?;
676        let base_sse_path = mcp_sse_router_path.base_path;
677
678        let mcp_stream_router_path = McpRouterPath::new(mcp_id.to_string(), McpProtocol::Stream)
679            .map_err(|e| {
680                anyhow::anyhow!("Failed to create Stream router path for {}: {}", mcp_id, e)
681            })?;
682        let base_stream_path = mcp_stream_router_path.base_path;
683
684        // 移除相关路由
685        DynamicRouterService::delete_route(&base_sse_path);
686        DynamicRouterService::delete_route(&base_stream_path);
687
688        // RAII 核心:从 DashMap 移除会触发 McpProcessGuard::drop()
689        if self.services.remove(mcp_id).is_some() {
690            info!(
691                "[RAII] 服务已从 map 移除(用于重启),McpProcessGuard 将自动取消令牌: mcp_id={}",
692                mcp_id
693            );
694        } else {
695            debug!("[RAII] 服务不存在,跳过移除: mcp_id={}", mcp_id);
696        }
697
698        // 注意:不清理配置缓存,保留用于重启
699        // self.unregister_mcp_config(mcp_id).await; // 不调用
700
701        // 清理健康状态缓存
702        GLOBAL_RESTART_TRACKER.clear_health_status(mcp_id);
703
704        info!("[RAII] MCP 服务资源清理完成(用于重启): mcp_id={}", mcp_id);
705        Ok(())
706    }
707}
708
709/// MCP 配置缓存(使用 moka 实现 TTL)
710///
711/// ## 存储架构说明
712///
713/// MCP 配置存储在两个位置:
714///
715/// 1. **McpServiceStatus.mcp_config**(服务状态中)
716///    - 存储当前运行服务的配置
717///    - 随服务清理而被删除
718///    - 用于快速访问当前服务的配置
719///
720/// 2. **GLOBAL_MCP_CONFIG_CACHE**(全局缓存)
721///    - 独立于服务状态存储
722///    - 有 TTL(24 小时)
723///    - 用于服务重启时恢复配置
724///
725/// ## 为什么需要两处存储?
726///
727/// - 服务清理后,McpServiceStatus 被删除,但配置仍在缓存中
728/// - 下次请求到来时,可以从缓存恢复配置并重启服务
729/// - 实现了服务的自动重启能力
730///
731/// ## 优先级
732///
733/// 1. 请求 header 中的配置(最新)
734/// 2. 缓存中的配置(兜底)
735///
736/// ## TTL
737///
738/// - 24 小时(可配置)
739/// - max_capacity: 1000(防止内存溢出)
740pub struct McpConfigCache {
741    cache: Cache<String, McpConfig>,
742}
743
744impl McpConfigCache {
745    pub fn new() -> Self {
746        Self {
747            cache: Cache::builder()
748                .time_to_live(Duration::from_secs(24 * 60 * 60)) // 24 小时 TTL
749                .max_capacity(1000) // 最多缓存 1000 个配置,防止内存溢出
750                .build(),
751        }
752    }
753
754    pub async fn insert(&self, mcp_id: String, config: McpConfig) {
755        self.cache.insert(mcp_id.clone(), config).await;
756        info!("MCP 配置已缓存: {} (TTL: 24h)", mcp_id);
757    }
758
759    pub async fn get(&self, mcp_id: &str) -> Option<McpConfig> {
760        self.cache.get(mcp_id).await
761    }
762
763    pub async fn invalidate(&self, mcp_id: &str) {
764        self.cache.invalidate(mcp_id).await;
765    }
766
767    #[allow(dead_code)]
768    pub fn invalidate_all(&self) {
769        self.cache.invalidate_all();
770    }
771}
772
773impl Default for McpConfigCache {
774    fn default() -> Self {
775        Self::new()
776    }
777}
778
779// 全局配置缓存单例
780pub static GLOBAL_MCP_CONFIG_CACHE: Lazy<McpConfigCache> = Lazy::new(McpConfigCache::default);
781
782/// MCP 服务重启追踪器
783///
784/// 用于防止服务频繁重启导致的无限循环
785///
786/// ## 重启限制
787///
788/// - 最小重启间隔:30 秒
789/// - 如果服务在 30 秒内被标记为需要重启,将跳过重启
790/// - 这防止了服务启动失败时的无限重启循环
791///
792/// ## 健康状态缓存
793///
794/// - 缓存后端健康状态,避免频繁检查
795/// - 缓存时间:5 秒(可配置)
796/// - 用于减少 `is_mcp_server_ready()` 调用频率
797pub struct RestartTracker {
798    // mcp_id -> 最后重启时间
799    last_restart: DashMap<String, Instant>,
800    // mcp_id -> (健康状态, 检查时间)
801    health_status: DashMap<String, (bool, Instant)>,
802    // mcp_id -> 启动锁,防止并发启动同一服务
803    startup_locks: DashMap<String, Arc<Mutex<()>>>,
804}
805
806impl RestartTracker {
807    pub fn new() -> Self {
808        Self {
809            last_restart: DashMap::new(),
810            health_status: DashMap::new(),
811            startup_locks: DashMap::new(),
812        }
813    }
814
815    /// 获取缓存的健康状态
816    ///
817    /// 如果缓存未过期(5秒内),返回缓存值
818    /// 否则返回 None,表示需要重新检查
819    pub fn get_cached_health_status(&self, mcp_id: &str) -> Option<bool> {
820        let cache_duration = Duration::from_secs(5); // 5 秒缓存
821        let now = Instant::now();
822
823        self.health_status.get(mcp_id).and_then(|entry| {
824            let (is_healthy, check_time) = *entry.value();
825            if now.duration_since(check_time) < cache_duration {
826                Some(is_healthy)
827            } else {
828                None
829            }
830        })
831    }
832
833    /// 更新健康状态缓存
834    pub fn update_health_status(&self, mcp_id: &str, is_healthy: bool) {
835        self.health_status
836            .insert(mcp_id.to_string(), (is_healthy, Instant::now()));
837    }
838
839    /// 清除健康状态缓存
840    pub fn clear_health_status(&self, mcp_id: &str) {
841        self.health_status.remove(mcp_id);
842    }
843
844    /// 检查是否可以重启服务
845    ///
846    /// 返回 true 表示可以重启,false 表示在冷却期内
847    ///
848    /// 注意:此方法仅检查是否可以重启,不会自动插入时间戳。
849    /// 时间戳只在服务成功启动后通过 `record_restart()` 方法记录。
850    pub fn can_restart(&self, mcp_id: &str) -> bool {
851        let now = Instant::now();
852        let min_restart_interval = Duration::from_secs(30); // 30 秒最小重启间隔
853
854        // 只检查,不自动插入时间戳
855        if let Some(last_restart) = self.last_restart.get(mcp_id) {
856            let elapsed = now.duration_since(*last_restart);
857            if elapsed < min_restart_interval {
858                warn!(
859                    "服务 {} 在冷却期内,距离上次重启仅 {} 秒,跳过重启",
860                    mcp_id,
861                    elapsed.as_secs()
862                );
863                return false;
864            }
865        }
866        // 不在冷却期内,但不自动更新时间戳
867        true
868    }
869
870    /// 记录服务成功重启
871    ///
872    /// 此方法应在服务成功启动后调用,用于记录重启时间戳。
873    /// 配合 `can_restart()` 使用,避免在服务启动失败时插入时间戳。
874    pub fn record_restart(&self, mcp_id: &str) {
875        self.last_restart.insert(mcp_id.to_string(), Instant::now());
876        info!("服务启动成功,记录重启时间: {}", mcp_id);
877    }
878
879    /// 清除重启时间戳
880    ///
881    /// 当服务启动失败时,可选择调用此方法清除时间戳,
882    /// 允许立即重试而不必等待冷却期。
883    #[allow(dead_code)]
884    pub fn clear_restart(&self, mcp_id: &str) {
885        self.last_restart.remove(mcp_id);
886        info!("已清除服务 {} 的重启时间戳", mcp_id);
887    }
888
889    /// 尝试获取服务启动锁
890    ///
891    /// 返回 Some(OwnedMutexGuard) 表示获取成功,可以继续启动服务
892    /// 返回 None 表示服务正在启动中,应该跳过本次启动
893    ///
894    /// # 使用方式
895    ///
896    /// ```ignore
897    /// if let Some(_guard) = GLOBAL_RESTART_TRACKER.try_acquire_startup_lock(&mcp_id) {
898    ///     // 获取到锁,可以启动服务
899    ///     let result = start_service().await;
900    ///     // _guard 在作用域结束时自动释放
901    /// } else {
902    ///     // 未获取到锁,服务正在启动中
903    ///     return Ok(Response::503);
904    /// }
905    /// ```
906    pub fn try_acquire_startup_lock(&self, mcp_id: &str) -> Option<OwnedMutexGuard<()>> {
907        // 使用 entry API 确保原子性,避免竞态条件
908        let lock = self
909            .startup_locks
910            .entry(mcp_id.to_string())
911            .or_insert_with(|| Arc::new(Mutex::new(())))
912            .clone();
913
914        // 尝试获取 owned 锁,锁会一直保持到返回的 guard 被 drop
915        match lock.try_lock_owned() {
916            Ok(guard) => Some(guard),
917            Err(_) => {
918                // 锁被占用,服务正在启动中
919                debug!("服务 {} 正在启动中,跳过本次启动", mcp_id);
920                None
921            }
922        }
923    }
924
925    /// 清理服务启动锁
926    ///
927    /// 当服务启动完成或失败后,应该清理启动锁以允许后续重试
928    /// 注意:正常情况下锁会随 MutexGuard 自动释放,此方法用于异常清理
929    #[allow(dead_code)]
930    pub fn cleanup_startup_lock(&self, mcp_id: &str) {
931        self.startup_locks.remove(mcp_id);
932        debug!("已清理服务 {} 的启动锁", mcp_id);
933    }
934}
935
936impl Default for RestartTracker {
937    fn default() -> Self {
938        Self::new()
939    }
940}
941
942// 全局重启追踪器单例
943pub static GLOBAL_RESTART_TRACKER: Lazy<RestartTracker> = Lazy::new(RestartTracker::default);
944
945// 提供一个便捷的函数来获取全局 ProxyHandlerManager
946pub fn get_proxy_manager() -> &'static ProxyHandlerManager {
947    &GLOBAL_PROXY_MANAGER
948}