1use bytes::Bytes;
7use http_body_util::{BodyExt, Full};
8use hyper::server::conn::http1;
9use hyper::service::service_fn;
10use hyper::{Request, Response};
11use hyper_util::rt::TokioIo;
12use std::net::SocketAddr;
13use std::sync::Arc;
14use tokio::net::TcpListener;
15use tracing::{debug, error, info};
16
17use turul_mcp_json_rpc_server::{JsonRpcDispatcher, JsonRpcHandler};
18use turul_mcp_protocol::McpError;
19use turul_mcp_session_storage::InMemorySessionStorage;
20
21use crate::streamable_http::{McpProtocolVersion, StreamableHttpHandler};
22use crate::{CorsLayer, Result, SessionMcpHandler, StreamConfig, StreamManager};
23
24#[derive(Debug, Clone)]
26pub struct ServerConfig {
27 pub bind_address: SocketAddr,
29 pub mcp_path: String,
31 pub enable_cors: bool,
33 pub max_body_size: usize,
35 pub enable_get_sse: bool,
37 pub enable_post_sse: bool,
39 pub session_expiry_minutes: u64,
41 pub allow_unauthenticated_ping: bool,
49}
50
51impl Default for ServerConfig {
52 fn default() -> Self {
53 Self {
54 bind_address: "127.0.0.1:8000".parse().unwrap(),
55 mcp_path: "/mcp".to_string(),
56 enable_cors: true,
57 max_body_size: 1024 * 1024, enable_get_sse: cfg!(feature = "sse"), enable_post_sse: false, session_expiry_minutes: 30, allow_unauthenticated_ping: true, }
63 }
64}
65
66pub struct HttpMcpServerBuilder {
68 config: ServerConfig,
69 dispatcher: JsonRpcDispatcher<McpError>,
70 session_storage: Option<Arc<turul_mcp_session_storage::BoxedSessionStorage>>,
71 stream_config: StreamConfig,
72 server_capabilities: Option<turul_mcp_protocol::ServerCapabilities>,
73 middleware_stack: Arc<crate::middleware::MiddlewareStack>,
74}
75
76impl HttpMcpServerBuilder {
77 pub fn new() -> Self {
79 Self {
80 config: ServerConfig::default(),
81 dispatcher: JsonRpcDispatcher::<McpError>::new(),
82 session_storage: Some(Arc::new(InMemorySessionStorage::new())),
83 stream_config: StreamConfig::default(),
84 server_capabilities: None,
85 middleware_stack: Arc::new(crate::middleware::MiddlewareStack::new()),
86 }
87 }
88}
89
90impl HttpMcpServerBuilder {
91 pub fn with_storage(
93 session_storage: Arc<turul_mcp_session_storage::BoxedSessionStorage>,
94 ) -> Self {
95 Self {
96 config: ServerConfig::default(),
97 dispatcher: JsonRpcDispatcher::<McpError>::new(),
98 session_storage: Some(session_storage),
99 stream_config: StreamConfig::default(),
100 server_capabilities: None,
101 middleware_stack: Arc::new(crate::middleware::MiddlewareStack::new()),
102 }
103 }
104
105 pub fn with_middleware_stack(
107 mut self,
108 middleware_stack: Arc<crate::middleware::MiddlewareStack>,
109 ) -> Self {
110 self.middleware_stack = middleware_stack;
111 self
112 }
113
114 pub fn bind_address(mut self, addr: SocketAddr) -> Self {
116 self.config.bind_address = addr;
117 self
118 }
119
120 pub fn mcp_path(mut self, path: impl Into<String>) -> Self {
122 self.config.mcp_path = path.into();
123 self
124 }
125
126 pub fn cors(mut self, enable: bool) -> Self {
128 self.config.enable_cors = enable;
129 self
130 }
131
132 pub fn max_body_size(mut self, size: usize) -> Self {
134 self.config.max_body_size = size;
135 self
136 }
137
138 pub fn get_sse(mut self, enable: bool) -> Self {
140 self.config.enable_get_sse = enable;
141 self
142 }
143
144 pub fn post_sse(mut self, enable: bool) -> Self {
146 self.config.enable_post_sse = enable;
147 self
148 }
149
150 pub fn sse(mut self, enable: bool) -> Self {
152 self.config.enable_get_sse = enable;
153 self.config.enable_post_sse = enable;
154 self
155 }
156
157 pub fn session_expiry_minutes(mut self, minutes: u64) -> Self {
159 self.config.session_expiry_minutes = minutes;
160 self
161 }
162
163 pub fn allow_unauthenticated_ping(mut self, allow: bool) -> Self {
168 self.config.allow_unauthenticated_ping = allow;
169 self
170 }
171
172 pub fn stream_config(mut self, config: StreamConfig) -> Self {
174 self.stream_config = config;
175 self
176 }
177
178 pub fn register_handler<H>(mut self, methods: Vec<String>, handler: H) -> Self
180 where
181 H: JsonRpcHandler<Error = McpError> + 'static,
182 {
183 self.dispatcher.register_methods(methods, handler);
184 self
185 }
186
187 pub fn default_handler<H>(mut self, handler: H) -> Self
189 where
190 H: JsonRpcHandler<Error = McpError> + 'static,
191 {
192 self.dispatcher.set_default_handler(handler);
193 self
194 }
195
196 pub fn server_capabilities(
198 mut self,
199 capabilities: turul_mcp_protocol::ServerCapabilities,
200 ) -> Self {
201 self.server_capabilities = Some(capabilities);
202 self
203 }
204
205 pub fn build(self) -> HttpMcpServer {
207 let session_storage = self
208 .session_storage
209 .expect("Session storage must be provided");
210
211 let stream_manager = Arc::new(StreamManager::with_config(
213 Arc::clone(&session_storage),
214 self.stream_config.clone(),
215 ));
216
217 let dispatcher = Arc::new(self.dispatcher);
219
220 let middleware_stack = self.middleware_stack;
222
223 let streamable_handler = StreamableHttpHandler::new(
225 Arc::new(self.config.clone()),
226 Arc::clone(&dispatcher),
227 Arc::clone(&session_storage),
228 Arc::clone(&stream_manager),
229 self.server_capabilities.unwrap_or_default(),
230 Arc::clone(&middleware_stack),
231 );
232
233 HttpMcpServer {
234 config: self.config,
235 dispatcher,
236 session_storage,
237 stream_config: self.stream_config,
238 stream_manager,
239 streamable_handler,
240 }
241 }
242}
243
244impl Default for HttpMcpServerBuilder {
245 fn default() -> Self {
246 Self::new()
247 }
248}
249
250#[derive(Clone)]
252pub struct HttpMcpServer {
253 config: ServerConfig,
254 dispatcher: Arc<JsonRpcDispatcher<McpError>>,
255 session_storage: Arc<turul_mcp_session_storage::BoxedSessionStorage>,
256 stream_config: StreamConfig,
257 stream_manager: Arc<StreamManager>,
259 streamable_handler: StreamableHttpHandler,
261}
262
263impl HttpMcpServer {
264 pub fn builder() -> HttpMcpServerBuilder {
266 HttpMcpServerBuilder::new()
267 }
268}
269
270impl HttpMcpServer {
271 pub fn builder_with_storage(
273 session_storage: Arc<turul_mcp_session_storage::BoxedSessionStorage>,
274 ) -> HttpMcpServerBuilder {
275 HttpMcpServerBuilder::with_storage(session_storage)
276 }
277
278 pub fn get_stream_manager(&self) -> Arc<crate::StreamManager> {
281 Arc::clone(&self.stream_manager)
282 }
283
284 pub async fn run(&self) -> Result<()> {
286 self.start_session_cleanup().await;
288
289 let listener = TcpListener::bind(&self.config.bind_address).await?;
290 info!("HTTP MCP server listening on {}", self.config.bind_address);
291 info!("MCP endpoint available at: {}", self.config.mcp_path);
292 info!("Session storage: {}", self.session_storage.backend_name());
293
294 let session_handler = SessionMcpHandler::with_shared_stream_manager(
297 self.config.clone(),
298 Arc::clone(&self.dispatcher),
299 Arc::clone(&self.session_storage),
300 self.stream_config.clone(),
301 Arc::clone(&self.stream_manager),
302 Arc::clone(&self.streamable_handler.middleware_stack),
303 );
304
305 let handler = McpRequestHandler {
307 session_handler,
308 streamable_handler: self.streamable_handler.clone(),
309 };
310
311 loop {
312 let (stream, peer_addr) = listener.accept().await?;
313 debug!("New connection from {}", peer_addr);
314
315 let handler_clone = handler.clone();
316 tokio::spawn(async move {
317 let io = TokioIo::new(stream);
318 let service = service_fn(move |req| handle_request(req, handler_clone.clone()));
319
320 if let Err(err) = http1::Builder::new().serve_connection(io, service).await {
321 let err_str = err.to_string();
323 if err_str.contains("connection closed before message completed") {
324 debug!("Client disconnected (normal): {}", err);
325 } else {
326 error!("Error serving connection: {}", err);
327 }
328 }
329 });
330 }
331 }
332
333 async fn start_session_cleanup(&self) {
335 let storage = Arc::clone(&self.session_storage);
336 let session_expiry_minutes = self.config.session_expiry_minutes;
337 tokio::spawn(async move {
338 let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(60));
339 loop {
340 interval.tick().await;
341
342 let expire_time = std::time::SystemTime::now()
343 - std::time::Duration::from_secs(session_expiry_minutes * 60);
344 match storage.expire_sessions(expire_time).await {
345 Ok(expired) => {
346 if !expired.is_empty() {
347 info!("Expired {} sessions", expired.len());
348 for session_id in expired {
349 debug!("Expired session: {}", session_id);
350 }
351 }
352 }
353 Err(err) => {
354 error!("Session cleanup error: {}", err);
355 }
356 }
357 }
358 });
359 }
360
361 pub async fn get_stats(&self) -> ServerStats {
363 let session_count = self.session_storage.session_count().await.unwrap_or(0);
364 let event_count = self.session_storage.event_count().await.unwrap_or(0);
365
366 ServerStats {
367 sessions: session_count,
368 events: event_count,
369 storage_type: self.session_storage.backend_name().to_string(),
370 }
371 }
372}
373
374#[derive(Clone)]
377struct McpRequestHandler {
378 session_handler: SessionMcpHandler,
379 streamable_handler: StreamableHttpHandler,
380}
381
382async fn handle_request(
383 req: Request<hyper::body::Incoming>,
384 handler: McpRequestHandler,
385) -> std::result::Result<
386 Response<http_body_util::combinators::UnsyncBoxBody<Bytes, hyper::Error>>,
387 hyper::Error,
388> {
389 let method = req.method().clone();
390 let uri = req.uri().clone();
391 let path = uri.path();
392
393 debug!("Handling {} {}", method, path);
394
395 debug!(
397 "HTTP server dispatch: path={}, expected_mcp_path={}",
398 path, handler.session_handler.config.mcp_path
399 );
400 let response = if path == handler.session_handler.config.mcp_path {
401 debug!("Path match: Request routed to MCP handler");
402 let protocol_version_str = req
404 .headers()
405 .get("MCP-Protocol-Version")
406 .and_then(|h| h.to_str().ok())
407 .unwrap_or("2025-11-25"); debug!("Protocol version: {}", protocol_version_str);
409
410 let protocol_version = McpProtocolVersion::parse_version(protocol_version_str)
411 .unwrap_or(McpProtocolVersion::V2025_11_25);
412
413 debug!(
414 "MCP request: protocol_version={}, method={}",
415 protocol_version.as_str(),
416 method
417 );
418
419 debug!(
421 "Routing decision: protocol_version={}, method={}, supports_streamable={}, handler={}",
422 protocol_version.as_str(),
423 method,
424 protocol_version.supports_streamable_http(),
425 if protocol_version.supports_streamable_http() {
426 "StreamableHttpHandler"
427 } else {
428 "SessionMcpHandler"
429 }
430 );
431
432 if protocol_version.supports_streamable_http() {
433 debug!(
435 "Calling streamable handler for protocol {}",
436 protocol_version.as_str()
437 );
438 let streamable_response = handler.streamable_handler.handle_request(req).await;
439 debug!("Streamable handler completed");
440 Ok(streamable_response)
441 } else {
442 match handler.session_handler.handle_mcp_request(req).await {
444 Ok(mcp_response) => Ok(mcp_response),
445 Err(err) => {
446 error!("Request handling error: {}", err);
447 Ok(Response::builder()
448 .status(hyper::StatusCode::INTERNAL_SERVER_ERROR)
449 .body(
450 Full::new(Bytes::from(format!("Internal Server Error: {}", err)))
451 .map_err(|never| match never {})
452 .boxed_unsync(),
453 )
454 .unwrap())
455 }
456 }
457 }
458 } else {
459 Ok(Response::builder()
461 .status(hyper::StatusCode::NOT_FOUND)
462 .body(
463 Full::new(Bytes::from("Not Found"))
464 .map_err(|never| match never {})
465 .boxed_unsync(),
466 )
467 .unwrap())
468 };
469
470 match response {
472 Ok(mut final_response) => {
473 if handler.session_handler.config.enable_cors {
474 CorsLayer::apply_cors_headers(final_response.headers_mut());
475 }
476 Ok(final_response)
477 }
478 Err(e) => Err(e),
479 }
480}
481
482#[derive(Debug, Clone)]
484pub struct ServerStats {
485 pub sessions: usize,
486 pub events: usize,
487 pub storage_type: String,
488}
489
490#[cfg(test)]
491mod tests {
492 use super::*;
493 use std::net::{IpAddr, Ipv4Addr};
494 use std::sync::Arc;
495 use turul_mcp_session_storage::InMemorySessionStorage;
496
497 #[test]
498 fn test_server_config_default() {
499 let config = ServerConfig::default();
500 assert_eq!(config.mcp_path, "/mcp");
501 assert!(config.enable_cors);
502 assert_eq!(config.max_body_size, 1024 * 1024);
503 }
504
505 #[test]
506 fn test_builder() {
507 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 3000);
508 let session_storage = Arc::new(InMemorySessionStorage::new());
509 let server = HttpMcpServer::builder_with_storage(session_storage)
510 .bind_address(addr)
511 .mcp_path("/api/mcp")
512 .cors(false)
513 .max_body_size(2048)
514 .build();
515
516 assert_eq!(server.config.bind_address, addr);
517 assert_eq!(server.config.mcp_path, "/api/mcp");
518 assert!(!server.config.enable_cors);
519 assert_eq!(server.config.max_body_size, 2048);
520 }
521
522 #[tokio::test]
523 async fn test_server_stats() {
524 let session_storage = Arc::new(InMemorySessionStorage::new());
525 let server = HttpMcpServer::builder_with_storage(session_storage).build();
526
527 let stats = server.get_stats().await;
528 assert_eq!(stats.sessions, 0);
529 assert_eq!(stats.events, 0);
530 assert_eq!(stats.storage_type, "InMemory");
531 }
532}