Skip to main content

mcp_streamable_proxy/
server.rs

1//! Streamable HTTP server implementation
2//!
3//! This module provides the HTTP server that uses ProxyAwareSessionManager
4//! for stateful session management with backend version control.
5
6use anyhow::{Result, bail};
7use mcp_common::{McpServiceConfig, check_windows_command, wrap_process_v9};
8use rmcp::{
9    ServiceExt,
10    model::{ClientCapabilities, ClientInfo},
11    transport::{
12        TokioChildProcess,
13        streamable_http_server::{StreamableHttpServerConfig, StreamableHttpService},
14    },
15};
16use std::process::Stdio;
17use std::sync::Arc;
18use tracing::{error, info, warn};
19
20// 进程组管理(跨平台子进程清理)
21// process-wrap 9.0 使用 CommandWrap 而不是 TokioCommandWrap
22use process_wrap::tokio::{CommandWrap, KillOnDrop};
23
24use crate::{ProxyAwareSessionManager, ProxyHandler};
25
26/// 从配置启动 Streamable HTTP 服务器
27///
28/// # Features
29///
30/// - **Stateful Mode**: `stateful_mode: true` 支持 session 管理和服务端推送
31/// - **Version Control**: 自动检测后端重连,使旧 session 失效
32/// - **Full Lifecycle**: 自动创建子进程、连接、handler、服务器
33///
34/// # Arguments
35///
36/// * `config` - MCP 服务配置
37/// * `std_listener` - 预先绑定的 TCP 监听器(端口在重试循环前绑定,保证端口占用)
38/// * `quiet` - 静默模式,不输出启动信息
39pub async fn run_stream_server_from_config(
40    config: McpServiceConfig,
41    std_listener: &std::net::TcpListener,
42    quiet: bool,
43) -> Result<()> {
44    // 1. 使用 process-wrap 创建子进程命令(跨平台进程清理)
45    // process-wrap 会自动处理进程组(Unix)或 Job Object(Windows)
46    // 并且在 Drop 时自动清理子进程树
47    // 子进程默认继承父进程的所有环境变量
48
49    // 🔧 Windows 特殊处理:检测并转换 .cmd/.bat 文件避免弹窗
50    // 如果用户配置了 npm 全局安装的 MCP 服务(如 npx some-server 或 some-server.cmd),
51    // 直接运行会弹 CMD 窗口。这里尝试转换
52    check_windows_command(&config.command);
53
54    info!(
55        "[Subprocess][{}] Command: {} {:?}",
56        config.name,
57        config.command,
58        config.args.as_ref().unwrap_or(&vec![])
59    );
60
61    let mut wrapped_cmd = CommandWrap::with_new(&config.command, |command| {
62        if let Some(ref cmd_args) = config.args {
63            command.args(cmd_args);
64        }
65        // 子进程默认继承父进程的所有环境变量
66        // 设置 MCP JSON 配置中的环境变量(会覆盖继承的同名变量)
67        if let Some(ref env_vars) = config.env {
68            for (k, v) in env_vars {
69                command.env(k, v);
70            }
71        }
72    });
73
74    // 应用平台特定的进程包装(Unix: ProcessGroup, Windows: CREATE_NO_WINDOW + JobObject)
75    wrap_process_v9!(wrapped_cmd);
76
77    // 所有平台: Drop 时自动清理进程
78    wrapped_cmd.wrap(KillOnDrop);
79
80    // 2. 启动子进程(rmcp 的 TokioChildProcess 已经支持 process-wrap)
81    //    使用 builder 模式捕获 stderr,便于诊断子 MCP 服务初始化失败
82    let (tokio_process, child_stderr) = TokioChildProcess::builder(wrapped_cmd)
83        .stderr(Stdio::piped())
84        .spawn()?;
85
86    // 启动 stderr 日志读取任务
87    if let Some(stderr_pipe) = child_stderr {
88        mcp_common::spawn_stderr_reader(stderr_pipe, config.name.clone());
89    }
90
91    // 3. 创建客户端信息
92    let capabilities = ClientCapabilities::builder()
93        .enable_experimental()
94        .enable_roots()
95        .enable_roots_list_changed()
96        .enable_sampling()
97        .build();
98    let client_info = ClientInfo::new(
99        capabilities,
100        rmcp::model::Implementation::new("mcp-streamable-proxy-server", env!("CARGO_PKG_VERSION")),
101    );
102
103    // 4. 连接到子进程
104    let client = client_info.serve(tokio_process).await?;
105
106    // 记录子进程启动到日志文件
107    info!(
108        "[Subprocess startup] Streamable HTTP - Service name: {}, Command: {} {:?}",
109        config.name,
110        config.command,
111        config.args.as_ref().unwrap_or(&vec![])
112    );
113
114    if !quiet {
115        eprintln!("✅ The child process has been started");
116
117        // 获取并打印工具列表
118        match client.list_tools(None).await {
119            Ok(tools_result) => {
120                let tools = &tools_result.tools;
121                if tools.is_empty() {
122                    warn!(
123                        "[Tool list] Tool list is empty - Service name: {}",
124                        config.name
125                    );
126                    eprintln!("⚠️Tool list is empty");
127                } else {
128                    info!(
129                        "[Tool list] Service name: {}, Number of tools: {}",
130                        config.name,
131                        tools.len()
132                    );
133                    eprintln!("🔧 Available tools ({}):", tools.len());
134                    for tool in tools.iter().take(10) {
135                        let desc = tool.description.as_deref().unwrap_or("无描述");
136                        let desc_short = if desc.len() > 50 {
137                            format!("{}...", &desc[..50])
138                        } else {
139                            desc.to_string()
140                        };
141                        eprintln!("   - {} : {}", tool.name, desc_short);
142                    }
143                    if tools.len() > 10 {
144                        eprintln!("... and {} other tools", tools.len() - 10);
145                    }
146                }
147            }
148            Err(e) => {
149                error!(
150                    "[Tool List] Failed to obtain tool list - Service name: {}, Error: {}",
151                    config.name, e
152                );
153                eprintln!("⚠️ Failed to obtain tool list: {}", e);
154            }
155        }
156    } else {
157        // 即使静默模式也记录日志
158        match client.list_tools(None).await {
159            Ok(tools_result) => {
160                info!(
161                    "[Tool list] Service name: {}, Number of tools: {}",
162                    config.name,
163                    tools_result.tools.len()
164                );
165            }
166            Err(e) => {
167                error!(
168                    "[Tool List] Failed to obtain tool list - Service name: {}, Error: {}",
169                    config.name, e
170                );
171            }
172        }
173    }
174
175    // 5. 创建 ProxyHandler
176    let proxy_handler = if let Some(tool_filter) = config.tool_filter {
177        ProxyHandler::with_tool_filter(client, config.name.clone(), tool_filter)
178    } else {
179        ProxyHandler::with_mcp_id(client, config.name.clone())
180    };
181
182    // 6. 启动服务器(使用预绑定的 listener)
183    let listener = tokio::net::TcpListener::from_std(std_listener.try_clone()?)?;
184    run_stream_server(proxy_handler, listener, quiet).await
185}
186
187/// Run Streamable HTTP server with ProxyAwareSessionManager
188///
189/// # Features
190///
191/// - **Stateful Mode**: `stateful_mode: true` 支持 session 管理和服务端推送
192/// - **Version Control**: 自动检测后端重连,使旧 session 失效
193/// - **Hot Swap**: 支持后端连接热替换
194///
195/// # Arguments
196///
197/// * `proxy_handler` - ProxyHandler 实例(包含后端版本控制)
198/// * `listener` - 已绑定的 tokio TcpListener
199/// * `quiet` - 静默模式,不输出启动信息
200pub async fn run_stream_server(
201    proxy_handler: ProxyHandler,
202    listener: tokio::net::TcpListener,
203    quiet: bool,
204) -> Result<()> {
205    let bind_addr = listener
206        .local_addr()
207        .map(|a| a.to_string())
208        .unwrap_or_else(|_| "<unknown>".to_string());
209    let mcp_id = proxy_handler.mcp_id().to_string();
210
211    // 记录服务启动到日志文件
212    info!(
213        "[HTTP service startup] Streamable HTTP service startup - Address: {}, MCP ID: {}",
214        bind_addr, mcp_id
215    );
216
217    if !quiet {
218        eprintln!("📡 Streamable HTTP service startup: http://{}", bind_addr);
219        eprintln!("💡 MCP client can be used directly: http://{}", bind_addr);
220        eprintln!("✨ Feature: stateful_mode (session management + server push)");
221        eprintln!("🔄 Backend version control: Enable (automatically handles reconnections)");
222        eprintln!("💡 Press Ctrl+C to stop the service");
223    }
224
225    // 包装 handler 为 Arc,供 SessionManager 和 service factory 共享
226    let handler = Arc::new(proxy_handler);
227
228    // 创建自定义 SessionManager(带版本控制)
229    let session_manager = ProxyAwareSessionManager::new(handler.clone());
230
231    // 创建 Streamable HTTP 服务
232    // service factory 每次请求都会调用,返回 handler 的克隆
233    let handler_for_service = handler.clone();
234    let service = StreamableHttpService::new(
235        move || Ok((*handler_for_service).clone()),
236        session_manager.into(), // 转换为 Arc<dyn SessionManager>
237        StreamableHttpServerConfig {
238            stateful_mode: true, // 关键:启用有状态模式
239            ..Default::default()
240        },
241    );
242
243    // Streamable HTTP 直接在根路径提供服务
244    let router = axum::Router::new().fallback_service(service);
245
246    // 使用传入的 listener 启动 HTTP 服务器
247
248    // 使用 select 处理 Ctrl+C 和服务器
249    tokio::select! {
250        result = axum::serve(listener, router) => {
251            if let Err(e) = result {
252                error!(
253                    "[HTTP Service Error] Streamable HTTP Server Error - MCP ID: {}, Error: {}",
254                    mcp_id, e
255                );
256                bail!("服务器错误: {}", e);
257            }
258        }
259        _ = tokio::signal::ctrl_c() => {
260            info!(
261                "[HTTP service shutdown] Received exit signal, closing Streamable HTTP service - MCP ID: {}",
262                mcp_id
263            );
264            if !quiet {
265                eprintln!("\\n🛑 Received exit signal, closing...");
266            }
267        }
268    }
269
270    Ok(())
271}