mcp_streamable_proxy/
server.rs

1//! Streamable HTTP server implementation
2//!
3//! This module provides the HTTP server that uses ProxyAwareSessionManager
4//! for stateful session management with backend version control.
5
6use std::sync::Arc;
7use anyhow::{Result, bail};
8use rmcp::{
9    ServiceExt,
10    model::{ClientCapabilities, ClientInfo},
11    transport::{
12        TokioChildProcess,
13        streamable_http_server::{StreamableHttpService, StreamableHttpServerConfig},
14    },
15};
16use tokio::process::Command;
17pub use mcp_common::McpServiceConfig;
18
19use crate::{ProxyHandler, ProxyAwareSessionManager};
20
21/// 从配置启动 Streamable HTTP 服务器
22///
23/// # Features
24///
25/// - **Stateful Mode**: `stateful_mode: true` 支持 session 管理和服务端推送
26/// - **Version Control**: 自动检测后端重连,使旧 session 失效
27/// - **Full Lifecycle**: 自动创建子进程、连接、handler、服务器
28///
29/// # Arguments
30///
31/// * `config` - MCP 服务配置
32/// * `bind_addr` - 绑定地址,例如 "127.0.0.1:3000"
33/// * `quiet` - 静默模式,不输出启动信息
34pub async fn run_stream_server_from_config(
35    config: McpServiceConfig,
36    bind_addr: &str,
37    quiet: bool,
38) -> Result<()> {
39    // 1. 创建子进程命令
40    let mut command = Command::new(&config.command);
41
42    if let Some(ref cmd_args) = config.args {
43        command.args(cmd_args);
44    }
45
46    if let Some(ref env_vars) = config.env {
47        for (k, v) in env_vars {
48            command.env(k, v);
49        }
50    }
51
52    // 2. 启动子进程
53    let tokio_process = TokioChildProcess::new(command)?;
54
55    // 3. 创建客户端信息
56    let client_info = ClientInfo {
57        protocol_version: Default::default(),
58        capabilities: ClientCapabilities::builder()
59            .enable_experimental()
60            .enable_roots()
61            .enable_roots_list_changed()
62            .enable_sampling()
63            .build(),
64        ..Default::default()
65    };
66
67    // 4. 连接到子进程
68    let client = client_info.serve(tokio_process).await?;
69
70    if !quiet {
71        eprintln!("✅ 子进程已启动");
72
73        // 获取并打印工具列表
74        match client.list_tools(None).await {
75            Ok(tools_result) => {
76                let tools = &tools_result.tools;
77                if tools.is_empty() {
78                    eprintln!("⚠️  工具列表为空");
79                } else {
80                    eprintln!("🔧 可用工具 ({} 个):", tools.len());
81                    for tool in tools.iter().take(10) {
82                        let desc = tool.description.as_deref().unwrap_or("无描述");
83                        let desc_short = if desc.len() > 50 {
84                            format!("{}...", &desc[..50])
85                        } else {
86                            desc.to_string()
87                        };
88                        eprintln!("   - {} : {}", tool.name, desc_short);
89                    }
90                    if tools.len() > 10 {
91                        eprintln!("   ... 和 {} 个其他工具", tools.len() - 10);
92                    }
93                }
94            }
95            Err(e) => {
96                eprintln!("⚠️  获取工具列表失败: {}", e);
97            }
98        }
99    }
100
101    // 5. 创建 ProxyHandler
102    let proxy_handler = if let Some(tool_filter) = config.tool_filter {
103        ProxyHandler::with_tool_filter(client, config.name.clone(), tool_filter)
104    } else {
105        ProxyHandler::with_mcp_id(client, config.name.clone())
106    };
107
108    // 6. 启动服务器
109    run_stream_server(proxy_handler, bind_addr, quiet).await
110}
111
112/// Run Streamable HTTP server with ProxyAwareSessionManager
113///
114/// # Features
115///
116/// - **Stateful Mode**: `stateful_mode: true` 支持 session 管理和服务端推送
117/// - **Version Control**: 自动检测后端重连,使旧 session 失效
118/// - **Hot Swap**: 支持后端连接热替换
119///
120/// # Arguments
121///
122/// * `proxy_handler` - ProxyHandler 实例(包含后端版本控制)
123/// * `bind_addr` - 绑定地址,例如 "127.0.0.1:3000"
124/// * `quiet` - 静默模式,不输出启动信息
125///
126/// # Example
127///
128/// ```no_run
129/// use mcp_streamable_proxy::{ProxyHandler, run_stream_server};
130///
131/// # async fn example() -> anyhow::Result<()> {
132/// let handler = ProxyHandler::new_disconnected(
133///     Default::default(),
134///     "test-mcp".to_string(),
135/// );
136///
137/// run_stream_server(handler, "127.0.0.1:3000", false).await?;
138/// # Ok(())
139/// # }
140/// ```
141pub async fn run_stream_server(
142    proxy_handler: ProxyHandler,
143    bind_addr: &str,
144    quiet: bool,
145) -> Result<()> {
146    if !quiet {
147        eprintln!("📡 Streamable HTTP 服务启动: http://{}", bind_addr);
148        eprintln!("💡 MCP 客户端可直接使用: http://{}", bind_addr);
149        eprintln!("✨ 特性: stateful_mode (会话管理 + 服务端推送)");
150        eprintln!("🔄 后端版本控制: 启用 (自动处理重连)");
151        eprintln!("💡 按 Ctrl+C 停止服务");
152    }
153
154    // 包装 handler 为 Arc,供 SessionManager 和 service factory 共享
155    let handler = Arc::new(proxy_handler);
156
157    // 创建自定义 SessionManager(带版本控制)
158    let session_manager = ProxyAwareSessionManager::new(handler.clone());
159
160    // 创建 Streamable HTTP 服务
161    // service factory 每次请求都会调用,返回 handler 的克隆
162    let handler_for_service = handler.clone();
163    let service = StreamableHttpService::new(
164        move || Ok((*handler_for_service).clone()),
165        session_manager.into(), // 转换为 Arc<dyn SessionManager>
166        StreamableHttpServerConfig {
167            stateful_mode: true, // 关键:启用有状态模式
168            ..Default::default()
169        },
170    );
171
172    // Streamable HTTP 直接在根路径提供服务
173    let router = axum::Router::new().fallback_service(service);
174
175    // 启动 HTTP 服务器
176    let listener = tokio::net::TcpListener::bind(bind_addr).await?;
177
178    // 使用 select 处理 Ctrl+C 和服务器
179    tokio::select! {
180        result = axum::serve(listener, router) => {
181            if let Err(e) = result {
182                bail!("服务器错误: {}", e);
183            }
184        }
185        _ = tokio::signal::ctrl_c() => {
186            if !quiet {
187                eprintln!("\n🛑 收到退出信号,正在关闭...");
188            }
189        }
190    }
191
192    Ok(())
193}