mcp_stdio_proxy/model/
global.rs1use axum::Router;
2use dashmap::DashMap;
3use log::{debug, error, info};
4use once_cell::sync::Lazy;
5use std::sync::Arc;
6use tokio::time::Instant;
7use tokio_util::sync::CancellationToken;
8
9use anyhow::Result;
10
11use crate::ProxyHandler;
12
13use super::{CheckMcpStatusResponseStatus, McpProtocol, McpRouterPath, McpType};
14
15pub static GLOBAL_ROUTES: Lazy<Arc<DashMap<String, Router>>> =
17 Lazy::new(|| Arc::new(DashMap::new()));
18
19pub static GLOBAL_PROXY_MANAGER: Lazy<ProxyHandlerManager> =
21 Lazy::new(ProxyHandlerManager::default);
22
23#[derive(Clone)]
25pub struct DynamicRouterService(pub McpProtocol);
26
27impl DynamicRouterService {
28 pub fn register_route(path: &str, handler: Router) {
30 debug!("=== 注册路由 ===");
31 debug!("注册路径: {}", path);
32 GLOBAL_ROUTES.insert(path.to_string(), handler);
33 debug!("=== 注册路由完成 ===");
34 }
35
36 pub fn delete_route(path: &str) {
38 debug!("=== 删除路由 ===");
39 debug!("删除路径: {}", path);
40 GLOBAL_ROUTES.remove(path);
41 debug!("=== 删除路由完成 ===");
42 }
43
44 pub fn get_route(path: &str) -> Option<Router> {
46 let result = GLOBAL_ROUTES.get(path).map(|entry| entry.value().clone());
47 if result.is_some() {
48 debug!("get_route('{}') = Some(Router)", path);
49 } else {
50 debug!("get_route('{}') = None", path);
51 }
52 result
53 }
54
55 pub fn get_all_routes() -> Vec<String> {
57 GLOBAL_ROUTES
58 .iter()
59 .map(|entry| entry.key().clone())
60 .collect()
61 }
62}
63
64impl std::fmt::Debug for DynamicRouterService {
65 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
66 let routes = GLOBAL_ROUTES
67 .iter()
68 .map(|entry| entry.key().clone())
69 .collect::<Vec<_>>();
70 write!(f, "DynamicRouterService {{ routes: {routes:?} }}")
71 }
72}
73
74#[derive(Debug, Clone)]
79pub struct ProxyHandlerManager {
80 proxy_handlers: DashMap<String, ProxyHandler>,
82 mcp_service_statuses: DashMap<String, McpServiceStatus>,
84}
85#[derive(Debug, Clone)]
87pub struct McpServiceStatus {
88 pub mcp_id: String,
90 pub mcp_type: McpType,
92 pub mcp_router_path: McpRouterPath,
94 pub cancellation_token: CancellationToken,
96 pub check_mcp_status_response_status: CheckMcpStatusResponseStatus,
98 pub last_accessed: Instant,
100}
101
102impl McpServiceStatus {
103 pub fn new(
104 mcp_id: String,
105 mcp_type: McpType,
106 mcp_router_path: McpRouterPath,
107 cancellation_token: CancellationToken,
108 check_mcp_status_response_status: CheckMcpStatusResponseStatus,
109 ) -> Self {
110 Self {
111 mcp_id,
112 mcp_type,
113 mcp_router_path,
114 cancellation_token,
115 check_mcp_status_response_status,
116 last_accessed: Instant::now(),
117 }
118 }
119
120 pub fn update_last_accessed(&mut self) {
122 self.last_accessed = Instant::now();
123 }
124}
125
126impl Default for ProxyHandlerManager {
127 fn default() -> Self {
128 ProxyHandlerManager {
129 proxy_handlers: DashMap::new(),
130 mcp_service_statuses: DashMap::new(),
131 }
132 }
133}
134
135impl ProxyHandlerManager {
136 pub fn add_mcp_service_status_and_proxy(
138 &self,
139 mcp_service_status: McpServiceStatus,
140 proxy_handler: Option<ProxyHandler>,
141 ) {
142 let mcp_id = mcp_service_status.mcp_id.clone();
143 self.mcp_service_statuses
144 .insert(mcp_id.clone(), mcp_service_status);
145 if let Some(proxy_handler) = proxy_handler {
147 self.proxy_handlers.insert(mcp_id, proxy_handler);
148 }
149 }
150 pub fn get_all_mcp_service_status(&self) -> Vec<McpServiceStatus> {
152 self.mcp_service_statuses
153 .iter()
154 .map(|entry| entry.value().clone())
155 .collect()
156 }
157
158 pub fn get_mcp_service_status(&self, mcp_id: &str) -> Option<McpServiceStatus> {
160 self.mcp_service_statuses
161 .get(mcp_id)
162 .map(|entry| entry.value().clone())
163 }
164
165 pub fn update_last_accessed(&self, mcp_id: &str) {
167 self.mcp_service_statuses
168 .get_mut(mcp_id)
169 .iter_mut()
170 .for_each(|entry| {
171 entry.value_mut().update_last_accessed();
172 })
173 }
174
175 pub fn update_mcp_service_status(&self, mcp_id: &str, status: CheckMcpStatusResponseStatus) {
177 if let Some(mut mcp_service_status) = self.mcp_service_statuses.get_mut(mcp_id) {
178 mcp_service_status.check_mcp_status_response_status = status;
179 }
180 }
181
182 pub fn get_proxy_handler(&self, mcp_id: &str) -> Option<ProxyHandler> {
183 self.proxy_handlers
184 .get(mcp_id)
185 .map(|entry| entry.value().clone())
186 }
187
188 pub fn add_proxy_handler(&self, mcp_id: &str, proxy_handler: ProxyHandler) {
189 self.proxy_handlers
190 .insert(mcp_id.to_string(), proxy_handler);
191 }
192
193 pub async fn cleanup_resources(&self, mcp_id: &str) -> Result<()> {
195 let mcp_sse_router_path = McpRouterPath::new(mcp_id.to_string(), McpProtocol::Sse)
197 .map_err(|e| {
198 anyhow::anyhow!("Failed to create SSE router path for {}: {}", mcp_id, e)
199 })?;
200 let base_sse_path = mcp_sse_router_path.base_path;
201
202 let mcp_stream_router_path = McpRouterPath::new(mcp_id.to_string(), McpProtocol::Stream)
203 .map_err(|e| {
204 anyhow::anyhow!("Failed to create Stream router path for {}: {}", mcp_id, e)
205 })?;
206 let base_stream_path = mcp_stream_router_path.base_path;
207
208 DynamicRouterService::delete_route(&base_sse_path);
210 DynamicRouterService::delete_route(&base_stream_path);
211
212 if let Some(status) = self.mcp_service_statuses.get_mut(mcp_id) {
214 info!("Cleaning up resources for mcp_id: {mcp_id}");
215 status.cancellation_token.cancel();
217 info!("CancellationToken cancelled for mcp_id: {mcp_id}");
218 }
219
220 self.proxy_handlers.remove(mcp_id);
221 self.mcp_service_statuses.remove(mcp_id);
222
223 info!("MCP 服务 {mcp_id} 的资源清理已完成");
224 Ok(())
225 }
226
227 pub async fn cleanup_all_resources(&self) -> Result<()> {
229 for mcp_service_entry in self.mcp_service_statuses.iter() {
230 if let Err(e) = self.cleanup_resources(mcp_service_entry.key()).await {
231 error!(
232 "Failed to cleanup resources for {}: {}",
233 mcp_service_entry.key(),
234 e
235 );
236 }
238 }
239 Ok(())
240 }
241}
242
243pub fn get_proxy_manager() -> &'static ProxyHandlerManager {
245 &GLOBAL_PROXY_MANAGER
246}