mcp_streamable_proxy/
server.rs1use std::sync::Arc;
7use anyhow::{Result, bail};
8use rmcp::{
9 ServiceExt,
10 model::{ClientCapabilities, ClientInfo},
11 transport::{
12 TokioChildProcess,
13 streamable_http_server::{StreamableHttpService, StreamableHttpServerConfig},
14 },
15};
16use tokio::process::Command;
17pub use mcp_common::McpServiceConfig;
18
19use crate::{ProxyHandler, ProxyAwareSessionManager};
20
21pub async fn run_stream_server_from_config(
35 config: McpServiceConfig,
36 bind_addr: &str,
37 quiet: bool,
38) -> Result<()> {
39 let mut command = Command::new(&config.command);
41
42 if let Some(ref cmd_args) = config.args {
43 command.args(cmd_args);
44 }
45
46 if let Some(ref env_vars) = config.env {
47 for (k, v) in env_vars {
48 command.env(k, v);
49 }
50 }
51
52 let tokio_process = TokioChildProcess::new(command)?;
54
55 let client_info = ClientInfo {
57 protocol_version: Default::default(),
58 capabilities: ClientCapabilities::builder()
59 .enable_experimental()
60 .enable_roots()
61 .enable_roots_list_changed()
62 .enable_sampling()
63 .build(),
64 ..Default::default()
65 };
66
67 let client = client_info.serve(tokio_process).await?;
69
70 if !quiet {
71 eprintln!("✅ 子进程已启动");
72
73 match client.list_tools(None).await {
75 Ok(tools_result) => {
76 let tools = &tools_result.tools;
77 if tools.is_empty() {
78 eprintln!("⚠️ 工具列表为空");
79 } else {
80 eprintln!("🔧 可用工具 ({} 个):", tools.len());
81 for tool in tools.iter().take(10) {
82 let desc = tool.description.as_deref().unwrap_or("无描述");
83 let desc_short = if desc.len() > 50 {
84 format!("{}...", &desc[..50])
85 } else {
86 desc.to_string()
87 };
88 eprintln!(" - {} : {}", tool.name, desc_short);
89 }
90 if tools.len() > 10 {
91 eprintln!(" ... 和 {} 个其他工具", tools.len() - 10);
92 }
93 }
94 }
95 Err(e) => {
96 eprintln!("⚠️ 获取工具列表失败: {}", e);
97 }
98 }
99 }
100
101 let proxy_handler = if let Some(tool_filter) = config.tool_filter {
103 ProxyHandler::with_tool_filter(client, config.name.clone(), tool_filter)
104 } else {
105 ProxyHandler::with_mcp_id(client, config.name.clone())
106 };
107
108 run_stream_server(proxy_handler, bind_addr, quiet).await
110}
111
112pub async fn run_stream_server(
144 proxy_handler: ProxyHandler,
145 bind_addr: &str,
146 quiet: bool,
147) -> Result<()> {
148 if !quiet {
149 eprintln!("📡 Streamable HTTP 服务启动: http://{}", bind_addr);
150 eprintln!("💡 MCP 客户端可直接使用: http://{}", bind_addr);
151 eprintln!("✨ 特性: stateful_mode (会话管理 + 服务端推送)");
152 eprintln!("🔄 后端版本控制: 启用 (自动处理重连)");
153 eprintln!("💡 按 Ctrl+C 停止服务");
154 }
155
156 let handler = Arc::new(proxy_handler);
158
159 let session_manager = ProxyAwareSessionManager::new(handler.clone());
161
162 let handler_for_service = handler.clone();
165 let service = StreamableHttpService::new(
166 move || Ok((*handler_for_service).clone()),
167 session_manager.into(), StreamableHttpServerConfig {
169 stateful_mode: true, ..Default::default()
171 },
172 );
173
174 let router = axum::Router::new().fallback_service(service);
176
177 let listener = tokio::net::TcpListener::bind(bind_addr).await?;
179
180 tokio::select! {
182 result = axum::serve(listener, router) => {
183 if let Err(e) = result {
184 bail!("服务器错误: {}", e);
185 }
186 }
187 _ = tokio::signal::ctrl_c() => {
188 if !quiet {
189 eprintln!("\n🛑 收到退出信号,正在关闭...");
190 }
191 }
192 }
193
194 Ok(())
195}