use std::sync::Arc;
use tokio::io::BufReader;
use tokio::net::UnixListener;
use tokio::sync::watch;
use turbomcp_core::error::{McpError, McpResult};
use turbomcp_core::handler::McpHandler;
use super::line::LineTransportRunner;
use crate::config::{ConnectionCounter, ServerConfig};
use crate::context::RequestContext;
use crate::router;
pub async fn run<H: McpHandler>(handler: &H, path: &str) -> McpResult<()> {
run_with_config(handler, path, &ServerConfig::default()).await
}
pub async fn run_with_config<H: McpHandler>(
handler: &H,
path: &str,
config: &ServerConfig,
) -> McpResult<()> {
let (shutdown_tx, shutdown_rx) = watch::channel(false);
let signal_task = tokio::spawn(async move {
if let Ok(()) = tokio::signal::ctrl_c().await {
tracing::info!("Received shutdown signal, stopping Unix socket server...");
let _ = shutdown_tx.send(true);
}
});
let result = run_with_shutdown(handler, path, config, shutdown_rx).await;
signal_task.abort();
result
}
pub async fn run_with_shutdown<H: McpHandler>(
handler: &H,
path: &str,
config: &ServerConfig,
mut shutdown: watch::Receiver<bool>,
) -> McpResult<()> {
handler.on_initialize().await?;
let max_connections = config.connection_limits.max_unix_connections;
let connection_counter = Arc::new(ConnectionCounter::new(max_connections));
if std::path::Path::new(path).exists() {
std::fs::remove_file(path).map_err(|e| {
McpError::internal(format!("Failed to remove existing socket {}: {}", path, e))
})?;
}
let listener = UnixListener::bind(path)
.map_err(|e| McpError::internal(format!("Failed to bind to {}: {}", path, e)))?;
let socket_path = path.to_string();
tracing::info!(
"MCP server listening on unix://{} (max {} connections)",
path,
max_connections
);
loop {
tokio::select! {
_ = shutdown.changed() => {
if *shutdown.borrow() {
tracing::info!("Unix socket server shutting down...");
break;
}
}
accept_result = listener.accept() => {
let (stream, _) = accept_result
.map_err(|e| McpError::internal(format!("Accept error: {}", e)))?;
let guard = match connection_counter.try_acquire_arc() {
Some(guard) => guard,
None => {
tracing::warn!(
"Unix socket connection rejected: at capacity ({}/{})",
connection_counter.current(),
connection_counter.max()
);
reject_connection(stream).await;
continue;
}
};
tracing::debug!(
"New Unix socket connection ({}/{})",
connection_counter.current(),
connection_counter.max()
);
let handler = handler.clone();
let conn_config = config.clone();
tokio::spawn(async move {
let _guard = guard;
let (reader, writer) = stream.into_split();
let reader = BufReader::new(reader);
let runner = LineTransportRunner::with_config(handler, conn_config);
if let Err(e) = runner.run(reader, writer, RequestContext::unix).await {
tracing::error!("Unix socket connection error: {}", e);
}
tracing::debug!("Unix socket connection closed");
});
}
}
}
if std::path::Path::new(&socket_path).exists() {
let _ = std::fs::remove_file(&socket_path);
}
handler.on_shutdown().await?;
Ok(())
}
async fn reject_connection(stream: tokio::net::UnixStream) {
use tokio::io::AsyncWriteExt;
let mut stream = stream;
let error_response =
router::JsonRpcOutgoing::error(None, McpError::internal("Server at maximum capacity"));
if let Ok(response_str) = router::serialize_response(&error_response) {
let _ = stream.write_all(response_str.as_bytes()).await;
let _ = stream.write_all(b"\n").await;
let _ = stream.flush().await;
}
}
#[cfg(test)]
mod tests {
}