mcp_streamable_proxy/
server.rs1use anyhow::{Result, bail};
7use mcp_common::{McpServiceConfig, check_windows_command, wrap_process_v9};
8use rmcp::{
9 ServiceExt,
10 model::{ClientCapabilities, ClientInfo},
11 transport::{
12 TokioChildProcess,
13 streamable_http_server::{StreamableHttpServerConfig, StreamableHttpService},
14 },
15};
16use std::process::Stdio;
17use std::sync::Arc;
18use tracing::{error, info, warn};
19
20use process_wrap::tokio::{CommandWrap, KillOnDrop};
23
24use crate::{ProxyAwareSessionManager, ProxyHandler};
25
26pub async fn run_stream_server_from_config(
40 config: McpServiceConfig,
41 std_listener: &std::net::TcpListener,
42 quiet: bool,
43) -> Result<()> {
44 check_windows_command(&config.command);
53
54 info!(
55 "[子进程][{}] 命令: {} {:?}",
56 config.name,
57 config.command,
58 config.args.as_ref().unwrap_or(&vec![])
59 );
60
61 let mut wrapped_cmd = CommandWrap::with_new(&config.command, |command| {
62 if let Some(ref cmd_args) = config.args {
63 command.args(cmd_args);
64 }
65 if let Some(ref env_vars) = config.env {
68 for (k, v) in env_vars {
69 command.env(k, v);
70 }
71 }
72 });
73
74 wrap_process_v9!(wrapped_cmd);
76
77 wrapped_cmd.wrap(KillOnDrop);
79
80 let (tokio_process, child_stderr) = TokioChildProcess::builder(wrapped_cmd)
83 .stderr(Stdio::piped())
84 .spawn()?;
85
86 if let Some(stderr_pipe) = child_stderr {
88 mcp_common::spawn_stderr_reader(stderr_pipe, config.name.clone());
89 }
90
91 let capabilities = ClientCapabilities::builder()
93 .enable_experimental()
94 .enable_roots()
95 .enable_roots_list_changed()
96 .enable_sampling()
97 .build();
98 let client_info = ClientInfo::new(
99 capabilities,
100 rmcp::model::Implementation::new("mcp-streamable-proxy-server", env!("CARGO_PKG_VERSION")),
101 );
102
103 let client = client_info.serve(tokio_process).await?;
105
106 info!(
108 "[子进程启动] Streamable HTTP - 服务名: {}, 命令: {} {:?}",
109 config.name,
110 config.command,
111 config.args.as_ref().unwrap_or(&vec![])
112 );
113
114 if !quiet {
115 eprintln!("✅ 子进程已启动");
116
117 match client.list_tools(None).await {
119 Ok(tools_result) => {
120 let tools = &tools_result.tools;
121 if tools.is_empty() {
122 warn!("[工具列表] 工具列表为空 - 服务名: {}", config.name);
123 eprintln!("⚠️ 工具列表为空");
124 } else {
125 info!(
126 "[工具列表] 服务名: {}, 工具数量: {}",
127 config.name,
128 tools.len()
129 );
130 eprintln!("🔧 可用工具 ({} 个):", tools.len());
131 for tool in tools.iter().take(10) {
132 let desc = tool.description.as_deref().unwrap_or("无描述");
133 let desc_short = if desc.len() > 50 {
134 format!("{}...", &desc[..50])
135 } else {
136 desc.to_string()
137 };
138 eprintln!(" - {} : {}", tool.name, desc_short);
139 }
140 if tools.len() > 10 {
141 eprintln!(" ... 和 {} 个其他工具", tools.len() - 10);
142 }
143 }
144 }
145 Err(e) => {
146 error!(
147 "[工具列表] 获取工具列表失败 - 服务名: {}, 错误: {}",
148 config.name, e
149 );
150 eprintln!("⚠️ 获取工具列表失败: {}", e);
151 }
152 }
153 } else {
154 match client.list_tools(None).await {
156 Ok(tools_result) => {
157 info!(
158 "[工具列表] 服务名: {}, 工具数量: {}",
159 config.name,
160 tools_result.tools.len()
161 );
162 }
163 Err(e) => {
164 error!(
165 "[工具列表] 获取工具列表失败 - 服务名: {}, 错误: {}",
166 config.name, e
167 );
168 }
169 }
170 }
171
172 let proxy_handler = if let Some(tool_filter) = config.tool_filter {
174 ProxyHandler::with_tool_filter(client, config.name.clone(), tool_filter)
175 } else {
176 ProxyHandler::with_mcp_id(client, config.name.clone())
177 };
178
179 let listener = tokio::net::TcpListener::from_std(std_listener.try_clone()?)?;
181 run_stream_server(proxy_handler, listener, quiet).await
182}
183
184pub async fn run_stream_server(
198 proxy_handler: ProxyHandler,
199 listener: tokio::net::TcpListener,
200 quiet: bool,
201) -> Result<()> {
202 let bind_addr = listener
203 .local_addr()
204 .map(|a| a.to_string())
205 .unwrap_or_else(|_| "<unknown>".to_string());
206 let mcp_id = proxy_handler.mcp_id().to_string();
207
208 info!(
210 "[HTTP服务启动] Streamable HTTP 服务启动 - 地址: {}, MCP ID: {}",
211 bind_addr, mcp_id
212 );
213
214 if !quiet {
215 eprintln!("📡 Streamable HTTP 服务启动: http://{}", bind_addr);
216 eprintln!("💡 MCP 客户端可直接使用: http://{}", bind_addr);
217 eprintln!("✨ 特性: stateful_mode (会话管理 + 服务端推送)");
218 eprintln!("🔄 后端版本控制: 启用 (自动处理重连)");
219 eprintln!("💡 按 Ctrl+C 停止服务");
220 }
221
222 let handler = Arc::new(proxy_handler);
224
225 let session_manager = ProxyAwareSessionManager::new(handler.clone());
227
228 let handler_for_service = handler.clone();
231 let service = StreamableHttpService::new(
232 move || Ok((*handler_for_service).clone()),
233 session_manager.into(), StreamableHttpServerConfig {
235 stateful_mode: true, ..Default::default()
237 },
238 );
239
240 let router = axum::Router::new().fallback_service(service);
242
243 tokio::select! {
247 result = axum::serve(listener, router) => {
248 if let Err(e) = result {
249 error!(
250 "[HTTP服务错误] Streamable HTTP 服务器错误 - MCP ID: {}, 错误: {}",
251 mcp_id, e
252 );
253 bail!("服务器错误: {}", e);
254 }
255 }
256 _ = tokio::signal::ctrl_c() => {
257 info!(
258 "[HTTP服务关闭] 收到退出信号,正在关闭 Streamable HTTP 服务 - MCP ID: {}",
259 mcp_id
260 );
261 if !quiet {
262 eprintln!("\n🛑 收到退出信号,正在关闭...");
263 }
264 }
265 }
266
267 Ok(())
268}