mcp_streamable_proxy/
server.rs1use std::sync::Arc;
7use anyhow::{Result, bail};
8use rmcp::{
9 ServiceExt,
10 model::{ClientCapabilities, ClientInfo},
11 transport::{
12 TokioChildProcess,
13 streamable_http_server::{StreamableHttpService, StreamableHttpServerConfig},
14 },
15};
16use tokio::process::Command;
17pub use mcp_common::McpServiceConfig;
18
19use crate::{ProxyHandler, ProxyAwareSessionManager};
20
21pub async fn run_stream_server_from_config(
35 config: McpServiceConfig,
36 bind_addr: &str,
37 quiet: bool,
38) -> Result<()> {
39 let mut command = Command::new(&config.command);
41
42 if let Some(ref cmd_args) = config.args {
43 command.args(cmd_args);
44 }
45
46 if let Some(ref env_vars) = config.env {
47 for (k, v) in env_vars {
48 command.env(k, v);
49 }
50 }
51
52 let tokio_process = TokioChildProcess::new(command)?;
54
55 let client_info = ClientInfo {
57 protocol_version: Default::default(),
58 capabilities: ClientCapabilities::builder()
59 .enable_experimental()
60 .enable_roots()
61 .enable_roots_list_changed()
62 .enable_sampling()
63 .build(),
64 ..Default::default()
65 };
66
67 let client = client_info.serve(tokio_process).await?;
69
70 if !quiet {
71 eprintln!("✅ 子进程已启动");
72
73 match client.list_tools(None).await {
75 Ok(tools_result) => {
76 let tools = &tools_result.tools;
77 if tools.is_empty() {
78 eprintln!("⚠️ 工具列表为空");
79 } else {
80 eprintln!("🔧 可用工具 ({} 个):", tools.len());
81 for tool in tools.iter().take(10) {
82 let desc = tool.description.as_deref().unwrap_or("无描述");
83 let desc_short = if desc.len() > 50 {
84 format!("{}...", &desc[..50])
85 } else {
86 desc.to_string()
87 };
88 eprintln!(" - {} : {}", tool.name, desc_short);
89 }
90 if tools.len() > 10 {
91 eprintln!(" ... 和 {} 个其他工具", tools.len() - 10);
92 }
93 }
94 }
95 Err(e) => {
96 eprintln!("⚠️ 获取工具列表失败: {}", e);
97 }
98 }
99 }
100
101 let proxy_handler = if let Some(tool_filter) = config.tool_filter {
103 ProxyHandler::with_tool_filter(client, config.name.clone(), tool_filter)
104 } else {
105 ProxyHandler::with_mcp_id(client, config.name.clone())
106 };
107
108 run_stream_server(proxy_handler, bind_addr, quiet).await
110}
111
112pub async fn run_stream_server(
142 proxy_handler: ProxyHandler,
143 bind_addr: &str,
144 quiet: bool,
145) -> Result<()> {
146 if !quiet {
147 eprintln!("📡 Streamable HTTP 服务启动: http://{}", bind_addr);
148 eprintln!("💡 MCP 客户端可直接使用: http://{}", bind_addr);
149 eprintln!("✨ 特性: stateful_mode (会话管理 + 服务端推送)");
150 eprintln!("🔄 后端版本控制: 启用 (自动处理重连)");
151 eprintln!("💡 按 Ctrl+C 停止服务");
152 }
153
154 let handler = Arc::new(proxy_handler);
156
157 let session_manager = ProxyAwareSessionManager::new(handler.clone());
159
160 let handler_for_service = handler.clone();
163 let service = StreamableHttpService::new(
164 move || Ok((*handler_for_service).clone()),
165 session_manager.into(), StreamableHttpServerConfig {
167 stateful_mode: true, ..Default::default()
169 },
170 );
171
172 let router = axum::Router::new().fallback_service(service);
174
175 let listener = tokio::net::TcpListener::bind(bind_addr).await?;
177
178 tokio::select! {
180 result = axum::serve(listener, router) => {
181 if let Err(e) = result {
182 bail!("服务器错误: {}", e);
183 }
184 }
185 _ = tokio::signal::ctrl_c() => {
186 if !quiet {
187 eprintln!("\n🛑 收到退出信号,正在关闭...");
188 }
189 }
190 }
191
192 Ok(())
193}