1use crate::{
10 AppError, DynamicRouterService, get_proxy_manager,
11 model::GLOBAL_RESTART_TRACKER,
12 model::{
13 CheckMcpStatusResponseStatus, McpConfig, McpProtocol, McpProtocolPath, McpRouterPath,
14 McpServerCommandConfig, McpServerConfig, McpServiceStatus, McpType,
15 },
16 proxy::{
17 McpHandler, SseBackendConfig, SseServerBuilder, StreamBackendConfig, StreamServerBuilder,
18 },
19};
20
21use anyhow::{Context, Result};
22use log::{debug, info};
23
24pub async fn mcp_start_task(
30 mcp_config: McpConfig,
31) -> Result<(axum::Router, tokio_util::sync::CancellationToken)> {
32 let mcp_id = mcp_config.mcp_id.clone();
33 let client_protocol = mcp_config.client_protocol.clone();
34
35 let mcp_router_path: McpRouterPath = McpRouterPath::new(mcp_id, client_protocol)
37 .map_err(|e| AppError::mcp_server_error(e.to_string()))?;
38
39 let mcp_json_config = mcp_config
40 .mcp_json_config
41 .clone()
42 .expect("mcp_json_config is required");
43
44 let mcp_server_config = McpServerConfig::try_from(mcp_json_config)?;
45
46 integrate_server_with_axum(
48 mcp_server_config.clone(),
49 mcp_router_path.clone(),
50 mcp_config.clone(),
51 )
52 .await
53}
54
55pub async fn integrate_server_with_axum(
63 mcp_config: McpServerConfig,
64 mcp_router_path: McpRouterPath,
65 full_mcp_config: McpConfig,
66) -> Result<(axum::Router, tokio_util::sync::CancellationToken)> {
67 let mcp_type = full_mcp_config.mcp_type.clone();
68 let base_path = mcp_router_path.base_path.clone();
69 let mcp_id = mcp_router_path.mcp_id.clone();
70
71 let backend_protocol = match &mcp_config {
73 McpServerConfig::Command(_) => McpProtocol::Stdio,
75 McpServerConfig::Url(url_config) => {
77 let mut detection_headers = url_config.headers.clone().unwrap_or_default();
79 if let Some(auth_token) = &url_config.auth_token {
80 detection_headers.insert("Authorization".to_string(), auth_token.clone());
81 }
82 let detection_headers_ref = if detection_headers.is_empty() {
83 None
84 } else {
85 Some(&detection_headers)
86 };
87
88 if let Some(type_str) = &url_config.r#type {
90 match type_str.parse::<McpProtocol>() {
91 Ok(protocol) => {
92 debug!(
93 "Using configured protocol type: {} -> {:?}",
94 type_str, protocol
95 );
96 protocol
97 }
98 Err(_) => {
99 debug!("Protocol type '{}' unrecognized, auto-detecting", type_str);
101 let detected_protocol = crate::server::detect_mcp_protocol_with_headers(
102 url_config.get_url(),
103 detection_headers_ref,
104 )
105 .await
106 .map_err(|e| {
107 anyhow::anyhow!(
108 "Protocol type '{}' unrecognized and auto-detection failed: {}",
109 type_str,
110 e
111 )
112 })?;
113 debug!(
114 "Auto-detected protocol: {:?} (original config: '{}')",
115 detected_protocol, type_str
116 );
117 detected_protocol
118 }
119 }
120 } else {
121 debug!("No type field specified, auto-detecting protocol");
123
124 crate::server::detect_mcp_protocol_with_headers(
125 url_config.get_url(),
126 detection_headers_ref,
127 )
128 .await
129 .map_err(|e| anyhow::anyhow!("Auto-detection failed: {}", e))?
130 }
131 }
132 };
133
134 debug!(
135 "MCP ID: {}, client protocol: {:?}, backend protocol: {:?}",
136 mcp_id, mcp_router_path.mcp_protocol, backend_protocol
137 );
138
139 let (router, ct, handler) = match mcp_router_path.mcp_protocol.clone() {
141 McpProtocol::Sse => {
143 let sse_path = match &mcp_router_path.mcp_protocol_path {
144 McpProtocolPath::SsePath(sse_path) => sse_path,
145 _ => unreachable!(),
146 };
147
148 let backend_config = build_sse_backend_config(&mcp_config, backend_protocol)?;
150
151 debug!(
152 "Creating SSE server, sse_path={}, post_path={}",
153 sse_path.sse_path, sse_path.message_path
154 );
155
156 let keep_alive_secs = if matches!(mcp_type, McpType::OneShot) {
159 5
160 } else {
161 15
162 };
163
164 let stateful = !matches!(mcp_type, McpType::OneShot);
167
168 let (router, ct, handler) = SseServerBuilder::new(backend_config)
169 .mcp_id(mcp_id.clone())
170 .sse_path(sse_path.sse_path.clone())
171 .post_path(sse_path.message_path.clone())
172 .keep_alive(keep_alive_secs)
173 .stateful(stateful)
174 .build()
175 .await
176 .with_context(|| {
177 format!(
178 "SSE server build failed - MCP ID: {}, type: {:?}",
179 mcp_id, mcp_type
180 )
181 })?;
182
183 info!(
184 "SSE server started - MCP ID: {}, type: {:?}",
185 mcp_router_path.mcp_id, mcp_type
186 );
187
188 (router, ct, McpHandler::Sse(Box::new(handler)))
189 }
190
191 McpProtocol::Stream => {
193 let backend_config = build_stream_backend_config(&mcp_config, backend_protocol)?;
195
196 let (router, ct, handler) = StreamServerBuilder::new(backend_config)
197 .mcp_id(mcp_id.clone())
198 .stateful(false)
199 .build()
200 .await
201 .with_context(|| {
202 format!(
203 "Stream server build failed - MCP ID: {}, type: {:?}",
204 mcp_id, mcp_type
205 )
206 })?;
207
208 info!(
209 "Streamable HTTP server started - MCP ID: {}, type: {:?}",
210 mcp_router_path.mcp_id, mcp_type
211 );
212
213 (router, ct, McpHandler::Stream(Box::new(handler)))
214 }
215
216 McpProtocol::Stdio => {
218 return Err(anyhow::anyhow!(
219 "Client protocol cannot be Stdio. McpRouterPath::new does not support creating Stdio protocol router paths"
220 ));
221 }
222 };
223
224 let ct_clone = ct.clone();
226 let mcp_id_clone = mcp_id.clone();
227
228 let mcp_service_status = McpServiceStatus::new(
230 mcp_id_clone.clone(),
231 mcp_type.clone(),
232 mcp_router_path.clone(),
233 ct_clone.clone(),
234 CheckMcpStatusResponseStatus::Ready,
235 )
236 .with_mcp_config(full_mcp_config.clone());
237
238 let proxy_manager = get_proxy_manager();
240 proxy_manager.add_mcp_service_status_and_proxy(mcp_service_status, Some(handler));
241
242 proxy_manager
244 .register_mcp_config(&mcp_id, full_mcp_config.clone())
245 .await;
246
247 let router = if matches!(mcp_router_path.mcp_protocol, McpProtocol::Sse) {
249 let modified_router = router.fallback(base_path_fallback_handler);
250 info!("SSE base path handler added, base_path: {}", base_path);
251 modified_router
252 } else {
253 router
254 };
255
256 info!(
258 "Registering route: base_path={}, mcp_id={}",
259 base_path, mcp_id
260 );
261 info!(
262 "SSE path config: sse_path={}, post_path={}",
263 match &mcp_router_path.mcp_protocol_path {
264 McpProtocolPath::SsePath(sse_path) => &sse_path.sse_path,
265 _ => "N/A",
266 },
267 match &mcp_router_path.mcp_protocol_path {
268 McpProtocolPath::SsePath(sse_path) => &sse_path.message_path,
269 _ => "N/A",
270 }
271 );
272 DynamicRouterService::register_route(&base_path, router.clone());
273 info!("Route registration complete: base_path={}", base_path);
274
275 GLOBAL_RESTART_TRACKER.record_restart(&mcp_id);
277
278 Ok((router, ct))
279}
280
281fn build_sse_backend_config(
283 mcp_config: &McpServerConfig,
284 backend_protocol: McpProtocol,
285) -> Result<SseBackendConfig> {
286 match mcp_config {
287 McpServerConfig::Command(cmd_config) => {
288 log_command_details(cmd_config);
289 Ok(SseBackendConfig::Stdio {
290 command: cmd_config.command.clone(),
291 args: cmd_config.args.clone(),
292 env: cmd_config.env.clone(),
293 })
294 }
295 McpServerConfig::Url(url_config) => match backend_protocol {
296 McpProtocol::Stdio => Err(anyhow::anyhow!(
297 "URL-based MCP service cannot use Stdio protocol"
298 )),
299 McpProtocol::Sse => {
300 info!("Connecting to SSE backend: {}", url_config.get_url());
301 Ok(SseBackendConfig::SseUrl {
302 url: url_config.get_url().to_string(),
303 headers: url_config.headers.clone(),
304 })
305 }
306 McpProtocol::Stream => {
307 info!(
308 "Connecting to Streamable HTTP backend (SSE frontend): {}",
309 url_config.get_url()
310 );
311 Ok(SseBackendConfig::StreamUrl {
312 url: url_config.get_url().to_string(),
313 headers: url_config.headers.clone(),
314 })
315 }
316 },
317 }
318}
319
320fn build_stream_backend_config(
322 mcp_config: &McpServerConfig,
323 backend_protocol: McpProtocol,
324) -> Result<StreamBackendConfig> {
325 match mcp_config {
326 McpServerConfig::Command(cmd_config) => {
327 log_command_details(cmd_config);
328 Ok(StreamBackendConfig::Stdio {
329 command: cmd_config.command.clone(),
330 args: cmd_config.args.clone(),
331 env: cmd_config.env.clone(),
332 })
333 }
334 McpServerConfig::Url(url_config) => {
335 match backend_protocol {
336 McpProtocol::Stdio => Err(anyhow::anyhow!(
337 "URL-based MCP service cannot use Stdio protocol"
338 )),
339 McpProtocol::Sse => {
340 Err(anyhow::anyhow!(
344 "SSE backend with Streamable HTTP frontend is not yet supported. \
345 Please use SSE frontend or configure a Streamable HTTP backend."
346 ))
347 }
348 McpProtocol::Stream => {
349 info!(
350 "Connecting to Streamable HTTP backend: {}",
351 url_config.get_url()
352 );
353 Ok(StreamBackendConfig::Url {
354 url: url_config.get_url().to_string(),
355 headers: url_config.headers.clone(),
356 })
357 }
358 }
359 }
360 }
361}
362
363fn log_command_details(mcp_config: &McpServerCommandConfig) {
365 let args_str = mcp_config
366 .args
367 .as_ref()
368 .map_or(String::new(), |args| args.join(" "));
369
370 info!("Executing command: {} {}", mcp_config.command, args_str);
371
372 if let Some(env_vars) = &mcp_config.env {
374 let keys: Vec<&String> = env_vars.keys().collect();
375 if !keys.is_empty() {
376 debug!("Config env keys: {:?}", keys);
377 }
378 }
379
380 debug!(
382 "Process PATH: {}",
383 mcp_common::diagnostic::format_path_summary(3)
384 );
385 for (key, val) in mcp_common::diagnostic::collect_mirror_env_vars() {
386 debug!("Process env: {}={}", key, val);
387 }
388}
389
390#[axum::debug_handler]
392async fn base_path_fallback_handler(
393 method: axum::http::Method,
394 uri: axum::http::Uri,
395 headers: axum::http::HeaderMap,
396) -> impl axum::response::IntoResponse {
397 let path = uri.path();
398 info!("Base path handler: {} {}", method, path);
399
400 if path.contains("/sse/proxy/") {
402 match method {
404 axum::http::Method::GET => {
405 let mcp_id = path.split("/sse/proxy/").nth(1);
407
408 if let Some(mcp_id) = mcp_id {
409 let proxy_manager = get_proxy_manager();
411 if proxy_manager.get_mcp_service_status(mcp_id).is_none() {
412 (
414 axum::http::StatusCode::NOT_FOUND,
415 [("Content-Type", "text/plain".to_string())],
416 format!("MCP service '{}' not found", mcp_id).to_string(),
417 )
418 } else {
419 let accept_header = headers.get("accept");
421 if let Some(accept) = accept_header {
422 let accept_str = accept.to_str().unwrap_or("");
423 if accept_str.contains("text/event-stream") {
424 let redirect_uri = format!("{}/sse", path);
426 info!("SSE redirect to: {}", redirect_uri);
427 (
428 axum::http::StatusCode::FOUND,
429 [("Location", redirect_uri.to_string())],
430 "Redirecting to SSE endpoint".to_string(),
431 )
432 } else {
433 (
435 axum::http::StatusCode::BAD_REQUEST,
436 [("Content-Type", "text/plain".to_string())],
437 "SSE error: Invalid Accept header, expected 'text/event-stream'".to_string(),
438 )
439 }
440 } else {
441 (
443 axum::http::StatusCode::BAD_REQUEST,
444 [("Content-Type", "text/plain".to_string())],
445 "SSE error: Missing Accept header, expected 'text/event-stream'"
446 .to_string(),
447 )
448 }
449 }
450 } else {
451 (
453 axum::http::StatusCode::BAD_REQUEST,
454 [("Content-Type", "text/plain".to_string())],
455 "SSE error: Invalid SSE path".to_string(),
456 )
457 }
458 }
459 axum::http::Method::POST => {
460 let redirect_uri = format!("{}/message", path);
462 info!("SSE redirect to: {}", redirect_uri);
463 (
464 axum::http::StatusCode::FOUND,
465 [("Location", redirect_uri.to_string())],
466 "Redirecting to message endpoint".to_string(),
467 )
468 }
469 _ => {
470 (
472 axum::http::StatusCode::METHOD_NOT_ALLOWED,
473 [("Allow", "GET, POST".to_string())],
474 "Only GET and POST methods are allowed".to_string(),
475 )
476 }
477 }
478 } else if path.contains("/stream/proxy/") {
479 match method {
481 axum::http::Method::GET => {
482 (
484 axum::http::StatusCode::OK,
485 [("Content-Type", "application/json".to_string())],
486 r#"{"jsonrpc":"2.0","result":{"info":"Streamable MCP Server","version":"1.0"}}"#.to_string(),
487 )
488 }
489 axum::http::Method::POST => {
490 (
492 axum::http::StatusCode::OK,
493 [("Content-Type", "application/json".to_string())],
494 r#"{"jsonrpc":"2.0","result":{"message":"Stream request received","protocol":"streamable-http"}}"#.to_string(),
495 )
496 }
497 _ => {
498 (
500 axum::http::StatusCode::METHOD_NOT_ALLOWED,
501 [("Allow", "GET, POST".to_string())],
502 "Only GET and POST methods are allowed".to_string(),
503 )
504 }
505 }
506 } else {
507 (
509 axum::http::StatusCode::BAD_REQUEST,
510 [("Content-Type", "text/plain".to_string())],
511 "Unknown protocol or path".to_string(),
512 )
513 }
514}