Skip to main content

mcp_streamable_proxy/
server.rs

1//! Streamable HTTP server implementation
2//!
3//! This module provides the HTTP server that uses ProxyAwareSessionManager
4//! for stateful session management with backend version control.
5
6use anyhow::{Result, bail};
7use mcp_common::{McpServiceConfig, check_windows_command, wrap_process_v9};
8use rmcp::{
9    ServiceExt,
10    model::{ClientCapabilities, ClientInfo},
11    transport::{
12        TokioChildProcess,
13        streamable_http_server::{StreamableHttpServerConfig, StreamableHttpService},
14    },
15};
16use std::process::Stdio;
17use std::sync::Arc;
18use tracing::{error, info, warn};
19
20// 进程组管理(跨平台子进程清理)
21// process-wrap 9.0 使用 CommandWrap 而不是 TokioCommandWrap
22use process_wrap::tokio::{CommandWrap, KillOnDrop};
23
24use crate::{ProxyAwareSessionManager, ProxyHandler};
25
26/// 从配置启动 Streamable HTTP 服务器
27///
28/// # Features
29///
30/// - **Stateful Mode**: `stateful_mode: true` 支持 session 管理和服务端推送
31/// - **Version Control**: 自动检测后端重连,使旧 session 失效
32/// - **Full Lifecycle**: 自动创建子进程、连接、handler、服务器
33///
34/// # Arguments
35///
36/// * `config` - MCP 服务配置
37/// * `std_listener` - 预先绑定的 TCP 监听器(端口在重试循环前绑定,保证端口占用)
38/// * `quiet` - 静默模式,不输出启动信息
39pub async fn run_stream_server_from_config(
40    config: McpServiceConfig,
41    std_listener: &std::net::TcpListener,
42    quiet: bool,
43) -> Result<()> {
44    // 1. 使用 process-wrap 创建子进程命令(跨平台进程清理)
45    // process-wrap 会自动处理进程组(Unix)或 Job Object(Windows)
46    // 并且在 Drop 时自动清理子进程树
47    // 子进程默认继承父进程的所有环境变量
48
49    // 🔧 Windows 特殊处理:检测并转换 .cmd/.bat 文件避免弹窗
50    // 如果用户配置了 npm 全局安装的 MCP 服务(如 npx some-server 或 some-server.cmd),
51    // 直接运行会弹 CMD 窗口。这里尝试转换
52    check_windows_command(&config.command);
53
54    info!(
55        "[子进程][{}] 命令: {} {:?}",
56        config.name,
57        config.command,
58        config.args.as_ref().unwrap_or(&vec![])
59    );
60
61    let mut wrapped_cmd = CommandWrap::with_new(&config.command, |command| {
62        if let Some(ref cmd_args) = config.args {
63            command.args(cmd_args);
64        }
65        // 子进程默认继承父进程的所有环境变量
66        // 设置 MCP JSON 配置中的环境变量(会覆盖继承的同名变量)
67        if let Some(ref env_vars) = config.env {
68            for (k, v) in env_vars {
69                command.env(k, v);
70            }
71        }
72    });
73
74    // 应用平台特定的进程包装(Unix: ProcessGroup, Windows: CREATE_NO_WINDOW + JobObject)
75    wrap_process_v9!(wrapped_cmd);
76
77    // 所有平台: Drop 时自动清理进程
78    wrapped_cmd.wrap(KillOnDrop);
79
80    // 2. 启动子进程(rmcp 的 TokioChildProcess 已经支持 process-wrap)
81    //    使用 builder 模式捕获 stderr,便于诊断子 MCP 服务初始化失败
82    let (tokio_process, child_stderr) = TokioChildProcess::builder(wrapped_cmd)
83        .stderr(Stdio::piped())
84        .spawn()?;
85
86    // 启动 stderr 日志读取任务
87    if let Some(stderr_pipe) = child_stderr {
88        mcp_common::spawn_stderr_reader(stderr_pipe, config.name.clone());
89    }
90
91    // 3. 创建客户端信息
92    let capabilities = ClientCapabilities::builder()
93        .enable_experimental()
94        .enable_roots()
95        .enable_roots_list_changed()
96        .enable_sampling()
97        .build();
98    let client_info = ClientInfo::new(
99        capabilities,
100        rmcp::model::Implementation::new("mcp-streamable-proxy-server", env!("CARGO_PKG_VERSION")),
101    );
102
103    // 4. 连接到子进程
104    let client = client_info.serve(tokio_process).await?;
105
106    // 记录子进程启动到日志文件
107    info!(
108        "[子进程启动] Streamable HTTP - 服务名: {}, 命令: {} {:?}",
109        config.name,
110        config.command,
111        config.args.as_ref().unwrap_or(&vec![])
112    );
113
114    if !quiet {
115        eprintln!("✅ 子进程已启动");
116
117        // 获取并打印工具列表
118        match client.list_tools(None).await {
119            Ok(tools_result) => {
120                let tools = &tools_result.tools;
121                if tools.is_empty() {
122                    warn!("[工具列表] 工具列表为空 - 服务名: {}", config.name);
123                    eprintln!("⚠️  工具列表为空");
124                } else {
125                    info!(
126                        "[工具列表] 服务名: {}, 工具数量: {}",
127                        config.name,
128                        tools.len()
129                    );
130                    eprintln!("🔧 可用工具 ({} 个):", tools.len());
131                    for tool in tools.iter().take(10) {
132                        let desc = tool.description.as_deref().unwrap_or("无描述");
133                        let desc_short = if desc.len() > 50 {
134                            format!("{}...", &desc[..50])
135                        } else {
136                            desc.to_string()
137                        };
138                        eprintln!("   - {} : {}", tool.name, desc_short);
139                    }
140                    if tools.len() > 10 {
141                        eprintln!("   ... 和 {} 个其他工具", tools.len() - 10);
142                    }
143                }
144            }
145            Err(e) => {
146                error!(
147                    "[工具列表] 获取工具列表失败 - 服务名: {}, 错误: {}",
148                    config.name, e
149                );
150                eprintln!("⚠️  获取工具列表失败: {}", e);
151            }
152        }
153    } else {
154        // 即使静默模式也记录日志
155        match client.list_tools(None).await {
156            Ok(tools_result) => {
157                info!(
158                    "[工具列表] 服务名: {}, 工具数量: {}",
159                    config.name,
160                    tools_result.tools.len()
161                );
162            }
163            Err(e) => {
164                error!(
165                    "[工具列表] 获取工具列表失败 - 服务名: {}, 错误: {}",
166                    config.name, e
167                );
168            }
169        }
170    }
171
172    // 5. 创建 ProxyHandler
173    let proxy_handler = if let Some(tool_filter) = config.tool_filter {
174        ProxyHandler::with_tool_filter(client, config.name.clone(), tool_filter)
175    } else {
176        ProxyHandler::with_mcp_id(client, config.name.clone())
177    };
178
179    // 6. 启动服务器(使用预绑定的 listener)
180    let listener = tokio::net::TcpListener::from_std(std_listener.try_clone()?)?;
181    run_stream_server(proxy_handler, listener, quiet).await
182}
183
184/// Run Streamable HTTP server with ProxyAwareSessionManager
185///
186/// # Features
187///
188/// - **Stateful Mode**: `stateful_mode: true` 支持 session 管理和服务端推送
189/// - **Version Control**: 自动检测后端重连,使旧 session 失效
190/// - **Hot Swap**: 支持后端连接热替换
191///
192/// # Arguments
193///
194/// * `proxy_handler` - ProxyHandler 实例(包含后端版本控制)
195/// * `listener` - 已绑定的 tokio TcpListener
196/// * `quiet` - 静默模式,不输出启动信息
197pub async fn run_stream_server(
198    proxy_handler: ProxyHandler,
199    listener: tokio::net::TcpListener,
200    quiet: bool,
201) -> Result<()> {
202    let bind_addr = listener
203        .local_addr()
204        .map(|a| a.to_string())
205        .unwrap_or_else(|_| "<unknown>".to_string());
206    let mcp_id = proxy_handler.mcp_id().to_string();
207
208    // 记录服务启动到日志文件
209    info!(
210        "[HTTP服务启动] Streamable HTTP 服务启动 - 地址: {}, MCP ID: {}",
211        bind_addr, mcp_id
212    );
213
214    if !quiet {
215        eprintln!("📡 Streamable HTTP 服务启动: http://{}", bind_addr);
216        eprintln!("💡 MCP 客户端可直接使用: http://{}", bind_addr);
217        eprintln!("✨ 特性: stateful_mode (会话管理 + 服务端推送)");
218        eprintln!("🔄 后端版本控制: 启用 (自动处理重连)");
219        eprintln!("💡 按 Ctrl+C 停止服务");
220    }
221
222    // 包装 handler 为 Arc,供 SessionManager 和 service factory 共享
223    let handler = Arc::new(proxy_handler);
224
225    // 创建自定义 SessionManager(带版本控制)
226    let session_manager = ProxyAwareSessionManager::new(handler.clone());
227
228    // 创建 Streamable HTTP 服务
229    // service factory 每次请求都会调用,返回 handler 的克隆
230    let handler_for_service = handler.clone();
231    let service = StreamableHttpService::new(
232        move || Ok((*handler_for_service).clone()),
233        session_manager.into(), // 转换为 Arc<dyn SessionManager>
234        StreamableHttpServerConfig {
235            stateful_mode: true, // 关键:启用有状态模式
236            ..Default::default()
237        },
238    );
239
240    // Streamable HTTP 直接在根路径提供服务
241    let router = axum::Router::new().fallback_service(service);
242
243    // 使用传入的 listener 启动 HTTP 服务器
244
245    // 使用 select 处理 Ctrl+C 和服务器
246    tokio::select! {
247        result = axum::serve(listener, router) => {
248            if let Err(e) = result {
249                error!(
250                    "[HTTP服务错误] Streamable HTTP 服务器错误 - MCP ID: {}, 错误: {}",
251                    mcp_id, e
252                );
253                bail!("服务器错误: {}", e);
254            }
255        }
256        _ = tokio::signal::ctrl_c() => {
257            info!(
258                "[HTTP服务关闭] 收到退出信号,正在关闭 Streamable HTTP 服务 - MCP ID: {}",
259                mcp_id
260            );
261            if !quiet {
262                eprintln!("\n🛑 收到退出信号,正在关闭...");
263            }
264        }
265    }
266
267    Ok(())
268}