1use axum::Router;
2use dashmap::DashMap;
3use log::{debug, error, info};
4use moka::future::Cache;
5use once_cell::sync::Lazy;
6use std::sync::Arc;
7use tokio::sync::{Mutex, OwnedMutexGuard};
8use tokio::time::{Duration, Instant};
9use tokio_util::sync::CancellationToken;
10use tracing::warn;
11
12use anyhow::Result;
13
14use crate::proxy::McpHandler;
15
16use super::{CheckMcpStatusResponseStatus, McpConfig, McpProtocol, McpRouterPath, McpType};
17
18pub static GLOBAL_ROUTES: Lazy<Arc<DashMap<String, Router>>> =
20 Lazy::new(|| Arc::new(DashMap::new()));
21
22pub static GLOBAL_PROXY_MANAGER: Lazy<ProxyHandlerManager> =
24 Lazy::new(ProxyHandlerManager::default);
25
26#[derive(Clone)]
28pub struct DynamicRouterService(pub McpProtocol);
29
30impl DynamicRouterService {
31 pub fn register_route(path: &str, handler: Router) {
33 debug!("=== 注册路由 ===");
34 debug!("注册路径: {}", path);
35 GLOBAL_ROUTES.insert(path.to_string(), handler);
36 debug!("=== 注册路由完成 ===");
37 }
38
39 pub fn delete_route(path: &str) {
41 debug!("=== 删除路由 ===");
42 debug!("删除路径: {}", path);
43 GLOBAL_ROUTES.remove(path);
44 debug!("=== 删除路由完成 ===");
45 }
46
47 pub fn get_route(path: &str) -> Option<Router> {
49 let result = GLOBAL_ROUTES.get(path).map(|entry| entry.value().clone());
50 if result.is_some() {
51 debug!("get_route('{}') = Some(Router)", path);
52 } else {
53 debug!("get_route('{}') = None", path);
54 }
55 result
56 }
57
58 pub fn get_all_routes() -> Vec<String> {
60 GLOBAL_ROUTES
61 .iter()
62 .map(|entry| entry.key().clone())
63 .collect()
64 }
65}
66
67impl std::fmt::Debug for DynamicRouterService {
68 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
69 let routes = GLOBAL_ROUTES
70 .iter()
71 .map(|entry| entry.key().clone())
72 .collect::<Vec<_>>();
73 write!(f, "DynamicRouterService {{ routes: {routes:?} }}")
74 }
75}
76
77pub struct McpProcessGuard {
107 mcp_id: String,
108 cancellation_token: CancellationToken,
109}
110
111impl McpProcessGuard {
112 pub fn new(mcp_id: String, cancellation_token: CancellationToken) -> Self {
113 debug!("[RAII] 创建进程守护器: mcp_id={}", mcp_id);
114 Self {
115 mcp_id,
116 cancellation_token,
117 }
118 }
119
120 pub fn clone_token(&self) -> CancellationToken {
122 self.cancellation_token.clone()
123 }
124}
125
126impl Drop for McpProcessGuard {
127 fn drop(&mut self) {
128 info!(
129 "[RAII] 进程守护器被 drop,取消 CancellationToken: mcp_id={}",
130 self.mcp_id
131 );
132 self.cancellation_token.cancel();
133 }
134}
135
136impl std::fmt::Debug for McpProcessGuard {
138 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
139 f.debug_struct("McpProcessGuard")
140 .field("mcp_id", &self.mcp_id)
141 .field("is_cancelled", &self.cancellation_token.is_cancelled())
142 .finish()
143 }
144}
145
146pub struct McpService {
157 process_guard: McpProcessGuard,
159 handler: Option<McpHandler>,
161 status: McpServiceStatusInfo,
163}
164
165#[derive(Debug, Clone)]
167pub struct McpServiceStatusInfo {
168 pub mcp_id: String,
169 pub mcp_type: McpType,
170 pub mcp_router_path: McpRouterPath,
171 pub check_mcp_status_response_status: CheckMcpStatusResponseStatus,
172 pub last_accessed: Instant,
173 pub mcp_config: Option<McpConfig>,
174 pub consecutive_probe_failures: u32,
176}
177
178impl McpServiceStatusInfo {
179 pub fn new(
180 mcp_id: String,
181 mcp_type: McpType,
182 mcp_router_path: McpRouterPath,
183 check_mcp_status_response_status: CheckMcpStatusResponseStatus,
184 ) -> Self {
185 Self {
186 mcp_id,
187 mcp_type,
188 mcp_router_path,
189 check_mcp_status_response_status,
190 last_accessed: Instant::now(),
191 mcp_config: None,
192 consecutive_probe_failures: 0,
193 }
194 }
195
196 pub fn update_last_accessed(&mut self) {
197 self.last_accessed = Instant::now();
198 }
199
200 pub fn reset_probe_failures(&mut self) {
202 self.consecutive_probe_failures = 0;
203 }
204
205 pub fn increment_probe_failures(&mut self) -> u32 {
207 self.consecutive_probe_failures += 1;
208 self.consecutive_probe_failures
209 }
210}
211
212impl McpService {
213 pub fn new(
221 mcp_id: String,
222 mcp_type: McpType,
223 mcp_router_path: McpRouterPath,
224 cancellation_token: CancellationToken,
225 ) -> Self {
226 let process_guard = McpProcessGuard::new(mcp_id.clone(), cancellation_token);
227 let status = McpServiceStatusInfo::new(
228 mcp_id,
229 mcp_type,
230 mcp_router_path,
231 CheckMcpStatusResponseStatus::Pending,
232 );
233 Self {
234 process_guard,
235 handler: None,
236 status,
237 }
238 }
239
240 pub fn set_handler(&mut self, handler: McpHandler) {
242 self.handler = Some(handler);
243 }
244
245 pub fn handler(&self) -> Option<&McpHandler> {
247 self.handler.as_ref()
248 }
249
250 pub fn status(&self) -> &McpServiceStatusInfo {
252 &self.status
253 }
254
255 pub fn status_mut(&mut self) -> &mut McpServiceStatusInfo {
257 &mut self.status
258 }
259
260 pub fn clone_token(&self) -> CancellationToken {
262 self.process_guard.clone_token()
263 }
264
265 pub fn update_status(&mut self, status: CheckMcpStatusResponseStatus) {
267 self.status.check_mcp_status_response_status = status;
268 }
269
270 pub fn update_last_accessed(&mut self) {
272 self.status.update_last_accessed();
273 }
274
275 pub fn set_mcp_config(&mut self, config: McpConfig) {
277 self.status.mcp_config = Some(config);
278 }
279}
280
281impl std::fmt::Debug for McpService {
282 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
283 f.debug_struct("McpService")
284 .field("process_guard", &self.process_guard)
285 .field("handler", &self.handler.is_some())
286 .field("status", &self.status)
287 .finish()
288 }
289}
290
291#[derive(Debug, Clone)]
299pub struct McpServiceStatus {
300 pub mcp_id: String,
301 pub mcp_type: McpType,
302 pub mcp_router_path: McpRouterPath,
303 pub cancellation_token: CancellationToken,
304 pub check_mcp_status_response_status: CheckMcpStatusResponseStatus,
305 pub last_accessed: Instant,
306 pub mcp_config: Option<McpConfig>,
307 pub consecutive_probe_failures: u32,
309}
310
311impl McpServiceStatus {
312 pub fn new(
313 mcp_id: String,
314 mcp_type: McpType,
315 mcp_router_path: McpRouterPath,
316 cancellation_token: CancellationToken,
317 check_mcp_status_response_status: CheckMcpStatusResponseStatus,
318 ) -> Self {
319 Self {
320 mcp_id,
321 mcp_type,
322 mcp_router_path,
323 cancellation_token,
324 check_mcp_status_response_status,
325 last_accessed: Instant::now(),
326 mcp_config: None,
327 consecutive_probe_failures: 0,
328 }
329 }
330
331 pub fn with_mcp_config(mut self, mcp_config: McpConfig) -> Self {
332 self.mcp_config = Some(mcp_config);
333 self
334 }
335
336 pub fn update_last_accessed(&mut self) {
337 self.last_accessed = Instant::now();
338 }
339}
340
341#[derive(Debug)]
351pub struct ProxyHandlerManager {
352 services: DashMap<String, McpService>,
354}
355
356impl Default for ProxyHandlerManager {
357 fn default() -> Self {
358 ProxyHandlerManager {
359 services: DashMap::new(),
360 }
361 }
362}
363
364impl ProxyHandlerManager {
365 pub fn add_mcp_service(
369 &self,
370 mcp_id: String,
371 mcp_type: McpType,
372 mcp_router_path: McpRouterPath,
373 cancellation_token: CancellationToken,
374 ) {
375 let service = McpService::new(
376 mcp_id.clone(),
377 mcp_type,
378 mcp_router_path,
379 cancellation_token,
380 );
381
382 if let Some(old_service) = self.services.insert(mcp_id.clone(), service) {
384 info!(
385 "[RAII] 覆盖已存在的服务,旧服务将被自动清理: mcp_id={}",
386 mcp_id
387 );
388 drop(old_service);
389 }
390 }
391
392 pub fn add_mcp_service_status_and_proxy(
398 &self,
399 mcp_service_status: McpServiceStatus,
400 proxy_handler: Option<McpHandler>,
401 ) {
402 let mcp_id = mcp_service_status.mcp_id.clone();
403
404 let mut service = McpService::new(
407 mcp_id.clone(),
408 mcp_service_status.mcp_type,
409 mcp_service_status.mcp_router_path,
410 mcp_service_status.cancellation_token,
411 );
412
413 service.status_mut().check_mcp_status_response_status =
415 mcp_service_status.check_mcp_status_response_status;
416
417 if let Some(config) = mcp_service_status.mcp_config {
419 service.set_mcp_config(config);
420 }
421
422 if let Some(handler) = proxy_handler {
424 service.set_handler(handler);
425 }
426
427 if let Some(old_service) = self.services.insert(mcp_id.clone(), service) {
429 info!(
430 "[RAII] 覆盖已存在的服务,旧服务将被自动清理: mcp_id={}",
431 mcp_id
432 );
433 drop(old_service);
435 }
436 }
437
438 pub fn get_all_mcp_service_status(&self) -> Vec<McpServiceStatus> {
443 let keys: Vec<String> = self
445 .services
446 .iter()
447 .map(|entry| entry.key().clone())
448 .collect();
449
450 keys.into_iter()
452 .filter_map(|mcp_id| self.get_mcp_service_status(&mcp_id))
453 .collect()
454 }
455
456 pub fn get_mcp_service_status(&self, mcp_id: &str) -> Option<McpServiceStatus> {
458 self.services.get(mcp_id).map(|entry| {
459 let service = entry.value();
460 let status = service.status();
461 McpServiceStatus {
462 mcp_id: status.mcp_id.clone(),
463 mcp_type: status.mcp_type.clone(),
464 mcp_router_path: status.mcp_router_path.clone(),
465 cancellation_token: service.clone_token(),
466 check_mcp_status_response_status: status.check_mcp_status_response_status.clone(),
467 last_accessed: status.last_accessed,
468 mcp_config: status.mcp_config.clone(),
469 consecutive_probe_failures: status.consecutive_probe_failures,
470 }
471 })
472 }
473
474 pub fn update_last_accessed(&self, mcp_id: &str) {
478 self.services
479 .entry(mcp_id.to_string())
480 .and_modify(|service| service.update_last_accessed());
481 }
482
483 pub fn update_mcp_service_status(&self, mcp_id: &str, status: CheckMcpStatusResponseStatus) {
487 self.services
488 .entry(mcp_id.to_string())
489 .and_modify(|service| service.update_status(status));
490 }
491
492 pub fn get_proxy_handler(&self, mcp_id: &str) -> Option<McpHandler> {
494 self.services
495 .get(mcp_id)
496 .and_then(|entry| entry.value().handler().cloned())
497 }
498
499 pub fn get_mcp_config(&self, mcp_id: &str) -> Option<McpConfig> {
501 self.services
502 .get(mcp_id)
503 .and_then(|entry| entry.value().status().mcp_config.clone())
504 }
505
506 pub fn add_proxy_handler(&self, mcp_id: &str, proxy_handler: McpHandler) {
510 match self.services.entry(mcp_id.to_string()) {
511 dashmap::mapref::entry::Entry::Occupied(mut entry) => {
512 entry.get_mut().set_handler(proxy_handler);
513 }
514 dashmap::mapref::entry::Entry::Vacant(_) => {
515 warn!("[RAII] 尝试添加 handler 到不存在的服务: mcp_id={}", mcp_id);
516 }
517 }
518 }
519
520 pub fn contains_service(&self, mcp_id: &str) -> bool {
522 self.services.contains_key(mcp_id)
523 }
524
525 pub fn service_count(&self) -> usize {
527 self.services.len()
528 }
529
530 pub async fn register_mcp_config(&self, mcp_id: &str, config: McpConfig) {
532 GLOBAL_MCP_CONFIG_CACHE
533 .insert(mcp_id.to_string(), config)
534 .await;
535 info!("MCP 配置已注册到缓存: {}", mcp_id);
536 }
537
538 pub async fn get_mcp_config_from_cache(&self, mcp_id: &str) -> Option<McpConfig> {
540 if let Some(config) = GLOBAL_MCP_CONFIG_CACHE.get(mcp_id).await {
541 debug!("从缓存获取 MCP 配置: {}", mcp_id);
542 Some(config)
543 } else {
544 debug!("缓存中未找到 MCP 配置: {}", mcp_id);
545 None
546 }
547 }
548
549 pub async fn unregister_mcp_config(&self, mcp_id: &str) {
551 GLOBAL_MCP_CONFIG_CACHE.invalidate(mcp_id).await;
552 info!("MCP 配置已从缓存删除: {}", mcp_id);
553 }
554
555 pub async fn cleanup_resources(&self, mcp_id: &str) -> Result<()> {
564 info!("[RAII] 开始清理资源: mcp_id={}", mcp_id);
565
566 let mcp_sse_router_path = McpRouterPath::new(mcp_id.to_string(), McpProtocol::Sse)
568 .map_err(|e| {
569 anyhow::anyhow!("Failed to create SSE router path for {}: {}", mcp_id, e)
570 })?;
571 let base_sse_path = mcp_sse_router_path.base_path;
572
573 let mcp_stream_router_path = McpRouterPath::new(mcp_id.to_string(), McpProtocol::Stream)
574 .map_err(|e| {
575 anyhow::anyhow!("Failed to create Stream router path for {}: {}", mcp_id, e)
576 })?;
577 let base_stream_path = mcp_stream_router_path.base_path;
578
579 DynamicRouterService::delete_route(&base_sse_path);
581 DynamicRouterService::delete_route(&base_stream_path);
582
583 if self.services.remove(mcp_id).is_some() {
586 info!(
587 "[RAII] 服务已从 map 移除,McpProcessGuard 将自动取消令牌: mcp_id={}",
588 mcp_id
589 );
590 } else {
591 debug!("[RAII] 服务不存在,跳过移除: mcp_id={}", mcp_id);
592 }
593
594 self.unregister_mcp_config(mcp_id).await;
596
597 GLOBAL_RESTART_TRACKER.clear_health_status(mcp_id);
599
600 info!("[RAII] MCP 服务资源清理完成: mcp_id={}", mcp_id);
601 Ok(())
602 }
603
604 pub async fn cleanup_all_resources(&self) -> Result<()> {
608 info!("[RAII] 开始清理所有 MCP 服务资源");
609
610 let mcp_ids: Vec<String> = self
612 .services
613 .iter()
614 .map(|entry| entry.key().clone())
615 .collect();
616
617 let count = mcp_ids.len();
618
619 for mcp_id in mcp_ids {
621 if let Err(e) = self.cleanup_resources(&mcp_id).await {
622 error!("[RAII] 清理资源失败: mcp_id={}, error={}", mcp_id, e);
623 }
625 }
626
627 info!("[RAII] 所有 MCP 服务资源清理完成,共清理 {} 个服务", count);
628 Ok(())
629 }
630
631 pub fn remove_service(&self, mcp_id: &str) -> bool {
636 if self.services.remove(mcp_id).is_some() {
637 info!("[RAII] 服务已移除,进程将自动清理: mcp_id={}", mcp_id);
638 true
639 } else {
640 debug!("[RAII] 服务不存在: mcp_id={}", mcp_id);
641 false
642 }
643 }
644
645 pub fn reset_probe_failures(&self, mcp_id: &str) {
649 if let Some(mut entry) = self.services.get_mut(mcp_id) {
650 entry.value_mut().status_mut().reset_probe_failures();
651 }
652 }
653
654 pub fn increment_probe_failures(&self, mcp_id: &str) -> u32 {
658 self.services
659 .get_mut(mcp_id)
660 .map(|mut entry| entry.value_mut().status_mut().increment_probe_failures())
661 .unwrap_or(0)
662 }
663
664 pub async fn cleanup_resources_for_restart(&self, mcp_id: &str) -> Result<()> {
669 info!("[RAII] 开始清理资源用于重启: mcp_id={}", mcp_id);
670
671 let mcp_sse_router_path = McpRouterPath::new(mcp_id.to_string(), McpProtocol::Sse)
673 .map_err(|e| {
674 anyhow::anyhow!("Failed to create SSE router path for {}: {}", mcp_id, e)
675 })?;
676 let base_sse_path = mcp_sse_router_path.base_path;
677
678 let mcp_stream_router_path = McpRouterPath::new(mcp_id.to_string(), McpProtocol::Stream)
679 .map_err(|e| {
680 anyhow::anyhow!("Failed to create Stream router path for {}: {}", mcp_id, e)
681 })?;
682 let base_stream_path = mcp_stream_router_path.base_path;
683
684 DynamicRouterService::delete_route(&base_sse_path);
686 DynamicRouterService::delete_route(&base_stream_path);
687
688 if self.services.remove(mcp_id).is_some() {
690 info!(
691 "[RAII] 服务已从 map 移除(用于重启),McpProcessGuard 将自动取消令牌: mcp_id={}",
692 mcp_id
693 );
694 } else {
695 debug!("[RAII] 服务不存在,跳过移除: mcp_id={}", mcp_id);
696 }
697
698 GLOBAL_RESTART_TRACKER.clear_health_status(mcp_id);
703
704 info!("[RAII] MCP 服务资源清理完成(用于重启): mcp_id={}", mcp_id);
705 Ok(())
706 }
707}
708
709pub struct McpConfigCache {
741 cache: Cache<String, McpConfig>,
742}
743
744impl McpConfigCache {
745 pub fn new() -> Self {
746 Self {
747 cache: Cache::builder()
748 .time_to_live(Duration::from_secs(24 * 60 * 60)) .max_capacity(1000) .build(),
751 }
752 }
753
754 pub async fn insert(&self, mcp_id: String, config: McpConfig) {
755 self.cache.insert(mcp_id.clone(), config).await;
756 info!("MCP 配置已缓存: {} (TTL: 24h)", mcp_id);
757 }
758
759 pub async fn get(&self, mcp_id: &str) -> Option<McpConfig> {
760 self.cache.get(mcp_id).await
761 }
762
763 pub async fn invalidate(&self, mcp_id: &str) {
764 self.cache.invalidate(mcp_id).await;
765 }
766
767 #[allow(dead_code)]
768 pub fn invalidate_all(&self) {
769 self.cache.invalidate_all();
770 }
771}
772
773impl Default for McpConfigCache {
774 fn default() -> Self {
775 Self::new()
776 }
777}
778
779pub static GLOBAL_MCP_CONFIG_CACHE: Lazy<McpConfigCache> = Lazy::new(McpConfigCache::default);
781
782pub struct RestartTracker {
798 last_restart: DashMap<String, Instant>,
800 health_status: DashMap<String, (bool, Instant)>,
802 startup_locks: DashMap<String, Arc<Mutex<()>>>,
804}
805
806impl RestartTracker {
807 pub fn new() -> Self {
808 Self {
809 last_restart: DashMap::new(),
810 health_status: DashMap::new(),
811 startup_locks: DashMap::new(),
812 }
813 }
814
815 pub fn get_cached_health_status(&self, mcp_id: &str) -> Option<bool> {
820 let cache_duration = Duration::from_secs(5); let now = Instant::now();
822
823 self.health_status.get(mcp_id).and_then(|entry| {
824 let (is_healthy, check_time) = *entry.value();
825 if now.duration_since(check_time) < cache_duration {
826 Some(is_healthy)
827 } else {
828 None
829 }
830 })
831 }
832
833 pub fn update_health_status(&self, mcp_id: &str, is_healthy: bool) {
835 self.health_status
836 .insert(mcp_id.to_string(), (is_healthy, Instant::now()));
837 }
838
839 pub fn clear_health_status(&self, mcp_id: &str) {
841 self.health_status.remove(mcp_id);
842 }
843
844 pub fn can_restart(&self, mcp_id: &str) -> bool {
851 let now = Instant::now();
852 let min_restart_interval = Duration::from_secs(30); if let Some(last_restart) = self.last_restart.get(mcp_id) {
856 let elapsed = now.duration_since(*last_restart);
857 if elapsed < min_restart_interval {
858 warn!(
859 "服务 {} 在冷却期内,距离上次重启仅 {} 秒,跳过重启",
860 mcp_id,
861 elapsed.as_secs()
862 );
863 return false;
864 }
865 }
866 true
868 }
869
870 pub fn record_restart(&self, mcp_id: &str) {
875 self.last_restart.insert(mcp_id.to_string(), Instant::now());
876 info!("服务启动成功,记录重启时间: {}", mcp_id);
877 }
878
879 #[allow(dead_code)]
884 pub fn clear_restart(&self, mcp_id: &str) {
885 self.last_restart.remove(mcp_id);
886 info!("已清除服务 {} 的重启时间戳", mcp_id);
887 }
888
889 pub fn try_acquire_startup_lock(&self, mcp_id: &str) -> Option<OwnedMutexGuard<()>> {
907 let lock = self
909 .startup_locks
910 .entry(mcp_id.to_string())
911 .or_insert_with(|| Arc::new(Mutex::new(())))
912 .clone();
913
914 match lock.try_lock_owned() {
916 Ok(guard) => Some(guard),
917 Err(_) => {
918 debug!("服务 {} 正在启动中,跳过本次启动", mcp_id);
920 None
921 }
922 }
923 }
924
925 #[allow(dead_code)]
930 pub fn cleanup_startup_lock(&self, mcp_id: &str) {
931 self.startup_locks.remove(mcp_id);
932 debug!("已清理服务 {} 的启动锁", mcp_id);
933 }
934}
935
936impl Default for RestartTracker {
937 fn default() -> Self {
938 Self::new()
939 }
940}
941
942pub static GLOBAL_RESTART_TRACKER: Lazy<RestartTracker> = Lazy::new(RestartTracker::default);
944
945pub fn get_proxy_manager() -> &'static ProxyHandlerManager {
947 &GLOBAL_PROXY_MANAGER
948}