use std::sync::Arc;
use actix_web::{App, HttpResponse, HttpServer, Result, web};
use rmcp::transport::{
StreamableHttpServerConfig, streamable_http_server::session::local::LocalSessionManager,
};
use rmcp_actix_web::StreamableHttpService;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
mod common;
use common::calculator::Calculator;
async fn health_check() -> Result<HttpResponse> {
Ok(HttpResponse::Ok().json(serde_json::json!({
"status": "healthy",
"service": "mcp-calculator-streamable",
"version": "1.0.0",
"transport": "streamable-http"
})))
}
async fn api_info() -> Result<HttpResponse> {
Ok(HttpResponse::Ok().json(serde_json::json!({
"api_version": "v1",
"services": {
"calculator": {
"path": "/api/v1/calculator/",
"transport": "streamable-http",
"methods": ["GET", "POST", "DELETE"],
"description": "MCP Calculator service with session management"
}
},
"usage": {
"initialize": "POST with initialize method to create session",
"requests": "POST with X-Session-Id header for subsequent requests",
"streaming": "GET with X-Session-Id header to receive streaming responses"
}
})))
}
async fn root() -> Result<HttpResponse> {
Ok(HttpResponse::Ok().json(serde_json::json!({
"message": "MCP Calculator Service (StreamableHttp)",
"endpoints": {
"health": "/health",
"api_info": "/api/info",
"calculator": "/api/v1/calculator/"
},
"documentation": "https://modelcontextprotocol.io/"
})))
}
#[actix_web::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
tracing_subscriber::registry()
.with(
tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| "info".to_string().into()),
)
.with(tracing_subscriber::fmt::layer())
.init();
let bind_addr = "127.0.0.1:8080";
tracing::info!(
"Starting StreamableHttp composition example server on {}",
bind_addr
);
let server = HttpServer::new(move || {
let calculator_service = Arc::new(StreamableHttpService::new(
|| {
tracing::debug!("Creating new Calculator instance for session");
Ok(Calculator::new())
},
LocalSessionManager::default().into(),
StreamableHttpServerConfig {
stateful_mode: true, sse_keep_alive: Some(std::time::Duration::from_secs(30)),
},
));
let calculator_scope = StreamableHttpService::scope(calculator_service);
App::new()
.wrap(actix_web::middleware::Logger::default())
.wrap(
actix_web::middleware::DefaultHeaders::new()
.add(("Access-Control-Allow-Origin", "*"))
.add(("Access-Control-Allow-Methods", "GET, POST, DELETE, OPTIONS"))
.add((
"Access-Control-Allow-Headers",
"Content-Type, Accept, X-Session-Id",
)),
)
.route("/", web::get().to(root))
.route("/health", web::get().to(health_check))
.route("/api/info", web::get().to(api_info))
.service(web::scope("/api").service(
web::scope("/v1").service(web::scope("/calculator").service(calculator_scope)),
))
})
.bind(bind_addr)?
.run();
tracing::info!("🚀 Server started successfully!");
tracing::info!("📊 Health check: http://{}/health", bind_addr);
tracing::info!("📋 API info: http://{}/api/info", bind_addr);
tracing::info!(
"🧮 Calculator service: http://{}/api/v1/calculator/",
bind_addr
);
tracing::info!(
"💡 Tip: Use stateful mode - create session with initialize, then use X-Session-Id header"
);
tracing::info!("Press Ctrl+C to stop the server");
tokio::select! {
result = server => {
if let Err(e) = result {
tracing::error!("HTTP server error: {}", e);
}
}
_ = tokio::signal::ctrl_c() => {
tracing::info!("Received Ctrl+C, shutting down gracefully");
}
}
Ok(())
}