use std::net::SocketAddr;
use std::sync::Arc;
use hyper_util::rt::{TokioExecutor, TokioIo};
use hyper_util::server::conn::auto::Builder as HyperBuilder;
use hyper_util::service::TowerToHyperService;
use rmcp::handler::server::router::tool::ToolRouter;
use rmcp::model::{
CallToolResult, Content, Implementation, ProtocolVersion, ServerCapabilities, ServerInfo,
};
use rmcp::transport::streamable_http_server::StreamableHttpService;
use rmcp::transport::streamable_http_server::session::local::LocalSessionManager;
use rmcp::{ErrorData as McpError, ServerHandler, tool, tool_handler, tool_router};
use tokio::net::TcpListener;
pub use tonin_mcp_macros::mcp_expose;
#[doc(hidden)]
pub use rmcp as __rmcp_reexport;
pub use rmcp::handler::server::wrapper::Parameters;
pub use rmcp::model::CallToolResult as McpCallToolResult;
pub use rmcp::model::Content as McpContent;
pub use rmcp::{ErrorData as McpErrorData, ServerHandler as McpServerHandler};
#[derive(Clone, Debug)]
pub struct McpConfig {
pub addr: SocketAddr,
}
impl Default for McpConfig {
fn default() -> Self {
Self {
addr: "0.0.0.0:50052".parse().expect("valid default mcp addr"),
}
}
}
#[derive(Clone)]
pub struct McpServer {
#[allow(dead_code)]
tool_router: ToolRouter<McpServer>,
}
#[tool_router]
impl McpServer {
pub fn new() -> Self {
Self {
tool_router: Self::tool_router(),
}
}
#[tool(description = "Liveness probe. Returns 'ok' if the service is running.")]
async fn health(&self) -> Result<CallToolResult, McpError> {
Ok(CallToolResult::success(vec![Content::text("ok")]))
}
}
impl Default for McpServer {
fn default() -> Self {
Self::new()
}
}
#[tool_handler]
impl ServerHandler for McpServer {
fn get_info(&self) -> ServerInfo {
ServerInfo::new(ServerCapabilities::builder().enable_tools().build())
.with_server_info(Implementation::from_build_env())
.with_protocol_version(ProtocolVersion::V_2024_11_05)
.with_instructions(
"tonin service MCP endpoint. Tools available via tools/list.".to_string(),
)
}
}
pub async fn spawn(
cfg: McpConfig,
) -> Result<(SocketAddr, tokio::task::JoinHandle<()>), std::io::Error> {
spawn_with(cfg, || Ok::<McpServer, std::io::Error>(McpServer::new())).await
}
pub async fn spawn_with<H, F>(
cfg: McpConfig,
factory: F,
) -> Result<(SocketAddr, tokio::task::JoinHandle<()>), std::io::Error>
where
H: ServerHandler + Clone + Send + Sync + 'static,
F: Fn() -> Result<H, std::io::Error> + Send + Sync + 'static + Clone,
{
let listener = TcpListener::bind(cfg.addr).await?;
let bound = listener.local_addr()?;
tracing::info!(target: "tonin::mcp", addr = %bound, "mcp listener bound");
let session_mgr = Arc::new(LocalSessionManager::default());
let streamable = StreamableHttpService::new(factory, session_mgr, Default::default());
let tower_svc = TowerToHyperService::new(streamable);
let handle = tokio::spawn(async move {
loop {
let (stream, peer) = match listener.accept().await {
Ok(p) => p,
Err(e) => {
tracing::warn!(target: "tonin::mcp", error = %e, "accept failed");
continue;
}
};
let io = TokioIo::new(stream);
let svc = tower_svc.clone();
tokio::spawn(async move {
if let Err(e) = HyperBuilder::new(TokioExecutor::new())
.serve_connection(io, svc)
.await
{
tracing::debug!(target: "tonin::mcp", %peer, error = %e, "connection ended");
}
});
}
});
Ok((bound, handle))
}