mcp_stdio_proxy/model/
global.rs

1use axum::Router;
2use dashmap::DashMap;
3use log::{debug, error, info};
4use once_cell::sync::Lazy;
5use std::sync::Arc;
6use tokio::time::Instant;
7use tokio_util::sync::CancellationToken;
8
9use anyhow::Result;
10
11use crate::ProxyHandler;
12
13use super::{CheckMcpStatusResponseStatus, McpProtocol, McpRouterPath, McpType};
14
15// 全局单例路由表
16pub static GLOBAL_ROUTES: Lazy<Arc<DashMap<String, Router>>> =
17    Lazy::new(|| Arc::new(DashMap::new()));
18
19// 全局单例 ProxyHandlerManager
20pub static GLOBAL_PROXY_MANAGER: Lazy<ProxyHandlerManager> =
21    Lazy::new(ProxyHandlerManager::default);
22
23/// 动态路由服务
24#[derive(Clone)]
25pub struct DynamicRouterService(pub McpProtocol);
26
27impl DynamicRouterService {
28    // 注册动态 handler
29    pub fn register_route(path: &str, handler: Router) {
30        debug!("=== 注册路由 ===");
31        debug!("注册路径: {}", path);
32        GLOBAL_ROUTES.insert(path.to_string(), handler);
33        debug!("=== 注册路由完成 ===");
34    }
35
36    // 删除动态 handler
37    pub fn delete_route(path: &str) {
38        debug!("=== 删除路由 ===");
39        debug!("删除路径: {}", path);
40        GLOBAL_ROUTES.remove(path);
41        debug!("=== 删除路由完成 ===");
42    }
43
44    // 获取动态 handler
45    pub fn get_route(path: &str) -> Option<Router> {
46        let result = GLOBAL_ROUTES.get(path).map(|entry| entry.value().clone());
47        if result.is_some() {
48            debug!("get_route('{}') = Some(Router)", path);
49        } else {
50            debug!("get_route('{}') = None", path);
51        }
52        result
53    }
54
55    // 获取所有已注册的路由(debug用)
56    pub fn get_all_routes() -> Vec<String> {
57        GLOBAL_ROUTES
58            .iter()
59            .map(|entry| entry.key().clone())
60            .collect()
61    }
62}
63
64impl std::fmt::Debug for DynamicRouterService {
65    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
66        let routes = GLOBAL_ROUTES
67            .iter()
68            .map(|entry| entry.key().clone())
69            .collect::<Vec<_>>();
70        write!(f, "DynamicRouterService {{ routes: {routes:?} }}")
71    }
72}
73
74//mcp 代理管理器,包含路由,取消令牌,透明mcp代理处理器
75
76//根据用户的 mcp_id ,获取对应的 ProxyHandler;
77//定义结构体
78#[derive(Debug, Clone)]
79pub struct ProxyHandlerManager {
80    // 存储 ProxyHandler 透明代理服务
81    proxy_handlers: DashMap<String, ProxyHandler>,
82    // 存储 MCP 服务状态,包含路径,类型,取消令牌,mcp_id,状态
83    mcp_service_statuses: DashMap<String, McpServiceStatus>,
84}
85//定义 mcp服务结构,包含:mcpType,McpRouterPath,CancellationToken,mcp_id,CheckMcpStatusResponseStatus
86#[derive(Debug, Clone)]
87pub struct McpServiceStatus {
88    // mcp_id
89    pub mcp_id: String,
90    // mcp类型
91    pub mcp_type: McpType,
92    // mcp路由路径
93    pub mcp_router_path: McpRouterPath,
94    // 用于控制与此 mcp_id 关联的 SseServer 和 command 终端
95    pub cancellation_token: CancellationToken,
96    // mcp服务状态
97    pub check_mcp_status_response_status: CheckMcpStatusResponseStatus,
98    // 最后访问时间
99    pub last_accessed: Instant,
100}
101
102impl McpServiceStatus {
103    pub fn new(
104        mcp_id: String,
105        mcp_type: McpType,
106        mcp_router_path: McpRouterPath,
107        cancellation_token: CancellationToken,
108        check_mcp_status_response_status: CheckMcpStatusResponseStatus,
109    ) -> Self {
110        Self {
111            mcp_id,
112            mcp_type,
113            mcp_router_path,
114            cancellation_token,
115            check_mcp_status_response_status,
116            last_accessed: Instant::now(),
117        }
118    }
119
120    // 更新最后访问时间
121    pub fn update_last_accessed(&mut self) {
122        self.last_accessed = Instant::now();
123    }
124}
125
126impl Default for ProxyHandlerManager {
127    fn default() -> Self {
128        ProxyHandlerManager {
129            proxy_handlers: DashMap::new(),
130            mcp_service_statuses: DashMap::new(),
131        }
132    }
133}
134
135impl ProxyHandlerManager {
136    // 添加 MCP 服务状态
137    pub fn add_mcp_service_status_and_proxy(
138        &self,
139        mcp_service_status: McpServiceStatus,
140        proxy_handler: Option<ProxyHandler>,
141    ) {
142        let mcp_id = mcp_service_status.mcp_id.clone();
143        self.mcp_service_statuses
144            .insert(mcp_id.clone(), mcp_service_status);
145        // 如果 proxy_handler 不为 None,则添加到 proxy_handlers; 为空的时候,是记录一个空代理处理器,正在启动中
146        if let Some(proxy_handler) = proxy_handler {
147            self.proxy_handlers.insert(mcp_id, proxy_handler);
148        }
149    }
150    //获取所有的 mcp 服务状态
151    pub fn get_all_mcp_service_status(&self) -> Vec<McpServiceStatus> {
152        self.mcp_service_statuses
153            .iter()
154            .map(|entry| entry.value().clone())
155            .collect()
156    }
157
158    // 获取 MCP 服务状态
159    pub fn get_mcp_service_status(&self, mcp_id: &str) -> Option<McpServiceStatus> {
160        self.mcp_service_statuses
161            .get(mcp_id)
162            .map(|entry| entry.value().clone())
163    }
164
165    // 更新最后访问时间
166    pub fn update_last_accessed(&self, mcp_id: &str) {
167        self.mcp_service_statuses
168            .get_mut(mcp_id)
169            .iter_mut()
170            .for_each(|entry| {
171                entry.value_mut().update_last_accessed();
172            })
173    }
174
175    //修改 mcp服务状态,Ready/Pending/Error
176    pub fn update_mcp_service_status(&self, mcp_id: &str, status: CheckMcpStatusResponseStatus) {
177        if let Some(mut mcp_service_status) = self.mcp_service_statuses.get_mut(mcp_id) {
178            mcp_service_status.check_mcp_status_response_status = status;
179        }
180    }
181
182    pub fn get_proxy_handler(&self, mcp_id: &str) -> Option<ProxyHandler> {
183        self.proxy_handlers
184            .get(mcp_id)
185            .map(|entry| entry.value().clone())
186    }
187
188    pub fn add_proxy_handler(&self, mcp_id: &str, proxy_handler: ProxyHandler) {
189        self.proxy_handlers
190            .insert(mcp_id.to_string(), proxy_handler);
191    }
192
193    // 清理资源,根据 mcp_id 清理资源
194    pub async fn cleanup_resources(&self, mcp_id: &str) -> Result<()> {
195        // 创建路径以构建要删除的路由路径
196        let mcp_sse_router_path = McpRouterPath::new(mcp_id.to_string(), McpProtocol::Sse)
197            .map_err(|e| {
198                anyhow::anyhow!("Failed to create SSE router path for {}: {}", mcp_id, e)
199            })?;
200        let base_sse_path = mcp_sse_router_path.base_path;
201
202        let mcp_stream_router_path = McpRouterPath::new(mcp_id.to_string(), McpProtocol::Stream)
203            .map_err(|e| {
204                anyhow::anyhow!("Failed to create Stream router path for {}: {}", mcp_id, e)
205            })?;
206        let base_stream_path = mcp_stream_router_path.base_path;
207
208        // 移除相关路由
209        DynamicRouterService::delete_route(&base_sse_path);
210        DynamicRouterService::delete_route(&base_stream_path);
211
212        // 取消取消令牌并移除资源
213        if let Some(status) = self.mcp_service_statuses.get_mut(mcp_id) {
214            info!("Cleaning up resources for mcp_id: {mcp_id}");
215            // 取消与此 mcp_id 关联的 SseServer/command 终端的 CancellationToken
216            status.cancellation_token.cancel();
217            info!("CancellationToken cancelled for mcp_id: {mcp_id}");
218        }
219
220        self.proxy_handlers.remove(mcp_id);
221        self.mcp_service_statuses.remove(mcp_id);
222
223        info!("MCP 服务 {mcp_id} 的资源清理已完成");
224        Ok(())
225    }
226
227    // 系统关闭,清理所有资源
228    pub async fn cleanup_all_resources(&self) -> Result<()> {
229        for mcp_service_entry in self.mcp_service_statuses.iter() {
230            if let Err(e) = self.cleanup_resources(mcp_service_entry.key()).await {
231                error!(
232                    "Failed to cleanup resources for {}: {}",
233                    mcp_service_entry.key(),
234                    e
235                );
236                // 继续清理其他资源,不中断整个清理过程
237            }
238        }
239        Ok(())
240    }
241}
242
243// 提供一个便捷的函数来获取全局 ProxyHandlerManager
244pub fn get_proxy_manager() -> &'static ProxyHandlerManager {
245    &GLOBAL_PROXY_MANAGER
246}