mcp_stdio_proxy/server/
mcp_dynamic_router_service.rs

1use std::{
2    convert::Infallible,
3    task::{Context, Poll},
4};
5
6use axum::{
7    body::Body,
8    extract::Request,
9    response::{IntoResponse, Response},
10};
11use futures::future::BoxFuture;
12use log::{debug, error, info, warn};
13use tower::Service;
14
15use crate::{
16    DynamicRouterService, mcp_start_task,
17    model::{HttpResult, McpConfig, McpRouterPath},
18    server::middlewares::extract_trace_id,
19};
20
21impl Service<Request<Body>> for DynamicRouterService {
22    type Response = Response;
23    type Error = Infallible;
24    type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
25
26    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
27        Poll::Ready(Ok(()))
28    }
29
30    fn call(&mut self, req: Request<Body>) -> Self::Future {
31        let path = req.uri().path().to_string();
32        let method = req.method().clone();
33        let headers = req.headers().clone();
34
35        // DEBUG: 详细路径解析日志
36        debug!("=== 路径解析开始 ===");
37        debug!("原始请求路径: {}", path);
38        debug!("路径包含的通配符参数: {:?}", req.extensions());
39
40        // 提取 trace_id
41        let trace_id = extract_trace_id();
42
43        // 创建根 span (使用 debug_span 减少日志量)
44        let span = tracing::debug_span!(
45            "DynamicRouterService",
46            otel.name = "HTTP Request",
47            http.method = %method,
48            http.route = %path,
49            http.url = %req.uri(),
50            mcp.protocol = format!("{:?}", self.0),
51            trace_id = %trace_id,
52        );
53
54        // 记录请求头信息
55        if let Some(content_type) = headers.get("content-type") {
56            span.record("http.request.content_type", format!("{:?}", content_type));
57        }
58        if let Some(content_length) = headers.get("content-length") {
59            span.record(
60                "http.request.content_length",
61                format!("{:?}", content_length),
62            );
63        }
64
65        debug!("请求路径: {path}");
66
67        // 解析路由路径
68        let mcp_router_path = McpRouterPath::from_url(&path);
69
70        match mcp_router_path {
71            Some(mcp_router_path) => {
72                let mcp_id = mcp_router_path.mcp_id.clone();
73                let base_path = mcp_router_path.base_path.clone();
74
75                span.record("mcp.id", &mcp_id);
76                span.record("mcp.base_path", &base_path);
77
78                debug!("=== 路径解析结果 ===");
79                debug!("解析出的mcp_id: {}", mcp_id);
80                debug!("解析出的base_path: {}", base_path);
81                debug!("请求路径: {} vs base_path: {}", path, base_path);
82                debug!("=== 路径解析结束 ===");
83
84                Box::pin(async move {
85                    let _guard = span.enter();
86
87                    // 先尝试查找已注册的路由
88                    debug!("=== 路由查找过程 ===");
89                    debug!("查找base_path: '{}'", base_path);
90
91                    if let Some(router_entry) = DynamicRouterService::get_route(&base_path) {
92                        debug!(
93                            "✅ 找到已注册的路由: base_path={}, path={}",
94                            base_path, path
95                        );
96                        debug!("=== 路由查找结束(成功) ===");
97                        return handle_request_with_router(req, router_entry).await;
98                    } else {
99                        debug!(
100                            "❌ 未找到已注册的路由: base_path='{}', path='{}'",
101                            base_path, path
102                        );
103
104                        // 显示所有已注册的路由
105                        let all_routes = DynamicRouterService::get_all_routes();
106                        debug!("当前已注册的路由: {:?}", all_routes);
107                        debug!("=== 路由查找结束(失败) ===");
108                    }
109
110                    // 未找到路由,尝试启动服务
111                    warn!("未找到匹配的路径,尝试启动服务:base_path={base_path},path={path}");
112                    span.record("error.route_not_found", true);
113
114                    // 从请求扩展中获取MCP配置
115                    if let Some(mcp_config) = req.extensions().get::<McpConfig>().cloned() {
116                        //mcp_config.mcp_json_config 非空判断
117                        if mcp_config.mcp_json_config.is_some() {
118                            return start_mcp_and_handle_request(req, mcp_config).await;
119                        }
120                    }
121
122                    // 没有配置,无法启动服务
123                    warn!(
124                        "未找到匹配的路径,且未获取到header[x-mcp-json]配置,无法启动MCP服务: {path}"
125                    );
126                    span.record("error.mcp_config_missing", true);
127
128                    let message = format!(
129                        "未找到匹配的路径,且未获取到header[x-mcp-json]配置,无法启动MCP服务: {path}"
130                    );
131                    let http_result: HttpResult<String> = HttpResult::error("0001", &message, None);
132                    span.record("http.response.status_code", 404u16);
133                    span.record("error.message", &message);
134                    Ok(http_result.into_response())
135                })
136            }
137            None => {
138                warn!("请求路径解析失败: {path}");
139                span.record("error.path_parse_failed", true);
140
141                let message = format!("请求路径解析失败: {path}");
142                let http_result: HttpResult<String> = HttpResult::error("0001", &message, None);
143                Box::pin(async move {
144                    let _guard = span.enter();
145                    span.record("http.response.status_code", 400u16);
146                    span.record("error.message", &message);
147                    Ok(http_result.into_response())
148                })
149            }
150        }
151    }
152}
153
154/// 使用给定的路由处理请求
155async fn handle_request_with_router(
156    req: Request<Body>,
157    router_entry: axum::Router,
158) -> Result<Response, Infallible> {
159    // 获取匹配路径的Router,并处理请求
160    let trace_id = extract_trace_id();
161
162    let method = req.method().clone();
163    let uri = req.uri().clone();
164    let path = uri.path();
165
166    info!("[handle_request_with_router]处理请求: {} {}", method, path);
167
168    // 记录请求头中的关键信息
169    if let Some(content_type) = req.headers().get("content-type") {
170        if let Ok(content_type_str) = content_type.to_str() {
171            debug!(
172                "[handle_request_with_router] Content-Type: {}",
173                content_type_str
174            );
175        }
176    }
177
178    if let Some(content_length) = req.headers().get("content-length") {
179        if let Ok(content_length_str) = content_length.to_str() {
180            debug!(
181                "[handle_request_with_router] Content-Length: {}",
182                content_length_str
183            );
184        }
185    }
186
187    // 记录 x-mcp-json 头信息(如果存在)
188    if let Some(mcp_json) = req.headers().get("x-mcp-json") {
189        if let Ok(mcp_json_str) = mcp_json.to_str() {
190            debug!(
191                "[handle_request_with_router] MCP-JSON Header: {}",
192                mcp_json_str
193            );
194        }
195    }
196
197    // 记录查询参数
198    if let Some(query) = uri.query() {
199        debug!("[handle_request_with_router] Query: {}", query);
200    }
201
202    // 使用 debug_span 减少日志量,因为 DynamicRouterService 已经记录了请求信息
203    // 移除 #[tracing::instrument] 避免 span 嵌套导致的日志膨胀问题
204    let span = tracing::debug_span!(
205        "handle_request_with_router",
206        otel.name = "Handle Request with Router",
207        component = "router",
208        trace_id = %trace_id,
209        http.method = %method,
210        http.path = %path,
211    );
212
213    let _guard = span.enter();
214
215    let mut service = router_entry.into_service();
216    match service.call(req).await {
217        Ok(response) => {
218            let status = response.status();
219            span.record("http.response.status_code", status.as_u16());
220
221            // 记录响应头信息
222            debug!(
223                "[handle_request_with_router]响应状态: {}, 响应头: {response:?}",
224                status
225            );
226
227            Ok(response)
228        }
229        Err(error) => {
230            span.record("error.router_service_error", true);
231            span.record("error.message", format!("{:?}", error));
232            error!("[handle_request_with_router]错误: {error:?}");
233            Ok(axum::http::StatusCode::INTERNAL_SERVER_ERROR.into_response())
234        }
235    }
236}
237
238/// 启动MCP服务并处理请求
239async fn start_mcp_and_handle_request(
240    req: Request<Body>,
241    mcp_config: McpConfig,
242) -> Result<Response, Infallible> {
243    let request_path = req.uri().path().to_string();
244    let trace_id = extract_trace_id();
245    debug!("请求路径: {request_path}");
246
247    // 使用 debug_span 减少日志量,移除 #[tracing::instrument] 避免 span 嵌套
248    let span = tracing::debug_span!(
249        "start_mcp_and_handle_request",
250        otel.name = "Start MCP and Handle Request",
251        component = "mcp_startup",
252        mcp.id = %mcp_config.mcp_id,
253        mcp.type = ?mcp_config.mcp_type,
254        mcp.config.has_config = mcp_config.mcp_json_config.is_some(),
255        trace_id = %trace_id,
256    );
257
258    let _guard = span.enter();
259
260    let ret = mcp_start_task(mcp_config).await;
261
262    if let Ok((router, _)) = ret {
263        span.record("mcp.startup.success", true);
264        handle_request_with_router(req, router).await
265    } else {
266        span.record("mcp.startup.failed", true);
267        span.record("error.mcp_startup_failed", true);
268        span.record("error.message", format!("{:?}", ret));
269        warn!("MCP服务启动失败: {ret:?}");
270        Ok(axum::http::StatusCode::INTERNAL_SERVER_ERROR.into_response())
271    }
272}