mcp_streamable_proxy/
server.rs1use anyhow::{Result, bail};
7pub use mcp_common::McpServiceConfig;
8use rmcp::{
9 ServiceExt,
10 model::{ClientCapabilities, ClientInfo},
11 transport::{
12 TokioChildProcess,
13 streamable_http_server::{StreamableHttpServerConfig, StreamableHttpService},
14 },
15};
16use std::sync::Arc;
17use tracing::{error, info, warn};
18
19use process_wrap::tokio::{CommandWrap, KillOnDrop};
22
23#[cfg(unix)]
24use process_wrap::tokio::ProcessGroup;
25
26#[cfg(windows)]
27use process_wrap::tokio::JobObject;
28
29use crate::{ProxyAwareSessionManager, ProxyHandler};
30
31pub async fn run_stream_server_from_config(
45 config: McpServiceConfig,
46 bind_addr: &str,
47 quiet: bool,
48) -> Result<()> {
49 let mut wrapped_cmd = CommandWrap::with_new(&config.command, |command| {
53 if let Some(ref cmd_args) = config.args {
54 command.args(cmd_args);
55 }
56 if let Some(ref env_vars) = config.env {
57 for (k, v) in env_vars {
58 command.env(k, v);
59 }
60 }
61 });
62 #[cfg(unix)]
64 wrapped_cmd.wrap(ProcessGroup::leader());
65 #[cfg(windows)]
67 wrapped_cmd.wrap(JobObject);
68 wrapped_cmd.wrap(KillOnDrop);
70
71 let tokio_process = TokioChildProcess::new(wrapped_cmd)?;
73
74 let client_info = ClientInfo {
76 protocol_version: Default::default(),
77 capabilities: ClientCapabilities::builder()
78 .enable_experimental()
79 .enable_roots()
80 .enable_roots_list_changed()
81 .enable_sampling()
82 .build(),
83 ..Default::default()
84 };
85
86 let client = client_info.serve(tokio_process).await?;
88
89 info!(
91 "[子进程启动] Streamable HTTP - 服务名: {}, 命令: {} {:?}",
92 config.name,
93 config.command,
94 config.args.as_ref().unwrap_or(&vec![])
95 );
96
97 if !quiet {
98 eprintln!("✅ 子进程已启动");
99
100 match client.list_tools(None).await {
102 Ok(tools_result) => {
103 let tools = &tools_result.tools;
104 if tools.is_empty() {
105 warn!("[工具列表] 工具列表为空 - 服务名: {}", config.name);
106 eprintln!("⚠️ 工具列表为空");
107 } else {
108 info!(
109 "[工具列表] 服务名: {}, 工具数量: {}",
110 config.name,
111 tools.len()
112 );
113 eprintln!("🔧 可用工具 ({} 个):", tools.len());
114 for tool in tools.iter().take(10) {
115 let desc = tool.description.as_deref().unwrap_or("无描述");
116 let desc_short = if desc.len() > 50 {
117 format!("{}...", &desc[..50])
118 } else {
119 desc.to_string()
120 };
121 eprintln!(" - {} : {}", tool.name, desc_short);
122 }
123 if tools.len() > 10 {
124 eprintln!(" ... 和 {} 个其他工具", tools.len() - 10);
125 }
126 }
127 }
128 Err(e) => {
129 error!(
130 "[工具列表] 获取工具列表失败 - 服务名: {}, 错误: {}",
131 config.name, e
132 );
133 eprintln!("⚠️ 获取工具列表失败: {}", e);
134 }
135 }
136 } else {
137 match client.list_tools(None).await {
139 Ok(tools_result) => {
140 info!(
141 "[工具列表] 服务名: {}, 工具数量: {}",
142 config.name,
143 tools_result.tools.len()
144 );
145 }
146 Err(e) => {
147 error!(
148 "[工具列表] 获取工具列表失败 - 服务名: {}, 错误: {}",
149 config.name, e
150 );
151 }
152 }
153 }
154
155 let proxy_handler = if let Some(tool_filter) = config.tool_filter {
157 ProxyHandler::with_tool_filter(client, config.name.clone(), tool_filter)
158 } else {
159 ProxyHandler::with_mcp_id(client, config.name.clone())
160 };
161
162 run_stream_server(proxy_handler, bind_addr, quiet).await
164}
165
166pub async fn run_stream_server(
198 proxy_handler: ProxyHandler,
199 bind_addr: &str,
200 quiet: bool,
201) -> Result<()> {
202 let mcp_id = proxy_handler.mcp_id().to_string();
203
204 info!(
206 "[HTTP服务启动] Streamable HTTP 服务启动 - 地址: {}, MCP ID: {}",
207 bind_addr, mcp_id
208 );
209
210 if !quiet {
211 eprintln!("📡 Streamable HTTP 服务启动: http://{}", bind_addr);
212 eprintln!("💡 MCP 客户端可直接使用: http://{}", bind_addr);
213 eprintln!("✨ 特性: stateful_mode (会话管理 + 服务端推送)");
214 eprintln!("🔄 后端版本控制: 启用 (自动处理重连)");
215 eprintln!("💡 按 Ctrl+C 停止服务");
216 }
217
218 let handler = Arc::new(proxy_handler);
220
221 let session_manager = ProxyAwareSessionManager::new(handler.clone());
223
224 let handler_for_service = handler.clone();
227 let service = StreamableHttpService::new(
228 move || Ok((*handler_for_service).clone()),
229 session_manager.into(), StreamableHttpServerConfig {
231 stateful_mode: true, ..Default::default()
233 },
234 );
235
236 let router = axum::Router::new().fallback_service(service);
238
239 let listener = tokio::net::TcpListener::bind(bind_addr).await?;
241
242 tokio::select! {
244 result = axum::serve(listener, router) => {
245 if let Err(e) = result {
246 error!(
247 "[HTTP服务错误] Streamable HTTP 服务器错误 - MCP ID: {}, 错误: {}",
248 mcp_id, e
249 );
250 bail!("服务器错误: {}", e);
251 }
252 }
253 _ = tokio::signal::ctrl_c() => {
254 info!(
255 "[HTTP服务关闭] 收到退出信号,正在关闭 Streamable HTTP 服务 - MCP ID: {}",
256 mcp_id
257 );
258 if !quiet {
259 eprintln!("\n🛑 收到退出信号,正在关闭...");
260 }
261 }
262 }
263
264 Ok(())
265}