1use bytes::Bytes;
7use http_body_util::{BodyExt, Full};
8use hyper::server::conn::http1;
9use hyper::service::service_fn;
10use hyper::{Request, Response};
11use hyper_util::rt::TokioIo;
12use std::net::SocketAddr;
13use std::sync::Arc;
14use tokio::net::TcpListener;
15use tracing::{debug, error, info, warn};
16
17use turul_mcp_json_rpc_server::{JsonRpcDispatcher, JsonRpcHandler};
18use turul_mcp_protocol::McpError;
19use turul_mcp_session_storage::InMemorySessionStorage;
20
21use crate::streamable_http::{McpProtocolVersion, StreamableHttpHandler};
22use crate::{CorsLayer, Result, SessionMcpHandler, StreamConfig, StreamManager};
23
24#[derive(Debug, Clone)]
26pub struct ServerConfig {
27 pub bind_address: SocketAddr,
29 pub mcp_path: String,
31 pub enable_cors: bool,
33 pub max_body_size: usize,
35 pub enable_get_sse: bool,
37 pub enable_post_sse: bool,
39 pub session_expiry_minutes: u64,
41 pub allow_unauthenticated_ping: bool,
49}
50
51impl Default for ServerConfig {
52 fn default() -> Self {
53 Self {
54 bind_address: "127.0.0.1:8000".parse().unwrap(),
55 mcp_path: "/mcp".to_string(),
56 enable_cors: true,
57 max_body_size: 1024 * 1024, enable_get_sse: cfg!(feature = "sse"), enable_post_sse: false, session_expiry_minutes: 30, allow_unauthenticated_ping: true, }
63 }
64}
65
66pub struct HttpMcpServerBuilder {
68 config: ServerConfig,
69 dispatcher: JsonRpcDispatcher<McpError>,
70 session_storage: Option<Arc<turul_mcp_session_storage::BoxedSessionStorage>>,
71 stream_config: StreamConfig,
72 server_capabilities: Option<turul_mcp_protocol::ServerCapabilities>,
73 middleware_stack: Arc<crate::middleware::MiddlewareStack>,
74 route_registry: Arc<crate::routes::RouteRegistry>,
75}
76
77impl HttpMcpServerBuilder {
78 pub fn new() -> Self {
80 Self {
81 config: ServerConfig::default(),
82 dispatcher: JsonRpcDispatcher::<McpError>::new(),
83 session_storage: Some(Arc::new(InMemorySessionStorage::new())),
84 stream_config: StreamConfig::default(),
85 server_capabilities: None,
86 middleware_stack: Arc::new(crate::middleware::MiddlewareStack::new()),
87 route_registry: Arc::new(crate::routes::RouteRegistry::new()),
88 }
89 }
90}
91
92impl HttpMcpServerBuilder {
93 pub fn with_storage(
95 session_storage: Arc<turul_mcp_session_storage::BoxedSessionStorage>,
96 ) -> Self {
97 Self {
98 config: ServerConfig::default(),
99 dispatcher: JsonRpcDispatcher::<McpError>::new(),
100 session_storage: Some(session_storage),
101 stream_config: StreamConfig::default(),
102 server_capabilities: None,
103 middleware_stack: Arc::new(crate::middleware::MiddlewareStack::new()),
104 route_registry: Arc::new(crate::routes::RouteRegistry::new()),
105 }
106 }
107
108 pub fn with_middleware_stack(
110 mut self,
111 middleware_stack: Arc<crate::middleware::MiddlewareStack>,
112 ) -> Self {
113 self.middleware_stack = middleware_stack;
114 self
115 }
116
117 pub fn route_registry(mut self, registry: Arc<crate::routes::RouteRegistry>) -> Self {
119 self.route_registry = registry;
120 self
121 }
122
123 pub fn bind_address(mut self, addr: SocketAddr) -> Self {
125 self.config.bind_address = addr;
126 self
127 }
128
129 pub fn mcp_path(mut self, path: impl Into<String>) -> Self {
131 self.config.mcp_path = path.into();
132 self
133 }
134
135 pub fn cors(mut self, enable: bool) -> Self {
137 self.config.enable_cors = enable;
138 self
139 }
140
141 pub fn max_body_size(mut self, size: usize) -> Self {
143 self.config.max_body_size = size;
144 self
145 }
146
147 pub fn get_sse(mut self, enable: bool) -> Self {
149 self.config.enable_get_sse = enable;
150 self
151 }
152
153 pub fn post_sse(mut self, enable: bool) -> Self {
155 self.config.enable_post_sse = enable;
156 self
157 }
158
159 pub fn sse(mut self, enable: bool) -> Self {
161 self.config.enable_get_sse = enable;
162 self.config.enable_post_sse = enable;
163 self
164 }
165
166 pub fn session_expiry_minutes(mut self, minutes: u64) -> Self {
168 self.config.session_expiry_minutes = minutes;
169 self
170 }
171
172 pub fn allow_unauthenticated_ping(mut self, allow: bool) -> Self {
177 self.config.allow_unauthenticated_ping = allow;
178 self
179 }
180
181 pub fn stream_config(mut self, config: StreamConfig) -> Self {
183 self.stream_config = config;
184 self
185 }
186
187 pub fn register_handler<H>(mut self, methods: Vec<String>, handler: H) -> Self
189 where
190 H: JsonRpcHandler<Error = McpError> + 'static,
191 {
192 self.dispatcher.register_methods(methods, handler);
193 self
194 }
195
196 pub fn default_handler<H>(mut self, handler: H) -> Self
198 where
199 H: JsonRpcHandler<Error = McpError> + 'static,
200 {
201 self.dispatcher.set_default_handler(handler);
202 self
203 }
204
205 pub fn server_capabilities(
207 mut self,
208 capabilities: turul_mcp_protocol::ServerCapabilities,
209 ) -> Self {
210 self.server_capabilities = Some(capabilities);
211 self
212 }
213
214 pub fn build(self) -> HttpMcpServer {
216 let session_storage = self
217 .session_storage
218 .expect("Session storage must be provided");
219
220 let stream_manager = Arc::new(StreamManager::with_config(
222 Arc::clone(&session_storage),
223 self.stream_config.clone(),
224 ));
225
226 let dispatcher = Arc::new(self.dispatcher);
228
229 let middleware_stack = self.middleware_stack;
231
232 let streamable_handler = StreamableHttpHandler::new(
234 Arc::new(self.config.clone()),
235 Arc::clone(&dispatcher),
236 Arc::clone(&session_storage),
237 Arc::clone(&stream_manager),
238 self.server_capabilities.unwrap_or_default(),
239 Arc::clone(&middleware_stack),
240 );
241
242 HttpMcpServer {
243 config: self.config,
244 dispatcher,
245 session_storage,
246 stream_config: self.stream_config,
247 stream_manager,
248 streamable_handler,
249 route_registry: self.route_registry,
250 }
251 }
252}
253
254impl Default for HttpMcpServerBuilder {
255 fn default() -> Self {
256 Self::new()
257 }
258}
259
260#[derive(Clone)]
262pub struct HttpMcpServer {
263 config: ServerConfig,
264 dispatcher: Arc<JsonRpcDispatcher<McpError>>,
265 session_storage: Arc<turul_mcp_session_storage::BoxedSessionStorage>,
266 stream_config: StreamConfig,
267 stream_manager: Arc<StreamManager>,
269 streamable_handler: StreamableHttpHandler,
271 route_registry: Arc<crate::routes::RouteRegistry>,
273}
274
275impl HttpMcpServer {
276 pub fn builder() -> HttpMcpServerBuilder {
278 HttpMcpServerBuilder::new()
279 }
280}
281
282impl HttpMcpServer {
283 pub fn builder_with_storage(
285 session_storage: Arc<turul_mcp_session_storage::BoxedSessionStorage>,
286 ) -> HttpMcpServerBuilder {
287 HttpMcpServerBuilder::with_storage(session_storage)
288 }
289
290 pub fn get_stream_manager(&self) -> Arc<crate::StreamManager> {
293 Arc::clone(&self.stream_manager)
294 }
295
296 pub async fn run(&self) -> Result<()> {
298 self.start_session_cleanup().await;
300
301 let listener = TcpListener::bind(&self.config.bind_address).await?;
302 info!("HTTP MCP server listening on {}", self.config.bind_address);
303 info!("MCP endpoint available at: {}", self.config.mcp_path);
304 info!("Session storage: {}", self.session_storage.backend_name());
305
306 let session_handler = SessionMcpHandler::with_shared_stream_manager(
309 self.config.clone(),
310 Arc::clone(&self.dispatcher),
311 Arc::clone(&self.session_storage),
312 self.stream_config.clone(),
313 Arc::clone(&self.stream_manager),
314 Arc::clone(&self.streamable_handler.middleware_stack),
315 );
316
317 let handler = McpRequestHandler {
319 session_handler,
320 streamable_handler: self.streamable_handler.clone(),
321 route_registry: Arc::clone(&self.route_registry),
322 };
323
324 loop {
325 let (stream, peer_addr) = listener.accept().await?;
326 debug!("New connection from {}", peer_addr);
327
328 let handler_clone = handler.clone();
329 tokio::spawn(async move {
330 let io = TokioIo::new(stream);
331 let service = service_fn(move |req| handle_request(req, handler_clone.clone()));
332
333 if let Err(err) = http1::Builder::new().serve_connection(io, service).await {
334 let err_str = err.to_string();
336 if err_str.contains("connection closed before message completed") {
337 debug!("Client disconnected (normal): {}", err);
338 } else {
339 error!("Error serving connection: {}", err);
340 }
341 }
342 });
343 }
344 }
345
346 async fn start_session_cleanup(&self) {
348 let storage = Arc::clone(&self.session_storage);
349 let session_expiry_minutes = self.config.session_expiry_minutes;
350 tokio::spawn(async move {
351 let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(60));
352 loop {
353 interval.tick().await;
354
355 let expire_time = std::time::SystemTime::now()
356 - std::time::Duration::from_secs(session_expiry_minutes * 60);
357 match storage.expire_sessions(expire_time).await {
358 Ok(expired) => {
359 if !expired.is_empty() {
360 info!("Expired {} sessions", expired.len());
361 for session_id in expired {
362 debug!("Expired session: {}", session_id);
363 }
364 }
365 }
366 Err(err) => {
367 error!("Session cleanup error: {}", err);
368 }
369 }
370 }
371 });
372 }
373
374 pub async fn get_stats(&self) -> ServerStats {
376 let session_count = self.session_storage.session_count().await.unwrap_or(0);
377 let event_count = self.session_storage.event_count().await.unwrap_or(0);
378
379 ServerStats {
380 sessions: session_count,
381 events: event_count,
382 storage_type: self.session_storage.backend_name().to_string(),
383 }
384 }
385}
386
387#[derive(Clone)]
390struct McpRequestHandler {
391 session_handler: SessionMcpHandler,
392 streamable_handler: StreamableHttpHandler,
393 route_registry: Arc<crate::routes::RouteRegistry>,
394}
395
396async fn handle_request(
397 req: Request<hyper::body::Incoming>,
398 handler: McpRequestHandler,
399) -> std::result::Result<
400 Response<http_body_util::combinators::UnsyncBoxBody<Bytes, hyper::Error>>,
401 hyper::Error,
402> {
403 let method = req.method().clone();
404 let uri = req.uri().clone();
405 let path = uri.path();
406
407 debug!("Handling {} {}", method, path);
408
409 debug!(
411 "HTTP server dispatch: path={}, expected_mcp_path={}",
412 path, handler.session_handler.config.mcp_path
413 );
414 let response = if path == handler.session_handler.config.mcp_path {
415 debug!("Path match: Request routed to MCP handler");
416 let protocol_version_str = req
418 .headers()
419 .get("MCP-Protocol-Version")
420 .and_then(|h| h.to_str().ok())
421 .unwrap_or("2025-11-25"); debug!("Protocol version: {}", protocol_version_str);
423
424 let protocol_version = McpProtocolVersion::parse_version(protocol_version_str)
425 .unwrap_or(McpProtocolVersion::V2025_11_25);
426
427 debug!(
428 "MCP request: protocol_version={}, method={}",
429 protocol_version.as_str(),
430 method
431 );
432
433 debug!(
435 "Routing decision: protocol_version={}, method={}, supports_streamable={}, handler={}",
436 protocol_version.as_str(),
437 method,
438 protocol_version.supports_streamable_http(),
439 if protocol_version.supports_streamable_http() {
440 "StreamableHttpHandler"
441 } else {
442 "SessionMcpHandler"
443 }
444 );
445
446 if protocol_version.supports_streamable_http() {
447 debug!(
449 "Calling streamable handler for protocol {}",
450 protocol_version.as_str()
451 );
452 let streamable_response = handler.streamable_handler.handle_request(req).await;
453 debug!("Streamable handler completed");
454 Ok(streamable_response)
455 } else {
456 match handler.session_handler.handle_mcp_request(req).await {
458 Ok(mcp_response) => Ok(mcp_response),
459 Err(err) => {
460 error!("Request handling error: {}", err);
461 Ok(Response::builder()
462 .status(hyper::StatusCode::INTERNAL_SERVER_ERROR)
463 .body(
464 Full::new(Bytes::from(format!("Internal Server Error: {}", err)))
465 .map_err(|never| match never {})
466 .boxed_unsync(),
467 )
468 .unwrap())
469 }
470 }
471 }
472 } else {
473 match handler.route_registry.match_route(path) {
475 Ok(Some(route_handler)) => {
476 debug!("Custom route matched: {}", path);
477 let (parts, body) = req.into_parts();
479 let boxed_req = Request::from_parts(parts, body.boxed_unsync());
480 Ok(route_handler.handle(boxed_req).await)
481 }
482 Ok(None) => {
483 Ok(Response::builder()
485 .status(hyper::StatusCode::NOT_FOUND)
486 .body(
487 Full::new(Bytes::from("Not Found"))
488 .map_err(|never| match never {})
489 .boxed_unsync(),
490 )
491 .unwrap())
492 }
493 Err(validation_err) => {
494 warn!(
496 "Route validation failed for path '{}': {}",
497 path, validation_err
498 );
499 Ok(validation_err.into_response())
500 }
501 }
502 };
503
504 match response {
506 Ok(mut final_response) => {
507 if handler.session_handler.config.enable_cors {
508 CorsLayer::apply_cors_headers(final_response.headers_mut());
509 }
510 Ok(final_response)
511 }
512 Err(e) => Err(e),
513 }
514}
515
516#[derive(Debug, Clone)]
518pub struct ServerStats {
519 pub sessions: usize,
520 pub events: usize,
521 pub storage_type: String,
522}
523
524#[cfg(test)]
525mod tests {
526 use super::*;
527 use std::net::{IpAddr, Ipv4Addr};
528 use std::sync::Arc;
529 use turul_mcp_session_storage::InMemorySessionStorage;
530
531 #[test]
532 fn test_server_config_default() {
533 let config = ServerConfig::default();
534 assert_eq!(config.mcp_path, "/mcp");
535 assert!(config.enable_cors);
536 assert_eq!(config.max_body_size, 1024 * 1024);
537 }
538
539 #[test]
540 fn test_builder() {
541 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 3000);
542 let session_storage = Arc::new(InMemorySessionStorage::new());
543 let server = HttpMcpServer::builder_with_storage(session_storage)
544 .bind_address(addr)
545 .mcp_path("/api/mcp")
546 .cors(false)
547 .max_body_size(2048)
548 .build();
549
550 assert_eq!(server.config.bind_address, addr);
551 assert_eq!(server.config.mcp_path, "/api/mcp");
552 assert!(!server.config.enable_cors);
553 assert_eq!(server.config.max_body_size, 2048);
554 }
555
556 #[tokio::test]
557 async fn test_server_stats() {
558 let session_storage = Arc::new(InMemorySessionStorage::new());
559 let server = HttpMcpServer::builder_with_storage(session_storage).build();
560
561 let stats = server.get_stats().await;
562 assert_eq!(stats.sessions, 0);
563 assert_eq!(stats.events, 0);
564 assert_eq!(stats.storage_type, "InMemory");
565 }
566}