use std::sync::Arc;
use tokio::io::BufReader;
use tokio::net::TcpListener;
use tokio::sync::watch;
use turbomcp_core::error::{McpError, McpResult};
use turbomcp_core::handler::McpHandler;
use super::line::LineTransportRunner;
use crate::config::{ConnectionCounter, ServerConfig};
use crate::context::RequestContext;
use crate::router;
pub async fn run<H: McpHandler>(handler: &H, addr: &str) -> McpResult<()> {
run_with_config(handler, addr, &ServerConfig::default()).await
}
pub async fn run_with_config<H: McpHandler>(
handler: &H,
addr: &str,
config: &ServerConfig,
) -> McpResult<()> {
let (shutdown_tx, shutdown_rx) = watch::channel(false);
let signal_task = tokio::spawn(async move {
if let Ok(()) = tokio::signal::ctrl_c().await {
tracing::info!("Received shutdown signal, stopping TCP server...");
let _ = shutdown_tx.send(true);
}
});
let result = run_with_shutdown(handler, addr, config, shutdown_rx).await;
signal_task.abort();
result
}
pub async fn run_with_shutdown<H: McpHandler>(
handler: &H,
addr: &str,
config: &ServerConfig,
mut shutdown: watch::Receiver<bool>,
) -> McpResult<()> {
handler.on_initialize().await?;
let max_connections = config.connection_limits.max_tcp_connections;
let connection_counter = Arc::new(ConnectionCounter::new(max_connections));
let listener = TcpListener::bind(addr)
.await
.map_err(|e| McpError::internal(format!("Failed to bind to {}: {}", addr, e)))?;
tracing::info!(
"MCP server listening on tcp://{} (max {} connections)",
addr,
max_connections
);
loop {
tokio::select! {
_ = shutdown.changed() => {
if *shutdown.borrow() {
tracing::info!("TCP server shutting down...");
break;
}
}
accept_result = listener.accept() => {
let (stream, peer_addr) = accept_result
.map_err(|e| McpError::internal(format!("Accept error: {}", e)))?;
let guard = match connection_counter.try_acquire_arc() {
Some(guard) => guard,
None => {
tracing::warn!(
"Connection from {} rejected: at capacity ({}/{})",
peer_addr,
connection_counter.current(),
connection_counter.max()
);
reject_connection(stream).await;
continue;
}
};
tracing::debug!(
"New TCP connection from {} ({}/{})",
peer_addr,
connection_counter.current(),
connection_counter.max()
);
let handler = handler.clone();
tokio::spawn(async move {
let _guard = guard;
let (reader, writer) = stream.into_split();
let reader = BufReader::new(reader);
let runner = LineTransportRunner::new(handler);
if let Err(e) = runner.run(reader, writer, RequestContext::tcp).await {
tracing::error!("TCP connection error from {}: {}", peer_addr, e);
}
tracing::debug!("TCP connection from {} closed", peer_addr);
});
}
}
}
handler.on_shutdown().await?;
Ok(())
}
async fn reject_connection(stream: tokio::net::TcpStream) {
use tokio::io::AsyncWriteExt;
let mut stream = stream;
let error_response =
router::JsonRpcOutgoing::error(None, McpError::internal("Server at maximum capacity"));
if let Ok(response_str) = router::serialize_response(&error_response) {
let _ = stream.write_all(response_str.as_bytes()).await;
let _ = stream.write_all(b"\n").await;
let _ = stream.flush().await;
}
}
#[cfg(test)]
mod tests {
}