use actix_web::{App, HttpResponse, HttpServer, Result, middleware, web};
use rmcp::transport::streamable_http_server::session::local::LocalSessionManager;
use rmcp_actix_web::transport::StreamableHttpService;
use std::{sync::Arc, time::Duration};
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
mod common;
use common::calculator::Calculator;
async fn health_check() -> Result<HttpResponse> {
Ok(HttpResponse::Ok().json(serde_json::json!({
"status": "healthy",
"service": "mcp-calculator-streamable",
"version": "1.0.0",
"transport": "streamable-http"
})))
}
async fn api_info() -> Result<HttpResponse> {
Ok(HttpResponse::Ok().json(serde_json::json!({
"api_version": "v1",
"services": {
"calculator": {
"path": "/api/v1/calculator/",
"transport": "streamable-http",
"methods": ["GET", "POST", "DELETE"],
"description": "MCP Calculator service with session management"
}
},
"usage": {
"initialize": "POST with initialize method to create session",
"requests": "POST with Mcp-Session-Id header for subsequent requests",
"streaming": "GET with Mcp-Session-Id header to receive streaming responses"
}
})))
}
async fn root() -> Result<HttpResponse> {
Ok(HttpResponse::Ok().json(serde_json::json!({
"message": "MCP Calculator Service (StreamableHttp)",
"endpoints": {
"health": "/health",
"api_info": "/api/info",
"calculator": "/api/v1/calculator/"
},
"documentation": "https://modelcontextprotocol.io/"
})))
}
#[actix_web::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
tracing_subscriber::registry()
.with(
tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| "info".to_string().into()),
)
.with(tracing_subscriber::fmt::layer())
.init();
let bind_addr = "127.0.0.1:8080";
tracing::info!(
"Starting StreamableHttp composition example server on {}",
bind_addr
);
let calculator_service = StreamableHttpService::builder()
.service_factory(Arc::new(|| {
tracing::debug!("Creating new Calculator instance for session");
Ok(Calculator::new())
}))
.session_manager(Arc::new(LocalSessionManager::default())) .stateful_mode(true) .sse_keep_alive(Duration::from_secs(30)) .build();
let server =
HttpServer::new(move || {
App::new()
.wrap(middleware::Logger::default())
.wrap(middleware::NormalizePath::trim())
.wrap(
middleware::DefaultHeaders::new()
.add(("Access-Control-Allow-Origin", "*"))
.add(("Access-Control-Allow-Methods", "GET, POST, DELETE, OPTIONS"))
.add((
"Access-Control-Allow-Headers",
"Content-Type, Accept, Mcp-Session-Id",
)),
)
.route("/", web::get().to(root))
.route("/health", web::get().to(health_check))
.route("/api/info", web::get().to(api_info))
.service(web::scope("/api").service(web::scope("/v1").service(
web::scope("/calculator").service(calculator_service.clone().scope()),
)))
})
.bind(bind_addr)?
.run();
tracing::info!("🚀 Server started successfully!");
tracing::info!("📊 Health check: http://{}/health", bind_addr);
tracing::info!("📋 API info: http://{}/api/info", bind_addr);
tracing::info!(
"🧮 Calculator service: http://{}/api/v1/calculator/",
bind_addr
);
tracing::info!(
"💡 Tip: Use stateful mode - create session with initialize, then use Mcp-Session-Id header"
);
tracing::info!("Press Ctrl+C to stop the server");
tokio::select! {
result = server => {
if let Err(e) = result {
tracing::error!("HTTP server error: {}", e);
}
}
_ = tokio::signal::ctrl_c() => {
tracing::info!("Received Ctrl+C, shutting down gracefully");
}
}
Ok(())
}