1use bytes::Bytes;
7use http_body_util::{BodyExt, Full};
8use hyper::server::conn::http1;
9use hyper::service::service_fn;
10use hyper::{Request, Response};
11use hyper_util::rt::TokioIo;
12use std::net::SocketAddr;
13use std::sync::Arc;
14use tokio::net::TcpListener;
15use tracing::{debug, error, info};
16
17use turul_mcp_json_rpc_server::{JsonRpcDispatcher, JsonRpcHandler};
18use turul_mcp_protocol::McpError;
19use turul_mcp_session_storage::InMemorySessionStorage;
20
21use crate::streamable_http::{McpProtocolVersion, StreamableHttpHandler};
22use crate::{CorsLayer, Result, SessionMcpHandler, StreamConfig, StreamManager};
23
24#[derive(Debug, Clone)]
26pub struct ServerConfig {
27 pub bind_address: SocketAddr,
29 pub mcp_path: String,
31 pub enable_cors: bool,
33 pub max_body_size: usize,
35 pub enable_get_sse: bool,
37 pub enable_post_sse: bool,
39 pub session_expiry_minutes: u64,
41}
42
43impl Default for ServerConfig {
44 fn default() -> Self {
45 Self {
46 bind_address: "127.0.0.1:8000".parse().unwrap(),
47 mcp_path: "/mcp".to_string(),
48 enable_cors: true,
49 max_body_size: 1024 * 1024, enable_get_sse: cfg!(feature = "sse"), enable_post_sse: false, session_expiry_minutes: 30, }
54 }
55}
56
57pub struct HttpMcpServerBuilder {
59 config: ServerConfig,
60 dispatcher: JsonRpcDispatcher<McpError>,
61 session_storage: Option<Arc<turul_mcp_session_storage::BoxedSessionStorage>>,
62 stream_config: StreamConfig,
63 server_capabilities: Option<turul_mcp_protocol::ServerCapabilities>,
64}
65
66impl HttpMcpServerBuilder {
67 pub fn new() -> Self {
69 Self {
70 config: ServerConfig::default(),
71 dispatcher: JsonRpcDispatcher::<McpError>::new(),
72 session_storage: Some(Arc::new(InMemorySessionStorage::new())),
73 stream_config: StreamConfig::default(),
74 server_capabilities: None,
75 }
76 }
77}
78
79impl HttpMcpServerBuilder {
80 pub fn with_storage(
82 session_storage: Arc<turul_mcp_session_storage::BoxedSessionStorage>,
83 ) -> Self {
84 Self {
85 config: ServerConfig::default(),
86 dispatcher: JsonRpcDispatcher::<McpError>::new(),
87 session_storage: Some(session_storage),
88 stream_config: StreamConfig::default(),
89 server_capabilities: None,
90 }
91 }
92
93 pub fn bind_address(mut self, addr: SocketAddr) -> Self {
95 self.config.bind_address = addr;
96 self
97 }
98
99 pub fn mcp_path(mut self, path: impl Into<String>) -> Self {
101 self.config.mcp_path = path.into();
102 self
103 }
104
105 pub fn cors(mut self, enable: bool) -> Self {
107 self.config.enable_cors = enable;
108 self
109 }
110
111 pub fn max_body_size(mut self, size: usize) -> Self {
113 self.config.max_body_size = size;
114 self
115 }
116
117 pub fn get_sse(mut self, enable: bool) -> Self {
119 self.config.enable_get_sse = enable;
120 self
121 }
122
123 pub fn post_sse(mut self, enable: bool) -> Self {
125 self.config.enable_post_sse = enable;
126 self
127 }
128
129 pub fn sse(mut self, enable: bool) -> Self {
131 self.config.enable_get_sse = enable;
132 self.config.enable_post_sse = enable;
133 self
134 }
135
136 pub fn session_expiry_minutes(mut self, minutes: u64) -> Self {
138 self.config.session_expiry_minutes = minutes;
139 self
140 }
141
142 pub fn stream_config(mut self, config: StreamConfig) -> Self {
144 self.stream_config = config;
145 self
146 }
147
148 pub fn register_handler<H>(mut self, methods: Vec<String>, handler: H) -> Self
150 where
151 H: JsonRpcHandler<Error = McpError> + 'static,
152 {
153 self.dispatcher.register_methods(methods, handler);
154 self
155 }
156
157 pub fn default_handler<H>(mut self, handler: H) -> Self
159 where
160 H: JsonRpcHandler<Error = McpError> + 'static,
161 {
162 self.dispatcher.set_default_handler(handler);
163 self
164 }
165
166 pub fn server_capabilities(
168 mut self,
169 capabilities: turul_mcp_protocol::ServerCapabilities,
170 ) -> Self {
171 self.server_capabilities = Some(capabilities);
172 self
173 }
174
175 pub fn build(self) -> HttpMcpServer {
177 let session_storage = self
178 .session_storage
179 .expect("Session storage must be provided");
180
181 let stream_manager = Arc::new(StreamManager::with_config(
183 Arc::clone(&session_storage),
184 self.stream_config.clone(),
185 ));
186
187 let dispatcher = Arc::new(self.dispatcher);
189
190 let streamable_handler = StreamableHttpHandler::new(
192 Arc::new(self.config.clone()),
193 Arc::clone(&dispatcher),
194 Arc::clone(&session_storage),
195 Arc::clone(&stream_manager),
196 self.server_capabilities
197 .unwrap_or_else(|| turul_mcp_protocol::ServerCapabilities::default()),
198 );
199
200 HttpMcpServer {
201 config: self.config,
202 dispatcher,
203 session_storage,
204 stream_config: self.stream_config,
205 stream_manager,
206 streamable_handler,
207 }
208 }
209}
210
211impl Default for HttpMcpServerBuilder {
212 fn default() -> Self {
213 Self::new()
214 }
215}
216
217#[derive(Clone)]
219pub struct HttpMcpServer {
220 config: ServerConfig,
221 dispatcher: Arc<JsonRpcDispatcher<McpError>>,
222 session_storage: Arc<turul_mcp_session_storage::BoxedSessionStorage>,
223 stream_config: StreamConfig,
224 stream_manager: Arc<StreamManager>,
226 streamable_handler: StreamableHttpHandler,
228}
229
230impl HttpMcpServer {
231 pub fn builder() -> HttpMcpServerBuilder {
233 HttpMcpServerBuilder::new()
234 }
235}
236
237impl HttpMcpServer {
238 pub fn builder_with_storage(
240 session_storage: Arc<turul_mcp_session_storage::BoxedSessionStorage>,
241 ) -> HttpMcpServerBuilder {
242 HttpMcpServerBuilder::with_storage(session_storage)
243 }
244
245 pub fn get_stream_manager(&self) -> Arc<crate::StreamManager> {
248 Arc::clone(&self.stream_manager)
249 }
250
251 pub async fn run(&self) -> Result<()> {
253 self.start_session_cleanup().await;
255
256 let listener = TcpListener::bind(&self.config.bind_address).await?;
257 info!("HTTP MCP server listening on {}", self.config.bind_address);
258 info!("MCP endpoint available at: {}", self.config.mcp_path);
259 info!("Session storage: {}", self.session_storage.backend_name());
260
261 let session_handler = SessionMcpHandler::with_shared_stream_manager(
263 self.config.clone(),
264 Arc::clone(&self.dispatcher),
265 Arc::clone(&self.session_storage),
266 self.stream_config.clone(),
267 Arc::clone(&self.stream_manager),
268 );
269
270 let handler = McpRequestHandler {
272 session_handler,
273 streamable_handler: self.streamable_handler.clone(),
274 };
275
276 loop {
277 let (stream, peer_addr) = listener.accept().await?;
278 debug!("New connection from {}", peer_addr);
279
280 let handler_clone = handler.clone();
281 tokio::spawn(async move {
282 let io = TokioIo::new(stream);
283 let service = service_fn(move |req| handle_request(req, handler_clone.clone()));
284
285 if let Err(err) = http1::Builder::new().serve_connection(io, service).await {
286 let err_str = err.to_string();
288 if err_str.contains("connection closed before message completed") {
289 debug!("Client disconnected (normal): {}", err);
290 } else {
291 error!("Error serving connection: {}", err);
292 }
293 }
294 });
295 }
296 }
297
298 async fn start_session_cleanup(&self) {
300 let storage = Arc::clone(&self.session_storage);
301 let session_expiry_minutes = self.config.session_expiry_minutes;
302 tokio::spawn(async move {
303 let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(60));
304 loop {
305 interval.tick().await;
306
307 let expire_time = std::time::SystemTime::now()
308 - std::time::Duration::from_secs(session_expiry_minutes * 60);
309 match storage.expire_sessions(expire_time).await {
310 Ok(expired) => {
311 if !expired.is_empty() {
312 info!("Expired {} sessions", expired.len());
313 for session_id in expired {
314 debug!("Expired session: {}", session_id);
315 }
316 }
317 }
318 Err(err) => {
319 error!("Session cleanup error: {}", err);
320 }
321 }
322 }
323 });
324 }
325
326 pub async fn get_stats(&self) -> ServerStats {
328 let session_count = self.session_storage.session_count().await.unwrap_or(0);
329 let event_count = self.session_storage.event_count().await.unwrap_or(0);
330
331 ServerStats {
332 sessions: session_count,
333 events: event_count,
334 storage_type: self.session_storage.backend_name().to_string(),
335 }
336 }
337}
338
339#[derive(Clone)]
342struct McpRequestHandler {
343 session_handler: SessionMcpHandler,
344 streamable_handler: StreamableHttpHandler,
345}
346
347async fn handle_request(
348 req: Request<hyper::body::Incoming>,
349 handler: McpRequestHandler,
350) -> std::result::Result<
351 Response<http_body_util::combinators::UnsyncBoxBody<Bytes, hyper::Error>>,
352 hyper::Error,
353> {
354 let method = req.method().clone();
355 let uri = req.uri().clone();
356 let path = uri.path();
357
358 debug!("Handling {} {}", method, path);
359
360 debug!(
362 "HTTP server dispatch: path={}, expected_mcp_path={}",
363 path, handler.session_handler.config.mcp_path
364 );
365 let response = if path == handler.session_handler.config.mcp_path {
366 debug!("Path match: Request routed to MCP handler");
367 let protocol_version_str = req
369 .headers()
370 .get("MCP-Protocol-Version")
371 .and_then(|h| h.to_str().ok())
372 .unwrap_or("2025-06-18"); debug!("Protocol version: {}", protocol_version_str);
374
375 let protocol_version = McpProtocolVersion::parse_version(protocol_version_str)
376 .unwrap_or(McpProtocolVersion::V2025_06_18);
377
378 debug!(
379 "MCP request: protocol_version={}, method={}",
380 protocol_version.as_str(),
381 method
382 );
383
384 debug!(
386 "Routing decision: protocol_version={}, method={}, supports_streamable={}, handler={}",
387 protocol_version.as_str(),
388 method,
389 protocol_version.supports_streamable_http(),
390 if protocol_version.supports_streamable_http() {
391 "StreamableHttpHandler"
392 } else {
393 "SessionMcpHandler"
394 }
395 );
396
397 if protocol_version.supports_streamable_http() {
398 debug!(
400 "Calling streamable handler for protocol {}",
401 protocol_version.as_str()
402 );
403 let streamable_response = handler.streamable_handler.handle_request(req).await;
404 debug!("Streamable handler completed");
405 Ok(streamable_response)
406 } else {
407 match handler.session_handler.handle_mcp_request(req).await {
409 Ok(mcp_response) => Ok(mcp_response),
410 Err(err) => {
411 error!("Request handling error: {}", err);
412 Ok(Response::builder()
413 .status(hyper::StatusCode::INTERNAL_SERVER_ERROR)
414 .body(
415 Full::new(Bytes::from(format!("Internal Server Error: {}", err)))
416 .map_err(|never| match never {})
417 .boxed_unsync(),
418 )
419 .unwrap())
420 }
421 }
422 }
423 } else {
424 Ok(Response::builder()
426 .status(hyper::StatusCode::NOT_FOUND)
427 .body(
428 Full::new(Bytes::from("Not Found"))
429 .map_err(|never| match never {})
430 .boxed_unsync(),
431 )
432 .unwrap())
433 };
434
435 match response {
437 Ok(mut final_response) => {
438 if handler.session_handler.config.enable_cors {
439 CorsLayer::apply_cors_headers(final_response.headers_mut());
440 }
441 Ok(final_response)
442 }
443 Err(e) => Err(e),
444 }
445}
446
447#[derive(Debug, Clone)]
449pub struct ServerStats {
450 pub sessions: usize,
451 pub events: usize,
452 pub storage_type: String,
453}
454
455#[cfg(test)]
456mod tests {
457 use super::*;
458 use std::net::{IpAddr, Ipv4Addr};
459 use std::sync::Arc;
460 use turul_mcp_session_storage::InMemorySessionStorage;
461
462 #[test]
463 fn test_server_config_default() {
464 let config = ServerConfig::default();
465 assert_eq!(config.mcp_path, "/mcp");
466 assert!(config.enable_cors);
467 assert_eq!(config.max_body_size, 1024 * 1024);
468 }
469
470 #[test]
471 fn test_builder() {
472 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 3000);
473 let session_storage = Arc::new(InMemorySessionStorage::new());
474 let server = HttpMcpServer::builder_with_storage(session_storage)
475 .bind_address(addr)
476 .mcp_path("/api/mcp")
477 .cors(false)
478 .max_body_size(2048)
479 .build();
480
481 assert_eq!(server.config.bind_address, addr);
482 assert_eq!(server.config.mcp_path, "/api/mcp");
483 assert!(!server.config.enable_cors);
484 assert_eq!(server.config.max_body_size, 2048);
485 }
486
487 #[tokio::test]
488 async fn test_server_stats() {
489 let session_storage = Arc::new(InMemorySessionStorage::new());
490 let server = HttpMcpServer::builder_with_storage(session_storage).build();
491
492 let stats = server.get_stats().await;
493 assert_eq!(stats.sessions, 0);
494 assert_eq!(stats.events, 0);
495 assert_eq!(stats.storage_type, "InMemory");
496 }
497}