mcp_streamable_proxy/
server.rs1use anyhow::{Result, bail};
7pub use mcp_common::McpServiceConfig;
8use rmcp::{
9 ServiceExt,
10 model::{ClientCapabilities, ClientInfo},
11 transport::{
12 TokioChildProcess,
13 streamable_http_server::{StreamableHttpServerConfig, StreamableHttpService},
14 },
15};
16use std::sync::Arc;
17use tokio::process::Command;
18use tracing::{error, info, warn};
19
20use crate::{ProxyAwareSessionManager, ProxyHandler};
21
22pub async fn run_stream_server_from_config(
36 config: McpServiceConfig,
37 bind_addr: &str,
38 quiet: bool,
39) -> Result<()> {
40 let mut command = Command::new(&config.command);
42
43 if let Some(ref cmd_args) = config.args {
44 command.args(cmd_args);
45 }
46
47 if let Some(ref env_vars) = config.env {
48 for (k, v) in env_vars {
49 command.env(k, v);
50 }
51 }
52
53 let tokio_process = TokioChildProcess::new(command)?;
55
56 let client_info = ClientInfo {
58 protocol_version: Default::default(),
59 capabilities: ClientCapabilities::builder()
60 .enable_experimental()
61 .enable_roots()
62 .enable_roots_list_changed()
63 .enable_sampling()
64 .build(),
65 ..Default::default()
66 };
67
68 let client = client_info.serve(tokio_process).await?;
70
71 info!(
73 "[子进程启动] Streamable HTTP - 服务名: {}, 命令: {} {:?}",
74 config.name,
75 config.command,
76 config.args.as_ref().unwrap_or(&vec![])
77 );
78
79 if !quiet {
80 eprintln!("✅ 子进程已启动");
81
82 match client.list_tools(None).await {
84 Ok(tools_result) => {
85 let tools = &tools_result.tools;
86 if tools.is_empty() {
87 warn!(
88 "[工具列表] 工具列表为空 - 服务名: {}",
89 config.name
90 );
91 eprintln!("⚠️ 工具列表为空");
92 } else {
93 info!(
94 "[工具列表] 服务名: {}, 工具数量: {}",
95 config.name,
96 tools.len()
97 );
98 eprintln!("🔧 可用工具 ({} 个):", tools.len());
99 for tool in tools.iter().take(10) {
100 let desc = tool.description.as_deref().unwrap_or("无描述");
101 let desc_short = if desc.len() > 50 {
102 format!("{}...", &desc[..50])
103 } else {
104 desc.to_string()
105 };
106 eprintln!(" - {} : {}", tool.name, desc_short);
107 }
108 if tools.len() > 10 {
109 eprintln!(" ... 和 {} 个其他工具", tools.len() - 10);
110 }
111 }
112 }
113 Err(e) => {
114 error!(
115 "[工具列表] 获取工具列表失败 - 服务名: {}, 错误: {}",
116 config.name, e
117 );
118 eprintln!("⚠️ 获取工具列表失败: {}", e);
119 }
120 }
121 } else {
122 match client.list_tools(None).await {
124 Ok(tools_result) => {
125 info!(
126 "[工具列表] 服务名: {}, 工具数量: {}",
127 config.name,
128 tools_result.tools.len()
129 );
130 }
131 Err(e) => {
132 error!(
133 "[工具列表] 获取工具列表失败 - 服务名: {}, 错误: {}",
134 config.name, e
135 );
136 }
137 }
138 }
139
140 let proxy_handler = if let Some(tool_filter) = config.tool_filter {
142 ProxyHandler::with_tool_filter(client, config.name.clone(), tool_filter)
143 } else {
144 ProxyHandler::with_mcp_id(client, config.name.clone())
145 };
146
147 run_stream_server(proxy_handler, bind_addr, quiet).await
149}
150
151pub async fn run_stream_server(
183 proxy_handler: ProxyHandler,
184 bind_addr: &str,
185 quiet: bool,
186) -> Result<()> {
187 let mcp_id = proxy_handler.mcp_id().to_string();
188
189 info!(
191 "[HTTP服务启动] Streamable HTTP 服务启动 - 地址: {}, MCP ID: {}",
192 bind_addr, mcp_id
193 );
194
195 if !quiet {
196 eprintln!("📡 Streamable HTTP 服务启动: http://{}", bind_addr);
197 eprintln!("💡 MCP 客户端可直接使用: http://{}", bind_addr);
198 eprintln!("✨ 特性: stateful_mode (会话管理 + 服务端推送)");
199 eprintln!("🔄 后端版本控制: 启用 (自动处理重连)");
200 eprintln!("💡 按 Ctrl+C 停止服务");
201 }
202
203 let handler = Arc::new(proxy_handler);
205
206 let session_manager = ProxyAwareSessionManager::new(handler.clone());
208
209 let handler_for_service = handler.clone();
212 let service = StreamableHttpService::new(
213 move || Ok((*handler_for_service).clone()),
214 session_manager.into(), StreamableHttpServerConfig {
216 stateful_mode: true, ..Default::default()
218 },
219 );
220
221 let router = axum::Router::new().fallback_service(service);
223
224 let listener = tokio::net::TcpListener::bind(bind_addr).await?;
226
227 tokio::select! {
229 result = axum::serve(listener, router) => {
230 if let Err(e) = result {
231 error!(
232 "[HTTP服务错误] Streamable HTTP 服务器错误 - MCP ID: {}, 错误: {}",
233 mcp_id, e
234 );
235 bail!("服务器错误: {}", e);
236 }
237 }
238 _ = tokio::signal::ctrl_c() => {
239 info!(
240 "[HTTP服务关闭] 收到退出信号,正在关闭 Streamable HTTP 服务 - MCP ID: {}",
241 mcp_id
242 );
243 if !quiet {
244 eprintln!("\n🛑 收到退出信号,正在关闭...");
245 }
246 }
247 }
248
249 Ok(())
250}