use actix_web::{App, HttpResponse, HttpServer, Result, middleware, web};
use rmcp::transport::streamable_http_server::session::local::LocalSessionManager;
#[cfg(feature = "transport-sse")]
#[allow(deprecated)]
use rmcp_actix_web::transport::SseService;
use rmcp_actix_web::transport::StreamableHttpService;
use std::{sync::Arc, time::Duration};
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
mod common;
use common::calculator::Calculator;
async fn service_discovery() -> Result<HttpResponse> {
#[allow(unused_mut)]
let mut services = serde_json::json!({
"calculator_http": {
"transport": "streamable-http",
"version": "1.0.0",
"endpoints": {
"base": "/api/v1/http/calculator/"
},
"description": "Calculator service using StreamableHttp with sessions",
"capabilities": ["tools/list", "tools/call"],
"tools": ["add", "subtract", "multiply", "divide"],
"features": ["stateful_sessions", "session_management"]
}
});
#[cfg(feature = "transport-sse")]
{
services["calculator_sse"] = serde_json::json!({
"transport": "sse",
"version": "1.0.0",
"endpoints": {
"sse": "/api/v1/sse/calculator/sse",
"post": "/api/v1/sse/calculator/message"
},
"description": "Calculator service using Server-Sent Events",
"capabilities": ["tools/list", "tools/call"],
"tools": ["add", "subtract", "multiply", "divide"]
});
}
#[allow(unused_mut)]
let mut transport_types = vec!["streamable-http"];
#[cfg(feature = "transport-sse")]
transport_types.push("sse");
let total_services = if cfg!(feature = "transport-sse") {
2
} else {
1
};
#[allow(unused_mut)]
let mut usage = serde_json::json!({
"streamable_http": "POST initialize request to create session, then use Mcp-Session-Id header"
});
#[cfg(feature = "transport-sse")]
{
usage["sse"] = serde_json::json!(
"Connect to SSE endpoint for real-time streaming, POST messages to post endpoint"
);
}
Ok(HttpResponse::Ok().json(serde_json::json!({
"services": services,
"meta": {
"total_services": total_services,
"transport_types": transport_types,
"api_version": "v1",
"protocol": "Model Context Protocol (MCP)"
},
"usage": usage
})))
}
async fn health_check() -> Result<HttpResponse> {
#[allow(unused_mut)]
let mut services = serde_json::json!({
"calculator_http": "running"
});
#[cfg(feature = "transport-sse")]
{
services["calculator_sse"] = serde_json::json!("running");
}
Ok(HttpResponse::Ok().json(serde_json::json!({
"status": "healthy",
"timestamp": chrono::Utc::now().to_rfc3339(),
"services": services,
"version": "1.0.0"
})))
}
async fn root() -> Result<HttpResponse> {
#[allow(unused_mut)]
let mut endpoints = serde_json::json!({
"health": "/health",
"services": "/api/services",
"calculator_http": "/api/v1/http/calculator/"
});
#[cfg(feature = "transport-sse")]
{
endpoints["calculator_sse"] = serde_json::json!("/api/v1/sse/calculator/");
}
#[allow(unused_mut)]
let mut transports = vec!["streamable-http"];
#[cfg(feature = "transport-sse")]
transports.push("sse");
Ok(HttpResponse::Ok().json(serde_json::json!({
"message": "Multi-Service MCP Server",
"description": "Demonstrates composition of multiple MCP services with different transports",
"endpoints": endpoints,
"transports": transports,
"documentation": "https://modelcontextprotocol.io/"
})))
}
#[actix_web::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
tracing_subscriber::registry()
.with(
tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| "info,rmcp_actix_web=debug".to_string().into()),
)
.with(tracing_subscriber::fmt::layer())
.init();
let bind_addr = "127.0.0.1:8080";
tracing::info!("🚀 Starting Multi-Service MCP server on {}", bind_addr);
let http_service = StreamableHttpService::builder()
.service_factory(Arc::new(|| {
tracing::debug!("Creating new Calculator for StreamableHttp transport");
Ok(Calculator::new())
}))
.session_manager(Arc::new(LocalSessionManager::default())) .stateful_mode(true) .sse_keep_alive(Duration::from_secs(30)) .build();
let server = HttpServer::new(move || {
let mut app = App::new()
.wrap(middleware::Logger::default())
.wrap(middleware::NormalizePath::trim())
.wrap(
middleware::DefaultHeaders::new()
.add(("Access-Control-Allow-Origin", "*"))
.add(("Access-Control-Allow-Methods", "GET, POST, DELETE, OPTIONS"))
.add((
"Access-Control-Allow-Headers",
"Content-Type, Accept, Mcp-Session-Id, Last-Event-ID",
))
.add(("X-Service-Type", "multi-mcp")),
)
.route("/", web::get().to(root))
.route("/health", web::get().to(health_check));
app = app.service(
web::scope("/api")
.route("/services", web::get().to(service_discovery))
.service({
let mut v1_scope = web::scope("/v1");
#[cfg(feature = "transport-sse")]
#[allow(deprecated)]
{
let sse_service = SseService::builder()
.service_factory(Arc::new(|| {
tracing::debug!("Creating new Calculator for SSE transport");
Ok(Calculator::new())
}))
.sse_path("/sse".to_string()) .post_path("/message".to_string()) .sse_keep_alive(Duration::from_secs(30)) .build();
v1_scope = v1_scope.service(
web::scope("/sse")
.service(web::scope("/calculator").service(sse_service.scope())),
);
}
v1_scope =
v1_scope.service(web::scope("/http").service(
web::scope("/calculator").service(http_service.clone().scope()),
));
v1_scope
}),
);
app
})
.bind(bind_addr)?
.run();
tracing::info!("✅ Multi-Service MCP Server started successfully!");
tracing::info!("");
tracing::info!("📊 Service Discovery: http://{}/api/services", bind_addr);
tracing::info!("🏥 Health Check: http://{}/health", bind_addr);
tracing::info!("");
#[cfg(feature = "transport-sse")]
{
tracing::info!("🔥 SSE Calculator:");
tracing::info!(
" • SSE Stream: http://{}/api/v1/sse/calculator/sse",
bind_addr
);
tracing::info!(
" • POST Endpoint: http://{}/api/v1/sse/calculator/message",
bind_addr
);
tracing::info!("");
}
tracing::info!("💻 StreamableHttp Calculator:");
tracing::info!(
" • Base URL: http://{}/api/v1/http/calculator/",
bind_addr
);
tracing::info!(" • Supports: Sessions, Streaming, Request/Response");
tracing::info!("");
tracing::info!("💡 Tip: Check /api/services for detailed usage instructions");
tracing::info!("🛑 Press Ctrl+C to stop all services");
tokio::select! {
result = server => {
if let Err(e) = result {
tracing::error!("HTTP server error: {}", e);
}
}
_ = tokio::signal::ctrl_c() => {
tracing::info!("Received Ctrl+C, shutting down all services gracefully");
}
}
tracing::info!("🔚 All services stopped successfully");
Ok(())
}