1use std::{
2 convert::Infallible,
3 task::{Context, Poll},
4};
5
6use axum::{
7 body::Body,
8 extract::Request,
9 response::{IntoResponse, Response},
10};
11use futures::future::BoxFuture;
12use log::{debug, error, info, warn};
13use tower::Service;
14
15use crate::{
16 DynamicRouterService, get_proxy_manager, mcp_start_task,
17 model::{
18 CheckMcpStatusResponseStatus, GLOBAL_RESTART_TRACKER, HttpResult, McpConfig, McpRouterPath,
19 McpType,
20 },
21 server::middlewares::extract_trace_id,
22};
23
24impl Service<Request<Body>> for DynamicRouterService {
25 type Response = Response;
26 type Error = Infallible;
27 type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
28
29 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
30 Poll::Ready(Ok(()))
31 }
32
33 fn call(&mut self, req: Request<Body>) -> Self::Future {
34 let path = req.uri().path().to_string();
35 let method = req.method().clone();
36 let headers = req.headers().clone();
37
38 debug!("=== Path analysis begins ===");
40 debug!("Original request path: {}", path);
41 debug!("Path contains wildcard parameters: {:?}", req.extensions());
42
43 let trace_id = extract_trace_id();
45
46 let span = tracing::debug_span!(
48 "DynamicRouterService",
49 otel.name = "HTTP Request",
50 http.method = %method,
51 http.route = %path,
52 http.url = %req.uri(),
53 mcp.protocol = format!("{:?}", self.0),
54 trace_id = %trace_id,
55 );
56
57 if let Some(content_type) = headers.get("content-type") {
59 span.record("http.request.content_type", format!("{:?}", content_type));
60 }
61 if let Some(content_length) = headers.get("content-length") {
62 span.record(
63 "http.request.content_length",
64 format!("{:?}", content_length),
65 );
66 }
67
68 debug!("Request path: {path}");
69
70 let mcp_router_path = McpRouterPath::from_url(&path);
72
73 match mcp_router_path {
74 Some(mcp_router_path) => {
75 let mcp_id = mcp_router_path.mcp_id.clone();
76 let base_path = mcp_router_path.base_path.clone();
77
78 span.record("mcp.id", &mcp_id);
79 span.record("mcp.base_path", &base_path);
80
81 debug!("=== Path analysis results ===");
82 debug!("Parsed mcp_id: {}", mcp_id);
83 debug!("Parsed base_path: {}", base_path);
84 debug!("Request path: {} vs base_path: {}", path, base_path);
85 debug!("=== Path analysis ends ===");
86
87 Box::pin(async move {
88 let _guard = span.enter();
89
90 debug!("===Route lookup process ===");
92 debug!("Find base_path: '{}'", base_path);
93
94 if let Some(router_entry) = DynamicRouterService::get_route(&base_path) {
95 debug!(
96 "✅ Find the registered route: base_path={}, path={}",
97 base_path, path
98 );
99
100 let mcp_id_for_check = McpRouterPath::from_url(&path);
102 if let Some(router_path) = mcp_id_for_check {
103 let proxy_manager = get_proxy_manager();
104
105 if let Some(service_status) =
109 proxy_manager.get_mcp_service_status(&router_path.mcp_id)
110 {
111 match &service_status.check_mcp_status_response_status {
112 CheckMcpStatusResponseStatus::Pending => {
113 debug!(
114 "[MCP status check] mcp_id={} The status is Pending, the service is being initialized, and 503 is returned.",
115 router_path.mcp_id
116 );
117 let message = format!(
118 "服务 {} 正在初始化中,请稍后再试",
119 router_path.mcp_id
120 );
121 let http_result: HttpResult<String> =
122 HttpResult::error("0003", &message, None);
123 return Ok(http_result.into_response());
124 }
125 CheckMcpStatusResponseStatus::Error(err) => {
126 warn!(
129 "[MCP status check] mcp_id={} status is Error: {}, clean up resources and return error",
130 router_path.mcp_id, err
131 );
132 if let Err(e) = proxy_manager
134 .cleanup_resources(&router_path.mcp_id)
135 .await
136 {
137 error!(
138 "[MCP status check] mcp_id={} Failed to clean up resources: {}",
139 router_path.mcp_id, e
140 );
141 }
142 let message = format!(
144 "服务 {} 启动失败: {}",
145 router_path.mcp_id, err
146 );
147 let http_result: HttpResult<String> =
148 HttpResult::error("0005", &message, None);
149 return Ok(http_result.into_response());
150 }
151 CheckMcpStatusResponseStatus::Ready => {
152 debug!(
153 "[MCP status check] mcp_id={} status is Ready, continue to check the backend health status",
154 router_path.mcp_id
155 );
156 }
157 }
158 }
159
160 let startup_guard = GLOBAL_RESTART_TRACKER
163 .try_acquire_startup_lock(&router_path.mcp_id);
164
165 if startup_guard.is_none() {
166 debug!(
168 "[Startup lock check] mcp_id={} The startup lock is occupied, the service is starting, and 503 is returned.",
169 router_path.mcp_id
170 );
171 span.record("mcp.startup_in_progress", true);
172 let message =
173 format!("服务 {} 正在启动中,请稍后再试", router_path.mcp_id);
174 let http_result: HttpResult<String> =
175 HttpResult::error("0003", &message, None);
176 span.record("http.response.status_code", 503u16);
177 return Ok(http_result.into_response());
178 }
179
180 let _startup_guard = startup_guard.unwrap();
182 debug!(
183 "[Start lock check] mcp_id={} Successfully obtained the startup lock and started health check",
184 router_path.mcp_id
185 );
186
187 if let Some(handler) =
188 proxy_manager.get_proxy_handler(&router_path.mcp_id)
189 {
190 let is_healthy = if let Some(cached) = GLOBAL_RESTART_TRACKER
192 .get_cached_health_status(&router_path.mcp_id)
193 {
194 debug!(
195 "[Health Check] mcp_id={} Use cache status: is_healthy={}",
196 router_path.mcp_id, cached
197 );
198 cached
199 } else {
200 debug!(
201 "[Health Check] mcp_id={} Cache miss, start actual health check...",
202 router_path.mcp_id
203 );
204 let status = handler.is_mcp_server_ready().await;
205 GLOBAL_RESTART_TRACKER
206 .update_health_status(&router_path.mcp_id, status);
207 debug!(
208 "[Health Check] mcp_id={} Actual health check result: is_healthy={}",
209 router_path.mcp_id, status
210 );
211 status
212 };
213
214 if is_healthy {
215 debug!(
216 "[Health Check] mcp_id={} The backend service is normal, release the lock and use routing",
217 router_path.mcp_id
218 );
219 drop(_startup_guard);
221 debug!("=== Route search ended (successful) ===");
222 return handle_request_with_router(req, router_entry, &path)
223 .await;
224 }
225
226 let mcp_type = proxy_manager
228 .get_mcp_service_status(&router_path.mcp_id)
229 .map(|s| s.mcp_type.clone());
230
231 warn!(
233 "[Health check] mcp_id={} The backend service is unhealthy, clean up resources.",
234 router_path.mcp_id
235 );
236 if let Err(e) =
237 proxy_manager.cleanup_resources(&router_path.mcp_id).await
238 {
239 error!(
240 "[Clean up resources] mcp_id={} Failed to clean up resources: error={}",
241 router_path.mcp_id, e
242 );
243 } else {
244 debug!(
245 "[Clean up resources] mcp_id={} Clean up resources successfully",
246 router_path.mcp_id
247 );
248 }
249
250 if matches!(mcp_type, Some(McpType::OneShot)) {
254 debug!(
255 "[Health Check] mcp_id={} is a OneShot type, does not automatically restart, and returns that the service has ended",
256 router_path.mcp_id
257 );
258 let message = format!(
259 "OneShot 服务 {} 已结束,请重新启动",
260 router_path.mcp_id
261 );
262 let http_result: HttpResult<String> =
263 HttpResult::error("0006", &message, None);
264 return Ok(http_result.into_response());
265 }
266
267 info!(
269 "[Restart process] mcp_id={} is Persistent type, start to restart the service",
270 router_path.mcp_id
271 );
272
273 if let Some(mcp_config) =
276 req.extensions().get::<McpConfig>().cloned()
277 && mcp_config.mcp_json_config.is_some()
278 {
279 info!(
280 "[Restart process] mcp_id={} Use the request header to configure the restart service",
281 mcp_config.mcp_id
282 );
283 proxy_manager
284 .register_mcp_config(&mcp_config.mcp_id, mcp_config.clone())
285 .await;
286 return start_mcp_and_handle_request(req, mcp_config).await;
287 }
288
289 if let Some(mcp_config) = proxy_manager
291 .get_mcp_config_from_cache(&router_path.mcp_id)
292 .await
293 {
294 info!(
295 "[Restart process] mcp_id={} Restart the service using cache configuration",
296 router_path.mcp_id
297 );
298 return start_mcp_and_handle_request(req, mcp_config).await;
299 }
300
301 warn!(
303 "[Restart Process] mcp_id={} Unable to obtain the configuration and unable to restart the service",
304 router_path.mcp_id
305 );
306 let message =
307 format!("服务 {} 不健康且无法获取配置", router_path.mcp_id);
308 let http_result: HttpResult<String> =
309 HttpResult::error("0004", &message, None);
310 return Ok(http_result.into_response());
311 } else {
312 let mcp_type = proxy_manager
315 .get_mcp_service_status(&router_path.mcp_id)
316 .map(|s| s.mcp_type.clone());
317
318 if matches!(mcp_type, Some(McpType::OneShot)) {
319 debug!(
320 "[Service Check] mcp_id={} is OneShot type and the handler does not exist, so it will not restart automatically.",
321 router_path.mcp_id
322 );
323 if let Err(e) =
325 proxy_manager.cleanup_resources(&router_path.mcp_id).await
326 {
327 error!(
328 "[Clean up resources] mcp_id={} Failed to clean up resources: {}",
329 router_path.mcp_id, e
330 );
331 }
332 let message = format!(
333 "OneShot 服务 {} 已结束,请重新启动",
334 router_path.mcp_id
335 );
336 let http_result: HttpResult<String> =
337 HttpResult::error("0006", &message, None);
338 return Ok(http_result.into_response());
339 }
340
341 warn!(
343 "The route exists but the handler does not exist. Enter the restart process: base_path={}",
344 base_path
345 );
346 }
347 } else {
348 debug!("=== Route search ended (successful) ===");
350 return handle_request_with_router(req, router_entry, &path).await;
351 }
352 } else {
353 debug!(
354 "❌ No registered route found: base_path='{}', path='{}'",
355 base_path, path
356 );
357
358 let all_routes = DynamicRouterService::get_all_routes();
360 debug!("Currently registered route: {:?}", all_routes);
361 debug!("=== Route search ended (failed) ===");
362 }
363
364 warn!(
366 "No matching path found, try to start the service: base_path={base_path}, path={path}"
367 );
368 span.record("error.route_not_found", true);
369
370 let mcp_router_path_for_config = McpRouterPath::from_url(&path);
372
373 let proxy_manager = get_proxy_manager();
375
376 if let Some(mcp_config) = req.extensions().get::<McpConfig>().cloned()
378 && mcp_config.mcp_json_config.is_some()
379 {
380 if !GLOBAL_RESTART_TRACKER.can_restart(&mcp_config.mcp_id) {
382 warn!(
383 "Service {} skips startup during restart cooldown period",
384 mcp_config.mcp_id
385 );
386 span.record("error.restart_in_cooldown", true);
387 let message =
388 format!("服务 {} 在重启冷却期内,请稍后再试", mcp_config.mcp_id);
389 let http_result: HttpResult<String> =
390 HttpResult::error("0002", &message, None);
391 span.record("http.response.status_code", 429u16); return Ok(http_result.into_response());
393 }
394
395 let _startup_guard = match GLOBAL_RESTART_TRACKER
397 .try_acquire_startup_lock(&mcp_config.mcp_id)
398 {
399 Some(guard) => guard,
400 None => {
401 warn!(
402 "Service {} is starting, skip this startup",
403 mcp_config.mcp_id
404 );
405 span.record("error.startup_in_progress", true);
406 let message =
407 format!("服务 {} 正在启动中,请稍后再试", mcp_config.mcp_id);
408 let http_result: HttpResult<String> =
409 HttpResult::error("0003", &message, None);
410 span.record("http.response.status_code", 503u16); return Ok(http_result.into_response());
412 }
413 };
414
415 info!(
416 "Use request header configuration to start the service: {}",
417 mcp_config.mcp_id
418 );
419 proxy_manager
421 .register_mcp_config(&mcp_config.mcp_id, mcp_config.clone())
422 .await;
423
424 return start_mcp_and_handle_request(req, mcp_config).await;
426 }
427
428 if let Some(mcp_id_for_cache) =
431 mcp_router_path_for_config.as_ref().map(|p| &p.mcp_id)
432 && let Some(mcp_config) = proxy_manager
433 .get_mcp_config_from_cache(mcp_id_for_cache)
434 .await
435 {
436 if matches!(mcp_config.mcp_type, McpType::OneShot) {
439 info!(
440 "[Startup check] mcp_id={} is a OneShot type, does not automatically start from the cache, and requires an explicit request from the user",
441 mcp_id_for_cache
442 );
443 let message = format!(
444 "OneShot 服务 {} 需要通过 check_status 接口启动",
445 mcp_id_for_cache
446 );
447 let http_result: HttpResult<String> =
448 HttpResult::error("0007", &message, None);
449 return Ok(http_result.into_response());
450 }
451
452 if !GLOBAL_RESTART_TRACKER.can_restart(mcp_id_for_cache) {
454 warn!(
455 "Service {} skips startup during restart cooldown period",
456 mcp_id_for_cache
457 );
458 span.record("error.restart_in_cooldown", true);
459 let message =
460 format!("服务 {} 在重启冷却期内,请稍后再试", mcp_id_for_cache);
461 let http_result: HttpResult<String> =
462 HttpResult::error("0002", &message, None);
463 span.record("http.response.status_code", 429u16); return Ok(http_result.into_response());
465 }
466
467 let _startup_guard = match GLOBAL_RESTART_TRACKER
469 .try_acquire_startup_lock(mcp_id_for_cache)
470 {
471 Some(guard) => guard,
472 None => {
473 warn!(
474 "Service {} is starting, skip this startup",
475 mcp_id_for_cache
476 );
477 span.record("error.startup_in_progress", true);
478 let message =
479 format!("服务 {} 正在启动中,请稍后再试", mcp_id_for_cache);
480 let http_result: HttpResult<String> =
481 HttpResult::error("0003", &message, None);
482 span.record("http.response.status_code", 503u16); return Ok(http_result.into_response());
484 }
485 };
486
487 info!(
488 "Start the service using cached configuration: {}",
489 mcp_id_for_cache
490 );
491 return start_mcp_and_handle_request(req, mcp_config).await;
493 }
494
495 warn!(
497 "No matching path was found, and the configuration was not obtained, so the MCP service could not be started: {path}"
498 );
499 span.record("error.mcp_config_missing", true);
500
501 let message =
502 format!("未找到匹配的路径,且未获取到配置,无法启动MCP服务: {path}");
503 let http_result: HttpResult<String> = HttpResult::error("0001", &message, None);
504 span.record("http.response.status_code", 404u16);
505 span.record("error.message", &message);
506 Ok(http_result.into_response())
507 })
508 }
509 None => {
510 warn!("Request path resolution failed: {path}");
511 span.record("error.path_parse_failed", true);
512
513 let message = format!("请求路径解析失败: {path}");
514 let http_result: HttpResult<String> = HttpResult::error("0001", &message, None);
515 Box::pin(async move {
516 let _guard = span.enter();
517 span.record("http.response.status_code", 400u16);
518 span.record("error.message", &message);
519 Ok(http_result.into_response())
520 })
521 }
522 }
523 }
524}
525
526async fn handle_request_with_router(
528 req: Request<Body>,
529 router_entry: axum::Router,
530 path: &str,
531) -> Result<Response, Infallible> {
532 let trace_id = extract_trace_id();
534
535 let method = req.method().clone();
536 let uri = req.uri().clone();
537
538 info!(
539 "[handle_request_with_router] Handle request: {} {}",
540 method, path
541 );
542
543 if let Some(content_type) = req.headers().get("content-type")
545 && let Ok(content_type_str) = content_type.to_str()
546 {
547 debug!(
548 "[handle_request_with_router] Content-Type: {}",
549 content_type_str
550 );
551 }
552
553 if let Some(content_length) = req.headers().get("content-length")
554 && let Ok(content_length_str) = content_length.to_str()
555 {
556 debug!(
557 "[handle_request_with_router] Content-Length: {}",
558 content_length_str
559 );
560 }
561
562 if let Some(mcp_json) = req.headers().get("x-mcp-json")
564 && let Ok(mcp_json_str) = mcp_json.to_str()
565 {
566 debug!(
567 "[handle_request_with_router] MCP-JSON Header: {}",
568 mcp_json_str
569 );
570 }
571
572 if let Some(query) = uri.query() {
574 debug!("[handle_request_with_router] Query: {}", query);
575 }
576
577 let span = tracing::debug_span!(
580 "handle_request_with_router",
581 otel.name = "Handle Request with Router",
582 component = "router",
583 trace_id = %trace_id,
584 http.method = %method,
585 http.path = %path,
586 );
587
588 let _guard = span.enter();
589
590 let mut service = router_entry.into_service();
591 match service.call(req).await {
592 Ok(response) => {
593 let status = response.status();
594
595 debug!(
597 "[handle_request_with_router]Response status: {}, response header: {response:?}",
598 status
599 );
600
601 span.record("http.response.status_code", status.as_u16());
602 Ok(response)
603 }
604 Err(error) => {
605 span.record("error.router_service_error", true);
606 span.record("error.message", format!("{:?}", error));
607 error!("[handle_request_with_router] error: {error:?}");
608 Ok(axum::http::StatusCode::INTERNAL_SERVER_ERROR.into_response())
609 }
610 }
611}
612
613async fn start_mcp_and_handle_request(
615 req: Request<Body>,
616 mcp_config: McpConfig,
617) -> Result<Response, Infallible> {
618 let request_path = req.uri().path().to_string();
619 let trace_id = extract_trace_id();
620 debug!("Request path: {request_path}");
621
622 let span = tracing::debug_span!(
624 "start_mcp_and_handle_request",
625 otel.name = "Start MCP and Handle Request",
626 component = "mcp_startup",
627 mcp.id = %mcp_config.mcp_id,
628 mcp.type = ?mcp_config.mcp_type,
629 mcp.config.has_config = mcp_config.mcp_json_config.is_some(),
630 trace_id = %trace_id,
631 );
632
633 let _guard = span.enter();
634
635 let ret = mcp_start_task(mcp_config).await;
636
637 if let Ok((router, _)) = ret {
638 span.record("mcp.startup.success", true);
639 handle_request_with_router(req, router, &request_path).await
640 } else {
641 span.record("mcp.startup.failed", true);
642 span.record("error.mcp_startup_failed", true);
643 span.record("error.message", format!("{:?}", ret));
644 warn!("MCP service startup failed: {ret:?}");
645 Ok(axum::http::StatusCode::INTERNAL_SERVER_ERROR.into_response())
646 }
647}