Skip to main content

mcp_streamable_proxy/
server.rs

1//! Streamable HTTP server implementation
2//!
3//! This module provides the HTTP server that uses ProxyAwareSessionManager
4//! for stateful session management with backend version control.
5
6use anyhow::{Result, bail};
7pub use mcp_common::McpServiceConfig;
8use rmcp::{
9    ServiceExt,
10    model::{ClientCapabilities, ClientInfo},
11    transport::{
12        TokioChildProcess,
13        streamable_http_server::{StreamableHttpServerConfig, StreamableHttpService},
14    },
15};
16use std::sync::Arc;
17use tracing::{error, info, warn};
18
19// 进程组管理(跨平台子进程清理)
20// process-wrap 9.0 使用 CommandWrap 而不是 TokioCommandWrap
21use process_wrap::tokio::{CommandWrap, KillOnDrop};
22
23#[cfg(unix)]
24use process_wrap::tokio::ProcessGroup;
25
26#[cfg(windows)]
27use process_wrap::tokio::JobObject;
28
29use crate::{ProxyAwareSessionManager, ProxyHandler};
30
31/// 从配置启动 Streamable HTTP 服务器
32///
33/// # Features
34///
35/// - **Stateful Mode**: `stateful_mode: true` 支持 session 管理和服务端推送
36/// - **Version Control**: 自动检测后端重连,使旧 session 失效
37/// - **Full Lifecycle**: 自动创建子进程、连接、handler、服务器
38///
39/// # Arguments
40///
41/// * `config` - MCP 服务配置
42/// * `bind_addr` - 绑定地址,例如 "127.0.0.1:3000"
43/// * `quiet` - 静默模式,不输出启动信息
44pub async fn run_stream_server_from_config(
45    config: McpServiceConfig,
46    bind_addr: &str,
47    quiet: bool,
48) -> Result<()> {
49    // 1. 使用 process-wrap 创建子进程命令(跨平台进程清理)
50    // process-wrap 会自动处理进程组(Unix)或 Job Object(Windows)
51    // 并且在 Drop 时自动清理子进程树
52    let mut wrapped_cmd = CommandWrap::with_new(&config.command, |command| {
53        if let Some(ref cmd_args) = config.args {
54            command.args(cmd_args);
55        }
56        if let Some(ref env_vars) = config.env {
57            for (k, v) in env_vars {
58                command.env(k, v);
59            }
60        }
61    });
62    // Unix: 创建进程组,支持 killpg 清理整个进程树
63    #[cfg(unix)]
64    wrapped_cmd.wrap(ProcessGroup::leader());
65    // Windows: 使用 Job Object 管理进程树
66    #[cfg(windows)]
67    wrapped_cmd.wrap(JobObject);
68    // 所有平台: Drop 时自动清理进程
69    wrapped_cmd.wrap(KillOnDrop);
70
71    // 2. 启动子进程(rmcp 的 TokioChildProcess 已经支持 process-wrap)
72    let tokio_process = TokioChildProcess::new(wrapped_cmd)?;
73
74    // 3. 创建客户端信息
75    let client_info = ClientInfo {
76        protocol_version: Default::default(),
77        capabilities: ClientCapabilities::builder()
78            .enable_experimental()
79            .enable_roots()
80            .enable_roots_list_changed()
81            .enable_sampling()
82            .build(),
83        ..Default::default()
84    };
85
86    // 4. 连接到子进程
87    let client = client_info.serve(tokio_process).await?;
88
89    // 记录子进程启动到日志文件
90    info!(
91        "[子进程启动] Streamable HTTP - 服务名: {}, 命令: {} {:?}",
92        config.name,
93        config.command,
94        config.args.as_ref().unwrap_or(&vec![])
95    );
96
97    if !quiet {
98        eprintln!("✅ 子进程已启动");
99
100        // 获取并打印工具列表
101        match client.list_tools(None).await {
102            Ok(tools_result) => {
103                let tools = &tools_result.tools;
104                if tools.is_empty() {
105                    warn!("[工具列表] 工具列表为空 - 服务名: {}", config.name);
106                    eprintln!("⚠️  工具列表为空");
107                } else {
108                    info!(
109                        "[工具列表] 服务名: {}, 工具数量: {}",
110                        config.name,
111                        tools.len()
112                    );
113                    eprintln!("🔧 可用工具 ({} 个):", tools.len());
114                    for tool in tools.iter().take(10) {
115                        let desc = tool.description.as_deref().unwrap_or("无描述");
116                        let desc_short = if desc.len() > 50 {
117                            format!("{}...", &desc[..50])
118                        } else {
119                            desc.to_string()
120                        };
121                        eprintln!("   - {} : {}", tool.name, desc_short);
122                    }
123                    if tools.len() > 10 {
124                        eprintln!("   ... 和 {} 个其他工具", tools.len() - 10);
125                    }
126                }
127            }
128            Err(e) => {
129                error!(
130                    "[工具列表] 获取工具列表失败 - 服务名: {}, 错误: {}",
131                    config.name, e
132                );
133                eprintln!("⚠️  获取工具列表失败: {}", e);
134            }
135        }
136    } else {
137        // 即使静默模式也记录日志
138        match client.list_tools(None).await {
139            Ok(tools_result) => {
140                info!(
141                    "[工具列表] 服务名: {}, 工具数量: {}",
142                    config.name,
143                    tools_result.tools.len()
144                );
145            }
146            Err(e) => {
147                error!(
148                    "[工具列表] 获取工具列表失败 - 服务名: {}, 错误: {}",
149                    config.name, e
150                );
151            }
152        }
153    }
154
155    // 5. 创建 ProxyHandler
156    let proxy_handler = if let Some(tool_filter) = config.tool_filter {
157        ProxyHandler::with_tool_filter(client, config.name.clone(), tool_filter)
158    } else {
159        ProxyHandler::with_mcp_id(client, config.name.clone())
160    };
161
162    // 6. 启动服务器
163    run_stream_server(proxy_handler, bind_addr, quiet).await
164}
165
166/// Run Streamable HTTP server with ProxyAwareSessionManager
167///
168/// # Features
169///
170/// - **Stateful Mode**: `stateful_mode: true` 支持 session 管理和服务端推送
171/// - **Version Control**: 自动检测后端重连,使旧 session 失效
172/// - **Hot Swap**: 支持后端连接热替换
173///
174/// # Arguments
175///
176/// * `proxy_handler` - ProxyHandler 实例(包含后端版本控制)
177/// * `bind_addr` - 绑定地址,例如 "127.0.0.1:3000"
178/// * `quiet` - 静默模式,不输出启动信息
179///
180/// # Example
181///
182/// ```no_run
183/// use mcp_streamable_proxy::{ProxyHandler, run_stream_server};
184/// use mcp_common::ToolFilter;
185///
186/// # async fn example() -> anyhow::Result<()> {
187/// let handler = ProxyHandler::new_disconnected(
188///     "test-mcp".to_string(),
189///     ToolFilter::default(),
190///     Default::default(),
191/// );
192///
193/// run_stream_server(handler, "127.0.0.1:3000", false).await?;
194/// # Ok(())
195/// # }
196/// ```
197pub async fn run_stream_server(
198    proxy_handler: ProxyHandler,
199    bind_addr: &str,
200    quiet: bool,
201) -> Result<()> {
202    let mcp_id = proxy_handler.mcp_id().to_string();
203
204    // 记录服务启动到日志文件
205    info!(
206        "[HTTP服务启动] Streamable HTTP 服务启动 - 地址: {}, MCP ID: {}",
207        bind_addr, mcp_id
208    );
209
210    if !quiet {
211        eprintln!("📡 Streamable HTTP 服务启动: http://{}", bind_addr);
212        eprintln!("💡 MCP 客户端可直接使用: http://{}", bind_addr);
213        eprintln!("✨ 特性: stateful_mode (会话管理 + 服务端推送)");
214        eprintln!("🔄 后端版本控制: 启用 (自动处理重连)");
215        eprintln!("💡 按 Ctrl+C 停止服务");
216    }
217
218    // 包装 handler 为 Arc,供 SessionManager 和 service factory 共享
219    let handler = Arc::new(proxy_handler);
220
221    // 创建自定义 SessionManager(带版本控制)
222    let session_manager = ProxyAwareSessionManager::new(handler.clone());
223
224    // 创建 Streamable HTTP 服务
225    // service factory 每次请求都会调用,返回 handler 的克隆
226    let handler_for_service = handler.clone();
227    let service = StreamableHttpService::new(
228        move || Ok((*handler_for_service).clone()),
229        session_manager.into(), // 转换为 Arc<dyn SessionManager>
230        StreamableHttpServerConfig {
231            stateful_mode: true, // 关键:启用有状态模式
232            ..Default::default()
233        },
234    );
235
236    // Streamable HTTP 直接在根路径提供服务
237    let router = axum::Router::new().fallback_service(service);
238
239    // 启动 HTTP 服务器
240    let listener = tokio::net::TcpListener::bind(bind_addr).await?;
241
242    // 使用 select 处理 Ctrl+C 和服务器
243    tokio::select! {
244        result = axum::serve(listener, router) => {
245            if let Err(e) = result {
246                error!(
247                    "[HTTP服务错误] Streamable HTTP 服务器错误 - MCP ID: {}, 错误: {}",
248                    mcp_id, e
249                );
250                bail!("服务器错误: {}", e);
251            }
252        }
253        _ = tokio::signal::ctrl_c() => {
254            info!(
255                "[HTTP服务关闭] 收到退出信号,正在关闭 Streamable HTTP 服务 - MCP ID: {}",
256                mcp_id
257            );
258            if !quiet {
259                eprintln!("\n🛑 收到退出信号,正在关闭...");
260            }
261        }
262    }
263
264    Ok(())
265}