use anyhow::{Result, bail};
use mcp_common::{McpServiceConfig, check_windows_command, wrap_process_v9};
use rmcp::{
ServiceExt,
model::{ClientCapabilities, ClientInfo},
transport::{
TokioChildProcess,
streamable_http_server::{StreamableHttpServerConfig, StreamableHttpService},
},
};
use std::process::Stdio;
use std::sync::Arc;
use tracing::{error, info, warn};
use process_wrap::tokio::{CommandWrap, KillOnDrop};
use crate::{ProxyAwareSessionManager, ProxyHandler};
pub async fn run_stream_server_from_config(
config: McpServiceConfig,
std_listener: &std::net::TcpListener,
quiet: bool,
) -> Result<()> {
check_windows_command(&config.command);
info!(
"[Subprocess][{}] Command: {} {:?}",
config.name,
config.command,
config.args.as_ref().unwrap_or(&vec![])
);
let mut wrapped_cmd = CommandWrap::with_new(&config.command, |command| {
if let Some(ref cmd_args) = config.args {
command.args(cmd_args);
}
if let Some(ref env_vars) = config.env {
for (k, v) in env_vars {
command.env(k, v);
}
}
});
wrap_process_v9!(wrapped_cmd);
wrapped_cmd.wrap(KillOnDrop);
let (tokio_process, child_stderr) = TokioChildProcess::builder(wrapped_cmd)
.stderr(Stdio::piped())
.spawn()?;
if let Some(stderr_pipe) = child_stderr {
mcp_common::spawn_stderr_reader(stderr_pipe, config.name.clone());
}
let capabilities = ClientCapabilities::builder()
.enable_experimental()
.enable_roots()
.enable_roots_list_changed()
.enable_sampling()
.build();
let client_info = ClientInfo::new(
capabilities,
rmcp::model::Implementation::new("mcp-streamable-proxy-server", env!("CARGO_PKG_VERSION")),
);
let client = client_info.serve(tokio_process).await?;
info!(
"[Subprocess startup] Streamable HTTP - Service name: {}, Command: {} {:?}",
config.name,
config.command,
config.args.as_ref().unwrap_or(&vec![])
);
if !quiet {
eprintln!("✅ The child process has been started");
match client.list_tools(None).await {
Ok(tools_result) => {
let tools = &tools_result.tools;
if tools.is_empty() {
warn!(
"[Tool list] Tool list is empty - Service name: {}",
config.name
);
eprintln!("⚠️Tool list is empty");
} else {
info!(
"[Tool list] Service name: {}, Number of tools: {}",
config.name,
tools.len()
);
eprintln!("🔧 Available tools ({}):", tools.len());
for tool in tools.iter().take(10) {
let desc = tool.description.as_deref().unwrap_or("无描述");
let desc_short = if desc.len() > 50 {
format!("{}...", &desc[..50])
} else {
desc.to_string()
};
eprintln!(" - {} : {}", tool.name, desc_short);
}
if tools.len() > 10 {
eprintln!("... and {} other tools", tools.len() - 10);
}
}
}
Err(e) => {
error!(
"[Tool List] Failed to obtain tool list - Service name: {}, Error: {}",
config.name, e
);
eprintln!("⚠️ Failed to obtain tool list: {}", e);
}
}
} else {
match client.list_tools(None).await {
Ok(tools_result) => {
info!(
"[Tool list] Service name: {}, Number of tools: {}",
config.name,
tools_result.tools.len()
);
}
Err(e) => {
error!(
"[Tool List] Failed to obtain tool list - Service name: {}, Error: {}",
config.name, e
);
}
}
}
let proxy_handler = if let Some(tool_filter) = config.tool_filter {
ProxyHandler::with_tool_filter(client, config.name.clone(), tool_filter)
} else {
ProxyHandler::with_mcp_id(client, config.name.clone())
};
let listener = tokio::net::TcpListener::from_std(std_listener.try_clone()?)?;
run_stream_server(proxy_handler, listener, quiet).await
}
pub async fn run_stream_server(
proxy_handler: ProxyHandler,
listener: tokio::net::TcpListener,
quiet: bool,
) -> Result<()> {
let bind_addr = listener
.local_addr()
.map(|a| a.to_string())
.unwrap_or_else(|_| "<unknown>".to_string());
let mcp_id = proxy_handler.mcp_id().to_string();
info!(
"[HTTP service startup] Streamable HTTP service startup - Address: {}, MCP ID: {}",
bind_addr, mcp_id
);
if !quiet {
eprintln!("📡 Streamable HTTP service startup: http://{}", bind_addr);
eprintln!("💡 MCP client can be used directly: http://{}", bind_addr);
eprintln!("✨ Feature: stateful_mode (session management + server push)");
eprintln!("🔄 Backend version control: Enable (automatically handles reconnections)");
eprintln!("💡 Press Ctrl+C to stop the service");
}
let handler = Arc::new(proxy_handler);
let session_manager = ProxyAwareSessionManager::new(handler.clone());
let handler_for_service = handler.clone();
let mut server_config = StreamableHttpServerConfig::default();
server_config.stateful_mode = true; let service = StreamableHttpService::new(
move || Ok((*handler_for_service).clone()),
session_manager.into(), server_config,
);
let router = axum::Router::new().fallback_service(service);
tokio::select! {
result = axum::serve(listener, router) => {
if let Err(e) = result {
error!(
"[HTTP Service Error] Streamable HTTP Server Error - MCP ID: {}, Error: {}",
mcp_id, e
);
bail!("服务器错误: {}", e);
}
}
_ = tokio::signal::ctrl_c() => {
info!(
"[HTTP service shutdown] Received exit signal, closing Streamable HTTP service - MCP ID: {}",
mcp_id
);
if !quiet {
eprintln!("\\n🛑 Received exit signal, closing...");
}
}
}
Ok(())
}