1use bytes::Bytes;
7use http_body_util::{BodyExt, Full};
8use hyper::server::conn::http1;
9use hyper::service::service_fn;
10use hyper::{Request, Response};
11use hyper_util::rt::TokioIo;
12use std::net::SocketAddr;
13use std::sync::Arc;
14use tokio::net::TcpListener;
15use tracing::{debug, error, info};
16
17use turul_mcp_json_rpc_server::{JsonRpcDispatcher, JsonRpcHandler};
18use turul_mcp_protocol::McpError;
19use turul_mcp_session_storage::InMemorySessionStorage;
20
21use crate::streamable_http::{McpProtocolVersion, StreamableHttpHandler};
22use crate::{CorsLayer, Result, SessionMcpHandler, StreamConfig, StreamManager};
23
24#[derive(Debug, Clone)]
26pub struct ServerConfig {
27 pub bind_address: SocketAddr,
29 pub mcp_path: String,
31 pub enable_cors: bool,
33 pub max_body_size: usize,
35 pub enable_get_sse: bool,
37 pub enable_post_sse: bool,
39 pub session_expiry_minutes: u64,
41}
42
43impl Default for ServerConfig {
44 fn default() -> Self {
45 Self {
46 bind_address: "127.0.0.1:8000".parse().unwrap(),
47 mcp_path: "/mcp".to_string(),
48 enable_cors: true,
49 max_body_size: 1024 * 1024, enable_get_sse: cfg!(feature = "sse"), enable_post_sse: false, session_expiry_minutes: 30, }
54 }
55}
56
57pub struct HttpMcpServerBuilder {
59 config: ServerConfig,
60 dispatcher: JsonRpcDispatcher<McpError>,
61 session_storage: Option<Arc<turul_mcp_session_storage::BoxedSessionStorage>>,
62 stream_config: StreamConfig,
63 server_capabilities: Option<turul_mcp_protocol::ServerCapabilities>,
64 middleware_stack: Arc<crate::middleware::MiddlewareStack>,
65}
66
67impl HttpMcpServerBuilder {
68 pub fn new() -> Self {
70 Self {
71 config: ServerConfig::default(),
72 dispatcher: JsonRpcDispatcher::<McpError>::new(),
73 session_storage: Some(Arc::new(InMemorySessionStorage::new())),
74 stream_config: StreamConfig::default(),
75 server_capabilities: None,
76 middleware_stack: Arc::new(crate::middleware::MiddlewareStack::new()),
77 }
78 }
79}
80
81impl HttpMcpServerBuilder {
82 pub fn with_storage(
84 session_storage: Arc<turul_mcp_session_storage::BoxedSessionStorage>,
85 ) -> Self {
86 Self {
87 config: ServerConfig::default(),
88 dispatcher: JsonRpcDispatcher::<McpError>::new(),
89 session_storage: Some(session_storage),
90 stream_config: StreamConfig::default(),
91 server_capabilities: None,
92 middleware_stack: Arc::new(crate::middleware::MiddlewareStack::new()),
93 }
94 }
95
96 pub fn with_middleware_stack(
98 mut self,
99 middleware_stack: Arc<crate::middleware::MiddlewareStack>,
100 ) -> Self {
101 self.middleware_stack = middleware_stack;
102 self
103 }
104
105 pub fn bind_address(mut self, addr: SocketAddr) -> Self {
107 self.config.bind_address = addr;
108 self
109 }
110
111 pub fn mcp_path(mut self, path: impl Into<String>) -> Self {
113 self.config.mcp_path = path.into();
114 self
115 }
116
117 pub fn cors(mut self, enable: bool) -> Self {
119 self.config.enable_cors = enable;
120 self
121 }
122
123 pub fn max_body_size(mut self, size: usize) -> Self {
125 self.config.max_body_size = size;
126 self
127 }
128
129 pub fn get_sse(mut self, enable: bool) -> Self {
131 self.config.enable_get_sse = enable;
132 self
133 }
134
135 pub fn post_sse(mut self, enable: bool) -> Self {
137 self.config.enable_post_sse = enable;
138 self
139 }
140
141 pub fn sse(mut self, enable: bool) -> Self {
143 self.config.enable_get_sse = enable;
144 self.config.enable_post_sse = enable;
145 self
146 }
147
148 pub fn session_expiry_minutes(mut self, minutes: u64) -> Self {
150 self.config.session_expiry_minutes = minutes;
151 self
152 }
153
154 pub fn stream_config(mut self, config: StreamConfig) -> Self {
156 self.stream_config = config;
157 self
158 }
159
160 pub fn register_handler<H>(mut self, methods: Vec<String>, handler: H) -> Self
162 where
163 H: JsonRpcHandler<Error = McpError> + 'static,
164 {
165 self.dispatcher.register_methods(methods, handler);
166 self
167 }
168
169 pub fn default_handler<H>(mut self, handler: H) -> Self
171 where
172 H: JsonRpcHandler<Error = McpError> + 'static,
173 {
174 self.dispatcher.set_default_handler(handler);
175 self
176 }
177
178 pub fn server_capabilities(
180 mut self,
181 capabilities: turul_mcp_protocol::ServerCapabilities,
182 ) -> Self {
183 self.server_capabilities = Some(capabilities);
184 self
185 }
186
187 pub fn build(self) -> HttpMcpServer {
189 let session_storage = self
190 .session_storage
191 .expect("Session storage must be provided");
192
193 let stream_manager = Arc::new(StreamManager::with_config(
195 Arc::clone(&session_storage),
196 self.stream_config.clone(),
197 ));
198
199 let dispatcher = Arc::new(self.dispatcher);
201
202 let middleware_stack = self.middleware_stack;
204
205 let streamable_handler = StreamableHttpHandler::new(
207 Arc::new(self.config.clone()),
208 Arc::clone(&dispatcher),
209 Arc::clone(&session_storage),
210 Arc::clone(&stream_manager),
211 self.server_capabilities.unwrap_or_default(),
212 Arc::clone(&middleware_stack),
213 );
214
215 HttpMcpServer {
216 config: self.config,
217 dispatcher,
218 session_storage,
219 stream_config: self.stream_config,
220 stream_manager,
221 streamable_handler,
222 }
223 }
224}
225
226impl Default for HttpMcpServerBuilder {
227 fn default() -> Self {
228 Self::new()
229 }
230}
231
232#[derive(Clone)]
234pub struct HttpMcpServer {
235 config: ServerConfig,
236 dispatcher: Arc<JsonRpcDispatcher<McpError>>,
237 session_storage: Arc<turul_mcp_session_storage::BoxedSessionStorage>,
238 stream_config: StreamConfig,
239 stream_manager: Arc<StreamManager>,
241 streamable_handler: StreamableHttpHandler,
243}
244
245impl HttpMcpServer {
246 pub fn builder() -> HttpMcpServerBuilder {
248 HttpMcpServerBuilder::new()
249 }
250}
251
252impl HttpMcpServer {
253 pub fn builder_with_storage(
255 session_storage: Arc<turul_mcp_session_storage::BoxedSessionStorage>,
256 ) -> HttpMcpServerBuilder {
257 HttpMcpServerBuilder::with_storage(session_storage)
258 }
259
260 pub fn get_stream_manager(&self) -> Arc<crate::StreamManager> {
263 Arc::clone(&self.stream_manager)
264 }
265
266 pub async fn run(&self) -> Result<()> {
268 self.start_session_cleanup().await;
270
271 let listener = TcpListener::bind(&self.config.bind_address).await?;
272 info!("HTTP MCP server listening on {}", self.config.bind_address);
273 info!("MCP endpoint available at: {}", self.config.mcp_path);
274 info!("Session storage: {}", self.session_storage.backend_name());
275
276 let session_handler = SessionMcpHandler::with_shared_stream_manager(
279 self.config.clone(),
280 Arc::clone(&self.dispatcher),
281 Arc::clone(&self.session_storage),
282 self.stream_config.clone(),
283 Arc::clone(&self.stream_manager),
284 Arc::clone(&self.streamable_handler.middleware_stack),
285 );
286
287 let handler = McpRequestHandler {
289 session_handler,
290 streamable_handler: self.streamable_handler.clone(),
291 };
292
293 loop {
294 let (stream, peer_addr) = listener.accept().await?;
295 debug!("New connection from {}", peer_addr);
296
297 let handler_clone = handler.clone();
298 tokio::spawn(async move {
299 let io = TokioIo::new(stream);
300 let service = service_fn(move |req| handle_request(req, handler_clone.clone()));
301
302 if let Err(err) = http1::Builder::new().serve_connection(io, service).await {
303 let err_str = err.to_string();
305 if err_str.contains("connection closed before message completed") {
306 debug!("Client disconnected (normal): {}", err);
307 } else {
308 error!("Error serving connection: {}", err);
309 }
310 }
311 });
312 }
313 }
314
315 async fn start_session_cleanup(&self) {
317 let storage = Arc::clone(&self.session_storage);
318 let session_expiry_minutes = self.config.session_expiry_minutes;
319 tokio::spawn(async move {
320 let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(60));
321 loop {
322 interval.tick().await;
323
324 let expire_time = std::time::SystemTime::now()
325 - std::time::Duration::from_secs(session_expiry_minutes * 60);
326 match storage.expire_sessions(expire_time).await {
327 Ok(expired) => {
328 if !expired.is_empty() {
329 info!("Expired {} sessions", expired.len());
330 for session_id in expired {
331 debug!("Expired session: {}", session_id);
332 }
333 }
334 }
335 Err(err) => {
336 error!("Session cleanup error: {}", err);
337 }
338 }
339 }
340 });
341 }
342
343 pub async fn get_stats(&self) -> ServerStats {
345 let session_count = self.session_storage.session_count().await.unwrap_or(0);
346 let event_count = self.session_storage.event_count().await.unwrap_or(0);
347
348 ServerStats {
349 sessions: session_count,
350 events: event_count,
351 storage_type: self.session_storage.backend_name().to_string(),
352 }
353 }
354}
355
356#[derive(Clone)]
359struct McpRequestHandler {
360 session_handler: SessionMcpHandler,
361 streamable_handler: StreamableHttpHandler,
362}
363
364async fn handle_request(
365 req: Request<hyper::body::Incoming>,
366 handler: McpRequestHandler,
367) -> std::result::Result<
368 Response<http_body_util::combinators::UnsyncBoxBody<Bytes, hyper::Error>>,
369 hyper::Error,
370> {
371 let method = req.method().clone();
372 let uri = req.uri().clone();
373 let path = uri.path();
374
375 debug!("Handling {} {}", method, path);
376
377 debug!(
379 "HTTP server dispatch: path={}, expected_mcp_path={}",
380 path, handler.session_handler.config.mcp_path
381 );
382 let response = if path == handler.session_handler.config.mcp_path {
383 debug!("Path match: Request routed to MCP handler");
384 let protocol_version_str = req
386 .headers()
387 .get("MCP-Protocol-Version")
388 .and_then(|h| h.to_str().ok())
389 .unwrap_or("2025-06-18"); debug!("Protocol version: {}", protocol_version_str);
391
392 let protocol_version = McpProtocolVersion::parse_version(protocol_version_str)
393 .unwrap_or(McpProtocolVersion::V2025_06_18);
394
395 debug!(
396 "MCP request: protocol_version={}, method={}",
397 protocol_version.as_str(),
398 method
399 );
400
401 debug!(
403 "Routing decision: protocol_version={}, method={}, supports_streamable={}, handler={}",
404 protocol_version.as_str(),
405 method,
406 protocol_version.supports_streamable_http(),
407 if protocol_version.supports_streamable_http() {
408 "StreamableHttpHandler"
409 } else {
410 "SessionMcpHandler"
411 }
412 );
413
414 if protocol_version.supports_streamable_http() {
415 debug!(
417 "Calling streamable handler for protocol {}",
418 protocol_version.as_str()
419 );
420 let streamable_response = handler.streamable_handler.handle_request(req).await;
421 debug!("Streamable handler completed");
422 Ok(streamable_response)
423 } else {
424 match handler.session_handler.handle_mcp_request(req).await {
426 Ok(mcp_response) => Ok(mcp_response),
427 Err(err) => {
428 error!("Request handling error: {}", err);
429 Ok(Response::builder()
430 .status(hyper::StatusCode::INTERNAL_SERVER_ERROR)
431 .body(
432 Full::new(Bytes::from(format!("Internal Server Error: {}", err)))
433 .map_err(|never| match never {})
434 .boxed_unsync(),
435 )
436 .unwrap())
437 }
438 }
439 }
440 } else {
441 Ok(Response::builder()
443 .status(hyper::StatusCode::NOT_FOUND)
444 .body(
445 Full::new(Bytes::from("Not Found"))
446 .map_err(|never| match never {})
447 .boxed_unsync(),
448 )
449 .unwrap())
450 };
451
452 match response {
454 Ok(mut final_response) => {
455 if handler.session_handler.config.enable_cors {
456 CorsLayer::apply_cors_headers(final_response.headers_mut());
457 }
458 Ok(final_response)
459 }
460 Err(e) => Err(e),
461 }
462}
463
464#[derive(Debug, Clone)]
466pub struct ServerStats {
467 pub sessions: usize,
468 pub events: usize,
469 pub storage_type: String,
470}
471
472#[cfg(test)]
473mod tests {
474 use super::*;
475 use std::net::{IpAddr, Ipv4Addr};
476 use std::sync::Arc;
477 use turul_mcp_session_storage::InMemorySessionStorage;
478
479 #[test]
480 fn test_server_config_default() {
481 let config = ServerConfig::default();
482 assert_eq!(config.mcp_path, "/mcp");
483 assert!(config.enable_cors);
484 assert_eq!(config.max_body_size, 1024 * 1024);
485 }
486
487 #[test]
488 fn test_builder() {
489 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 3000);
490 let session_storage = Arc::new(InMemorySessionStorage::new());
491 let server = HttpMcpServer::builder_with_storage(session_storage)
492 .bind_address(addr)
493 .mcp_path("/api/mcp")
494 .cors(false)
495 .max_body_size(2048)
496 .build();
497
498 assert_eq!(server.config.bind_address, addr);
499 assert_eq!(server.config.mcp_path, "/api/mcp");
500 assert!(!server.config.enable_cors);
501 assert_eq!(server.config.max_body_size, 2048);
502 }
503
504 #[tokio::test]
505 async fn test_server_stats() {
506 let session_storage = Arc::new(InMemorySessionStorage::new());
507 let server = HttpMcpServer::builder_with_storage(session_storage).build();
508
509 let stats = server.get_stats().await;
510 assert_eq!(stats.sessions, 0);
511 assert_eq!(stats.events, 0);
512 assert_eq!(stats.storage_type, "InMemory");
513 }
514}