1use std::{
2 convert::Infallible,
3 task::{Context, Poll},
4};
5
6use axum::{
7 body::Body,
8 extract::Request,
9 response::{IntoResponse, Response},
10};
11use futures::future::BoxFuture;
12use log::{debug, error, info, warn};
13use tower::Service;
14
15use crate::{
16 DynamicRouterService, get_proxy_manager, mcp_start_task,
17 model::{
18 CheckMcpStatusResponseStatus, GLOBAL_RESTART_TRACKER, HttpResult, McpConfig, McpRouterPath,
19 McpType,
20 },
21 server::middlewares::extract_trace_id,
22};
23
24impl Service<Request<Body>> for DynamicRouterService {
25 type Response = Response;
26 type Error = Infallible;
27 type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
28
29 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
30 Poll::Ready(Ok(()))
31 }
32
33 fn call(&mut self, req: Request<Body>) -> Self::Future {
34 let path = req.uri().path().to_string();
35 let method = req.method().clone();
36 let headers = req.headers().clone();
37
38 debug!("=== 路径解析开始 ===");
40 debug!("原始请求路径: {}", path);
41 debug!("路径包含的通配符参数: {:?}", req.extensions());
42
43 let trace_id = extract_trace_id();
45
46 let span = tracing::debug_span!(
48 "DynamicRouterService",
49 otel.name = "HTTP Request",
50 http.method = %method,
51 http.route = %path,
52 http.url = %req.uri(),
53 mcp.protocol = format!("{:?}", self.0),
54 trace_id = %trace_id,
55 );
56
57 if let Some(content_type) = headers.get("content-type") {
59 span.record("http.request.content_type", format!("{:?}", content_type));
60 }
61 if let Some(content_length) = headers.get("content-length") {
62 span.record(
63 "http.request.content_length",
64 format!("{:?}", content_length),
65 );
66 }
67
68 debug!("请求路径: {path}");
69
70 let mcp_router_path = McpRouterPath::from_url(&path);
72
73 match mcp_router_path {
74 Some(mcp_router_path) => {
75 let mcp_id = mcp_router_path.mcp_id.clone();
76 let base_path = mcp_router_path.base_path.clone();
77
78 span.record("mcp.id", &mcp_id);
79 span.record("mcp.base_path", &base_path);
80
81 debug!("=== 路径解析结果 ===");
82 debug!("解析出的mcp_id: {}", mcp_id);
83 debug!("解析出的base_path: {}", base_path);
84 debug!("请求路径: {} vs base_path: {}", path, base_path);
85 debug!("=== 路径解析结束 ===");
86
87 Box::pin(async move {
88 let _guard = span.enter();
89
90 debug!("=== 路由查找过程 ===");
92 debug!("查找base_path: '{}'", base_path);
93
94 if let Some(router_entry) = DynamicRouterService::get_route(&base_path) {
95 debug!(
96 "✅ 找到已注册的路由: base_path={}, path={}",
97 base_path, path
98 );
99
100 let mcp_id_for_check = McpRouterPath::from_url(&path);
102 if let Some(router_path) = mcp_id_for_check {
103 let proxy_manager = get_proxy_manager();
104
105 if let Some(service_status) =
109 proxy_manager.get_mcp_service_status(&router_path.mcp_id)
110 {
111 match &service_status.check_mcp_status_response_status {
112 CheckMcpStatusResponseStatus::Pending => {
113 debug!(
114 "[MCP状态检查] mcp_id={} 状态为 Pending,服务正在初始化中,返回 503",
115 router_path.mcp_id
116 );
117 let message = format!(
118 "服务 {} 正在初始化中,请稍后再试",
119 router_path.mcp_id
120 );
121 let http_result: HttpResult<String> =
122 HttpResult::error("0003", &message, None);
123 return Ok(http_result.into_response());
124 }
125 CheckMcpStatusResponseStatus::Error(err) => {
126 warn!(
129 "[MCP状态检查] mcp_id={} 状态为 Error: {},清理资源并返回错误",
130 router_path.mcp_id, err
131 );
132 if let Err(e) = proxy_manager
134 .cleanup_resources(&router_path.mcp_id)
135 .await
136 {
137 error!(
138 "[MCP状态检查] mcp_id={} 清理资源失败: {}",
139 router_path.mcp_id, e
140 );
141 }
142 let message = format!(
144 "服务 {} 启动失败: {}",
145 router_path.mcp_id, err
146 );
147 let http_result: HttpResult<String> =
148 HttpResult::error("0005", &message, None);
149 return Ok(http_result.into_response());
150 }
151 CheckMcpStatusResponseStatus::Ready => {
152 debug!(
153 "[MCP状态检查] mcp_id={} 状态为 Ready,继续检查后端健康状态",
154 router_path.mcp_id
155 );
156 }
157 }
158 }
159
160 let startup_guard = GLOBAL_RESTART_TRACKER
163 .try_acquire_startup_lock(&router_path.mcp_id);
164
165 if startup_guard.is_none() {
166 debug!(
168 "[启动锁检查] mcp_id={} 启动锁被占用,服务正在启动中,返回 503",
169 router_path.mcp_id
170 );
171 span.record("mcp.startup_in_progress", true);
172 let message =
173 format!("服务 {} 正在启动中,请稍后再试", router_path.mcp_id);
174 let http_result: HttpResult<String> =
175 HttpResult::error("0003", &message, None);
176 span.record("http.response.status_code", 503u16);
177 return Ok(http_result.into_response());
178 }
179
180 let _startup_guard = startup_guard.unwrap();
182 debug!(
183 "[启动锁检查] mcp_id={} 成功获取启动锁,开始健康检查",
184 router_path.mcp_id
185 );
186
187 if let Some(handler) =
188 proxy_manager.get_proxy_handler(&router_path.mcp_id)
189 {
190 let is_healthy = if let Some(cached) = GLOBAL_RESTART_TRACKER
192 .get_cached_health_status(&router_path.mcp_id)
193 {
194 debug!(
195 "[健康检查] mcp_id={} 使用缓存状态: is_healthy={}",
196 router_path.mcp_id, cached
197 );
198 cached
199 } else {
200 debug!(
201 "[健康检查] mcp_id={} 缓存未命中,开始实际健康检查...",
202 router_path.mcp_id
203 );
204 let status = handler.is_mcp_server_ready().await;
205 GLOBAL_RESTART_TRACKER
206 .update_health_status(&router_path.mcp_id, status);
207 debug!(
208 "[健康检查] mcp_id={} 实际健康检查结果: is_healthy={}",
209 router_path.mcp_id, status
210 );
211 status
212 };
213
214 if is_healthy {
215 debug!(
216 "[健康检查] mcp_id={} 后端服务正常,释放锁并使用路由",
217 router_path.mcp_id
218 );
219 drop(_startup_guard);
221 debug!("=== 路由查找结束(成功) ===");
222 return handle_request_with_router(req, router_entry, &path)
223 .await;
224 }
225
226 let mcp_type = proxy_manager
228 .get_mcp_service_status(&router_path.mcp_id)
229 .map(|s| s.mcp_type.clone());
230
231 warn!(
233 "[健康检查] mcp_id={} 后端服务不健康,清理资源",
234 router_path.mcp_id
235 );
236 if let Err(e) =
237 proxy_manager.cleanup_resources(&router_path.mcp_id).await
238 {
239 error!(
240 "[清理资源] mcp_id={} 清理资源失败: error={}",
241 router_path.mcp_id, e
242 );
243 } else {
244 debug!("[清理资源] mcp_id={} 清理资源成功", router_path.mcp_id);
245 }
246
247 if matches!(mcp_type, Some(McpType::OneShot)) {
251 debug!(
252 "[健康检查] mcp_id={} 是 OneShot 类型,不自动重启,返回服务已结束",
253 router_path.mcp_id
254 );
255 let message = format!(
256 "OneShot 服务 {} 已结束,请重新启动",
257 router_path.mcp_id
258 );
259 let http_result: HttpResult<String> =
260 HttpResult::error("0006", &message, None);
261 return Ok(http_result.into_response());
262 }
263
264 info!(
266 "[重启流程] mcp_id={} 是 Persistent 类型,开始重启服务",
267 router_path.mcp_id
268 );
269
270 if let Some(mcp_config) =
273 req.extensions().get::<McpConfig>().cloned()
274 && mcp_config.mcp_json_config.is_some()
275 {
276 info!(
277 "[重启流程] mcp_id={} 使用请求 header 配置重启服务",
278 mcp_config.mcp_id
279 );
280 proxy_manager
281 .register_mcp_config(&mcp_config.mcp_id, mcp_config.clone())
282 .await;
283 return start_mcp_and_handle_request(req, mcp_config).await;
284 }
285
286 if let Some(mcp_config) = proxy_manager
288 .get_mcp_config_from_cache(&router_path.mcp_id)
289 .await
290 {
291 info!(
292 "[重启流程] mcp_id={} 使用缓存配置重启服务",
293 router_path.mcp_id
294 );
295 return start_mcp_and_handle_request(req, mcp_config).await;
296 }
297
298 warn!(
300 "[重启流程] mcp_id={} 无法获取配置,无法重启服务",
301 router_path.mcp_id
302 );
303 let message =
304 format!("服务 {} 不健康且无法获取配置", router_path.mcp_id);
305 let http_result: HttpResult<String> =
306 HttpResult::error("0004", &message, None);
307 return Ok(http_result.into_response());
308 } else {
309 let mcp_type = proxy_manager
312 .get_mcp_service_status(&router_path.mcp_id)
313 .map(|s| s.mcp_type.clone());
314
315 if matches!(mcp_type, Some(McpType::OneShot)) {
316 debug!(
317 "[服务检查] mcp_id={} 是 OneShot 类型且 handler 不存在,不自动重启",
318 router_path.mcp_id
319 );
320 if let Err(e) =
322 proxy_manager.cleanup_resources(&router_path.mcp_id).await
323 {
324 error!(
325 "[清理资源] mcp_id={} 清理资源失败: {}",
326 router_path.mcp_id, e
327 );
328 }
329 let message = format!(
330 "OneShot 服务 {} 已结束,请重新启动",
331 router_path.mcp_id
332 );
333 let http_result: HttpResult<String> =
334 HttpResult::error("0006", &message, None);
335 return Ok(http_result.into_response());
336 }
337
338 warn!(
340 "路由存在但 handler 不存在,进入重启流程: base_path={}",
341 base_path
342 );
343 }
344 } else {
345 debug!("=== 路由查找结束(成功) ===");
347 return handle_request_with_router(req, router_entry, &path).await;
348 }
349 } else {
350 debug!(
351 "❌ 未找到已注册的路由: base_path='{}', path='{}'",
352 base_path, path
353 );
354
355 let all_routes = DynamicRouterService::get_all_routes();
357 debug!("当前已注册的路由: {:?}", all_routes);
358 debug!("=== 路由查找结束(失败) ===");
359 }
360
361 warn!("未找到匹配的路径,尝试启动服务:base_path={base_path},path={path}");
363 span.record("error.route_not_found", true);
364
365 let mcp_router_path_for_config = McpRouterPath::from_url(&path);
367
368 let proxy_manager = get_proxy_manager();
370
371 if let Some(mcp_config) = req.extensions().get::<McpConfig>().cloned()
373 && mcp_config.mcp_json_config.is_some()
374 {
375 if !GLOBAL_RESTART_TRACKER.can_restart(&mcp_config.mcp_id) {
377 warn!("服务 {} 在重启冷却期内,跳过启动", mcp_config.mcp_id);
378 span.record("error.restart_in_cooldown", true);
379 let message =
380 format!("服务 {} 在重启冷却期内,请稍后再试", mcp_config.mcp_id);
381 let http_result: HttpResult<String> =
382 HttpResult::error("0002", &message, None);
383 span.record("http.response.status_code", 429u16); return Ok(http_result.into_response());
385 }
386
387 let _startup_guard = match GLOBAL_RESTART_TRACKER
389 .try_acquire_startup_lock(&mcp_config.mcp_id)
390 {
391 Some(guard) => guard,
392 None => {
393 warn!("服务 {} 正在启动中,跳过本次启动", mcp_config.mcp_id);
394 span.record("error.startup_in_progress", true);
395 let message =
396 format!("服务 {} 正在启动中,请稍后再试", mcp_config.mcp_id);
397 let http_result: HttpResult<String> =
398 HttpResult::error("0003", &message, None);
399 span.record("http.response.status_code", 503u16); return Ok(http_result.into_response());
401 }
402 };
403
404 info!("使用请求 header 配置启动服务: {}", mcp_config.mcp_id);
405 proxy_manager
407 .register_mcp_config(&mcp_config.mcp_id, mcp_config.clone())
408 .await;
409
410 return start_mcp_and_handle_request(req, mcp_config).await;
412 }
413
414 if let Some(mcp_id_for_cache) =
417 mcp_router_path_for_config.as_ref().map(|p| &p.mcp_id)
418 && let Some(mcp_config) = proxy_manager
419 .get_mcp_config_from_cache(mcp_id_for_cache)
420 .await
421 {
422 if matches!(mcp_config.mcp_type, McpType::OneShot) {
425 info!(
426 "[启动检查] mcp_id={} 是 OneShot 类型,不从缓存自动启动,需要用户显式请求",
427 mcp_id_for_cache
428 );
429 let message = format!(
430 "OneShot 服务 {} 需要通过 check_status 接口启动",
431 mcp_id_for_cache
432 );
433 let http_result: HttpResult<String> =
434 HttpResult::error("0007", &message, None);
435 return Ok(http_result.into_response());
436 }
437
438 if !GLOBAL_RESTART_TRACKER.can_restart(mcp_id_for_cache) {
440 warn!("服务 {} 在重启冷却期内,跳过启动", mcp_id_for_cache);
441 span.record("error.restart_in_cooldown", true);
442 let message =
443 format!("服务 {} 在重启冷却期内,请稍后再试", mcp_id_for_cache);
444 let http_result: HttpResult<String> =
445 HttpResult::error("0002", &message, None);
446 span.record("http.response.status_code", 429u16); return Ok(http_result.into_response());
448 }
449
450 let _startup_guard = match GLOBAL_RESTART_TRACKER
452 .try_acquire_startup_lock(mcp_id_for_cache)
453 {
454 Some(guard) => guard,
455 None => {
456 warn!("服务 {} 正在启动中,跳过本次启动", mcp_id_for_cache);
457 span.record("error.startup_in_progress", true);
458 let message =
459 format!("服务 {} 正在启动中,请稍后再试", mcp_id_for_cache);
460 let http_result: HttpResult<String> =
461 HttpResult::error("0003", &message, None);
462 span.record("http.response.status_code", 503u16); return Ok(http_result.into_response());
464 }
465 };
466
467 info!("使用缓存配置启动服务: {}", mcp_id_for_cache);
468 return start_mcp_and_handle_request(req, mcp_config).await;
470 }
471
472 warn!("未找到匹配的路径,且未获取到配置,无法启动MCP服务: {path}");
474 span.record("error.mcp_config_missing", true);
475
476 let message =
477 format!("未找到匹配的路径,且未获取到配置,无法启动MCP服务: {path}");
478 let http_result: HttpResult<String> = HttpResult::error("0001", &message, None);
479 span.record("http.response.status_code", 404u16);
480 span.record("error.message", &message);
481 Ok(http_result.into_response())
482 })
483 }
484 None => {
485 warn!("请求路径解析失败: {path}");
486 span.record("error.path_parse_failed", true);
487
488 let message = format!("请求路径解析失败: {path}");
489 let http_result: HttpResult<String> = HttpResult::error("0001", &message, None);
490 Box::pin(async move {
491 let _guard = span.enter();
492 span.record("http.response.status_code", 400u16);
493 span.record("error.message", &message);
494 Ok(http_result.into_response())
495 })
496 }
497 }
498 }
499}
500
501async fn handle_request_with_router(
503 req: Request<Body>,
504 router_entry: axum::Router,
505 path: &str,
506) -> Result<Response, Infallible> {
507 let trace_id = extract_trace_id();
509
510 let method = req.method().clone();
511 let uri = req.uri().clone();
512
513 info!("[handle_request_with_router]处理请求: {} {}", method, path);
514
515 if let Some(content_type) = req.headers().get("content-type")
517 && let Ok(content_type_str) = content_type.to_str()
518 {
519 debug!(
520 "[handle_request_with_router] Content-Type: {}",
521 content_type_str
522 );
523 }
524
525 if let Some(content_length) = req.headers().get("content-length")
526 && let Ok(content_length_str) = content_length.to_str()
527 {
528 debug!(
529 "[handle_request_with_router] Content-Length: {}",
530 content_length_str
531 );
532 }
533
534 if let Some(mcp_json) = req.headers().get("x-mcp-json")
536 && let Ok(mcp_json_str) = mcp_json.to_str()
537 {
538 debug!(
539 "[handle_request_with_router] MCP-JSON Header: {}",
540 mcp_json_str
541 );
542 }
543
544 if let Some(query) = uri.query() {
546 debug!("[handle_request_with_router] Query: {}", query);
547 }
548
549 let span = tracing::debug_span!(
552 "handle_request_with_router",
553 otel.name = "Handle Request with Router",
554 component = "router",
555 trace_id = %trace_id,
556 http.method = %method,
557 http.path = %path,
558 );
559
560 let _guard = span.enter();
561
562 let mut service = router_entry.into_service();
563 match service.call(req).await {
564 Ok(response) => {
565 let status = response.status();
566
567 debug!(
569 "[handle_request_with_router]响应状态: {}, 响应头: {response:?}",
570 status
571 );
572
573 span.record("http.response.status_code", status.as_u16());
574 Ok(response)
575 }
576 Err(error) => {
577 span.record("error.router_service_error", true);
578 span.record("error.message", format!("{:?}", error));
579 error!("[handle_request_with_router]错误: {error:?}");
580 Ok(axum::http::StatusCode::INTERNAL_SERVER_ERROR.into_response())
581 }
582 }
583}
584
585async fn start_mcp_and_handle_request(
587 req: Request<Body>,
588 mcp_config: McpConfig,
589) -> Result<Response, Infallible> {
590 let request_path = req.uri().path().to_string();
591 let trace_id = extract_trace_id();
592 debug!("请求路径: {request_path}");
593
594 let span = tracing::debug_span!(
596 "start_mcp_and_handle_request",
597 otel.name = "Start MCP and Handle Request",
598 component = "mcp_startup",
599 mcp.id = %mcp_config.mcp_id,
600 mcp.type = ?mcp_config.mcp_type,
601 mcp.config.has_config = mcp_config.mcp_json_config.is_some(),
602 trace_id = %trace_id,
603 );
604
605 let _guard = span.enter();
606
607 let ret = mcp_start_task(mcp_config).await;
608
609 if let Ok((router, _)) = ret {
610 span.record("mcp.startup.success", true);
611 handle_request_with_router(req, router, &request_path).await
612 } else {
613 span.record("mcp.startup.failed", true);
614 span.record("error.mcp_startup_failed", true);
615 span.record("error.message", format!("{:?}", ret));
616 warn!("MCP服务启动失败: {ret:?}");
617 Ok(axum::http::StatusCode::INTERNAL_SERVER_ERROR.into_response())
618 }
619}