Skip to main content

mcp_stdio_proxy/server/task/
mcp_start_task.rs

1//! MCP Service Start Task
2//!
3//! This module handles starting MCP services using the Builder APIs from
4//! mcp-sse-proxy and mcp-streamable-proxy libraries.
5//!
6//! The refactored implementation removes direct rmcp dependency by delegating
7//! protocol-specific logic to the proxy libraries.
8
9use crate::{
10    AppError, DynamicRouterService, get_proxy_manager,
11    model::GLOBAL_RESTART_TRACKER,
12    model::{
13        CheckMcpStatusResponseStatus, McpConfig, McpProtocol, McpProtocolPath, McpRouterPath,
14        McpServerCommandConfig, McpServerConfig, McpServiceStatus, McpType,
15    },
16    proxy::{
17        McpHandler, SseBackendConfig, SseServerBuilder, StreamBackendConfig, StreamServerBuilder,
18    },
19};
20
21use anyhow::{Context, Result};
22use log::{debug, info};
23
24/// Start an MCP service based on configuration
25///
26/// This function creates and configures an MCP proxy service based on the
27/// provided configuration. It supports both SSE and Streamable HTTP client
28/// protocols, with automatic backend protocol detection for URL-based services.
29pub async fn mcp_start_task(
30    mcp_config: McpConfig,
31) -> Result<(axum::Router, tokio_util::sync::CancellationToken)> {
32    let mcp_id = mcp_config.mcp_id.clone();
33    let client_protocol = mcp_config.client_protocol.clone();
34
35    // Create router path based on client protocol (determines exposed API interface)
36    let mcp_router_path: McpRouterPath = McpRouterPath::new(mcp_id, client_protocol)
37        .map_err(|e| AppError::mcp_server_error(e.to_string()))?;
38
39    let mcp_json_config = mcp_config
40        .mcp_json_config
41        .clone()
42        .expect("mcp_json_config is required");
43
44    let mcp_server_config = McpServerConfig::try_from(mcp_json_config)?;
45
46    // Use the integrated method to create the server
47    integrate_server_with_axum(
48        mcp_server_config.clone(),
49        mcp_router_path.clone(),
50        mcp_config.clone(),
51    )
52    .await
53}
54
55/// Integrate MCP server with axum router
56///
57/// This function:
58/// 1. Determines backend protocol (stdio, SSE, or Streamable HTTP)
59/// 2. Creates the appropriate server using Builder APIs
60/// 3. Registers the handler with ProxyManager
61/// 4. Sets up dynamic routing
62pub async fn integrate_server_with_axum(
63    mcp_config: McpServerConfig,
64    mcp_router_path: McpRouterPath,
65    full_mcp_config: McpConfig,
66) -> Result<(axum::Router, tokio_util::sync::CancellationToken)> {
67    let mcp_type = full_mcp_config.mcp_type.clone();
68    let base_path = mcp_router_path.base_path.clone();
69    let mcp_id = mcp_router_path.mcp_id.clone();
70
71    // Determine backend protocol from configuration
72    let backend_protocol = match &mcp_config {
73        // Command-line config: use stdio protocol
74        McpServerConfig::Command(_) => McpProtocol::Stdio,
75        // URL config: parse type field or auto-detect
76        McpServerConfig::Url(url_config) => {
77            // Merge headers + auth_token for protocol detection
78            let mut detection_headers = url_config.headers.clone().unwrap_or_default();
79            if let Some(auth_token) = &url_config.auth_token {
80                detection_headers.insert("Authorization".to_string(), auth_token.clone());
81            }
82            let detection_headers_ref = if detection_headers.is_empty() {
83                None
84            } else {
85                Some(&detection_headers)
86            };
87
88            // Check type field first
89            if let Some(type_str) = &url_config.r#type {
90                match type_str.parse::<McpProtocol>() {
91                    Ok(protocol) => {
92                        debug!(
93                            "Using configured protocol type: {} -> {:?}",
94                            type_str, protocol
95                        );
96                        protocol
97                    }
98                    Err(_) => {
99                        // If parsing fails, auto-detect
100                        debug!("Protocol type '{}' unrecognized, auto-detecting", type_str);
101                        let detected_protocol = crate::server::detect_mcp_protocol_with_headers(
102                            url_config.get_url(),
103                            detection_headers_ref,
104                        )
105                        .await
106                        .map_err(|e| {
107                            anyhow::anyhow!(
108                                "Protocol type '{}' unrecognized and auto-detection failed: {}",
109                                type_str,
110                                e
111                            )
112                        })?;
113                        debug!(
114                            "Auto-detected protocol: {:?} (original config: '{}')",
115                            detected_protocol, type_str
116                        );
117                        detected_protocol
118                    }
119                }
120            } else {
121                // No type field, auto-detect
122                debug!("No type field specified, auto-detecting protocol");
123
124                crate::server::detect_mcp_protocol_with_headers(
125                    url_config.get_url(),
126                    detection_headers_ref,
127                )
128                .await
129                .map_err(|e| anyhow::anyhow!("Auto-detection failed: {}", e))?
130            }
131        }
132    };
133
134    debug!(
135        "MCP ID: {}, client protocol: {:?}, backend protocol: {:?}",
136        mcp_id, mcp_router_path.mcp_protocol, backend_protocol
137    );
138
139    // Create server based on client protocol using Builder APIs
140    let (router, ct, handler) = match mcp_router_path.mcp_protocol.clone() {
141        // ================ Client uses SSE protocol ================
142        McpProtocol::Sse => {
143            let sse_path = match &mcp_router_path.mcp_protocol_path {
144                McpProtocolPath::SsePath(sse_path) => sse_path,
145                _ => unreachable!(),
146            };
147
148            // Build backend config for SSE
149            let backend_config = build_sse_backend_config(&mcp_config, backend_protocol)?;
150
151            debug!(
152                "Creating SSE server, sse_path={}, post_path={}",
153                sse_path.sse_path, sse_path.message_path
154            );
155
156            // 对于 OneShot 服务,使用更短的 keep_alive 间隔(5秒)来保持后端活跃
157            // 防止后端进程因空闲超时而退出
158            let keep_alive_secs = if matches!(mcp_type, McpType::OneShot) {
159                5
160            } else {
161                15
162            };
163
164            // 对于 OneShot 服务,禁用 stateful 模式以加快响应速度
165            // stateful=false 会跳过 MCP 初始化步骤,直接处理请求
166            let stateful = !matches!(mcp_type, McpType::OneShot);
167
168            let (router, ct, handler) = SseServerBuilder::new(backend_config)
169                .mcp_id(mcp_id.clone())
170                .sse_path(sse_path.sse_path.clone())
171                .post_path(sse_path.message_path.clone())
172                .keep_alive(keep_alive_secs)
173                .stateful(stateful)
174                .build()
175                .await
176                .with_context(|| {
177                    format!(
178                        "SSE server build failed - MCP ID: {}, type: {:?}",
179                        mcp_id, mcp_type
180                    )
181                })?;
182
183            info!(
184                "SSE server started - MCP ID: {}, type: {:?}",
185                mcp_router_path.mcp_id, mcp_type
186            );
187
188            (router, ct, McpHandler::Sse(Box::new(handler)))
189        }
190
191        // ================ Client uses Streamable HTTP protocol ================
192        McpProtocol::Stream => {
193            // Build backend config for Stream
194            let backend_config = build_stream_backend_config(&mcp_config, backend_protocol)?;
195
196            let (router, ct, handler) = StreamServerBuilder::new(backend_config)
197                .mcp_id(mcp_id.clone())
198                .stateful(false)
199                .build()
200                .await
201                .with_context(|| {
202                    format!(
203                        "Stream server build failed - MCP ID: {}, type: {:?}",
204                        mcp_id, mcp_type
205                    )
206                })?;
207
208            info!(
209                "Streamable HTTP server started - MCP ID: {}, type: {:?}",
210                mcp_router_path.mcp_id, mcp_type
211            );
212
213            (router, ct, McpHandler::Stream(Box::new(handler)))
214        }
215
216        // Client stdio protocol is not supported in server mode
217        McpProtocol::Stdio => {
218            return Err(anyhow::anyhow!(
219                "Client protocol cannot be Stdio. McpRouterPath::new does not support creating Stdio protocol router paths"
220            ));
221        }
222    };
223
224    // Clone cancellation token for monitoring
225    let ct_clone = ct.clone();
226    let mcp_id_clone = mcp_id.clone();
227
228    // Store MCP service status with full mcp_config for auto-restart
229    let mcp_service_status = McpServiceStatus::new(
230        mcp_id_clone.clone(),
231        mcp_type.clone(),
232        mcp_router_path.clone(),
233        ct_clone.clone(),
234        CheckMcpStatusResponseStatus::Ready,
235    )
236    .with_mcp_config(full_mcp_config.clone());
237
238    // Add MCP service status and proxy handler to global manager
239    let proxy_manager = get_proxy_manager();
240    proxy_manager.add_mcp_service_status_and_proxy(mcp_service_status, Some(handler));
241
242    // ===== 新增:注册配置到缓存 =====
243    proxy_manager
244        .register_mcp_config(&mcp_id, full_mcp_config.clone())
245        .await;
246
247    // Add base path fallback handler for SSE protocol
248    let router = if matches!(mcp_router_path.mcp_protocol, McpProtocol::Sse) {
249        let modified_router = router.fallback(base_path_fallback_handler);
250        info!("SSE base path handler added, base_path: {}", base_path);
251        modified_router
252    } else {
253        router
254    };
255
256    // Register route to global route table
257    info!(
258        "Registering route: base_path={}, mcp_id={}",
259        base_path, mcp_id
260    );
261    info!(
262        "SSE path config: sse_path={}, post_path={}",
263        match &mcp_router_path.mcp_protocol_path {
264            McpProtocolPath::SsePath(sse_path) => &sse_path.sse_path,
265            _ => "N/A",
266        },
267        match &mcp_router_path.mcp_protocol_path {
268            McpProtocolPath::SsePath(sse_path) => &sse_path.message_path,
269            _ => "N/A",
270        }
271    );
272    DynamicRouterService::register_route(&base_path, router.clone());
273    info!("Route registration complete: base_path={}", base_path);
274
275    // 记录重启时间戳(仅在服务成功启动后)
276    GLOBAL_RESTART_TRACKER.record_restart(&mcp_id);
277
278    Ok((router, ct))
279}
280
281/// Build SSE backend configuration from MCP server config
282fn build_sse_backend_config(
283    mcp_config: &McpServerConfig,
284    backend_protocol: McpProtocol,
285) -> Result<SseBackendConfig> {
286    match mcp_config {
287        McpServerConfig::Command(cmd_config) => {
288            log_command_details(cmd_config);
289            Ok(SseBackendConfig::Stdio {
290                command: cmd_config.command.clone(),
291                args: cmd_config.args.clone(),
292                env: cmd_config.env.clone(),
293            })
294        }
295        McpServerConfig::Url(url_config) => match backend_protocol {
296            McpProtocol::Stdio => Err(anyhow::anyhow!(
297                "URL-based MCP service cannot use Stdio protocol"
298            )),
299            McpProtocol::Sse => {
300                info!("Connecting to SSE backend: {}", url_config.get_url());
301                Ok(SseBackendConfig::SseUrl {
302                    url: url_config.get_url().to_string(),
303                    headers: url_config.headers.clone(),
304                })
305            }
306            McpProtocol::Stream => {
307                info!(
308                    "Connecting to Streamable HTTP backend (SSE frontend): {}",
309                    url_config.get_url()
310                );
311                Ok(SseBackendConfig::StreamUrl {
312                    url: url_config.get_url().to_string(),
313                    headers: url_config.headers.clone(),
314                })
315            }
316        },
317    }
318}
319
320/// Build Stream backend configuration from MCP server config
321fn build_stream_backend_config(
322    mcp_config: &McpServerConfig,
323    backend_protocol: McpProtocol,
324) -> Result<StreamBackendConfig> {
325    match mcp_config {
326        McpServerConfig::Command(cmd_config) => {
327            log_command_details(cmd_config);
328            Ok(StreamBackendConfig::Stdio {
329                command: cmd_config.command.clone(),
330                args: cmd_config.args.clone(),
331                env: cmd_config.env.clone(),
332            })
333        }
334        McpServerConfig::Url(url_config) => {
335            match backend_protocol {
336                McpProtocol::Stdio => Err(anyhow::anyhow!(
337                    "URL-based MCP service cannot use Stdio protocol"
338                )),
339                McpProtocol::Sse => {
340                    // Note: StreamServerBuilder currently only supports Streamable HTTP URL backend
341                    // SSE backend with Stream frontend would require protocol conversion
342                    // For now, we return an error for this combination
343                    Err(anyhow::anyhow!(
344                        "SSE backend with Streamable HTTP frontend is not yet supported. \
345                         Please use SSE frontend or configure a Streamable HTTP backend."
346                    ))
347                }
348                McpProtocol::Stream => {
349                    info!(
350                        "Connecting to Streamable HTTP backend: {}",
351                        url_config.get_url()
352                    );
353                    Ok(StreamBackendConfig::Url {
354                        url: url_config.get_url().to_string(),
355                        headers: url_config.headers.clone(),
356                    })
357                }
358            }
359        }
360    }
361}
362
363/// Log command execution details for debugging
364fn log_command_details(mcp_config: &McpServerCommandConfig) {
365    let args_str = mcp_config
366        .args
367        .as_ref()
368        .map_or(String::new(), |args| args.join(" "));
369
370    info!("Executing command: {} {}", mcp_config.command, args_str);
371
372    // 只输出 env 变量的 key 列表,避免泄露敏感 value
373    if let Some(env_vars) = &mcp_config.env {
374        let keys: Vec<&String> = env_vars.keys().collect();
375        if !keys.is_empty() {
376            debug!("Config env keys: {:?}", keys);
377        }
378    }
379
380    // 输出进程级关键环境变量(PATH 摘要 + 镜像变量)
381    debug!(
382        "Process PATH: {}",
383        mcp_common::diagnostic::format_path_summary(3)
384    );
385    for (key, val) in mcp_common::diagnostic::collect_mirror_env_vars() {
386        debug!("Process env: {}={}", key, val);
387    }
388}
389
390/// Base path fallback handler - supports direct access to base path with automatic redirection
391#[axum::debug_handler]
392async fn base_path_fallback_handler(
393    method: axum::http::Method,
394    uri: axum::http::Uri,
395    headers: axum::http::HeaderMap,
396) -> impl axum::response::IntoResponse {
397    let path = uri.path();
398    info!("Base path handler: {} {}", method, path);
399
400    // Determine if SSE or Stream protocol
401    if path.contains("/sse/proxy/") {
402        // SSE protocol handling
403        match method {
404            axum::http::Method::GET => {
405                // Extract MCP ID from path
406                let mcp_id = path.split("/sse/proxy/").nth(1);
407
408                if let Some(mcp_id) = mcp_id {
409                    // Check if MCP service exists
410                    let proxy_manager = get_proxy_manager();
411                    if proxy_manager.get_mcp_service_status(mcp_id).is_none() {
412                        // MCP service not found
413                        (
414                            axum::http::StatusCode::NOT_FOUND,
415                            [("Content-Type", "text/plain".to_string())],
416                            format!("MCP service '{}' not found", mcp_id).to_string(),
417                        )
418                    } else {
419                        // MCP service exists, check Accept header
420                        let accept_header = headers.get("accept");
421                        if let Some(accept) = accept_header {
422                            let accept_str = accept.to_str().unwrap_or("");
423                            if accept_str.contains("text/event-stream") {
424                                // Correct Accept header, redirect to /sse
425                                let redirect_uri = format!("{}/sse", path);
426                                info!("SSE redirect to: {}", redirect_uri);
427                                (
428                                    axum::http::StatusCode::FOUND,
429                                    [("Location", redirect_uri.to_string())],
430                                    "Redirecting to SSE endpoint".to_string(),
431                                )
432                            } else {
433                                // Incorrect Accept header
434                                (
435                                    axum::http::StatusCode::BAD_REQUEST,
436                                    [("Content-Type", "text/plain".to_string())],
437                                    "SSE error: Invalid Accept header, expected 'text/event-stream'".to_string(),
438                                )
439                            }
440                        } else {
441                            // No Accept header
442                            (
443                                axum::http::StatusCode::BAD_REQUEST,
444                                [("Content-Type", "text/plain".to_string())],
445                                "SSE error: Missing Accept header, expected 'text/event-stream'"
446                                    .to_string(),
447                            )
448                        }
449                    }
450                } else {
451                    // Cannot extract MCP ID from path
452                    (
453                        axum::http::StatusCode::BAD_REQUEST,
454                        [("Content-Type", "text/plain".to_string())],
455                        "SSE error: Invalid SSE path".to_string(),
456                    )
457                }
458            }
459            axum::http::Method::POST => {
460                // POST request redirect to /message
461                let redirect_uri = format!("{}/message", path);
462                info!("SSE redirect to: {}", redirect_uri);
463                (
464                    axum::http::StatusCode::FOUND,
465                    [("Location", redirect_uri.to_string())],
466                    "Redirecting to message endpoint".to_string(),
467                )
468            }
469            _ => {
470                // Other methods return 405 Method Not Allowed
471                (
472                    axum::http::StatusCode::METHOD_NOT_ALLOWED,
473                    [("Allow", "GET, POST".to_string())],
474                    "Only GET and POST methods are allowed".to_string(),
475                )
476            }
477        }
478    } else if path.contains("/stream/proxy/") {
479        // Stream protocol handling - return success directly without redirect
480        match method {
481            axum::http::Method::GET => {
482                // GET request returns server info
483                (
484                    axum::http::StatusCode::OK,
485                    [("Content-Type", "application/json".to_string())],
486                    r#"{"jsonrpc":"2.0","result":{"info":"Streamable MCP Server","version":"1.0"}}"#.to_string(),
487                )
488            }
489            axum::http::Method::POST => {
490                // POST request returns success, let StreamableHttpService handle
491                (
492                    axum::http::StatusCode::OK,
493                    [("Content-Type", "application/json".to_string())],
494                    r#"{"jsonrpc":"2.0","result":{"message":"Stream request received","protocol":"streamable-http"}}"#.to_string(),
495                )
496            }
497            _ => {
498                // Other methods return 405 Method Not Allowed
499                (
500                    axum::http::StatusCode::METHOD_NOT_ALLOWED,
501                    [("Allow", "GET, POST".to_string())],
502                    "Only GET and POST methods are allowed".to_string(),
503                )
504            }
505        }
506    } else {
507        // Unknown protocol
508        (
509            axum::http::StatusCode::BAD_REQUEST,
510            [("Content-Type", "text/plain".to_string())],
511            "Unknown protocol or path".to_string(),
512        )
513    }
514}