Skip to main content

mcp_stdio_proxy/server/
mcp_dynamic_router_service.rs

1use std::{
2    convert::Infallible,
3    task::{Context, Poll},
4};
5
6use axum::{
7    body::Body,
8    extract::Request,
9    response::{IntoResponse, Response},
10};
11use futures::future::BoxFuture;
12use log::{debug, error, info, warn};
13use tower::Service;
14
15use crate::{
16    DynamicRouterService, get_proxy_manager, mcp_start_task,
17    model::{
18        CheckMcpStatusResponseStatus, GLOBAL_RESTART_TRACKER, HttpResult, McpConfig, McpRouterPath,
19        McpType,
20    },
21    server::middlewares::extract_trace_id,
22};
23
24impl Service<Request<Body>> for DynamicRouterService {
25    type Response = Response;
26    type Error = Infallible;
27    type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
28
29    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
30        Poll::Ready(Ok(()))
31    }
32
33    fn call(&mut self, req: Request<Body>) -> Self::Future {
34        let path = req.uri().path().to_string();
35        let method = req.method().clone();
36        let headers = req.headers().clone();
37
38        // DEBUG: 详细路径解析日志
39        debug!("=== 路径解析开始 ===");
40        debug!("原始请求路径: {}", path);
41        debug!("路径包含的通配符参数: {:?}", req.extensions());
42
43        // 提取 trace_id
44        let trace_id = extract_trace_id();
45
46        // 创建根 span (使用 debug_span 减少日志量)
47        let span = tracing::debug_span!(
48            "DynamicRouterService",
49            otel.name = "HTTP Request",
50            http.method = %method,
51            http.route = %path,
52            http.url = %req.uri(),
53            mcp.protocol = format!("{:?}", self.0),
54            trace_id = %trace_id,
55        );
56
57        // 记录请求头信息
58        if let Some(content_type) = headers.get("content-type") {
59            span.record("http.request.content_type", format!("{:?}", content_type));
60        }
61        if let Some(content_length) = headers.get("content-length") {
62            span.record(
63                "http.request.content_length",
64                format!("{:?}", content_length),
65            );
66        }
67
68        debug!("请求路径: {path}");
69
70        // 解析路由路径
71        let mcp_router_path = McpRouterPath::from_url(&path);
72
73        match mcp_router_path {
74            Some(mcp_router_path) => {
75                let mcp_id = mcp_router_path.mcp_id.clone();
76                let base_path = mcp_router_path.base_path.clone();
77
78                span.record("mcp.id", &mcp_id);
79                span.record("mcp.base_path", &base_path);
80
81                debug!("=== 路径解析结果 ===");
82                debug!("解析出的mcp_id: {}", mcp_id);
83                debug!("解析出的base_path: {}", base_path);
84                debug!("请求路径: {} vs base_path: {}", path, base_path);
85                debug!("=== 路径解析结束 ===");
86
87                Box::pin(async move {
88                    let _guard = span.enter();
89
90                    // 先尝试查找已注册的路由
91                    debug!("=== 路由查找过程 ===");
92                    debug!("查找base_path: '{}'", base_path);
93
94                    if let Some(router_entry) = DynamicRouterService::get_route(&base_path) {
95                        debug!(
96                            "✅ 找到已注册的路由: base_path={}, path={}",
97                            base_path, path
98                        );
99
100                        // ===== 检查后端健康状态 =====
101                        let mcp_id_for_check = McpRouterPath::from_url(&path);
102                        if let Some(router_path) = mcp_id_for_check {
103                            let proxy_manager = get_proxy_manager();
104
105                            // ===== 首先检查服务状态 =====
106                            // 如果服务状态是 Pending,说明服务正在初始化中(uvx/npx 下载中)
107                            // 此时不应该做健康检查,应该等待
108                            if let Some(service_status) =
109                                proxy_manager.get_mcp_service_status(&router_path.mcp_id)
110                            {
111                                match &service_status.check_mcp_status_response_status {
112                                    CheckMcpStatusResponseStatus::Pending => {
113                                        debug!(
114                                            "[MCP状态检查] mcp_id={} 状态为 Pending,服务正在初始化中,返回 503",
115                                            router_path.mcp_id
116                                        );
117                                        let message = format!(
118                                            "服务 {} 正在初始化中,请稍后再试",
119                                            router_path.mcp_id
120                                        );
121                                        let http_result: HttpResult<String> =
122                                            HttpResult::error("0003", &message, None);
123                                        return Ok(http_result.into_response());
124                                    }
125                                    CheckMcpStatusResponseStatus::Error(err) => {
126                                        // Error 状态:只清理,不重启
127                                        // 避免有问题的 MCP 服务无限重启循环
128                                        warn!(
129                                            "[MCP状态检查] mcp_id={} 状态为 Error: {},清理资源并返回错误",
130                                            router_path.mcp_id, err
131                                        );
132                                        // 清理资源
133                                        if let Err(e) = proxy_manager
134                                            .cleanup_resources(&router_path.mcp_id)
135                                            .await
136                                        {
137                                            error!(
138                                                "[MCP状态检查] mcp_id={} 清理资源失败: {}",
139                                                router_path.mcp_id, e
140                                            );
141                                        }
142                                        // 返回错误,不尝试重启
143                                        let message = format!(
144                                            "服务 {} 启动失败: {}",
145                                            router_path.mcp_id, err
146                                        );
147                                        let http_result: HttpResult<String> =
148                                            HttpResult::error("0005", &message, None);
149                                        return Ok(http_result.into_response());
150                                    }
151                                    CheckMcpStatusResponseStatus::Ready => {
152                                        debug!(
153                                            "[MCP状态检查] mcp_id={} 状态为 Ready,继续检查后端健康状态",
154                                            router_path.mcp_id
155                                        );
156                                    }
157                                }
158                            }
159
160                            // ===== 检查启动锁状态 =====
161                            // 如果锁被占用,说明服务正在启动中
162                            let startup_guard = GLOBAL_RESTART_TRACKER
163                                .try_acquire_startup_lock(&router_path.mcp_id);
164
165                            if startup_guard.is_none() {
166                                // 锁被占用,服务正在启动中,返回 503
167                                debug!(
168                                    "[启动锁检查] mcp_id={} 启动锁被占用,服务正在启动中,返回 503",
169                                    router_path.mcp_id
170                                );
171                                span.record("mcp.startup_in_progress", true);
172                                let message =
173                                    format!("服务 {} 正在启动中,请稍后再试", router_path.mcp_id);
174                                let http_result: HttpResult<String> =
175                                    HttpResult::error("0003", &message, None);
176                                span.record("http.response.status_code", 503u16);
177                                return Ok(http_result.into_response());
178                            }
179
180                            // 获取到锁,现在可以安全地检查健康状态
181                            let _startup_guard = startup_guard.unwrap();
182                            debug!(
183                                "[启动锁检查] mcp_id={} 成功获取启动锁,开始健康检查",
184                                router_path.mcp_id
185                            );
186
187                            if let Some(handler) =
188                                proxy_manager.get_proxy_handler(&router_path.mcp_id)
189                            {
190                                // ===== 健康检查(带缓存)=====
191                                let is_healthy = if let Some(cached) = GLOBAL_RESTART_TRACKER
192                                    .get_cached_health_status(&router_path.mcp_id)
193                                {
194                                    debug!(
195                                        "[健康检查] mcp_id={} 使用缓存状态: is_healthy={}",
196                                        router_path.mcp_id, cached
197                                    );
198                                    cached
199                                } else {
200                                    debug!(
201                                        "[健康检查] mcp_id={} 缓存未命中,开始实际健康检查...",
202                                        router_path.mcp_id
203                                    );
204                                    let status = handler.is_mcp_server_ready().await;
205                                    GLOBAL_RESTART_TRACKER
206                                        .update_health_status(&router_path.mcp_id, status);
207                                    debug!(
208                                        "[健康检查] mcp_id={} 实际健康检查结果: is_healthy={}",
209                                        router_path.mcp_id, status
210                                    );
211                                    status
212                                };
213
214                                if is_healthy {
215                                    debug!(
216                                        "[健康检查] mcp_id={} 后端服务正常,释放锁并使用路由",
217                                        router_path.mcp_id
218                                    );
219                                    // 释放锁,使用路由
220                                    drop(_startup_guard);
221                                    debug!("=== 路由查找结束(成功) ===");
222                                    return handle_request_with_router(req, router_entry, &path)
223                                        .await;
224                                }
225
226                                // 不健康,获取服务类型以决定是否重启
227                                let mcp_type = proxy_manager
228                                    .get_mcp_service_status(&router_path.mcp_id)
229                                    .map(|s| s.mcp_type.clone());
230
231                                // 清理资源
232                                warn!(
233                                    "[健康检查] mcp_id={} 后端服务不健康,清理资源",
234                                    router_path.mcp_id
235                                );
236                                if let Err(e) =
237                                    proxy_manager.cleanup_resources(&router_path.mcp_id).await
238                                {
239                                    error!(
240                                        "[清理资源] mcp_id={} 清理资源失败: error={}",
241                                        router_path.mcp_id, e
242                                    );
243                                } else {
244                                    debug!("[清理资源] mcp_id={} 清理资源成功", router_path.mcp_id);
245                                }
246
247                                // OneShot 类型:只清理,不重启
248                                // OneShot 服务执行完成后进程会退出,这是正常行为,不应该自动重启
249                                // 用户需要通过 check_status 接口显式启动新的 OneShot 服务
250                                if matches!(mcp_type, Some(McpType::OneShot)) {
251                                    debug!(
252                                        "[健康检查] mcp_id={} 是 OneShot 类型,不自动重启,返回服务已结束",
253                                        router_path.mcp_id
254                                    );
255                                    let message = format!(
256                                        "OneShot 服务 {} 已结束,请重新启动",
257                                        router_path.mcp_id
258                                    );
259                                    let http_result: HttpResult<String> =
260                                        HttpResult::error("0006", &message, None);
261                                    return Ok(http_result.into_response());
262                                }
263
264                                // Persistent 类型:清理后重启
265                                info!(
266                                    "[重启流程] mcp_id={} 是 Persistent 类型,开始重启服务",
267                                    router_path.mcp_id
268                                );
269
270                                // 从配置获取 mcp_config 并启动服务
271                                // 优先从请求 header 获取配置
272                                if let Some(mcp_config) =
273                                    req.extensions().get::<McpConfig>().cloned()
274                                    && mcp_config.mcp_json_config.is_some()
275                                {
276                                    info!(
277                                        "[重启流程] mcp_id={} 使用请求 header 配置重启服务",
278                                        mcp_config.mcp_id
279                                    );
280                                    proxy_manager
281                                        .register_mcp_config(&mcp_config.mcp_id, mcp_config.clone())
282                                        .await;
283                                    return start_mcp_and_handle_request(req, mcp_config).await;
284                                }
285
286                                // 从缓存获取配置
287                                if let Some(mcp_config) = proxy_manager
288                                    .get_mcp_config_from_cache(&router_path.mcp_id)
289                                    .await
290                                {
291                                    info!(
292                                        "[重启流程] mcp_id={} 使用缓存配置重启服务",
293                                        router_path.mcp_id
294                                    );
295                                    return start_mcp_and_handle_request(req, mcp_config).await;
296                                }
297
298                                // 无法获取配置
299                                warn!(
300                                    "[重启流程] mcp_id={} 无法获取配置,无法重启服务",
301                                    router_path.mcp_id
302                                );
303                                let message =
304                                    format!("服务 {} 不健康且无法获取配置", router_path.mcp_id);
305                                let http_result: HttpResult<String> =
306                                    HttpResult::error("0004", &message, None);
307                                return Ok(http_result.into_response());
308                            } else {
309                                // handler 不存在,但路由存在
310                                // 检查服务类型,OneShot 不自动重启
311                                let mcp_type = proxy_manager
312                                    .get_mcp_service_status(&router_path.mcp_id)
313                                    .map(|s| s.mcp_type.clone());
314
315                                if matches!(mcp_type, Some(McpType::OneShot)) {
316                                    debug!(
317                                        "[服务检查] mcp_id={} 是 OneShot 类型且 handler 不存在,不自动重启",
318                                        router_path.mcp_id
319                                    );
320                                    // 清理残留状态
321                                    if let Err(e) =
322                                        proxy_manager.cleanup_resources(&router_path.mcp_id).await
323                                    {
324                                        error!(
325                                            "[清理资源] mcp_id={} 清理资源失败: {}",
326                                            router_path.mcp_id, e
327                                        );
328                                    }
329                                    let message = format!(
330                                        "OneShot 服务 {} 已结束,请重新启动",
331                                        router_path.mcp_id
332                                    );
333                                    let http_result: HttpResult<String> =
334                                        HttpResult::error("0006", &message, None);
335                                    return Ok(http_result.into_response());
336                                }
337
338                                // Persistent 类型:继续进入启动流程
339                                warn!(
340                                    "路由存在但 handler 不存在,进入重启流程: base_path={}",
341                                    base_path
342                                );
343                            }
344                        } else {
345                            // 无法解析路由路径,直接使用路由
346                            debug!("=== 路由查找结束(成功) ===");
347                            return handle_request_with_router(req, router_entry, &path).await;
348                        }
349                    } else {
350                        debug!(
351                            "❌ 未找到已注册的路由: base_path='{}', path='{}'",
352                            base_path, path
353                        );
354
355                        // 显示所有已注册的路由
356                        let all_routes = DynamicRouterService::get_all_routes();
357                        debug!("当前已注册的路由: {:?}", all_routes);
358                        debug!("=== 路由查找结束(失败) ===");
359                    }
360
361                    // 未找到路由,尝试启动服务
362                    warn!("未找到匹配的路径,尝试启动服务:base_path={base_path},path={path}");
363                    span.record("error.route_not_found", true);
364
365                    // ===== 提前解析 mcp_id 用于配置获取 =====
366                    let mcp_router_path_for_config = McpRouterPath::from_url(&path);
367
368                    // ===== 配置获取优先级 =====
369                    let proxy_manager = get_proxy_manager();
370
371                    // 优先级 1: 从请求 header 中获取配置(最新)
372                    if let Some(mcp_config) = req.extensions().get::<McpConfig>().cloned()
373                        && mcp_config.mcp_json_config.is_some()
374                    {
375                        // 检查重启限制(防止无限循环)
376                        if !GLOBAL_RESTART_TRACKER.can_restart(&mcp_config.mcp_id) {
377                            warn!("服务 {} 在重启冷却期内,跳过启动", mcp_config.mcp_id);
378                            span.record("error.restart_in_cooldown", true);
379                            let message =
380                                format!("服务 {} 在重启冷却期内,请稍后再试", mcp_config.mcp_id);
381                            let http_result: HttpResult<String> =
382                                HttpResult::error("0002", &message, None);
383                            span.record("http.response.status_code", 429u16); // Too Many Requests
384                            return Ok(http_result.into_response());
385                        }
386
387                        // 尝试获取启动锁,防止并发启动同一服务
388                        let _startup_guard = match GLOBAL_RESTART_TRACKER
389                            .try_acquire_startup_lock(&mcp_config.mcp_id)
390                        {
391                            Some(guard) => guard,
392                            None => {
393                                warn!("服务 {} 正在启动中,跳过本次启动", mcp_config.mcp_id);
394                                span.record("error.startup_in_progress", true);
395                                let message =
396                                    format!("服务 {} 正在启动中,请稍后再试", mcp_config.mcp_id);
397                                let http_result: HttpResult<String> =
398                                    HttpResult::error("0003", &message, None);
399                                span.record("http.response.status_code", 503u16); // Service Unavailable
400                                return Ok(http_result.into_response());
401                            }
402                        };
403
404                        info!("使用请求 header 配置启动服务: {}", mcp_config.mcp_id);
405                        // 同时更新缓存
406                        proxy_manager
407                            .register_mcp_config(&mcp_config.mcp_id, mcp_config.clone())
408                            .await;
409
410                        // _startup_guard 会在作用域结束时自动释放
411                        return start_mcp_and_handle_request(req, mcp_config).await;
412                    }
413
414                    // 优先级 2: 从 moka 缓存中获取配置(兜底)
415                    // 注意:OneShot 类型不从缓存自动启动,需要用户显式请求(带 header 配置)
416                    if let Some(mcp_id_for_cache) =
417                        mcp_router_path_for_config.as_ref().map(|p| &p.mcp_id)
418                        && let Some(mcp_config) = proxy_manager
419                            .get_mcp_config_from_cache(mcp_id_for_cache)
420                            .await
421                    {
422                        // OneShot 类型不从缓存自动启动
423                        // 避免已回收的 OneShot 服务被意外重启
424                        if matches!(mcp_config.mcp_type, McpType::OneShot) {
425                            info!(
426                                "[启动检查] mcp_id={} 是 OneShot 类型,不从缓存自动启动,需要用户显式请求",
427                                mcp_id_for_cache
428                            );
429                            let message = format!(
430                                "OneShot 服务 {} 需要通过 check_status 接口启动",
431                                mcp_id_for_cache
432                            );
433                            let http_result: HttpResult<String> =
434                                HttpResult::error("0007", &message, None);
435                            return Ok(http_result.into_response());
436                        }
437
438                        // 检查重启限制(防止无限循环)
439                        if !GLOBAL_RESTART_TRACKER.can_restart(mcp_id_for_cache) {
440                            warn!("服务 {} 在重启冷却期内,跳过启动", mcp_id_for_cache);
441                            span.record("error.restart_in_cooldown", true);
442                            let message =
443                                format!("服务 {} 在重启冷却期内,请稍后再试", mcp_id_for_cache);
444                            let http_result: HttpResult<String> =
445                                HttpResult::error("0002", &message, None);
446                            span.record("http.response.status_code", 429u16); // Too Many Requests
447                            return Ok(http_result.into_response());
448                        }
449
450                        // 尝试获取启动锁,防止并发启动同一服务
451                        let _startup_guard = match GLOBAL_RESTART_TRACKER
452                            .try_acquire_startup_lock(mcp_id_for_cache)
453                        {
454                            Some(guard) => guard,
455                            None => {
456                                warn!("服务 {} 正在启动中,跳过本次启动", mcp_id_for_cache);
457                                span.record("error.startup_in_progress", true);
458                                let message =
459                                    format!("服务 {} 正在启动中,请稍后再试", mcp_id_for_cache);
460                                let http_result: HttpResult<String> =
461                                    HttpResult::error("0003", &message, None);
462                                span.record("http.response.status_code", 503u16); // Service Unavailable
463                                return Ok(http_result.into_response());
464                            }
465                        };
466
467                        info!("使用缓存配置启动服务: {}", mcp_id_for_cache);
468                        // _startup_guard 会在作用域结束时自动释放
469                        return start_mcp_and_handle_request(req, mcp_config).await;
470                    }
471
472                    // 优先级 3: 无法获取配置,返回错误
473                    warn!("未找到匹配的路径,且未获取到配置,无法启动MCP服务: {path}");
474                    span.record("error.mcp_config_missing", true);
475
476                    let message =
477                        format!("未找到匹配的路径,且未获取到配置,无法启动MCP服务: {path}");
478                    let http_result: HttpResult<String> = HttpResult::error("0001", &message, None);
479                    span.record("http.response.status_code", 404u16);
480                    span.record("error.message", &message);
481                    Ok(http_result.into_response())
482                })
483            }
484            None => {
485                warn!("请求路径解析失败: {path}");
486                span.record("error.path_parse_failed", true);
487
488                let message = format!("请求路径解析失败: {path}");
489                let http_result: HttpResult<String> = HttpResult::error("0001", &message, None);
490                Box::pin(async move {
491                    let _guard = span.enter();
492                    span.record("http.response.status_code", 400u16);
493                    span.record("error.message", &message);
494                    Ok(http_result.into_response())
495                })
496            }
497        }
498    }
499}
500
501/// 使用给定的路由处理请求
502async fn handle_request_with_router(
503    req: Request<Body>,
504    router_entry: axum::Router,
505    path: &str,
506) -> Result<Response, Infallible> {
507    // 获取匹配路径的Router,并处理请求
508    let trace_id = extract_trace_id();
509
510    let method = req.method().clone();
511    let uri = req.uri().clone();
512
513    info!("[handle_request_with_router]处理请求: {} {}", method, path);
514
515    // 记录请求头中的关键信息
516    if let Some(content_type) = req.headers().get("content-type")
517        && let Ok(content_type_str) = content_type.to_str()
518    {
519        debug!(
520            "[handle_request_with_router] Content-Type: {}",
521            content_type_str
522        );
523    }
524
525    if let Some(content_length) = req.headers().get("content-length")
526        && let Ok(content_length_str) = content_length.to_str()
527    {
528        debug!(
529            "[handle_request_with_router] Content-Length: {}",
530            content_length_str
531        );
532    }
533
534    // 记录 x-mcp-json 头信息(如果存在)
535    if let Some(mcp_json) = req.headers().get("x-mcp-json")
536        && let Ok(mcp_json_str) = mcp_json.to_str()
537    {
538        debug!(
539            "[handle_request_with_router] MCP-JSON Header: {}",
540            mcp_json_str
541        );
542    }
543
544    // 记录查询参数
545    if let Some(query) = uri.query() {
546        debug!("[handle_request_with_router] Query: {}", query);
547    }
548
549    // 使用 debug_span 减少日志量,因为 DynamicRouterService 已经记录了请求信息
550    // 移除 #[tracing::instrument] 避免 span 嵌套导致的日志膨胀问题
551    let span = tracing::debug_span!(
552        "handle_request_with_router",
553        otel.name = "Handle Request with Router",
554        component = "router",
555        trace_id = %trace_id,
556        http.method = %method,
557        http.path = %path,
558    );
559
560    let _guard = span.enter();
561
562    let mut service = router_entry.into_service();
563    match service.call(req).await {
564        Ok(response) => {
565            let status = response.status();
566
567            // 记录响应头信息
568            debug!(
569                "[handle_request_with_router]响应状态: {}, 响应头: {response:?}",
570                status
571            );
572
573            span.record("http.response.status_code", status.as_u16());
574            Ok(response)
575        }
576        Err(error) => {
577            span.record("error.router_service_error", true);
578            span.record("error.message", format!("{:?}", error));
579            error!("[handle_request_with_router]错误: {error:?}");
580            Ok(axum::http::StatusCode::INTERNAL_SERVER_ERROR.into_response())
581        }
582    }
583}
584
585/// 启动MCP服务并处理请求
586async fn start_mcp_and_handle_request(
587    req: Request<Body>,
588    mcp_config: McpConfig,
589) -> Result<Response, Infallible> {
590    let request_path = req.uri().path().to_string();
591    let trace_id = extract_trace_id();
592    debug!("请求路径: {request_path}");
593
594    // 使用 debug_span 减少日志量,移除 #[tracing::instrument] 避免 span 嵌套
595    let span = tracing::debug_span!(
596        "start_mcp_and_handle_request",
597        otel.name = "Start MCP and Handle Request",
598        component = "mcp_startup",
599        mcp.id = %mcp_config.mcp_id,
600        mcp.type = ?mcp_config.mcp_type,
601        mcp.config.has_config = mcp_config.mcp_json_config.is_some(),
602        trace_id = %trace_id,
603    );
604
605    let _guard = span.enter();
606
607    let ret = mcp_start_task(mcp_config).await;
608
609    if let Ok((router, _)) = ret {
610        span.record("mcp.startup.success", true);
611        handle_request_with_router(req, router, &request_path).await
612    } else {
613        span.record("mcp.startup.failed", true);
614        span.record("error.mcp_startup_failed", true);
615        span.record("error.message", format!("{:?}", ret));
616        warn!("MCP服务启动失败: {ret:?}");
617        Ok(axum::http::StatusCode::INTERNAL_SERVER_ERROR.into_response())
618    }
619}