1use axum::Router;
2use dashmap::DashMap;
3use log::{debug, error, info};
4use moka::future::Cache;
5use once_cell::sync::Lazy;
6use std::sync::Arc;
7use tokio::sync::{Mutex, OwnedMutexGuard};
8use tokio::time::{Duration, Instant};
9use tokio_util::sync::CancellationToken;
10use tracing::warn;
11
12use anyhow::Result;
13
14use crate::proxy::McpHandler;
15
16use super::{CheckMcpStatusResponseStatus, McpConfig, McpProtocol, McpRouterPath, McpType};
17
18pub static GLOBAL_ROUTES: Lazy<Arc<DashMap<String, Router>>> =
20 Lazy::new(|| Arc::new(DashMap::new()));
21
22pub static GLOBAL_PROXY_MANAGER: Lazy<ProxyHandlerManager> =
24 Lazy::new(ProxyHandlerManager::default);
25
26#[derive(Clone)]
28pub struct DynamicRouterService(pub McpProtocol);
29
30impl DynamicRouterService {
31 pub fn register_route(path: &str, handler: Router) {
33 debug!("=== Register Route ===");
34 debug!("Registration path: {}", path);
35 GLOBAL_ROUTES.insert(path.to_string(), handler);
36 debug!("=== Route registration completed ===");
37 }
38
39 pub fn delete_route(path: &str) {
41 debug!("=== Delete route ===");
42 debug!("Delete path: {}", path);
43 GLOBAL_ROUTES.remove(path);
44 debug!("=== Route deletion completed ===");
45 }
46
47 pub fn get_route(path: &str) -> Option<Router> {
49 let result = GLOBAL_ROUTES.get(path).map(|entry| entry.value().clone());
50 if result.is_some() {
51 debug!("get_route('{}') = Some(Router)", path);
52 } else {
53 debug!("get_route('{}') = None", path);
54 }
55 result
56 }
57
58 pub fn get_all_routes() -> Vec<String> {
60 GLOBAL_ROUTES
61 .iter()
62 .map(|entry| entry.key().clone())
63 .collect()
64 }
65}
66
67impl std::fmt::Debug for DynamicRouterService {
68 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
69 let routes = GLOBAL_ROUTES
70 .iter()
71 .map(|entry| entry.key().clone())
72 .collect::<Vec<_>>();
73 write!(f, "DynamicRouterService {{ routes: {routes:?} }}")
74 }
75}
76
77pub struct McpProcessGuard {
107 mcp_id: String,
108 cancellation_token: CancellationToken,
109}
110
111impl McpProcessGuard {
112 pub fn new(mcp_id: String, cancellation_token: CancellationToken) -> Self {
113 debug!("[RAII] Create process daemon: mcp_id={}", mcp_id);
114 Self {
115 mcp_id,
116 cancellation_token,
117 }
118 }
119
120 pub fn clone_token(&self) -> CancellationToken {
122 self.cancellation_token.clone()
123 }
124}
125
126impl Drop for McpProcessGuard {
127 fn drop(&mut self) {
128 info!(
129 "[RAII] The process daemon was dropped and canceled CancellationToken: mcp_id={}",
130 self.mcp_id
131 );
132 self.cancellation_token.cancel();
133 }
134}
135
136impl std::fmt::Debug for McpProcessGuard {
138 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
139 f.debug_struct("McpProcessGuard")
140 .field("mcp_id", &self.mcp_id)
141 .field("is_cancelled", &self.cancellation_token.is_cancelled())
142 .finish()
143 }
144}
145
146pub struct McpService {
157 process_guard: McpProcessGuard,
159 handler: Option<McpHandler>,
161 status: McpServiceStatusInfo,
163}
164
165#[derive(Debug, Clone)]
167pub struct McpServiceStatusInfo {
168 pub mcp_id: String,
169 pub mcp_type: McpType,
170 pub mcp_router_path: McpRouterPath,
171 pub check_mcp_status_response_status: CheckMcpStatusResponseStatus,
172 pub last_accessed: Instant,
173 pub mcp_config: Option<McpConfig>,
174 pub consecutive_probe_failures: u32,
176}
177
178impl McpServiceStatusInfo {
179 pub fn new(
180 mcp_id: String,
181 mcp_type: McpType,
182 mcp_router_path: McpRouterPath,
183 check_mcp_status_response_status: CheckMcpStatusResponseStatus,
184 ) -> Self {
185 Self {
186 mcp_id,
187 mcp_type,
188 mcp_router_path,
189 check_mcp_status_response_status,
190 last_accessed: Instant::now(),
191 mcp_config: None,
192 consecutive_probe_failures: 0,
193 }
194 }
195
196 pub fn update_last_accessed(&mut self) {
197 self.last_accessed = Instant::now();
198 }
199
200 pub fn reset_probe_failures(&mut self) {
202 self.consecutive_probe_failures = 0;
203 }
204
205 pub fn increment_probe_failures(&mut self) -> u32 {
207 self.consecutive_probe_failures += 1;
208 self.consecutive_probe_failures
209 }
210}
211
212impl McpService {
213 pub fn new(
221 mcp_id: String,
222 mcp_type: McpType,
223 mcp_router_path: McpRouterPath,
224 cancellation_token: CancellationToken,
225 ) -> Self {
226 let process_guard = McpProcessGuard::new(mcp_id.clone(), cancellation_token);
227 let status = McpServiceStatusInfo::new(
228 mcp_id,
229 mcp_type,
230 mcp_router_path,
231 CheckMcpStatusResponseStatus::Pending,
232 );
233 Self {
234 process_guard,
235 handler: None,
236 status,
237 }
238 }
239
240 pub fn set_handler(&mut self, handler: McpHandler) {
242 self.handler = Some(handler);
243 }
244
245 pub fn handler(&self) -> Option<&McpHandler> {
247 self.handler.as_ref()
248 }
249
250 pub fn status(&self) -> &McpServiceStatusInfo {
252 &self.status
253 }
254
255 pub fn status_mut(&mut self) -> &mut McpServiceStatusInfo {
257 &mut self.status
258 }
259
260 pub fn clone_token(&self) -> CancellationToken {
262 self.process_guard.clone_token()
263 }
264
265 pub fn update_status(&mut self, status: CheckMcpStatusResponseStatus) {
267 self.status.check_mcp_status_response_status = status;
268 }
269
270 pub fn update_last_accessed(&mut self) {
272 self.status.update_last_accessed();
273 }
274
275 pub fn set_mcp_config(&mut self, config: McpConfig) {
277 self.status.mcp_config = Some(config);
278 }
279}
280
281impl std::fmt::Debug for McpService {
282 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
283 f.debug_struct("McpService")
284 .field("process_guard", &self.process_guard)
285 .field("handler", &self.handler.is_some())
286 .field("status", &self.status)
287 .finish()
288 }
289}
290
291#[derive(Debug, Clone)]
299pub struct McpServiceStatus {
300 pub mcp_id: String,
301 pub mcp_type: McpType,
302 pub mcp_router_path: McpRouterPath,
303 pub cancellation_token: CancellationToken,
304 pub check_mcp_status_response_status: CheckMcpStatusResponseStatus,
305 pub last_accessed: Instant,
306 pub mcp_config: Option<McpConfig>,
307 pub consecutive_probe_failures: u32,
309}
310
311impl McpServiceStatus {
312 pub fn new(
313 mcp_id: String,
314 mcp_type: McpType,
315 mcp_router_path: McpRouterPath,
316 cancellation_token: CancellationToken,
317 check_mcp_status_response_status: CheckMcpStatusResponseStatus,
318 ) -> Self {
319 Self {
320 mcp_id,
321 mcp_type,
322 mcp_router_path,
323 cancellation_token,
324 check_mcp_status_response_status,
325 last_accessed: Instant::now(),
326 mcp_config: None,
327 consecutive_probe_failures: 0,
328 }
329 }
330
331 pub fn with_mcp_config(mut self, mcp_config: McpConfig) -> Self {
332 self.mcp_config = Some(mcp_config);
333 self
334 }
335
336 pub fn update_last_accessed(&mut self) {
337 self.last_accessed = Instant::now();
338 }
339}
340
341#[derive(Debug)]
351pub struct ProxyHandlerManager {
352 services: DashMap<String, McpService>,
354}
355
356impl Default for ProxyHandlerManager {
357 fn default() -> Self {
358 ProxyHandlerManager {
359 services: DashMap::new(),
360 }
361 }
362}
363
364impl ProxyHandlerManager {
365 pub fn add_mcp_service(
369 &self,
370 mcp_id: String,
371 mcp_type: McpType,
372 mcp_router_path: McpRouterPath,
373 cancellation_token: CancellationToken,
374 ) {
375 let service = McpService::new(
376 mcp_id.clone(),
377 mcp_type,
378 mcp_router_path,
379 cancellation_token,
380 );
381
382 if let Some(old_service) = self.services.insert(mcp_id.clone(), service) {
384 info!(
385 "[RAII] Overwrite existing services, old services will be automatically cleaned up: mcp_id={}",
386 mcp_id
387 );
388 drop(old_service);
389 }
390 }
391
392 pub fn add_mcp_service_status_and_proxy(
398 &self,
399 mcp_service_status: McpServiceStatus,
400 proxy_handler: Option<McpHandler>,
401 ) {
402 let mcp_id = mcp_service_status.mcp_id.clone();
403
404 let mut service = McpService::new(
407 mcp_id.clone(),
408 mcp_service_status.mcp_type,
409 mcp_service_status.mcp_router_path,
410 mcp_service_status.cancellation_token,
411 );
412
413 service.status_mut().check_mcp_status_response_status =
415 mcp_service_status.check_mcp_status_response_status;
416
417 if let Some(config) = mcp_service_status.mcp_config {
419 service.set_mcp_config(config);
420 }
421
422 if let Some(handler) = proxy_handler {
424 service.set_handler(handler);
425 }
426
427 if let Some(old_service) = self.services.insert(mcp_id.clone(), service) {
429 info!(
430 "[RAII] Overwrite existing services, old services will be automatically cleaned up: mcp_id={}",
431 mcp_id
432 );
433 drop(old_service);
435 }
436 }
437
438 pub fn get_all_mcp_service_status(&self) -> Vec<McpServiceStatus> {
443 let keys: Vec<String> = self
445 .services
446 .iter()
447 .map(|entry| entry.key().clone())
448 .collect();
449
450 keys.into_iter()
452 .filter_map(|mcp_id| self.get_mcp_service_status(&mcp_id))
453 .collect()
454 }
455
456 pub fn get_mcp_service_status(&self, mcp_id: &str) -> Option<McpServiceStatus> {
458 self.services.get(mcp_id).map(|entry| {
459 let service = entry.value();
460 let status = service.status();
461 McpServiceStatus {
462 mcp_id: status.mcp_id.clone(),
463 mcp_type: status.mcp_type.clone(),
464 mcp_router_path: status.mcp_router_path.clone(),
465 cancellation_token: service.clone_token(),
466 check_mcp_status_response_status: status.check_mcp_status_response_status.clone(),
467 last_accessed: status.last_accessed,
468 mcp_config: status.mcp_config.clone(),
469 consecutive_probe_failures: status.consecutive_probe_failures,
470 }
471 })
472 }
473
474 pub fn update_last_accessed(&self, mcp_id: &str) {
478 self.services
479 .entry(mcp_id.to_string())
480 .and_modify(|service| service.update_last_accessed());
481 }
482
483 pub fn update_mcp_service_status(&self, mcp_id: &str, status: CheckMcpStatusResponseStatus) {
487 self.services
488 .entry(mcp_id.to_string())
489 .and_modify(|service| service.update_status(status));
490 }
491
492 pub fn get_proxy_handler(&self, mcp_id: &str) -> Option<McpHandler> {
494 self.services
495 .get(mcp_id)
496 .and_then(|entry| entry.value().handler().cloned())
497 }
498
499 pub fn get_mcp_config(&self, mcp_id: &str) -> Option<McpConfig> {
501 self.services
502 .get(mcp_id)
503 .and_then(|entry| entry.value().status().mcp_config.clone())
504 }
505
506 pub fn add_proxy_handler(&self, mcp_id: &str, proxy_handler: McpHandler) {
510 match self.services.entry(mcp_id.to_string()) {
511 dashmap::mapref::entry::Entry::Occupied(mut entry) => {
512 entry.get_mut().set_handler(proxy_handler);
513 }
514 dashmap::mapref::entry::Entry::Vacant(_) => {
515 warn!(
516 "[RAII] Trying to add handler to non-existent service: mcp_id={}",
517 mcp_id
518 );
519 }
520 }
521 }
522
523 pub fn contains_service(&self, mcp_id: &str) -> bool {
525 self.services.contains_key(mcp_id)
526 }
527
528 pub fn service_count(&self) -> usize {
530 self.services.len()
531 }
532
533 pub async fn register_mcp_config(&self, mcp_id: &str, config: McpConfig) {
535 GLOBAL_MCP_CONFIG_CACHE
536 .insert(mcp_id.to_string(), config)
537 .await;
538 info!("MCP configuration registered in cache: {}", mcp_id);
539 }
540
541 pub async fn get_mcp_config_from_cache(&self, mcp_id: &str) -> Option<McpConfig> {
543 if let Some(config) = GLOBAL_MCP_CONFIG_CACHE.get(mcp_id).await {
544 debug!("Get MCP configuration from cache: {}", mcp_id);
545 Some(config)
546 } else {
547 debug!("MCP configuration not found in cache: {}", mcp_id);
548 None
549 }
550 }
551
552 pub async fn unregister_mcp_config(&self, mcp_id: &str) {
554 GLOBAL_MCP_CONFIG_CACHE.invalidate(mcp_id).await;
555 info!("MCP configuration removed from cache: {}", mcp_id);
556 }
557
558 pub async fn cleanup_resources(&self, mcp_id: &str) -> Result<()> {
567 info!("[RAII] Start cleaning up resources: mcp_id={}", mcp_id);
568
569 let mcp_sse_router_path = McpRouterPath::new(mcp_id.to_string(), McpProtocol::Sse)
571 .map_err(|e| {
572 anyhow::anyhow!("Failed to create SSE router path for {}: {}", mcp_id, e)
573 })?;
574 let base_sse_path = mcp_sse_router_path.base_path;
575
576 let mcp_stream_router_path = McpRouterPath::new(mcp_id.to_string(), McpProtocol::Stream)
577 .map_err(|e| {
578 anyhow::anyhow!("Failed to create Stream router path for {}: {}", mcp_id, e)
579 })?;
580 let base_stream_path = mcp_stream_router_path.base_path;
581
582 DynamicRouterService::delete_route(&base_sse_path);
584 DynamicRouterService::delete_route(&base_stream_path);
585
586 if self.services.remove(mcp_id).is_some() {
589 info!(
590 "[RAII] The service has been removed from the map, McpProcessGuard will automatically cancel the token: mcp_id={}",
591 mcp_id
592 );
593 } else {
594 debug!(
595 "[RAII] Service does not exist, skip removal: mcp_id={}",
596 mcp_id
597 );
598 }
599
600 self.unregister_mcp_config(mcp_id).await;
602
603 GLOBAL_RESTART_TRACKER.clear_health_status(mcp_id);
605
606 info!(
607 "[RAII] MCP service resource cleanup completed: mcp_id={}",
608 mcp_id
609 );
610 Ok(())
611 }
612
613 pub async fn cleanup_all_resources(&self) -> Result<()> {
617 info!("[RAII] Start cleaning up all MCP service resources");
618
619 let mcp_ids: Vec<String> = self
621 .services
622 .iter()
623 .map(|entry| entry.key().clone())
624 .collect();
625
626 let count = mcp_ids.len();
627
628 for mcp_id in mcp_ids {
630 if let Err(e) = self.cleanup_resources(&mcp_id).await {
631 error!(
632 "[RAII] Failed to clean up resources: mcp_id={}, error={}",
633 mcp_id, e
634 );
635 }
637 }
638
639 info!(
640 "[RAII] All MCP service resources have been cleaned up, and a total of {} services have been cleaned up.",
641 count
642 );
643 Ok(())
644 }
645
646 pub fn remove_service(&self, mcp_id: &str) -> bool {
651 if self.services.remove(mcp_id).is_some() {
652 info!(
653 "[RAII] The service has been removed and the process will be automatically cleaned up: mcp_id={}",
654 mcp_id
655 );
656 true
657 } else {
658 debug!("[RAII] Service does not exist: mcp_id={}", mcp_id);
659 false
660 }
661 }
662
663 pub fn reset_probe_failures(&self, mcp_id: &str) {
667 if let Some(mut entry) = self.services.get_mut(mcp_id) {
668 entry.value_mut().status_mut().reset_probe_failures();
669 }
670 }
671
672 pub fn increment_probe_failures(&self, mcp_id: &str) -> u32 {
676 self.services
677 .get_mut(mcp_id)
678 .map(|mut entry| entry.value_mut().status_mut().increment_probe_failures())
679 .unwrap_or(0)
680 }
681
682 pub async fn cleanup_resources_for_restart(&self, mcp_id: &str) -> Result<()> {
687 info!(
688 "[RAII] Start cleaning up resources for restart: mcp_id={}",
689 mcp_id
690 );
691
692 let mcp_sse_router_path = McpRouterPath::new(mcp_id.to_string(), McpProtocol::Sse)
694 .map_err(|e| {
695 anyhow::anyhow!("Failed to create SSE router path for {}: {}", mcp_id, e)
696 })?;
697 let base_sse_path = mcp_sse_router_path.base_path;
698
699 let mcp_stream_router_path = McpRouterPath::new(mcp_id.to_string(), McpProtocol::Stream)
700 .map_err(|e| {
701 anyhow::anyhow!("Failed to create Stream router path for {}: {}", mcp_id, e)
702 })?;
703 let base_stream_path = mcp_stream_router_path.base_path;
704
705 DynamicRouterService::delete_route(&base_sse_path);
707 DynamicRouterService::delete_route(&base_stream_path);
708
709 if self.services.remove(mcp_id).is_some() {
711 info!(
712 "[RAII] The service has been removed from the map (for restart), McpProcessGuard will automatically cancel the token: mcp_id={}",
713 mcp_id
714 );
715 } else {
716 debug!(
717 "[RAII] Service does not exist, skip removal: mcp_id={}",
718 mcp_id
719 );
720 }
721
722 GLOBAL_RESTART_TRACKER.clear_health_status(mcp_id);
727
728 info!(
729 "[RAII] MCP service resource cleanup completed (for restart): mcp_id={}",
730 mcp_id
731 );
732 Ok(())
733 }
734}
735
736pub struct McpConfigCache {
768 cache: Cache<String, McpConfig>,
769}
770
771impl McpConfigCache {
772 pub fn new() -> Self {
773 Self {
774 cache: Cache::builder()
775 .time_to_live(Duration::from_secs(24 * 60 * 60)) .max_capacity(1000) .build(),
778 }
779 }
780
781 pub async fn insert(&self, mcp_id: String, config: McpConfig) {
782 self.cache.insert(mcp_id.clone(), config).await;
783 info!("MCP configuration cached: {} (TTL: 24h)", mcp_id);
784 }
785
786 pub async fn get(&self, mcp_id: &str) -> Option<McpConfig> {
787 self.cache.get(mcp_id).await
788 }
789
790 pub async fn invalidate(&self, mcp_id: &str) {
791 self.cache.invalidate(mcp_id).await;
792 }
793
794 #[allow(dead_code)]
795 pub fn invalidate_all(&self) {
796 self.cache.invalidate_all();
797 }
798}
799
800impl Default for McpConfigCache {
801 fn default() -> Self {
802 Self::new()
803 }
804}
805
806pub static GLOBAL_MCP_CONFIG_CACHE: Lazy<McpConfigCache> = Lazy::new(McpConfigCache::default);
808
809pub struct RestartTracker {
825 last_restart: DashMap<String, Instant>,
827 health_status: DashMap<String, (bool, Instant)>,
829 startup_locks: DashMap<String, Arc<Mutex<()>>>,
831}
832
833impl RestartTracker {
834 pub fn new() -> Self {
835 Self {
836 last_restart: DashMap::new(),
837 health_status: DashMap::new(),
838 startup_locks: DashMap::new(),
839 }
840 }
841
842 pub fn get_cached_health_status(&self, mcp_id: &str) -> Option<bool> {
847 let cache_duration = Duration::from_secs(5); let now = Instant::now();
849
850 self.health_status.get(mcp_id).and_then(|entry| {
851 let (is_healthy, check_time) = *entry.value();
852 if now.duration_since(check_time) < cache_duration {
853 Some(is_healthy)
854 } else {
855 None
856 }
857 })
858 }
859
860 pub fn update_health_status(&self, mcp_id: &str, is_healthy: bool) {
862 self.health_status
863 .insert(mcp_id.to_string(), (is_healthy, Instant::now()));
864 }
865
866 pub fn clear_health_status(&self, mcp_id: &str) {
868 self.health_status.remove(mcp_id);
869 }
870
871 pub fn can_restart(&self, mcp_id: &str) -> bool {
878 let now = Instant::now();
879 let min_restart_interval = Duration::from_secs(30); if let Some(last_restart) = self.last_restart.get(mcp_id) {
883 let elapsed = now.duration_since(*last_restart);
884 if elapsed < min_restart_interval {
885 warn!(
886 "Service {} is in the cooldown period and is only {} seconds since its last restart. Restart is skipped.",
887 mcp_id,
888 elapsed.as_secs()
889 );
890 return false;
891 }
892 }
893 true
895 }
896
897 pub fn record_restart(&self, mcp_id: &str) {
902 self.last_restart.insert(mcp_id.to_string(), Instant::now());
903 info!(
904 "The service started successfully and the restart time was recorded: {}",
905 mcp_id
906 );
907 }
908
909 #[allow(dead_code)]
914 pub fn clear_restart(&self, mcp_id: &str) {
915 self.last_restart.remove(mcp_id);
916 info!("Restart timestamp cleared for service {}", mcp_id);
917 }
918
919 pub fn try_acquire_startup_lock(&self, mcp_id: &str) -> Option<OwnedMutexGuard<()>> {
937 let lock = self
939 .startup_locks
940 .entry(mcp_id.to_string())
941 .or_insert_with(|| Arc::new(Mutex::new(())))
942 .clone();
943
944 match lock.try_lock_owned() {
946 Ok(guard) => Some(guard),
947 Err(_) => {
948 debug!("Service {} is starting, skip this startup", mcp_id);
950 None
951 }
952 }
953 }
954
955 #[allow(dead_code)]
960 pub fn cleanup_startup_lock(&self, mcp_id: &str) {
961 self.startup_locks.remove(mcp_id);
962 debug!("Cleaned startup lock for service {}", mcp_id);
963 }
964}
965
966impl Default for RestartTracker {
967 fn default() -> Self {
968 Self::new()
969 }
970}
971
972pub static GLOBAL_RESTART_TRACKER: Lazy<RestartTracker> = Lazy::new(RestartTracker::default);
974
975pub fn get_proxy_manager() -> &'static ProxyHandlerManager {
977 &GLOBAL_PROXY_MANAGER
978}