Skip to main content

mcp_stdio_proxy/server/
mcp_dynamic_router_service.rs

1use std::{
2    convert::Infallible,
3    task::{Context, Poll},
4};
5
6use axum::{
7    body::Body,
8    extract::Request,
9    response::{IntoResponse, Response},
10};
11use futures::future::BoxFuture;
12use log::{debug, error, info, warn};
13use tower::Service;
14
15use crate::{
16    DynamicRouterService, get_proxy_manager, mcp_start_task,
17    model::{
18        CheckMcpStatusResponseStatus, GLOBAL_RESTART_TRACKER, HttpResult, McpConfig, McpRouterPath,
19        McpType,
20    },
21    server::middlewares::extract_trace_id,
22};
23
24impl Service<Request<Body>> for DynamicRouterService {
25    type Response = Response;
26    type Error = Infallible;
27    type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
28
29    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
30        Poll::Ready(Ok(()))
31    }
32
33    fn call(&mut self, req: Request<Body>) -> Self::Future {
34        let path = req.uri().path().to_string();
35        let method = req.method().clone();
36        let headers = req.headers().clone();
37
38        // DEBUG: 详细路径解析日志
39        debug!("=== Path analysis begins ===");
40        debug!("Original request path: {}", path);
41        debug!("Path contains wildcard parameters: {:?}", req.extensions());
42
43        // 提取 trace_id
44        let trace_id = extract_trace_id();
45
46        // 创建根 span (使用 debug_span 减少日志量)
47        let span = tracing::debug_span!(
48            "DynamicRouterService",
49            otel.name = "HTTP Request",
50            http.method = %method,
51            http.route = %path,
52            http.url = %req.uri(),
53            mcp.protocol = format!("{:?}", self.0),
54            trace_id = %trace_id,
55        );
56
57        // 记录请求头信息
58        if let Some(content_type) = headers.get("content-type") {
59            span.record("http.request.content_type", format!("{:?}", content_type));
60        }
61        if let Some(content_length) = headers.get("content-length") {
62            span.record(
63                "http.request.content_length",
64                format!("{:?}", content_length),
65            );
66        }
67
68        debug!("Request path: {path}");
69
70        // 解析路由路径
71        let mcp_router_path = McpRouterPath::from_url(&path);
72
73        match mcp_router_path {
74            Some(mcp_router_path) => {
75                let mcp_id = mcp_router_path.mcp_id.clone();
76                let base_path = mcp_router_path.base_path.clone();
77
78                span.record("mcp.id", &mcp_id);
79                span.record("mcp.base_path", &base_path);
80
81                debug!("=== Path analysis results ===");
82                debug!("Parsed mcp_id: {}", mcp_id);
83                debug!("Parsed base_path: {}", base_path);
84                debug!("Request path: {} vs base_path: {}", path, base_path);
85                debug!("=== Path analysis ends ===");
86
87                Box::pin(async move {
88                    let _guard = span.enter();
89
90                    // 先尝试查找已注册的路由
91                    debug!("===Route lookup process ===");
92                    debug!("Find base_path: '{}'", base_path);
93
94                    if let Some(router_entry) = DynamicRouterService::get_route(&base_path) {
95                        debug!(
96                            "✅ Find the registered route: base_path={}, path={}",
97                            base_path, path
98                        );
99
100                        // ===== 检查后端健康状态 =====
101                        let mcp_id_for_check = McpRouterPath::from_url(&path);
102                        if let Some(router_path) = mcp_id_for_check {
103                            let proxy_manager = get_proxy_manager();
104
105                            // ===== 首先检查服务状态 =====
106                            // 如果服务状态是 Pending,说明服务正在初始化中(uvx/npx 下载中)
107                            // 此时不应该做健康检查,应该等待
108                            if let Some(service_status) =
109                                proxy_manager.get_mcp_service_status(&router_path.mcp_id)
110                            {
111                                match &service_status.check_mcp_status_response_status {
112                                    CheckMcpStatusResponseStatus::Pending => {
113                                        debug!(
114                                            "[MCP status check] mcp_id={} The status is Pending, the service is being initialized, and 503 is returned.",
115                                            router_path.mcp_id
116                                        );
117                                        let message = format!(
118                                            "服务 {} 正在初始化中,请稍后再试",
119                                            router_path.mcp_id
120                                        );
121                                        let http_result: HttpResult<String> =
122                                            HttpResult::error("0003", &message, None);
123                                        return Ok(http_result.into_response());
124                                    }
125                                    CheckMcpStatusResponseStatus::Error(err) => {
126                                        // Error 状态:只清理,不重启
127                                        // 避免有问题的 MCP 服务无限重启循环
128                                        warn!(
129                                            "[MCP status check] mcp_id={} status is Error: {}, clean up resources and return error",
130                                            router_path.mcp_id, err
131                                        );
132                                        // 清理资源
133                                        if let Err(e) = proxy_manager
134                                            .cleanup_resources(&router_path.mcp_id)
135                                            .await
136                                        {
137                                            error!(
138                                                "[MCP status check] mcp_id={} Failed to clean up resources: {}",
139                                                router_path.mcp_id, e
140                                            );
141                                        }
142                                        // 返回错误,不尝试重启
143                                        let message = format!(
144                                            "服务 {} 启动失败: {}",
145                                            router_path.mcp_id, err
146                                        );
147                                        let http_result: HttpResult<String> =
148                                            HttpResult::error("0005", &message, None);
149                                        return Ok(http_result.into_response());
150                                    }
151                                    CheckMcpStatusResponseStatus::Ready => {
152                                        debug!(
153                                            "[MCP status check] mcp_id={} status is Ready, continue to check the backend health status",
154                                            router_path.mcp_id
155                                        );
156                                    }
157                                }
158                            }
159
160                            // ===== 检查启动锁状态 =====
161                            // 如果锁被占用,说明服务正在启动中
162                            let startup_guard = GLOBAL_RESTART_TRACKER
163                                .try_acquire_startup_lock(&router_path.mcp_id);
164
165                            if startup_guard.is_none() {
166                                // 锁被占用,服务正在启动中,返回 503
167                                debug!(
168                                    "[Startup lock check] mcp_id={} The startup lock is occupied, the service is starting, and 503 is returned.",
169                                    router_path.mcp_id
170                                );
171                                span.record("mcp.startup_in_progress", true);
172                                let message =
173                                    format!("服务 {} 正在启动中,请稍后再试", router_path.mcp_id);
174                                let http_result: HttpResult<String> =
175                                    HttpResult::error("0003", &message, None);
176                                span.record("http.response.status_code", 503u16);
177                                return Ok(http_result.into_response());
178                            }
179
180                            // 获取到锁,现在可以安全地检查健康状态
181                            let _startup_guard = startup_guard.unwrap();
182                            debug!(
183                                "[Start lock check] mcp_id={} Successfully obtained the startup lock and started health check",
184                                router_path.mcp_id
185                            );
186
187                            if let Some(handler) =
188                                proxy_manager.get_proxy_handler(&router_path.mcp_id)
189                            {
190                                // ===== 健康检查(带缓存)=====
191                                let is_healthy = if let Some(cached) = GLOBAL_RESTART_TRACKER
192                                    .get_cached_health_status(&router_path.mcp_id)
193                                {
194                                    debug!(
195                                        "[Health Check] mcp_id={} Use cache status: is_healthy={}",
196                                        router_path.mcp_id, cached
197                                    );
198                                    cached
199                                } else {
200                                    debug!(
201                                        "[Health Check] mcp_id={} Cache miss, start actual health check...",
202                                        router_path.mcp_id
203                                    );
204                                    let status = handler.is_mcp_server_ready().await;
205                                    GLOBAL_RESTART_TRACKER
206                                        .update_health_status(&router_path.mcp_id, status);
207                                    debug!(
208                                        "[Health Check] mcp_id={} Actual health check result: is_healthy={}",
209                                        router_path.mcp_id, status
210                                    );
211                                    status
212                                };
213
214                                if is_healthy {
215                                    debug!(
216                                        "[Health Check] mcp_id={} The backend service is normal, release the lock and use routing",
217                                        router_path.mcp_id
218                                    );
219                                    // 释放锁,使用路由
220                                    drop(_startup_guard);
221                                    debug!("=== Route search ended (successful) ===");
222                                    return handle_request_with_router(req, router_entry, &path)
223                                        .await;
224                                }
225
226                                // 不健康,获取服务类型以决定是否重启
227                                let mcp_type = proxy_manager
228                                    .get_mcp_service_status(&router_path.mcp_id)
229                                    .map(|s| s.mcp_type.clone());
230
231                                // 清理资源
232                                warn!(
233                                    "[Health check] mcp_id={} The backend service is unhealthy, clean up resources.",
234                                    router_path.mcp_id
235                                );
236                                if let Err(e) =
237                                    proxy_manager.cleanup_resources(&router_path.mcp_id).await
238                                {
239                                    error!(
240                                        "[Clean up resources] mcp_id={} Failed to clean up resources: error={}",
241                                        router_path.mcp_id, e
242                                    );
243                                } else {
244                                    debug!(
245                                        "[Clean up resources] mcp_id={} Clean up resources successfully",
246                                        router_path.mcp_id
247                                    );
248                                }
249
250                                // OneShot 类型:只清理,不重启
251                                // OneShot 服务执行完成后进程会退出,这是正常行为,不应该自动重启
252                                // 用户需要通过 check_status 接口显式启动新的 OneShot 服务
253                                if matches!(mcp_type, Some(McpType::OneShot)) {
254                                    debug!(
255                                        "[Health Check] mcp_id={} is a OneShot type, does not automatically restart, and returns that the service has ended",
256                                        router_path.mcp_id
257                                    );
258                                    let message = format!(
259                                        "OneShot 服务 {} 已结束,请重新启动",
260                                        router_path.mcp_id
261                                    );
262                                    let http_result: HttpResult<String> =
263                                        HttpResult::error("0006", &message, None);
264                                    return Ok(http_result.into_response());
265                                }
266
267                                // Persistent 类型:清理后重启
268                                info!(
269                                    "[Restart process] mcp_id={} is Persistent type, start to restart the service",
270                                    router_path.mcp_id
271                                );
272
273                                // 从配置获取 mcp_config 并启动服务
274                                // 优先从请求 header 获取配置
275                                if let Some(mcp_config) =
276                                    req.extensions().get::<McpConfig>().cloned()
277                                    && mcp_config.mcp_json_config.is_some()
278                                {
279                                    info!(
280                                        "[Restart process] mcp_id={} Use the request header to configure the restart service",
281                                        mcp_config.mcp_id
282                                    );
283                                    proxy_manager
284                                        .register_mcp_config(&mcp_config.mcp_id, mcp_config.clone())
285                                        .await;
286                                    return start_mcp_and_handle_request(req, mcp_config).await;
287                                }
288
289                                // 从缓存获取配置
290                                if let Some(mcp_config) = proxy_manager
291                                    .get_mcp_config_from_cache(&router_path.mcp_id)
292                                    .await
293                                {
294                                    info!(
295                                        "[Restart process] mcp_id={} Restart the service using cache configuration",
296                                        router_path.mcp_id
297                                    );
298                                    return start_mcp_and_handle_request(req, mcp_config).await;
299                                }
300
301                                // 无法获取配置
302                                warn!(
303                                    "[Restart Process] mcp_id={} Unable to obtain the configuration and unable to restart the service",
304                                    router_path.mcp_id
305                                );
306                                let message =
307                                    format!("服务 {} 不健康且无法获取配置", router_path.mcp_id);
308                                let http_result: HttpResult<String> =
309                                    HttpResult::error("0004", &message, None);
310                                return Ok(http_result.into_response());
311                            } else {
312                                // handler 不存在,但路由存在
313                                // 检查服务类型,OneShot 不自动重启
314                                let mcp_type = proxy_manager
315                                    .get_mcp_service_status(&router_path.mcp_id)
316                                    .map(|s| s.mcp_type.clone());
317
318                                if matches!(mcp_type, Some(McpType::OneShot)) {
319                                    debug!(
320                                        "[Service Check] mcp_id={} is OneShot type and the handler does not exist, so it will not restart automatically.",
321                                        router_path.mcp_id
322                                    );
323                                    // 清理残留状态
324                                    if let Err(e) =
325                                        proxy_manager.cleanup_resources(&router_path.mcp_id).await
326                                    {
327                                        error!(
328                                            "[Clean up resources] mcp_id={} Failed to clean up resources: {}",
329                                            router_path.mcp_id, e
330                                        );
331                                    }
332                                    let message = format!(
333                                        "OneShot 服务 {} 已结束,请重新启动",
334                                        router_path.mcp_id
335                                    );
336                                    let http_result: HttpResult<String> =
337                                        HttpResult::error("0006", &message, None);
338                                    return Ok(http_result.into_response());
339                                }
340
341                                // Persistent 类型:继续进入启动流程
342                                warn!(
343                                    "The route exists but the handler does not exist. Enter the restart process: base_path={}",
344                                    base_path
345                                );
346                            }
347                        } else {
348                            // 无法解析路由路径,直接使用路由
349                            debug!("=== Route search ended (successful) ===");
350                            return handle_request_with_router(req, router_entry, &path).await;
351                        }
352                    } else {
353                        debug!(
354                            "❌ No registered route found: base_path='{}', path='{}'",
355                            base_path, path
356                        );
357
358                        // 显示所有已注册的路由
359                        let all_routes = DynamicRouterService::get_all_routes();
360                        debug!("Currently registered route: {:?}", all_routes);
361                        debug!("=== Route search ended (failed) ===");
362                    }
363
364                    // 未找到路由,尝试启动服务
365                    warn!(
366                        "No matching path found, try to start the service: base_path={base_path}, path={path}"
367                    );
368                    span.record("error.route_not_found", true);
369
370                    // ===== 提前解析 mcp_id 用于配置获取 =====
371                    let mcp_router_path_for_config = McpRouterPath::from_url(&path);
372
373                    // ===== 配置获取优先级 =====
374                    let proxy_manager = get_proxy_manager();
375
376                    // 优先级 1: 从请求 header 中获取配置(最新)
377                    if let Some(mcp_config) = req.extensions().get::<McpConfig>().cloned()
378                        && mcp_config.mcp_json_config.is_some()
379                    {
380                        // 检查重启限制(防止无限循环)
381                        if !GLOBAL_RESTART_TRACKER.can_restart(&mcp_config.mcp_id) {
382                            warn!(
383                                "Service {} skips startup during restart cooldown period",
384                                mcp_config.mcp_id
385                            );
386                            span.record("error.restart_in_cooldown", true);
387                            let message =
388                                format!("服务 {} 在重启冷却期内,请稍后再试", mcp_config.mcp_id);
389                            let http_result: HttpResult<String> =
390                                HttpResult::error("0002", &message, None);
391                            span.record("http.response.status_code", 429u16); // Too Many Requests
392                            return Ok(http_result.into_response());
393                        }
394
395                        // 尝试获取启动锁,防止并发启动同一服务
396                        let _startup_guard = match GLOBAL_RESTART_TRACKER
397                            .try_acquire_startup_lock(&mcp_config.mcp_id)
398                        {
399                            Some(guard) => guard,
400                            None => {
401                                warn!(
402                                    "Service {} is starting, skip this startup",
403                                    mcp_config.mcp_id
404                                );
405                                span.record("error.startup_in_progress", true);
406                                let message =
407                                    format!("服务 {} 正在启动中,请稍后再试", mcp_config.mcp_id);
408                                let http_result: HttpResult<String> =
409                                    HttpResult::error("0003", &message, None);
410                                span.record("http.response.status_code", 503u16); // Service Unavailable
411                                return Ok(http_result.into_response());
412                            }
413                        };
414
415                        info!(
416                            "Use request header configuration to start the service: {}",
417                            mcp_config.mcp_id
418                        );
419                        // 同时更新缓存
420                        proxy_manager
421                            .register_mcp_config(&mcp_config.mcp_id, mcp_config.clone())
422                            .await;
423
424                        // _startup_guard 会在作用域结束时自动释放
425                        return start_mcp_and_handle_request(req, mcp_config).await;
426                    }
427
428                    // 优先级 2: 从 moka 缓存中获取配置(兜底)
429                    // 注意:OneShot 类型不从缓存自动启动,需要用户显式请求(带 header 配置)
430                    if let Some(mcp_id_for_cache) =
431                        mcp_router_path_for_config.as_ref().map(|p| &p.mcp_id)
432                        && let Some(mcp_config) = proxy_manager
433                            .get_mcp_config_from_cache(mcp_id_for_cache)
434                            .await
435                    {
436                        // OneShot 类型不从缓存自动启动
437                        // 避免已回收的 OneShot 服务被意外重启
438                        if matches!(mcp_config.mcp_type, McpType::OneShot) {
439                            info!(
440                                "[Startup check] mcp_id={} is a OneShot type, does not automatically start from the cache, and requires an explicit request from the user",
441                                mcp_id_for_cache
442                            );
443                            let message = format!(
444                                "OneShot 服务 {} 需要通过 check_status 接口启动",
445                                mcp_id_for_cache
446                            );
447                            let http_result: HttpResult<String> =
448                                HttpResult::error("0007", &message, None);
449                            return Ok(http_result.into_response());
450                        }
451
452                        // 检查重启限制(防止无限循环)
453                        if !GLOBAL_RESTART_TRACKER.can_restart(mcp_id_for_cache) {
454                            warn!(
455                                "Service {} skips startup during restart cooldown period",
456                                mcp_id_for_cache
457                            );
458                            span.record("error.restart_in_cooldown", true);
459                            let message =
460                                format!("服务 {} 在重启冷却期内,请稍后再试", mcp_id_for_cache);
461                            let http_result: HttpResult<String> =
462                                HttpResult::error("0002", &message, None);
463                            span.record("http.response.status_code", 429u16); // Too Many Requests
464                            return Ok(http_result.into_response());
465                        }
466
467                        // 尝试获取启动锁,防止并发启动同一服务
468                        let _startup_guard = match GLOBAL_RESTART_TRACKER
469                            .try_acquire_startup_lock(mcp_id_for_cache)
470                        {
471                            Some(guard) => guard,
472                            None => {
473                                warn!(
474                                    "Service {} is starting, skip this startup",
475                                    mcp_id_for_cache
476                                );
477                                span.record("error.startup_in_progress", true);
478                                let message =
479                                    format!("服务 {} 正在启动中,请稍后再试", mcp_id_for_cache);
480                                let http_result: HttpResult<String> =
481                                    HttpResult::error("0003", &message, None);
482                                span.record("http.response.status_code", 503u16); // Service Unavailable
483                                return Ok(http_result.into_response());
484                            }
485                        };
486
487                        info!(
488                            "Start the service using cached configuration: {}",
489                            mcp_id_for_cache
490                        );
491                        // _startup_guard 会在作用域结束时自动释放
492                        return start_mcp_and_handle_request(req, mcp_config).await;
493                    }
494
495                    // 优先级 3: 无法获取配置,返回错误
496                    warn!(
497                        "No matching path was found, and the configuration was not obtained, so the MCP service could not be started: {path}"
498                    );
499                    span.record("error.mcp_config_missing", true);
500
501                    let message =
502                        format!("未找到匹配的路径,且未获取到配置,无法启动MCP服务: {path}");
503                    let http_result: HttpResult<String> = HttpResult::error("0001", &message, None);
504                    span.record("http.response.status_code", 404u16);
505                    span.record("error.message", &message);
506                    Ok(http_result.into_response())
507                })
508            }
509            None => {
510                warn!("Request path resolution failed: {path}");
511                span.record("error.path_parse_failed", true);
512
513                let message = format!("请求路径解析失败: {path}");
514                let http_result: HttpResult<String> = HttpResult::error("0001", &message, None);
515                Box::pin(async move {
516                    let _guard = span.enter();
517                    span.record("http.response.status_code", 400u16);
518                    span.record("error.message", &message);
519                    Ok(http_result.into_response())
520                })
521            }
522        }
523    }
524}
525
526/// 使用给定的路由处理请求
527async fn handle_request_with_router(
528    req: Request<Body>,
529    router_entry: axum::Router,
530    path: &str,
531) -> Result<Response, Infallible> {
532    // 获取匹配路径的Router,并处理请求
533    let trace_id = extract_trace_id();
534
535    let method = req.method().clone();
536    let uri = req.uri().clone();
537
538    info!(
539        "[handle_request_with_router] Handle request: {} {}",
540        method, path
541    );
542
543    // 记录请求头中的关键信息
544    if let Some(content_type) = req.headers().get("content-type")
545        && let Ok(content_type_str) = content_type.to_str()
546    {
547        debug!(
548            "[handle_request_with_router] Content-Type: {}",
549            content_type_str
550        );
551    }
552
553    if let Some(content_length) = req.headers().get("content-length")
554        && let Ok(content_length_str) = content_length.to_str()
555    {
556        debug!(
557            "[handle_request_with_router] Content-Length: {}",
558            content_length_str
559        );
560    }
561
562    // 记录 x-mcp-json 头信息(如果存在)
563    if let Some(mcp_json) = req.headers().get("x-mcp-json")
564        && let Ok(mcp_json_str) = mcp_json.to_str()
565    {
566        debug!(
567            "[handle_request_with_router] MCP-JSON Header: {}",
568            mcp_json_str
569        );
570    }
571
572    // 记录查询参数
573    if let Some(query) = uri.query() {
574        debug!("[handle_request_with_router] Query: {}", query);
575    }
576
577    // 使用 debug_span 减少日志量,因为 DynamicRouterService 已经记录了请求信息
578    // 移除 #[tracing::instrument] 避免 span 嵌套导致的日志膨胀问题
579    let span = tracing::debug_span!(
580        "handle_request_with_router",
581        otel.name = "Handle Request with Router",
582        component = "router",
583        trace_id = %trace_id,
584        http.method = %method,
585        http.path = %path,
586    );
587
588    let _guard = span.enter();
589
590    let mut service = router_entry.into_service();
591    match service.call(req).await {
592        Ok(response) => {
593            let status = response.status();
594
595            // 记录响应头信息
596            debug!(
597                "[handle_request_with_router]Response status: {}, response header: {response:?}",
598                status
599            );
600
601            span.record("http.response.status_code", status.as_u16());
602            Ok(response)
603        }
604        Err(error) => {
605            span.record("error.router_service_error", true);
606            span.record("error.message", format!("{:?}", error));
607            error!("[handle_request_with_router] error: {error:?}");
608            Ok(axum::http::StatusCode::INTERNAL_SERVER_ERROR.into_response())
609        }
610    }
611}
612
613/// 启动MCP服务并处理请求
614async fn start_mcp_and_handle_request(
615    req: Request<Body>,
616    mcp_config: McpConfig,
617) -> Result<Response, Infallible> {
618    let request_path = req.uri().path().to_string();
619    let trace_id = extract_trace_id();
620    debug!("Request path: {request_path}");
621
622    // 使用 debug_span 减少日志量,移除 #[tracing::instrument] 避免 span 嵌套
623    let span = tracing::debug_span!(
624        "start_mcp_and_handle_request",
625        otel.name = "Start MCP and Handle Request",
626        component = "mcp_startup",
627        mcp.id = %mcp_config.mcp_id,
628        mcp.type = ?mcp_config.mcp_type,
629        mcp.config.has_config = mcp_config.mcp_json_config.is_some(),
630        trace_id = %trace_id,
631    );
632
633    let _guard = span.enter();
634
635    let ret = mcp_start_task(mcp_config).await;
636
637    if let Ok((router, _)) = ret {
638        span.record("mcp.startup.success", true);
639        handle_request_with_router(req, router, &request_path).await
640    } else {
641        span.record("mcp.startup.failed", true);
642        span.record("error.mcp_startup_failed", true);
643        span.record("error.message", format!("{:?}", ret));
644        warn!("MCP service startup failed: {ret:?}");
645        Ok(axum::http::StatusCode::INTERNAL_SERVER_ERROR.into_response())
646    }
647}