1use std::{
2 convert::Infallible,
3 task::{Context, Poll},
4};
5
6use axum::{
7 body::Body,
8 extract::Request,
9 response::{IntoResponse, Response},
10};
11use futures::future::BoxFuture;
12use log::{debug, error, info, warn};
13use tower::Service;
14
15use crate::{
16 DynamicRouterService, mcp_start_task,
17 model::{HttpResult, McpConfig, McpRouterPath},
18 server::middlewares::extract_trace_id,
19};
20
21impl Service<Request<Body>> for DynamicRouterService {
22 type Response = Response;
23 type Error = Infallible;
24 type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
25
26 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
27 Poll::Ready(Ok(()))
28 }
29
30 fn call(&mut self, req: Request<Body>) -> Self::Future {
31 let path = req.uri().path().to_string();
32 let method = req.method().clone();
33 let headers = req.headers().clone();
34
35 debug!("=== 路径解析开始 ===");
37 debug!("原始请求路径: {}", path);
38 debug!("路径包含的通配符参数: {:?}", req.extensions());
39
40 let trace_id = extract_trace_id();
42
43 let span = tracing::debug_span!(
45 "DynamicRouterService",
46 otel.name = "HTTP Request",
47 http.method = %method,
48 http.route = %path,
49 http.url = %req.uri(),
50 mcp.protocol = format!("{:?}", self.0),
51 trace_id = %trace_id,
52 );
53
54 if let Some(content_type) = headers.get("content-type") {
56 span.record("http.request.content_type", format!("{:?}", content_type));
57 }
58 if let Some(content_length) = headers.get("content-length") {
59 span.record(
60 "http.request.content_length",
61 format!("{:?}", content_length),
62 );
63 }
64
65 debug!("请求路径: {path}");
66
67 let mcp_router_path = McpRouterPath::from_url(&path);
69
70 match mcp_router_path {
71 Some(mcp_router_path) => {
72 let mcp_id = mcp_router_path.mcp_id.clone();
73 let base_path = mcp_router_path.base_path.clone();
74
75 span.record("mcp.id", &mcp_id);
76 span.record("mcp.base_path", &base_path);
77
78 debug!("=== 路径解析结果 ===");
79 debug!("解析出的mcp_id: {}", mcp_id);
80 debug!("解析出的base_path: {}", base_path);
81 debug!("请求路径: {} vs base_path: {}", path, base_path);
82 debug!("=== 路径解析结束 ===");
83
84 Box::pin(async move {
85 let _guard = span.enter();
86
87 debug!("=== 路由查找过程 ===");
89 debug!("查找base_path: '{}'", base_path);
90
91 if let Some(router_entry) = DynamicRouterService::get_route(&base_path) {
92 debug!(
93 "✅ 找到已注册的路由: base_path={}, path={}",
94 base_path, path
95 );
96 debug!("=== 路由查找结束(成功) ===");
97 return handle_request_with_router(req, router_entry).await;
98 } else {
99 debug!(
100 "❌ 未找到已注册的路由: base_path='{}', path='{}'",
101 base_path, path
102 );
103
104 let all_routes = DynamicRouterService::get_all_routes();
106 debug!("当前已注册的路由: {:?}", all_routes);
107 debug!("=== 路由查找结束(失败) ===");
108 }
109
110 warn!("未找到匹配的路径,尝试启动服务:base_path={base_path},path={path}");
112 span.record("error.route_not_found", true);
113
114 if let Some(mcp_config) = req.extensions().get::<McpConfig>().cloned() {
116 if mcp_config.mcp_json_config.is_some() {
118 return start_mcp_and_handle_request(req, mcp_config).await;
119 }
120 }
121
122 warn!(
124 "未找到匹配的路径,且未获取到header[x-mcp-json]配置,无法启动MCP服务: {path}"
125 );
126 span.record("error.mcp_config_missing", true);
127
128 let message = format!(
129 "未找到匹配的路径,且未获取到header[x-mcp-json]配置,无法启动MCP服务: {path}"
130 );
131 let http_result: HttpResult<String> = HttpResult::error("0001", &message, None);
132 span.record("http.response.status_code", 404u16);
133 span.record("error.message", &message);
134 Ok(http_result.into_response())
135 })
136 }
137 None => {
138 warn!("请求路径解析失败: {path}");
139 span.record("error.path_parse_failed", true);
140
141 let message = format!("请求路径解析失败: {path}");
142 let http_result: HttpResult<String> = HttpResult::error("0001", &message, None);
143 Box::pin(async move {
144 let _guard = span.enter();
145 span.record("http.response.status_code", 400u16);
146 span.record("error.message", &message);
147 Ok(http_result.into_response())
148 })
149 }
150 }
151 }
152}
153
154async fn handle_request_with_router(
156 req: Request<Body>,
157 router_entry: axum::Router,
158) -> Result<Response, Infallible> {
159 let trace_id = extract_trace_id();
161
162 let method = req.method().clone();
163 let uri = req.uri().clone();
164 let path = uri.path();
165
166 info!("[handle_request_with_router]处理请求: {} {}", method, path);
167
168 if let Some(content_type) = req.headers().get("content-type") {
170 if let Ok(content_type_str) = content_type.to_str() {
171 debug!(
172 "[handle_request_with_router] Content-Type: {}",
173 content_type_str
174 );
175 }
176 }
177
178 if let Some(content_length) = req.headers().get("content-length") {
179 if let Ok(content_length_str) = content_length.to_str() {
180 debug!(
181 "[handle_request_with_router] Content-Length: {}",
182 content_length_str
183 );
184 }
185 }
186
187 if let Some(mcp_json) = req.headers().get("x-mcp-json") {
189 if let Ok(mcp_json_str) = mcp_json.to_str() {
190 debug!(
191 "[handle_request_with_router] MCP-JSON Header: {}",
192 mcp_json_str
193 );
194 }
195 }
196
197 if let Some(query) = uri.query() {
199 debug!("[handle_request_with_router] Query: {}", query);
200 }
201
202 let span = tracing::debug_span!(
205 "handle_request_with_router",
206 otel.name = "Handle Request with Router",
207 component = "router",
208 trace_id = %trace_id,
209 http.method = %method,
210 http.path = %path,
211 );
212
213 let _guard = span.enter();
214
215 let mut service = router_entry.into_service();
216 match service.call(req).await {
217 Ok(response) => {
218 let status = response.status();
219 span.record("http.response.status_code", status.as_u16());
220
221 debug!(
223 "[handle_request_with_router]响应状态: {}, 响应头: {response:?}",
224 status
225 );
226
227 Ok(response)
228 }
229 Err(error) => {
230 span.record("error.router_service_error", true);
231 span.record("error.message", format!("{:?}", error));
232 error!("[handle_request_with_router]错误: {error:?}");
233 Ok(axum::http::StatusCode::INTERNAL_SERVER_ERROR.into_response())
234 }
235 }
236}
237
238async fn start_mcp_and_handle_request(
240 req: Request<Body>,
241 mcp_config: McpConfig,
242) -> Result<Response, Infallible> {
243 let request_path = req.uri().path().to_string();
244 let trace_id = extract_trace_id();
245 debug!("请求路径: {request_path}");
246
247 let span = tracing::debug_span!(
249 "start_mcp_and_handle_request",
250 otel.name = "Start MCP and Handle Request",
251 component = "mcp_startup",
252 mcp.id = %mcp_config.mcp_id,
253 mcp.type = ?mcp_config.mcp_type,
254 mcp.config.has_config = mcp_config.mcp_json_config.is_some(),
255 trace_id = %trace_id,
256 );
257
258 let _guard = span.enter();
259
260 let ret = mcp_start_task(mcp_config).await;
261
262 if let Ok((router, _)) = ret {
263 span.record("mcp.startup.success", true);
264 handle_request_with_router(req, router).await
265 } else {
266 span.record("mcp.startup.failed", true);
267 span.record("error.mcp_startup_failed", true);
268 span.record("error.message", format!("{:?}", ret));
269 warn!("MCP服务启动失败: {ret:?}");
270 Ok(axum::http::StatusCode::INTERNAL_SERVER_ERROR.into_response())
271 }
272}