Skip to main content

mcp_stdio_proxy/model/
global.rs

1use axum::Router;
2use dashmap::DashMap;
3use log::{debug, error, info};
4use moka::future::Cache;
5use once_cell::sync::Lazy;
6use std::sync::Arc;
7use tokio::sync::{Mutex, OwnedMutexGuard};
8use tokio::time::{Duration, Instant};
9use tokio_util::sync::CancellationToken;
10use tracing::warn;
11
12use anyhow::Result;
13
14use crate::proxy::McpHandler;
15
16use super::{CheckMcpStatusResponseStatus, McpConfig, McpProtocol, McpRouterPath, McpType};
17
18// 全局单例路由表
19pub static GLOBAL_ROUTES: Lazy<Arc<DashMap<String, Router>>> =
20    Lazy::new(|| Arc::new(DashMap::new()));
21
22// 全局单例 ProxyHandlerManager
23pub static GLOBAL_PROXY_MANAGER: Lazy<ProxyHandlerManager> =
24    Lazy::new(ProxyHandlerManager::default);
25
26/// 动态路由服务
27#[derive(Clone)]
28pub struct DynamicRouterService(pub McpProtocol);
29
30impl DynamicRouterService {
31    // 注册动态 handler
32    pub fn register_route(path: &str, handler: Router) {
33        debug!("=== Register Route ===");
34        debug!("Registration path: {}", path);
35        GLOBAL_ROUTES.insert(path.to_string(), handler);
36        debug!("=== Route registration completed ===");
37    }
38
39    // 删除动态 handler
40    pub fn delete_route(path: &str) {
41        debug!("=== Delete route ===");
42        debug!("Delete path: {}", path);
43        GLOBAL_ROUTES.remove(path);
44        debug!("=== Route deletion completed ===");
45    }
46
47    // 获取动态 handler
48    pub fn get_route(path: &str) -> Option<Router> {
49        let result = GLOBAL_ROUTES.get(path).map(|entry| entry.value().clone());
50        if result.is_some() {
51            debug!("get_route('{}') = Some(Router)", path);
52        } else {
53            debug!("get_route('{}') = None", path);
54        }
55        result
56    }
57
58    // 获取所有已注册的路由(debug用)
59    pub fn get_all_routes() -> Vec<String> {
60        GLOBAL_ROUTES
61            .iter()
62            .map(|entry| entry.key().clone())
63            .collect()
64    }
65}
66
67impl std::fmt::Debug for DynamicRouterService {
68    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
69        let routes = GLOBAL_ROUTES
70            .iter()
71            .map(|entry| entry.key().clone())
72            .collect::<Vec<_>>();
73        write!(f, "DynamicRouterService {{ routes: {routes:?} }}")
74    }
75}
76
77// =============================================================================
78// RAII 进程管理设计
79// =============================================================================
80//
81// 设计目标:当 mcp_id 从 map 中移除时,自动释放对应的 MCP 进程资源
82//
83// 核心结构:
84// - McpProcessGuard: 进程生命周期守护器,实现 Drop trait 自动取消 CancellationToken
85// - McpService: 封装 McpHandler + McpProcessGuard + 服务状态,作为 map 的 value
86// - ProxyHandlerManager: 使用单一 DashMap<String, McpService> 管理所有服务
87//
88// 资源释放流程:
89// 1. 从 map 中 remove mcp_id
90// 2. McpService 被 drop
91// 3. McpProcessGuard::drop() 被调用
92// 4. CancellationToken 被 cancel
93// 5. 监听该 token 的 SseServer/子进程收到信号,自动退出
94// =============================================================================
95
96/// MCP 进程生命周期守护器
97///
98/// 实现 RAII 模式:当此结构体被 drop 时,自动取消 CancellationToken,
99/// 触发关联的 SseServer 和子进程退出。
100///
101/// # 使用场景
102///
103/// 1. 从 `ProxyHandlerManager` 移除 mcp_id 时,自动清理进程
104/// 2. 服务重启时,旧服务自动被清理
105/// 3. 系统关闭时,所有服务自动清理
106pub struct McpProcessGuard {
107    mcp_id: String,
108    cancellation_token: CancellationToken,
109}
110
111impl McpProcessGuard {
112    pub fn new(mcp_id: String, cancellation_token: CancellationToken) -> Self {
113        debug!("[RAII] Create process daemon: mcp_id={}", mcp_id);
114        Self {
115            mcp_id,
116            cancellation_token,
117        }
118    }
119
120    /// 克隆 CancellationToken(用于传递给异步任务)
121    pub fn clone_token(&self) -> CancellationToken {
122        self.cancellation_token.clone()
123    }
124}
125
126impl Drop for McpProcessGuard {
127    fn drop(&mut self) {
128        info!(
129            "[RAII] The process daemon was dropped and canceled CancellationToken: mcp_id={}",
130            self.mcp_id
131        );
132        self.cancellation_token.cancel();
133    }
134}
135
136// McpProcessGuard 不实现 Clone,确保唯一所有权
137impl std::fmt::Debug for McpProcessGuard {
138    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
139        f.debug_struct("McpProcessGuard")
140            .field("mcp_id", &self.mcp_id)
141            .field("is_cancelled", &self.cancellation_token.is_cancelled())
142            .finish()
143    }
144}
145
146/// MCP 服务封装
147///
148/// 将 McpHandler、McpProcessGuard 和服务状态封装在一起,
149/// 作为 `ProxyHandlerManager` 中 DashMap 的 value。
150///
151/// # RAII 保证
152///
153/// 当 McpService 被 drop 时:
154/// 1. McpProcessGuard 被 drop,触发 CancellationToken 取消
155/// 2. 关联的子进程收到信号,自动退出
156pub struct McpService {
157    /// 进程守护器(RAII 核心)
158    process_guard: McpProcessGuard,
159    /// MCP 透明代理处理器(可选,启动中时为 None)
160    handler: Option<McpHandler>,
161    /// 服务状态信息
162    status: McpServiceStatusInfo,
163}
164
165/// MCP 服务状态信息(不包含 CancellationToken,由 McpProcessGuard 管理)
166#[derive(Debug, Clone)]
167pub struct McpServiceStatusInfo {
168    pub mcp_id: String,
169    pub mcp_type: McpType,
170    pub mcp_router_path: McpRouterPath,
171    pub check_mcp_status_response_status: CheckMcpStatusResponseStatus,
172    pub last_accessed: Instant,
173    pub mcp_config: Option<McpConfig>,
174    /// 连续健康检查失败次数(用于容错机制)
175    pub consecutive_probe_failures: u32,
176}
177
178impl McpServiceStatusInfo {
179    pub fn new(
180        mcp_id: String,
181        mcp_type: McpType,
182        mcp_router_path: McpRouterPath,
183        check_mcp_status_response_status: CheckMcpStatusResponseStatus,
184    ) -> Self {
185        Self {
186            mcp_id,
187            mcp_type,
188            mcp_router_path,
189            check_mcp_status_response_status,
190            last_accessed: Instant::now(),
191            mcp_config: None,
192            consecutive_probe_failures: 0,
193        }
194    }
195
196    pub fn update_last_accessed(&mut self) {
197        self.last_accessed = Instant::now();
198    }
199
200    /// 重置健康检查失败计数
201    pub fn reset_probe_failures(&mut self) {
202        self.consecutive_probe_failures = 0;
203    }
204
205    /// 增加健康检查失败计数,返回增加后的值
206    pub fn increment_probe_failures(&mut self) -> u32 {
207        self.consecutive_probe_failures += 1;
208        self.consecutive_probe_failures
209    }
210}
211
212impl McpService {
213    /// 创建新的 MCP 服务
214    ///
215    /// # 参数
216    /// - `mcp_id`: 服务唯一标识
217    /// - `mcp_type`: 服务类型
218    /// - `mcp_router_path`: 路由路径
219    /// - `cancellation_token`: 用于控制进程生命周期的取消令牌
220    pub fn new(
221        mcp_id: String,
222        mcp_type: McpType,
223        mcp_router_path: McpRouterPath,
224        cancellation_token: CancellationToken,
225    ) -> Self {
226        let process_guard = McpProcessGuard::new(mcp_id.clone(), cancellation_token);
227        let status = McpServiceStatusInfo::new(
228            mcp_id,
229            mcp_type,
230            mcp_router_path,
231            CheckMcpStatusResponseStatus::Pending,
232        );
233        Self {
234            process_guard,
235            handler: None,
236            status,
237        }
238    }
239
240    /// 设置 MCP Handler
241    pub fn set_handler(&mut self, handler: McpHandler) {
242        self.handler = Some(handler);
243    }
244
245    /// 获取 MCP Handler
246    pub fn handler(&self) -> Option<&McpHandler> {
247        self.handler.as_ref()
248    }
249
250    /// 获取服务状态
251    pub fn status(&self) -> &McpServiceStatusInfo {
252        &self.status
253    }
254
255    /// 获取可变服务状态
256    pub fn status_mut(&mut self) -> &mut McpServiceStatusInfo {
257        &mut self.status
258    }
259
260    /// 克隆 CancellationToken
261    pub fn clone_token(&self) -> CancellationToken {
262        self.process_guard.clone_token()
263    }
264
265    /// 更新服务状态
266    pub fn update_status(&mut self, status: CheckMcpStatusResponseStatus) {
267        self.status.check_mcp_status_response_status = status;
268    }
269
270    /// 更新最后访问时间
271    pub fn update_last_accessed(&mut self) {
272        self.status.update_last_accessed();
273    }
274
275    /// 设置 MCP 配置
276    pub fn set_mcp_config(&mut self, config: McpConfig) {
277        self.status.mcp_config = Some(config);
278    }
279}
280
281impl std::fmt::Debug for McpService {
282    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
283        f.debug_struct("McpService")
284            .field("process_guard", &self.process_guard)
285            .field("handler", &self.handler.is_some())
286            .field("status", &self.status)
287            .finish()
288    }
289}
290
291// =============================================================================
292// 兼容层:保留 McpServiceStatus 以兼容现有代码
293// =============================================================================
294
295/// MCP 服务状态(兼容层)
296///
297/// 保留此结构体以兼容现有代码,内部委托给 McpServiceStatusInfo
298#[derive(Debug, Clone)]
299pub struct McpServiceStatus {
300    pub mcp_id: String,
301    pub mcp_type: McpType,
302    pub mcp_router_path: McpRouterPath,
303    pub cancellation_token: CancellationToken,
304    pub check_mcp_status_response_status: CheckMcpStatusResponseStatus,
305    pub last_accessed: Instant,
306    pub mcp_config: Option<McpConfig>,
307    /// 连续健康检查失败次数
308    pub consecutive_probe_failures: u32,
309}
310
311impl McpServiceStatus {
312    pub fn new(
313        mcp_id: String,
314        mcp_type: McpType,
315        mcp_router_path: McpRouterPath,
316        cancellation_token: CancellationToken,
317        check_mcp_status_response_status: CheckMcpStatusResponseStatus,
318    ) -> Self {
319        Self {
320            mcp_id,
321            mcp_type,
322            mcp_router_path,
323            cancellation_token,
324            check_mcp_status_response_status,
325            last_accessed: Instant::now(),
326            mcp_config: None,
327            consecutive_probe_failures: 0,
328        }
329    }
330
331    pub fn with_mcp_config(mut self, mcp_config: McpConfig) -> Self {
332        self.mcp_config = Some(mcp_config);
333        self
334    }
335
336    pub fn update_last_accessed(&mut self) {
337        self.last_accessed = Instant::now();
338    }
339}
340
341// =============================================================================
342// ProxyHandlerManager:使用 RAII 模式管理 MCP 服务
343// =============================================================================
344
345/// MCP 代理管理器
346///
347/// 使用 RAII 模式管理 MCP 服务:
348/// - 从 map 中移除 mcp_id 时,自动释放对应的进程资源
349/// - 不需要显式调用 cleanup 方法(但仍提供显式清理接口)
350#[derive(Debug)]
351pub struct ProxyHandlerManager {
352    /// 使用单一 DashMap 管理所有 MCP 服务(RAII 核心)
353    services: DashMap<String, McpService>,
354}
355
356impl Default for ProxyHandlerManager {
357    fn default() -> Self {
358        ProxyHandlerManager {
359            services: DashMap::new(),
360        }
361    }
362}
363
364impl ProxyHandlerManager {
365    /// 添加 MCP 服务(RAII 模式)
366    ///
367    /// 使用新的 RAII 结构创建服务,当服务被移除时会自动清理资源
368    pub fn add_mcp_service(
369        &self,
370        mcp_id: String,
371        mcp_type: McpType,
372        mcp_router_path: McpRouterPath,
373        cancellation_token: CancellationToken,
374    ) {
375        let service = McpService::new(
376            mcp_id.clone(),
377            mcp_type,
378            mcp_router_path,
379            cancellation_token,
380        );
381
382        // RAII: 如果已存在同名服务,insert 会返回旧服务,旧服务被 drop 时自动清理
383        if let Some(old_service) = self.services.insert(mcp_id.clone(), service) {
384            info!(
385                "[RAII] Overwrite existing services, old services will be automatically cleaned up: mcp_id={}",
386                mcp_id
387            );
388            drop(old_service);
389        }
390    }
391
392    /// 添加 MCP 服务状态(兼容旧 API)
393    ///
394    /// 保持与现有代码的兼容性,内部转换为新的 RAII 结构
395    ///
396    /// 注意:`last_accessed` 会被重置为当前时间(插入视为新访问)
397    pub fn add_mcp_service_status_and_proxy(
398        &self,
399        mcp_service_status: McpServiceStatus,
400        proxy_handler: Option<McpHandler>,
401    ) {
402        let mcp_id = mcp_service_status.mcp_id.clone();
403
404        // 创建 McpService 使用 RAII 模式
405        // 注意:last_accessed 会在 McpServiceStatusInfo::new() 中重置为 Instant::now()
406        let mut service = McpService::new(
407            mcp_id.clone(),
408            mcp_service_status.mcp_type,
409            mcp_service_status.mcp_router_path,
410            mcp_service_status.cancellation_token,
411        );
412
413        // 设置初始状态
414        service.status_mut().check_mcp_status_response_status =
415            mcp_service_status.check_mcp_status_response_status;
416
417        // 设置配置(如果有)
418        if let Some(config) = mcp_service_status.mcp_config {
419            service.set_mcp_config(config);
420        }
421
422        // 设置 handler(如果有)
423        if let Some(handler) = proxy_handler {
424            service.set_handler(handler);
425        }
426
427        // RAII: 如果已存在同名服务,insert 会返回旧服务,旧服务被 drop 时自动清理
428        if let Some(old_service) = self.services.insert(mcp_id.clone(), service) {
429            info!(
430                "[RAII] Overwrite existing services, old services will be automatically cleaned up: mcp_id={}",
431                mcp_id
432            );
433            // old_service 在此作用域结束时 drop,触发 McpProcessGuard::drop()
434            drop(old_service);
435        }
436    }
437
438    /// 获取所有的 MCP 服务状态(兼容旧 API)
439    ///
440    /// 优化:先快速收集所有 keys,然后逐个获取详细信息
441    /// 避免 iter() 长时间锁住多个分片,让其他写操作有机会执行
442    pub fn get_all_mcp_service_status(&self) -> Vec<McpServiceStatus> {
443        // 第一步:快速收集所有 keys(只 clone String,锁持有时间短)
444        let keys: Vec<String> = self
445            .services
446            .iter()
447            .map(|entry| entry.key().clone())
448            .collect();
449
450        // 第二步:逐个获取详细信息(每次只锁一个分片)
451        keys.into_iter()
452            .filter_map(|mcp_id| self.get_mcp_service_status(&mcp_id))
453            .collect()
454    }
455
456    /// 获取 MCP 服务状态(兼容旧 API)
457    pub fn get_mcp_service_status(&self, mcp_id: &str) -> Option<McpServiceStatus> {
458        self.services.get(mcp_id).map(|entry| {
459            let service = entry.value();
460            let status = service.status();
461            McpServiceStatus {
462                mcp_id: status.mcp_id.clone(),
463                mcp_type: status.mcp_type.clone(),
464                mcp_router_path: status.mcp_router_path.clone(),
465                cancellation_token: service.clone_token(),
466                check_mcp_status_response_status: status.check_mcp_status_response_status.clone(),
467                last_accessed: status.last_accessed,
468                mcp_config: status.mcp_config.clone(),
469                consecutive_probe_failures: status.consecutive_probe_failures,
470            }
471        })
472    }
473
474    /// 更新最后访问时间
475    ///
476    /// 使用 entry API 确保原子性操作
477    pub fn update_last_accessed(&self, mcp_id: &str) {
478        self.services
479            .entry(mcp_id.to_string())
480            .and_modify(|service| service.update_last_accessed());
481    }
482
483    /// 修改 MCP 服务状态 (Ready/Pending/Error)
484    ///
485    /// 使用 entry API 确保原子性操作
486    pub fn update_mcp_service_status(&self, mcp_id: &str, status: CheckMcpStatusResponseStatus) {
487        self.services
488            .entry(mcp_id.to_string())
489            .and_modify(|service| service.update_status(status));
490    }
491
492    /// 获取 MCP Handler
493    pub fn get_proxy_handler(&self, mcp_id: &str) -> Option<McpHandler> {
494        self.services
495            .get(mcp_id)
496            .and_then(|entry| entry.value().handler().cloned())
497    }
498
499    /// 获取服务的 MCP 配置(用于自动重启)
500    pub fn get_mcp_config(&self, mcp_id: &str) -> Option<McpConfig> {
501        self.services
502            .get(mcp_id)
503            .and_then(|entry| entry.value().status().mcp_config.clone())
504    }
505
506    /// 添加 MCP Handler 到已存在的服务
507    ///
508    /// 使用 entry API 确保原子性操作
509    pub fn add_proxy_handler(&self, mcp_id: &str, proxy_handler: McpHandler) {
510        match self.services.entry(mcp_id.to_string()) {
511            dashmap::mapref::entry::Entry::Occupied(mut entry) => {
512                entry.get_mut().set_handler(proxy_handler);
513            }
514            dashmap::mapref::entry::Entry::Vacant(_) => {
515                warn!(
516                    "[RAII] Trying to add handler to non-existent service: mcp_id={}",
517                    mcp_id
518                );
519            }
520        }
521    }
522
523    /// 检查服务是否存在
524    pub fn contains_service(&self, mcp_id: &str) -> bool {
525        self.services.contains_key(mcp_id)
526    }
527
528    /// 获取服务数量
529    pub fn service_count(&self) -> usize {
530        self.services.len()
531    }
532
533    /// 注册 MCP 配置到缓存
534    pub async fn register_mcp_config(&self, mcp_id: &str, config: McpConfig) {
535        GLOBAL_MCP_CONFIG_CACHE
536            .insert(mcp_id.to_string(), config)
537            .await;
538        info!("MCP configuration registered in cache: {}", mcp_id);
539    }
540
541    /// 从缓存获取 MCP 配置
542    pub async fn get_mcp_config_from_cache(&self, mcp_id: &str) -> Option<McpConfig> {
543        if let Some(config) = GLOBAL_MCP_CONFIG_CACHE.get(mcp_id).await {
544            debug!("Get MCP configuration from cache: {}", mcp_id);
545            Some(config)
546        } else {
547            debug!("MCP configuration not found in cache: {}", mcp_id);
548            None
549        }
550    }
551
552    /// 从缓存删除 MCP 配置
553    pub async fn unregister_mcp_config(&self, mcp_id: &str) {
554        GLOBAL_MCP_CONFIG_CACHE.invalidate(mcp_id).await;
555        info!("MCP configuration removed from cache: {}", mcp_id);
556    }
557
558    /// 清理资源 (RAII 模式简化版)
559    ///
560    /// 通过 RAII 模式,从 DashMap 中移除服务会自动:
561    /// 1. 触发 McpProcessGuard::drop()
562    /// 2. 取消 CancellationToken
563    /// 3. 关联的子进程收到信号退出
564    ///
565    /// 此方法额外清理路由和缓存
566    pub async fn cleanup_resources(&self, mcp_id: &str) -> Result<()> {
567        info!("[RAII] Start cleaning up resources: mcp_id={}", mcp_id);
568
569        // 创建路径以构建要删除的路由路径
570        let mcp_sse_router_path = McpRouterPath::new(mcp_id.to_string(), McpProtocol::Sse)
571            .map_err(|e| {
572                anyhow::anyhow!("Failed to create SSE router path for {}: {}", mcp_id, e)
573            })?;
574        let base_sse_path = mcp_sse_router_path.base_path;
575
576        let mcp_stream_router_path = McpRouterPath::new(mcp_id.to_string(), McpProtocol::Stream)
577            .map_err(|e| {
578                anyhow::anyhow!("Failed to create Stream router path for {}: {}", mcp_id, e)
579            })?;
580        let base_stream_path = mcp_stream_router_path.base_path;
581
582        // 移除相关路由
583        DynamicRouterService::delete_route(&base_sse_path);
584        DynamicRouterService::delete_route(&base_stream_path);
585
586        // RAII 核心:从 DashMap 移除会触发 McpProcessGuard::drop()
587        // 这会自动取消 CancellationToken,进而触发子进程退出
588        if self.services.remove(mcp_id).is_some() {
589            info!(
590                "[RAII] The service has been removed from the map, McpProcessGuard will automatically cancel the token: mcp_id={}",
591                mcp_id
592            );
593        } else {
594            debug!(
595                "[RAII] Service does not exist, skip removal: mcp_id={}",
596                mcp_id
597            );
598        }
599
600        // 清理配置缓存
601        self.unregister_mcp_config(mcp_id).await;
602
603        // 清理健康状态缓存
604        GLOBAL_RESTART_TRACKER.clear_health_status(mcp_id);
605
606        info!(
607            "[RAII] MCP service resource cleanup completed: mcp_id={}",
608            mcp_id
609        );
610        Ok(())
611    }
612
613    /// 系统关闭,清理所有资源
614    ///
615    /// RAII 模式下,清除 DashMap 会自动释放所有资源
616    pub async fn cleanup_all_resources(&self) -> Result<()> {
617        info!("[RAII] Start cleaning up all MCP service resources");
618
619        // 收集所有 mcp_id
620        let mcp_ids: Vec<String> = self
621            .services
622            .iter()
623            .map(|entry| entry.key().clone())
624            .collect();
625
626        let count = mcp_ids.len();
627
628        // 逐个清理(包括路由和缓存)
629        for mcp_id in mcp_ids {
630            if let Err(e) = self.cleanup_resources(&mcp_id).await {
631                error!(
632                    "[RAII] Failed to clean up resources: mcp_id={}, error={}",
633                    mcp_id, e
634                );
635                // 继续清理其他资源
636            }
637        }
638
639        info!(
640            "[RAII] All MCP service resources have been cleaned up, and a total of {} services have been cleaned up.",
641            count
642        );
643        Ok(())
644    }
645
646    /// 仅移除服务(依赖 RAII 自动清理进程)
647    ///
648    /// 从 DashMap 中移除服务,触发 RAII 自动清理。
649    /// 不会清理路由和缓存,适用于需要快速移除服务的场景。
650    pub fn remove_service(&self, mcp_id: &str) -> bool {
651        if self.services.remove(mcp_id).is_some() {
652            info!(
653                "[RAII] The service has been removed and the process will be automatically cleaned up: mcp_id={}",
654                mcp_id
655            );
656            true
657        } else {
658            debug!("[RAII] Service does not exist: mcp_id={}", mcp_id);
659            false
660        }
661    }
662
663    /// 重置健康检查失败计数
664    ///
665    /// 使用 get_mut 避免为不存在的服务创建空 entry
666    pub fn reset_probe_failures(&self, mcp_id: &str) {
667        if let Some(mut entry) = self.services.get_mut(mcp_id) {
668            entry.value_mut().status_mut().reset_probe_failures();
669        }
670    }
671
672    /// 增加健康检查失败计数,返回增加后的值
673    ///
674    /// 使用 entry API 确保原子性操作
675    pub fn increment_probe_failures(&self, mcp_id: &str) -> u32 {
676        self.services
677            .get_mut(mcp_id)
678            .map(|mut entry| entry.value_mut().status_mut().increment_probe_failures())
679            .unwrap_or(0)
680    }
681
682    /// 清理资源用于重启(保留配置缓存)
683    ///
684    /// 与 cleanup_resources 不同,此方法不会清理配置缓存,
685    /// 允许后续使用缓存的配置重新启动服务。
686    pub async fn cleanup_resources_for_restart(&self, mcp_id: &str) -> Result<()> {
687        info!(
688            "[RAII] Start cleaning up resources for restart: mcp_id={}",
689            mcp_id
690        );
691
692        // 创建路径以构建要删除的路由路径
693        let mcp_sse_router_path = McpRouterPath::new(mcp_id.to_string(), McpProtocol::Sse)
694            .map_err(|e| {
695                anyhow::anyhow!("Failed to create SSE router path for {}: {}", mcp_id, e)
696            })?;
697        let base_sse_path = mcp_sse_router_path.base_path;
698
699        let mcp_stream_router_path = McpRouterPath::new(mcp_id.to_string(), McpProtocol::Stream)
700            .map_err(|e| {
701                anyhow::anyhow!("Failed to create Stream router path for {}: {}", mcp_id, e)
702            })?;
703        let base_stream_path = mcp_stream_router_path.base_path;
704
705        // 移除相关路由
706        DynamicRouterService::delete_route(&base_sse_path);
707        DynamicRouterService::delete_route(&base_stream_path);
708
709        // RAII 核心:从 DashMap 移除会触发 McpProcessGuard::drop()
710        if self.services.remove(mcp_id).is_some() {
711            info!(
712                "[RAII] The service has been removed from the map (for restart), McpProcessGuard will automatically cancel the token: mcp_id={}",
713                mcp_id
714            );
715        } else {
716            debug!(
717                "[RAII] Service does not exist, skip removal: mcp_id={}",
718                mcp_id
719            );
720        }
721
722        // 注意:不清理配置缓存,保留用于重启
723        // self.unregister_mcp_config(mcp_id).await; // 不调用
724
725        // 清理健康状态缓存
726        GLOBAL_RESTART_TRACKER.clear_health_status(mcp_id);
727
728        info!(
729            "[RAII] MCP service resource cleanup completed (for restart): mcp_id={}",
730            mcp_id
731        );
732        Ok(())
733    }
734}
735
736/// MCP 配置缓存(使用 moka 实现 TTL)
737///
738/// ## 存储架构说明
739///
740/// MCP 配置存储在两个位置:
741///
742/// 1. **McpServiceStatus.mcp_config**(服务状态中)
743///    - 存储当前运行服务的配置
744///    - 随服务清理而被删除
745///    - 用于快速访问当前服务的配置
746///
747/// 2. **GLOBAL_MCP_CONFIG_CACHE**(全局缓存)
748///    - 独立于服务状态存储
749///    - 有 TTL(24 小时)
750///    - 用于服务重启时恢复配置
751///
752/// ## 为什么需要两处存储?
753///
754/// - 服务清理后,McpServiceStatus 被删除,但配置仍在缓存中
755/// - 下次请求到来时,可以从缓存恢复配置并重启服务
756/// - 实现了服务的自动重启能力
757///
758/// ## 优先级
759///
760/// 1. 请求 header 中的配置(最新)
761/// 2. 缓存中的配置(兜底)
762///
763/// ## TTL
764///
765/// - 24 小时(可配置)
766/// - max_capacity: 1000(防止内存溢出)
767pub struct McpConfigCache {
768    cache: Cache<String, McpConfig>,
769}
770
771impl McpConfigCache {
772    pub fn new() -> Self {
773        Self {
774            cache: Cache::builder()
775                .time_to_live(Duration::from_secs(24 * 60 * 60)) // 24 小时 TTL
776                .max_capacity(1000) // 最多缓存 1000 个配置,防止内存溢出
777                .build(),
778        }
779    }
780
781    pub async fn insert(&self, mcp_id: String, config: McpConfig) {
782        self.cache.insert(mcp_id.clone(), config).await;
783        info!("MCP configuration cached: {} (TTL: 24h)", mcp_id);
784    }
785
786    pub async fn get(&self, mcp_id: &str) -> Option<McpConfig> {
787        self.cache.get(mcp_id).await
788    }
789
790    pub async fn invalidate(&self, mcp_id: &str) {
791        self.cache.invalidate(mcp_id).await;
792    }
793
794    #[allow(dead_code)]
795    pub fn invalidate_all(&self) {
796        self.cache.invalidate_all();
797    }
798}
799
800impl Default for McpConfigCache {
801    fn default() -> Self {
802        Self::new()
803    }
804}
805
806// 全局配置缓存单例
807pub static GLOBAL_MCP_CONFIG_CACHE: Lazy<McpConfigCache> = Lazy::new(McpConfigCache::default);
808
809/// MCP 服务重启追踪器
810///
811/// 用于防止服务频繁重启导致的无限循环
812///
813/// ## 重启限制
814///
815/// - 最小重启间隔:30 秒
816/// - 如果服务在 30 秒内被标记为需要重启,将跳过重启
817/// - 这防止了服务启动失败时的无限重启循环
818///
819/// ## 健康状态缓存
820///
821/// - 缓存后端健康状态,避免频繁检查
822/// - 缓存时间:5 秒(可配置)
823/// - 用于减少 `is_mcp_server_ready()` 调用频率
824pub struct RestartTracker {
825    // mcp_id -> 最后重启时间
826    last_restart: DashMap<String, Instant>,
827    // mcp_id -> (健康状态, 检查时间)
828    health_status: DashMap<String, (bool, Instant)>,
829    // mcp_id -> 启动锁,防止并发启动同一服务
830    startup_locks: DashMap<String, Arc<Mutex<()>>>,
831}
832
833impl RestartTracker {
834    pub fn new() -> Self {
835        Self {
836            last_restart: DashMap::new(),
837            health_status: DashMap::new(),
838            startup_locks: DashMap::new(),
839        }
840    }
841
842    /// 获取缓存的健康状态
843    ///
844    /// 如果缓存未过期(5秒内),返回缓存值
845    /// 否则返回 None,表示需要重新检查
846    pub fn get_cached_health_status(&self, mcp_id: &str) -> Option<bool> {
847        let cache_duration = Duration::from_secs(5); // 5 秒缓存
848        let now = Instant::now();
849
850        self.health_status.get(mcp_id).and_then(|entry| {
851            let (is_healthy, check_time) = *entry.value();
852            if now.duration_since(check_time) < cache_duration {
853                Some(is_healthy)
854            } else {
855                None
856            }
857        })
858    }
859
860    /// 更新健康状态缓存
861    pub fn update_health_status(&self, mcp_id: &str, is_healthy: bool) {
862        self.health_status
863            .insert(mcp_id.to_string(), (is_healthy, Instant::now()));
864    }
865
866    /// 清除健康状态缓存
867    pub fn clear_health_status(&self, mcp_id: &str) {
868        self.health_status.remove(mcp_id);
869    }
870
871    /// 检查是否可以重启服务
872    ///
873    /// 返回 true 表示可以重启,false 表示在冷却期内
874    ///
875    /// 注意:此方法仅检查是否可以重启,不会自动插入时间戳。
876    /// 时间戳只在服务成功启动后通过 `record_restart()` 方法记录。
877    pub fn can_restart(&self, mcp_id: &str) -> bool {
878        let now = Instant::now();
879        let min_restart_interval = Duration::from_secs(30); // 30 秒最小重启间隔
880
881        // 只检查,不自动插入时间戳
882        if let Some(last_restart) = self.last_restart.get(mcp_id) {
883            let elapsed = now.duration_since(*last_restart);
884            if elapsed < min_restart_interval {
885                warn!(
886                    "Service {} is in the cooldown period and is only {} seconds since its last restart. Restart is skipped.",
887                    mcp_id,
888                    elapsed.as_secs()
889                );
890                return false;
891            }
892        }
893        // 不在冷却期内,但不自动更新时间戳
894        true
895    }
896
897    /// 记录服务成功重启
898    ///
899    /// 此方法应在服务成功启动后调用,用于记录重启时间戳。
900    /// 配合 `can_restart()` 使用,避免在服务启动失败时插入时间戳。
901    pub fn record_restart(&self, mcp_id: &str) {
902        self.last_restart.insert(mcp_id.to_string(), Instant::now());
903        info!(
904            "The service started successfully and the restart time was recorded: {}",
905            mcp_id
906        );
907    }
908
909    /// 清除重启时间戳
910    ///
911    /// 当服务启动失败时,可选择调用此方法清除时间戳,
912    /// 允许立即重试而不必等待冷却期。
913    #[allow(dead_code)]
914    pub fn clear_restart(&self, mcp_id: &str) {
915        self.last_restart.remove(mcp_id);
916        info!("Restart timestamp cleared for service {}", mcp_id);
917    }
918
919    /// 尝试获取服务启动锁
920    ///
921    /// 返回 Some(OwnedMutexGuard) 表示获取成功,可以继续启动服务
922    /// 返回 None 表示服务正在启动中,应该跳过本次启动
923    ///
924    /// # 使用方式
925    ///
926    /// ```ignore
927    /// if let Some(_guard) = GLOBAL_RESTART_TRACKER.try_acquire_startup_lock(&mcp_id) {
928    ///     // 获取到锁,可以启动服务
929    ///     let result = start_service().await;
930    ///     // _guard 在作用域结束时自动释放
931    /// } else {
932    ///     // 未获取到锁,服务正在启动中
933    ///     return Ok(Response::503);
934    /// }
935    /// ```
936    pub fn try_acquire_startup_lock(&self, mcp_id: &str) -> Option<OwnedMutexGuard<()>> {
937        // 使用 entry API 确保原子性,避免竞态条件
938        let lock = self
939            .startup_locks
940            .entry(mcp_id.to_string())
941            .or_insert_with(|| Arc::new(Mutex::new(())))
942            .clone();
943
944        // 尝试获取 owned 锁,锁会一直保持到返回的 guard 被 drop
945        match lock.try_lock_owned() {
946            Ok(guard) => Some(guard),
947            Err(_) => {
948                // 锁被占用,服务正在启动中
949                debug!("Service {} is starting, skip this startup", mcp_id);
950                None
951            }
952        }
953    }
954
955    /// 清理服务启动锁
956    ///
957    /// 当服务启动完成或失败后,应该清理启动锁以允许后续重试
958    /// 注意:正常情况下锁会随 MutexGuard 自动释放,此方法用于异常清理
959    #[allow(dead_code)]
960    pub fn cleanup_startup_lock(&self, mcp_id: &str) {
961        self.startup_locks.remove(mcp_id);
962        debug!("Cleaned startup lock for service {}", mcp_id);
963    }
964}
965
966impl Default for RestartTracker {
967    fn default() -> Self {
968        Self::new()
969    }
970}
971
972// 全局重启追踪器单例
973pub static GLOBAL_RESTART_TRACKER: Lazy<RestartTracker> = Lazy::new(RestartTracker::default);
974
975// 提供一个便捷的函数来获取全局 ProxyHandlerManager
976pub fn get_proxy_manager() -> &'static ProxyHandlerManager {
977    &GLOBAL_PROXY_MANAGER
978}