mcp_streamable_proxy/
server.rs

1//! Streamable HTTP server implementation
2//!
3//! This module provides the HTTP server that uses ProxyAwareSessionManager
4//! for stateful session management with backend version control.
5
6use anyhow::{Result, bail};
7pub use mcp_common::McpServiceConfig;
8use rmcp::{
9    ServiceExt,
10    model::{ClientCapabilities, ClientInfo},
11    transport::{
12        TokioChildProcess,
13        streamable_http_server::{StreamableHttpServerConfig, StreamableHttpService},
14    },
15};
16use std::sync::Arc;
17use tokio::process::Command;
18use tracing::{error, info, warn};
19
20use crate::{ProxyAwareSessionManager, ProxyHandler};
21
22/// 从配置启动 Streamable HTTP 服务器
23///
24/// # Features
25///
26/// - **Stateful Mode**: `stateful_mode: true` 支持 session 管理和服务端推送
27/// - **Version Control**: 自动检测后端重连,使旧 session 失效
28/// - **Full Lifecycle**: 自动创建子进程、连接、handler、服务器
29///
30/// # Arguments
31///
32/// * `config` - MCP 服务配置
33/// * `bind_addr` - 绑定地址,例如 "127.0.0.1:3000"
34/// * `quiet` - 静默模式,不输出启动信息
35pub async fn run_stream_server_from_config(
36    config: McpServiceConfig,
37    bind_addr: &str,
38    quiet: bool,
39) -> Result<()> {
40    // 1. 创建子进程命令
41    let mut command = Command::new(&config.command);
42
43    if let Some(ref cmd_args) = config.args {
44        command.args(cmd_args);
45    }
46
47    if let Some(ref env_vars) = config.env {
48        for (k, v) in env_vars {
49            command.env(k, v);
50        }
51    }
52
53    // 2. 启动子进程
54    let tokio_process = TokioChildProcess::new(command)?;
55
56    // 3. 创建客户端信息
57    let client_info = ClientInfo {
58        protocol_version: Default::default(),
59        capabilities: ClientCapabilities::builder()
60            .enable_experimental()
61            .enable_roots()
62            .enable_roots_list_changed()
63            .enable_sampling()
64            .build(),
65        ..Default::default()
66    };
67
68    // 4. 连接到子进程
69    let client = client_info.serve(tokio_process).await?;
70
71    // 记录子进程启动到日志文件
72    info!(
73        "[子进程启动] Streamable HTTP - 服务名: {}, 命令: {} {:?}",
74        config.name,
75        config.command,
76        config.args.as_ref().unwrap_or(&vec![])
77    );
78
79    if !quiet {
80        eprintln!("✅ 子进程已启动");
81
82        // 获取并打印工具列表
83        match client.list_tools(None).await {
84            Ok(tools_result) => {
85                let tools = &tools_result.tools;
86                if tools.is_empty() {
87                    warn!(
88                        "[工具列表] 工具列表为空 - 服务名: {}",
89                        config.name
90                    );
91                    eprintln!("⚠️  工具列表为空");
92                } else {
93                    info!(
94                        "[工具列表] 服务名: {}, 工具数量: {}",
95                        config.name,
96                        tools.len()
97                    );
98                    eprintln!("🔧 可用工具 ({} 个):", tools.len());
99                    for tool in tools.iter().take(10) {
100                        let desc = tool.description.as_deref().unwrap_or("无描述");
101                        let desc_short = if desc.len() > 50 {
102                            format!("{}...", &desc[..50])
103                        } else {
104                            desc.to_string()
105                        };
106                        eprintln!("   - {} : {}", tool.name, desc_short);
107                    }
108                    if tools.len() > 10 {
109                        eprintln!("   ... 和 {} 个其他工具", tools.len() - 10);
110                    }
111                }
112            }
113            Err(e) => {
114                error!(
115                    "[工具列表] 获取工具列表失败 - 服务名: {}, 错误: {}",
116                    config.name, e
117                );
118                eprintln!("⚠️  获取工具列表失败: {}", e);
119            }
120        }
121    } else {
122        // 即使静默模式也记录日志
123        match client.list_tools(None).await {
124            Ok(tools_result) => {
125                info!(
126                    "[工具列表] 服务名: {}, 工具数量: {}",
127                    config.name,
128                    tools_result.tools.len()
129                );
130            }
131            Err(e) => {
132                error!(
133                    "[工具列表] 获取工具列表失败 - 服务名: {}, 错误: {}",
134                    config.name, e
135                );
136            }
137        }
138    }
139
140    // 5. 创建 ProxyHandler
141    let proxy_handler = if let Some(tool_filter) = config.tool_filter {
142        ProxyHandler::with_tool_filter(client, config.name.clone(), tool_filter)
143    } else {
144        ProxyHandler::with_mcp_id(client, config.name.clone())
145    };
146
147    // 6. 启动服务器
148    run_stream_server(proxy_handler, bind_addr, quiet).await
149}
150
151/// Run Streamable HTTP server with ProxyAwareSessionManager
152///
153/// # Features
154///
155/// - **Stateful Mode**: `stateful_mode: true` 支持 session 管理和服务端推送
156/// - **Version Control**: 自动检测后端重连,使旧 session 失效
157/// - **Hot Swap**: 支持后端连接热替换
158///
159/// # Arguments
160///
161/// * `proxy_handler` - ProxyHandler 实例(包含后端版本控制)
162/// * `bind_addr` - 绑定地址,例如 "127.0.0.1:3000"
163/// * `quiet` - 静默模式,不输出启动信息
164///
165/// # Example
166///
167/// ```no_run
168/// use mcp_streamable_proxy::{ProxyHandler, run_stream_server};
169/// use mcp_common::ToolFilter;
170///
171/// # async fn example() -> anyhow::Result<()> {
172/// let handler = ProxyHandler::new_disconnected(
173///     "test-mcp".to_string(),
174///     ToolFilter::default(),
175///     Default::default(),
176/// );
177///
178/// run_stream_server(handler, "127.0.0.1:3000", false).await?;
179/// # Ok(())
180/// # }
181/// ```
182pub async fn run_stream_server(
183    proxy_handler: ProxyHandler,
184    bind_addr: &str,
185    quiet: bool,
186) -> Result<()> {
187    let mcp_id = proxy_handler.mcp_id().to_string();
188
189    // 记录服务启动到日志文件
190    info!(
191        "[HTTP服务启动] Streamable HTTP 服务启动 - 地址: {}, MCP ID: {}",
192        bind_addr, mcp_id
193    );
194
195    if !quiet {
196        eprintln!("📡 Streamable HTTP 服务启动: http://{}", bind_addr);
197        eprintln!("💡 MCP 客户端可直接使用: http://{}", bind_addr);
198        eprintln!("✨ 特性: stateful_mode (会话管理 + 服务端推送)");
199        eprintln!("🔄 后端版本控制: 启用 (自动处理重连)");
200        eprintln!("💡 按 Ctrl+C 停止服务");
201    }
202
203    // 包装 handler 为 Arc,供 SessionManager 和 service factory 共享
204    let handler = Arc::new(proxy_handler);
205
206    // 创建自定义 SessionManager(带版本控制)
207    let session_manager = ProxyAwareSessionManager::new(handler.clone());
208
209    // 创建 Streamable HTTP 服务
210    // service factory 每次请求都会调用,返回 handler 的克隆
211    let handler_for_service = handler.clone();
212    let service = StreamableHttpService::new(
213        move || Ok((*handler_for_service).clone()),
214        session_manager.into(), // 转换为 Arc<dyn SessionManager>
215        StreamableHttpServerConfig {
216            stateful_mode: true, // 关键:启用有状态模式
217            ..Default::default()
218        },
219    );
220
221    // Streamable HTTP 直接在根路径提供服务
222    let router = axum::Router::new().fallback_service(service);
223
224    // 启动 HTTP 服务器
225    let listener = tokio::net::TcpListener::bind(bind_addr).await?;
226
227    // 使用 select 处理 Ctrl+C 和服务器
228    tokio::select! {
229        result = axum::serve(listener, router) => {
230            if let Err(e) = result {
231                error!(
232                    "[HTTP服务错误] Streamable HTTP 服务器错误 - MCP ID: {}, 错误: {}",
233                    mcp_id, e
234                );
235                bail!("服务器错误: {}", e);
236            }
237        }
238        _ = tokio::signal::ctrl_c() => {
239            info!(
240                "[HTTP服务关闭] 收到退出信号,正在关闭 Streamable HTTP 服务 - MCP ID: {}",
241                mcp_id
242            );
243            if !quiet {
244                eprintln!("\n🛑 收到退出信号,正在关闭...");
245            }
246        }
247    }
248
249    Ok(())
250}