mcp_streamable_proxy/
server.rs1use anyhow::{Result, bail};
7use mcp_common::{McpServiceConfig, check_windows_command, wrap_process_v9};
8use rmcp::{
9 ServiceExt,
10 model::{ClientCapabilities, ClientInfo},
11 transport::{
12 TokioChildProcess,
13 streamable_http_server::{StreamableHttpServerConfig, StreamableHttpService},
14 },
15};
16use std::process::Stdio;
17use std::sync::Arc;
18use tracing::{error, info, warn};
19
20use process_wrap::tokio::{CommandWrap, KillOnDrop};
23
24use crate::{ProxyAwareSessionManager, ProxyHandler};
25
26pub async fn run_stream_server_from_config(
40 config: McpServiceConfig,
41 std_listener: &std::net::TcpListener,
42 quiet: bool,
43) -> Result<()> {
44 check_windows_command(&config.command);
53
54 info!(
55 "[Subprocess][{}] Command: {} {:?}",
56 config.name,
57 config.command,
58 config.args.as_ref().unwrap_or(&vec![])
59 );
60
61 let mut wrapped_cmd = CommandWrap::with_new(&config.command, |command| {
62 if let Some(ref cmd_args) = config.args {
63 command.args(cmd_args);
64 }
65 if let Some(ref env_vars) = config.env {
68 for (k, v) in env_vars {
69 command.env(k, v);
70 }
71 }
72 });
73
74 wrap_process_v9!(wrapped_cmd);
76
77 wrapped_cmd.wrap(KillOnDrop);
79
80 let (tokio_process, child_stderr) = TokioChildProcess::builder(wrapped_cmd)
83 .stderr(Stdio::piped())
84 .spawn()?;
85
86 if let Some(stderr_pipe) = child_stderr {
88 mcp_common::spawn_stderr_reader(stderr_pipe, config.name.clone());
89 }
90
91 let capabilities = ClientCapabilities::builder()
93 .enable_experimental()
94 .enable_roots()
95 .enable_roots_list_changed()
96 .enable_sampling()
97 .build();
98 let client_info = ClientInfo::new(
99 capabilities,
100 rmcp::model::Implementation::new("mcp-streamable-proxy-server", env!("CARGO_PKG_VERSION")),
101 );
102
103 let client = client_info.serve(tokio_process).await?;
105
106 info!(
108 "[Subprocess startup] Streamable HTTP - Service name: {}, Command: {} {:?}",
109 config.name,
110 config.command,
111 config.args.as_ref().unwrap_or(&vec![])
112 );
113
114 if !quiet {
115 eprintln!("✅ The child process has been started");
116
117 match client.list_tools(None).await {
119 Ok(tools_result) => {
120 let tools = &tools_result.tools;
121 if tools.is_empty() {
122 warn!(
123 "[Tool list] Tool list is empty - Service name: {}",
124 config.name
125 );
126 eprintln!("⚠️Tool list is empty");
127 } else {
128 info!(
129 "[Tool list] Service name: {}, Number of tools: {}",
130 config.name,
131 tools.len()
132 );
133 eprintln!("🔧 Available tools ({}):", tools.len());
134 for tool in tools.iter().take(10) {
135 let desc = tool.description.as_deref().unwrap_or("无描述");
136 let desc_short = if desc.len() > 50 {
137 format!("{}...", &desc[..50])
138 } else {
139 desc.to_string()
140 };
141 eprintln!(" - {} : {}", tool.name, desc_short);
142 }
143 if tools.len() > 10 {
144 eprintln!("... and {} other tools", tools.len() - 10);
145 }
146 }
147 }
148 Err(e) => {
149 error!(
150 "[Tool List] Failed to obtain tool list - Service name: {}, Error: {}",
151 config.name, e
152 );
153 eprintln!("⚠️ Failed to obtain tool list: {}", e);
154 }
155 }
156 } else {
157 match client.list_tools(None).await {
159 Ok(tools_result) => {
160 info!(
161 "[Tool list] Service name: {}, Number of tools: {}",
162 config.name,
163 tools_result.tools.len()
164 );
165 }
166 Err(e) => {
167 error!(
168 "[Tool List] Failed to obtain tool list - Service name: {}, Error: {}",
169 config.name, e
170 );
171 }
172 }
173 }
174
175 let proxy_handler = if let Some(tool_filter) = config.tool_filter {
177 ProxyHandler::with_tool_filter(client, config.name.clone(), tool_filter)
178 } else {
179 ProxyHandler::with_mcp_id(client, config.name.clone())
180 };
181
182 let listener = tokio::net::TcpListener::from_std(std_listener.try_clone()?)?;
184 run_stream_server(proxy_handler, listener, quiet).await
185}
186
187pub async fn run_stream_server(
201 proxy_handler: ProxyHandler,
202 listener: tokio::net::TcpListener,
203 quiet: bool,
204) -> Result<()> {
205 let bind_addr = listener
206 .local_addr()
207 .map(|a| a.to_string())
208 .unwrap_or_else(|_| "<unknown>".to_string());
209 let mcp_id = proxy_handler.mcp_id().to_string();
210
211 info!(
213 "[HTTP service startup] Streamable HTTP service startup - Address: {}, MCP ID: {}",
214 bind_addr, mcp_id
215 );
216
217 if !quiet {
218 eprintln!("📡 Streamable HTTP service startup: http://{}", bind_addr);
219 eprintln!("💡 MCP client can be used directly: http://{}", bind_addr);
220 eprintln!("✨ Feature: stateful_mode (session management + server push)");
221 eprintln!("🔄 Backend version control: Enable (automatically handles reconnections)");
222 eprintln!("💡 Press Ctrl+C to stop the service");
223 }
224
225 let handler = Arc::new(proxy_handler);
227
228 let session_manager = ProxyAwareSessionManager::new(handler.clone());
230
231 let handler_for_service = handler.clone();
234 let service = StreamableHttpService::new(
235 move || Ok((*handler_for_service).clone()),
236 session_manager.into(), StreamableHttpServerConfig {
238 stateful_mode: true, ..Default::default()
240 },
241 );
242
243 let router = axum::Router::new().fallback_service(service);
245
246 tokio::select! {
250 result = axum::serve(listener, router) => {
251 if let Err(e) = result {
252 error!(
253 "[HTTP Service Error] Streamable HTTP Server Error - MCP ID: {}, Error: {}",
254 mcp_id, e
255 );
256 bail!("服务器错误: {}", e);
257 }
258 }
259 _ = tokio::signal::ctrl_c() => {
260 info!(
261 "[HTTP service shutdown] Received exit signal, closing Streamable HTTP service - MCP ID: {}",
262 mcp_id
263 );
264 if !quiet {
265 eprintln!("\\n🛑 Received exit signal, closing...");
266 }
267 }
268 }
269
270 Ok(())
271}