use std::sync::Arc;
use actix_web::{App, HttpResponse, HttpServer, Result, web};
use rmcp::transport::{
StreamableHttpServerConfig, streamable_http_server::session::local::LocalSessionManager,
};
use rmcp_actix_web::{SseServer, SseServerConfig, StreamableHttpService};
use tokio_util::sync::CancellationToken;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
mod common;
use common::calculator::Calculator;
async fn service_discovery() -> Result<HttpResponse> {
Ok(HttpResponse::Ok().json(serde_json::json!({
"services": {
"calculator_sse": {
"transport": "sse",
"version": "1.0.0",
"endpoints": {
"sse": "/api/v1/sse/calculator/sse",
"post": "/api/v1/sse/calculator/message"
},
"description": "Calculator service using Server-Sent Events",
"capabilities": ["tools/list", "tools/call"],
"tools": ["add", "subtract", "multiply", "divide"]
},
"calculator_http": {
"transport": "streamable-http",
"version": "1.0.0",
"endpoints": {
"base": "/api/v1/http/calculator/"
},
"description": "Calculator service using StreamableHttp with sessions",
"capabilities": ["tools/list", "tools/call"],
"tools": ["add", "subtract", "multiply", "divide"],
"features": ["stateful_sessions", "session_management"]
}
},
"meta": {
"total_services": 2,
"transport_types": ["sse", "streamable-http"],
"api_version": "v1",
"protocol": "Model Context Protocol (MCP)"
},
"usage": {
"sse": "Connect to SSE endpoint for real-time streaming, POST messages to post endpoint",
"streamable_http": "POST initialize request to create session, then use Mcp-Session-Id header"
}
})))
}
async fn health_check() -> Result<HttpResponse> {
Ok(HttpResponse::Ok().json(serde_json::json!({
"status": "healthy",
"timestamp": chrono::Utc::now().to_rfc3339(),
"services": {
"calculator_sse": "running",
"calculator_http": "running"
},
"version": "1.0.0"
})))
}
async fn root() -> Result<HttpResponse> {
Ok(HttpResponse::Ok().json(serde_json::json!({
"message": "Multi-Service MCP Server",
"description": "Demonstrates composition of multiple MCP services with different transports",
"endpoints": {
"health": "/health",
"services": "/api/services",
"calculator_sse": "/api/v1/sse/calculator/",
"calculator_http": "/api/v1/http/calculator/"
},
"transports": ["sse", "streamable-http"],
"documentation": "https://modelcontextprotocol.io/"
})))
}
#[actix_web::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
tracing_subscriber::registry()
.with(
tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| "info,rmcp_actix_web=debug".to_string().into()),
)
.with(tracing_subscriber::fmt::layer())
.init();
let bind_addr = "127.0.0.1:8080";
tracing::info!("🚀 Starting Multi-Service MCP server on {}", bind_addr);
let http_service = Arc::new(StreamableHttpService::new(
|| {
tracing::debug!("Creating new Calculator for StreamableHttp transport");
Ok(Calculator::new())
},
LocalSessionManager::default().into(),
StreamableHttpServerConfig {
stateful_mode: true,
sse_keep_alive: Some(std::time::Duration::from_secs(30)),
},
));
let server = HttpServer::new(move || {
let sse_config = SseServerConfig {
bind: "127.0.0.1:0".parse().unwrap(), sse_path: "/sse".to_string(),
post_path: "/message".to_string(),
ct: CancellationToken::new(),
sse_keep_alive: Some(std::time::Duration::from_secs(30)),
};
let (sse_server, sse_scope) = SseServer::new(sse_config);
let _sse_ct = sse_server.with_service(|| {
tracing::debug!("Creating new Calculator for SSE transport");
Calculator::new()
});
let http_scope = StreamableHttpService::scope(http_service.clone());
App::new()
.wrap(actix_web::middleware::Logger::default())
.wrap(
actix_web::middleware::DefaultHeaders::new()
.add(("Access-Control-Allow-Origin", "*"))
.add(("Access-Control-Allow-Methods", "GET, POST, DELETE, OPTIONS"))
.add((
"Access-Control-Allow-Headers",
"Content-Type, Accept, Mcp-Session-Id, Last-Event-ID",
))
.add(("X-Service-Type", "multi-mcp")),
)
.route("/", web::get().to(root))
.route("/health", web::get().to(health_check))
.service(
web::scope("/api")
.route("/services", web::get().to(service_discovery))
.service(
web::scope("/v1")
.service(
web::scope("/sse")
.service(web::scope("/calculator").service(sse_scope)),
)
.service(
web::scope("/http")
.service(web::scope("/calculator").service(http_scope)),
),
), )
})
.bind(bind_addr)?
.run();
tracing::info!("✅ Multi-Service MCP Server started successfully!");
tracing::info!("");
tracing::info!("📊 Service Discovery: http://{}/api/services", bind_addr);
tracing::info!("🏥 Health Check: http://{}/health", bind_addr);
tracing::info!("");
tracing::info!("🔥 SSE Calculator:");
tracing::info!(
" • SSE Stream: http://{}/api/v1/sse/calculator/sse",
bind_addr
);
tracing::info!(
" • POST Endpoint: http://{}/api/v1/sse/calculator/message",
bind_addr
);
tracing::info!("");
tracing::info!("💻 StreamableHttp Calculator:");
tracing::info!(
" • Base URL: http://{}/api/v1/http/calculator/",
bind_addr
);
tracing::info!(" • Supports: Sessions, Streaming, Request/Response");
tracing::info!("");
tracing::info!("💡 Tip: Check /api/services for detailed usage instructions");
tracing::info!("🛑 Press Ctrl+C to stop all services");
tokio::select! {
result = server => {
if let Err(e) = result {
tracing::error!("HTTP server error: {}", e);
}
}
_ = tokio::signal::ctrl_c() => {
tracing::info!("Received Ctrl+C, shutting down all services gracefully");
}
}
tracing::info!("🔚 All services stopped successfully");
Ok(())
}