use anyhow::{Result, bail};
use mcp_common::{McpServiceConfig, check_windows_command, wrap_process_v8};
use rmcp::{
ServiceExt,
model::{ClientCapabilities, ClientInfo, ProtocolVersion},
transport::{
TokioChildProcess,
sse_server::{SseServer, SseServerConfig},
},
};
use std::process::Stdio;
use tokio_util::sync::CancellationToken;
use tracing::{error, info};
use process_wrap::tokio::{KillOnDrop, TokioCommandWrap};
use crate::SseHandler;
pub async fn run_sse_server_from_config(
config: McpServiceConfig,
std_listener: &std::net::TcpListener,
quiet: bool,
) -> Result<()> {
check_windows_command(&config.command);
info!(
"[Subprocess][{}] Command: {} {:?}",
config.name,
config.command,
config.args.as_ref().unwrap_or(&vec![])
);
let mut wrapped_cmd = TokioCommandWrap::with_new(&config.command, |command| {
if let Some(ref cmd_args) = config.args {
command.args(cmd_args);
}
if let Some(ref env_vars) = config.env {
for (k, v) in env_vars {
command.env(k, v);
}
}
});
wrap_process_v8!(wrapped_cmd);
wrapped_cmd.wrap(KillOnDrop);
let (tokio_process, child_stderr) = TokioChildProcess::builder(wrapped_cmd)
.stderr(Stdio::piped())
.spawn()?;
if let Some(stderr_pipe) = child_stderr {
mcp_common::spawn_stderr_reader(stderr_pipe, config.name.clone());
}
let client_info = ClientInfo {
protocol_version: ProtocolVersion::V_2024_11_05,
capabilities: ClientCapabilities::builder()
.enable_experimental()
.enable_roots()
.enable_roots_list_changed()
.enable_sampling()
.build(),
..Default::default()
};
let client = client_info.serve(tokio_process).await?;
info!(
"[Subprocess startup] SSE - Service name: {}, Command: {} {:?}",
config.name,
config.command,
config.args.as_ref().unwrap_or(&vec![])
);
if !quiet {
eprintln!("✅ The child process has been started");
match client.list_tools(None).await {
Ok(tools_result) => {
let tools = &tools_result.tools;
if tools.is_empty() {
info!(
"[Tool list] Tool list is empty - Service name: {}",
config.name
);
eprintln!("⚠️Tool list is empty");
} else {
info!(
"[Tool list] Service name: {}, Number of tools: {}",
config.name,
tools.len()
);
eprintln!("🔧 Available tools ({}):", tools.len());
for tool in tools.iter().take(10) {
let desc = tool.description.as_deref().unwrap_or("无描述");
let desc_short = if desc.len() > 50 {
format!("{}...", &desc[..50])
} else {
desc.to_string()
};
eprintln!(" - {} : {}", tool.name, desc_short);
}
if tools.len() > 10 {
eprintln!("... and {} other tools", tools.len() - 10);
}
}
}
Err(e) => {
error!(
"[Tool List] Failed to obtain tool list - Service name: {}, Error: {}",
config.name, e
);
eprintln!("⚠️ Failed to obtain tool list: {}", e);
}
}
} else {
match client.list_tools(None).await {
Ok(tools_result) => {
info!(
"[Tool list] Service name: {}, Number of tools: {}",
config.name,
tools_result.tools.len()
);
}
Err(e) => {
error!(
"[Tool List] Failed to obtain tool list - Service name: {}, Error: {}",
config.name, e
);
}
}
}
let sse_handler = if let Some(tool_filter) = config.tool_filter {
SseHandler::with_tool_filter(client, config.name.clone(), tool_filter)
} else {
SseHandler::with_mcp_id(client, config.name.clone())
};
let listener = tokio::net::TcpListener::from_std(std_listener.try_clone()?)?;
run_sse_server(sse_handler, listener, quiet).await
}
pub async fn run_sse_server(
sse_handler: SseHandler,
listener: tokio::net::TcpListener,
quiet: bool,
) -> Result<()> {
let bind_addr = listener.local_addr()?;
let bind_addr_str = bind_addr.to_string();
let sse_path = "/sse".to_string();
let message_path = "/message".to_string();
let mcp_id = sse_handler.mcp_id().to_string();
info!(
"[HTTP service startup] SSE service startup - Address: {}, MCP ID: {}, SSE endpoint: {}, Message endpoint: {}",
bind_addr_str, mcp_id, sse_path, message_path
);
if !quiet {
eprintln!("📡 SSE service startup: http://{}", bind_addr_str);
eprintln!("SSE endpoint: http://{}{}", bind_addr_str, sse_path);
eprintln!("Message endpoint: http://{}{}", bind_addr_str, message_path);
eprintln!(
"💡 MCP client can be used directly: http://{} (automatic redirection)",
bind_addr_str
);
eprintln!("🔄 Backend hot replacement: enabled");
eprintln!("💡 Press Ctrl+C to stop the service");
}
let config = SseServerConfig {
bind: bind_addr,
sse_path: sse_path.clone(),
post_path: message_path.clone(),
ct: CancellationToken::new(),
sse_keep_alive: Some(std::time::Duration::from_secs(15)),
};
let (sse_server, sse_router) = SseServer::new(config);
let ct = sse_server.with_service(move || sse_handler.clone());
let sse_path_for_fallback = sse_path.clone();
let message_path_for_fallback = message_path.clone();
let fallback_handler = move |method: axum::http::Method, headers: axum::http::HeaderMap| {
let sse_path = sse_path_for_fallback.clone();
let message_path = message_path_for_fallback.clone();
async move {
match method {
axum::http::Method::GET => {
let accept = headers
.get("accept")
.and_then(|v| v.to_str().ok())
.unwrap_or("");
if accept.contains("text/event-stream") {
(
axum::http::StatusCode::TEMPORARY_REDIRECT,
[("Location", sse_path)],
"Redirecting to SSE endpoint".to_string(),
)
} else {
(
axum::http::StatusCode::OK,
[("Content-Type", "application/json".to_string())],
serde_json::json!({
"status": "running",
"protocol": "SSE",
"endpoints": {
"sse": sse_path,
"message": message_path
},
"usage": "Connect your MCP client to this URL or the SSE endpoint directly"
}).to_string(),
)
}
}
axum::http::Method::POST => {
(
axum::http::StatusCode::TEMPORARY_REDIRECT,
[("Location", message_path)],
"Redirecting to message endpoint".to_string(),
)
}
_ => (
axum::http::StatusCode::METHOD_NOT_ALLOWED,
[("Allow", "GET, POST".to_string())],
"Method not allowed".to_string(),
),
}
}
};
let router = sse_router.fallback(fallback_handler);
tokio::select! {
result = axum::serve(listener, router) => {
if let Err(e) = result {
error!(
"[HTTP Service Error] SSE Server Error - MCP ID: {}, Error: {}",
mcp_id, e
);
bail!("服务器错误: {}", e);
}
}
_ = tokio::signal::ctrl_c() => {
info!(
"[HTTP service shutdown] Received exit signal, closing SSE service - MCP ID: {}",
mcp_id
);
if !quiet {
eprintln!("\\n🛑 Received exit signal, closing...");
}
ct.cancel();
}
}
Ok(())
}