turul_http_mcp_server/
server.rs

1//! HTTP MCP Server with SessionStorage integration
2//!
3//! This server provides MCP 2025-06-18 compliant HTTP transport with
4//! pluggable session storage backends and proper SSE resumability.
5
6use bytes::Bytes;
7use http_body_util::{BodyExt, Full};
8use hyper::server::conn::http1;
9use hyper::service::service_fn;
10use hyper::{Request, Response};
11use hyper_util::rt::TokioIo;
12use std::net::SocketAddr;
13use std::sync::Arc;
14use tokio::net::TcpListener;
15use tracing::{debug, error, info};
16
17use turul_mcp_json_rpc_server::{JsonRpcDispatcher, JsonRpcHandler};
18use turul_mcp_protocol::McpError;
19use turul_mcp_session_storage::InMemorySessionStorage;
20
21use crate::streamable_http::{McpProtocolVersion, StreamableHttpHandler};
22use crate::{CorsLayer, Result, SessionMcpHandler, StreamConfig, StreamManager};
23
24/// Configuration for the HTTP MCP server
25#[derive(Debug, Clone)]
26pub struct ServerConfig {
27    /// Address to bind to
28    pub bind_address: SocketAddr,
29    /// Path for MCP endpoint
30    pub mcp_path: String,
31    /// Enable CORS
32    pub enable_cors: bool,
33    /// Maximum request body size
34    pub max_body_size: usize,
35    /// Enable GET SSE support (persistent event streams)
36    pub enable_get_sse: bool,
37    /// Enable POST SSE support (streaming tool call responses) - disabled by default for compatibility
38    pub enable_post_sse: bool,
39    /// Session expiry time in minutes (default: 30 minutes)
40    pub session_expiry_minutes: u64,
41}
42
43impl Default for ServerConfig {
44    fn default() -> Self {
45        Self {
46            bind_address: "127.0.0.1:8000".parse().unwrap(),
47            mcp_path: "/mcp".to_string(),
48            enable_cors: true,
49            max_body_size: 1024 * 1024,            // 1MB
50            enable_get_sse: cfg!(feature = "sse"), // GET SSE enabled if "sse" feature is compiled
51            enable_post_sse: false, // Disabled by default for better client compatibility (e.g., MCP Inspector)
52            session_expiry_minutes: 30, // 30 minutes default
53        }
54    }
55}
56
57/// Builder for HTTP MCP server with pluggable storage
58pub struct HttpMcpServerBuilder {
59    config: ServerConfig,
60    dispatcher: JsonRpcDispatcher<McpError>,
61    session_storage: Option<Arc<turul_mcp_session_storage::BoxedSessionStorage>>,
62    stream_config: StreamConfig,
63    server_capabilities: Option<turul_mcp_protocol::ServerCapabilities>,
64    middleware_stack: Arc<crate::middleware::MiddlewareStack>,
65}
66
67impl HttpMcpServerBuilder {
68    /// Create a new builder with in-memory storage (zero-configuration)
69    pub fn new() -> Self {
70        Self {
71            config: ServerConfig::default(),
72            dispatcher: JsonRpcDispatcher::<McpError>::new(),
73            session_storage: Some(Arc::new(InMemorySessionStorage::new())),
74            stream_config: StreamConfig::default(),
75            server_capabilities: None,
76            middleware_stack: Arc::new(crate::middleware::MiddlewareStack::new()),
77        }
78    }
79}
80
81impl HttpMcpServerBuilder {
82    /// Create a new builder with specific session storage
83    pub fn with_storage(
84        session_storage: Arc<turul_mcp_session_storage::BoxedSessionStorage>,
85    ) -> Self {
86        Self {
87            config: ServerConfig::default(),
88            dispatcher: JsonRpcDispatcher::<McpError>::new(),
89            session_storage: Some(session_storage),
90            stream_config: StreamConfig::default(),
91            server_capabilities: None,
92            middleware_stack: Arc::new(crate::middleware::MiddlewareStack::new()),
93        }
94    }
95
96    /// Set the middleware stack (for HTTP transport middleware support)
97    pub fn with_middleware_stack(
98        mut self,
99        middleware_stack: Arc<crate::middleware::MiddlewareStack>,
100    ) -> Self {
101        self.middleware_stack = middleware_stack;
102        self
103    }
104
105    /// Set the bind address
106    pub fn bind_address(mut self, addr: SocketAddr) -> Self {
107        self.config.bind_address = addr;
108        self
109    }
110
111    /// Set the MCP endpoint path
112    pub fn mcp_path(mut self, path: impl Into<String>) -> Self {
113        self.config.mcp_path = path.into();
114        self
115    }
116
117    /// Enable or disable CORS
118    pub fn cors(mut self, enable: bool) -> Self {
119        self.config.enable_cors = enable;
120        self
121    }
122
123    /// Set maximum request body size
124    pub fn max_body_size(mut self, size: usize) -> Self {
125        self.config.max_body_size = size;
126        self
127    }
128
129    /// Enable or disable GET SSE for persistent event streams
130    pub fn get_sse(mut self, enable: bool) -> Self {
131        self.config.enable_get_sse = enable;
132        self
133    }
134
135    /// Enable or disable POST SSE for streaming tool call responses (disabled by default for compatibility)
136    pub fn post_sse(mut self, enable: bool) -> Self {
137        self.config.enable_post_sse = enable;
138        self
139    }
140
141    /// Enable or disable both GET and POST SSE (convenience method)
142    pub fn sse(mut self, enable: bool) -> Self {
143        self.config.enable_get_sse = enable;
144        self.config.enable_post_sse = enable;
145        self
146    }
147
148    /// Set session expiry time in minutes
149    pub fn session_expiry_minutes(mut self, minutes: u64) -> Self {
150        self.config.session_expiry_minutes = minutes;
151        self
152    }
153
154    /// Configure SSE streaming settings
155    pub fn stream_config(mut self, config: StreamConfig) -> Self {
156        self.stream_config = config;
157        self
158    }
159
160    /// Register a JSON-RPC handler for specific methods
161    pub fn register_handler<H>(mut self, methods: Vec<String>, handler: H) -> Self
162    where
163        H: JsonRpcHandler<Error = McpError> + 'static,
164    {
165        self.dispatcher.register_methods(methods, handler);
166        self
167    }
168
169    /// Register a default handler for unhandled methods
170    pub fn default_handler<H>(mut self, handler: H) -> Self
171    where
172        H: JsonRpcHandler<Error = McpError> + 'static,
173    {
174        self.dispatcher.set_default_handler(handler);
175        self
176    }
177
178    /// Set server capabilities
179    pub fn server_capabilities(
180        mut self,
181        capabilities: turul_mcp_protocol::ServerCapabilities,
182    ) -> Self {
183        self.server_capabilities = Some(capabilities);
184        self
185    }
186
187    /// Build the HTTP MCP server
188    pub fn build(self) -> HttpMcpServer {
189        let session_storage = self
190            .session_storage
191            .expect("Session storage must be provided");
192
193        // ✅ CORRECTED ARCHITECTURE: Create single shared StreamManager instance
194        let stream_manager = Arc::new(StreamManager::with_config(
195            Arc::clone(&session_storage),
196            self.stream_config.clone(),
197        ));
198
199        // Create shared dispatcher Arc
200        let dispatcher = Arc::new(self.dispatcher);
201
202        // Use middleware stack from builder
203        let middleware_stack = self.middleware_stack;
204
205        // Create StreamableHttpHandler for MCP 2025-06-18 support
206        let streamable_handler = StreamableHttpHandler::new(
207            Arc::new(self.config.clone()),
208            Arc::clone(&dispatcher),
209            Arc::clone(&session_storage),
210            Arc::clone(&stream_manager),
211            self.server_capabilities.unwrap_or_default(),
212            Arc::clone(&middleware_stack),
213        );
214
215        HttpMcpServer {
216            config: self.config,
217            dispatcher,
218            session_storage,
219            stream_config: self.stream_config,
220            stream_manager,
221            streamable_handler,
222        }
223    }
224}
225
226impl Default for HttpMcpServerBuilder {
227    fn default() -> Self {
228        Self::new()
229    }
230}
231
232/// HTTP MCP Server with SessionStorage integration
233#[derive(Clone)]
234pub struct HttpMcpServer {
235    config: ServerConfig,
236    dispatcher: Arc<JsonRpcDispatcher<McpError>>,
237    session_storage: Arc<turul_mcp_session_storage::BoxedSessionStorage>,
238    stream_config: StreamConfig,
239    // ✅ CORRECTED ARCHITECTURE: Single shared StreamManager instance
240    stream_manager: Arc<StreamManager>,
241    // StreamableHttpHandler for MCP 2025-06-18 clients
242    streamable_handler: StreamableHttpHandler,
243}
244
245impl HttpMcpServer {
246    /// Create a new builder with default in-memory storage
247    pub fn builder() -> HttpMcpServerBuilder {
248        HttpMcpServerBuilder::new()
249    }
250}
251
252impl HttpMcpServer {
253    /// Create a new builder with specific session storage
254    pub fn builder_with_storage(
255        session_storage: Arc<turul_mcp_session_storage::BoxedSessionStorage>,
256    ) -> HttpMcpServerBuilder {
257        HttpMcpServerBuilder::with_storage(session_storage)
258    }
259
260    /// Get the shared StreamManager instance for event forwarding bridge
261    /// Returns reference to the same StreamManager used by HTTP server
262    pub fn get_stream_manager(&self) -> Arc<crate::StreamManager> {
263        Arc::clone(&self.stream_manager)
264    }
265
266    /// Run the server with session management
267    pub async fn run(&self) -> Result<()> {
268        // Start session cleanup task
269        self.start_session_cleanup().await;
270
271        let listener = TcpListener::bind(&self.config.bind_address).await?;
272        info!("HTTP MCP server listening on {}", self.config.bind_address);
273        info!("MCP endpoint available at: {}", self.config.mcp_path);
274        info!("Session storage: {}", self.session_storage.backend_name());
275
276        // ✅ CORRECTED ARCHITECTURE: Create single SessionMcpHandler instance outside the loop
277        // Use the same middleware stack as streamable_handler (both handlers share it)
278        let session_handler = SessionMcpHandler::with_shared_stream_manager(
279            self.config.clone(),
280            Arc::clone(&self.dispatcher),
281            Arc::clone(&self.session_storage),
282            self.stream_config.clone(),
283            Arc::clone(&self.stream_manager),
284            Arc::clone(&self.streamable_handler.middleware_stack),
285        );
286
287        // Create combined handler that routes based on protocol version
288        let handler = McpRequestHandler {
289            session_handler,
290            streamable_handler: self.streamable_handler.clone(),
291        };
292
293        loop {
294            let (stream, peer_addr) = listener.accept().await?;
295            debug!("New connection from {}", peer_addr);
296
297            let handler_clone = handler.clone();
298            tokio::spawn(async move {
299                let io = TokioIo::new(stream);
300                let service = service_fn(move |req| handle_request(req, handler_clone.clone()));
301
302                if let Err(err) = http1::Builder::new().serve_connection(io, service).await {
303                    // Filter out common client disconnection errors that aren't actual problems
304                    let err_str = err.to_string();
305                    if err_str.contains("connection closed before message completed") {
306                        debug!("Client disconnected (normal): {}", err);
307                    } else {
308                        error!("Error serving connection: {}", err);
309                    }
310                }
311            });
312        }
313    }
314
315    /// Start background session cleanup task
316    async fn start_session_cleanup(&self) {
317        let storage = Arc::clone(&self.session_storage);
318        let session_expiry_minutes = self.config.session_expiry_minutes;
319        tokio::spawn(async move {
320            let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(60));
321            loop {
322                interval.tick().await;
323
324                let expire_time = std::time::SystemTime::now()
325                    - std::time::Duration::from_secs(session_expiry_minutes * 60);
326                match storage.expire_sessions(expire_time).await {
327                    Ok(expired) => {
328                        if !expired.is_empty() {
329                            info!("Expired {} sessions", expired.len());
330                            for session_id in expired {
331                                debug!("Expired session: {}", session_id);
332                            }
333                        }
334                    }
335                    Err(err) => {
336                        error!("Session cleanup error: {}", err);
337                    }
338                }
339            }
340        });
341    }
342
343    /// Get server statistics
344    pub async fn get_stats(&self) -> ServerStats {
345        let session_count = self.session_storage.session_count().await.unwrap_or(0);
346        let event_count = self.session_storage.event_count().await.unwrap_or(0);
347
348        ServerStats {
349            sessions: session_count,
350            events: event_count,
351            storage_type: self.session_storage.backend_name().to_string(),
352        }
353    }
354}
355
356/// Handle requests with MCP 2025-06-18 compliance
357/// Combined handler that routes based on MCP protocol version
358#[derive(Clone)]
359struct McpRequestHandler {
360    session_handler: SessionMcpHandler,
361    streamable_handler: StreamableHttpHandler,
362}
363
364async fn handle_request(
365    req: Request<hyper::body::Incoming>,
366    handler: McpRequestHandler,
367) -> std::result::Result<
368    Response<http_body_util::combinators::UnsyncBoxBody<Bytes, hyper::Error>>,
369    hyper::Error,
370> {
371    let method = req.method().clone();
372    let uri = req.uri().clone();
373    let path = uri.path();
374
375    debug!("Handling {} {}", method, path);
376
377    // Route the request
378    debug!(
379        "HTTP server dispatch: path={}, expected_mcp_path={}",
380        path, handler.session_handler.config.mcp_path
381    );
382    let response = if path == handler.session_handler.config.mcp_path {
383        debug!("Path match: Request routed to MCP handler");
384        // Extract MCP protocol version from headers
385        let protocol_version_str = req
386            .headers()
387            .get("MCP-Protocol-Version")
388            .and_then(|h| h.to_str().ok())
389            .unwrap_or("2025-06-18"); // Default to latest version (we only support the latest protocol)
390        debug!("Protocol version: {}", protocol_version_str);
391
392        let protocol_version = McpProtocolVersion::parse_version(protocol_version_str)
393            .unwrap_or(McpProtocolVersion::V2025_06_18);
394
395        debug!(
396            "MCP request: protocol_version={}, method={}",
397            protocol_version.as_str(),
398            method
399        );
400
401        // Route based on protocol version - MCP 2025-06-18 uses Streamable HTTP, older versions use SessionMcpHandler
402        debug!(
403            "Routing decision: protocol_version={}, method={}, supports_streamable={}, handler={}",
404            protocol_version.as_str(),
405            method,
406            protocol_version.supports_streamable_http(),
407            if protocol_version.supports_streamable_http() {
408                "StreamableHttpHandler"
409            } else {
410                "SessionMcpHandler"
411            }
412        );
413
414        if protocol_version.supports_streamable_http() {
415            // Use StreamableHttpHandler for MCP 2025-06-18 clients
416            debug!(
417                "Calling streamable handler for protocol {}",
418                protocol_version.as_str()
419            );
420            let streamable_response = handler.streamable_handler.handle_request(req).await;
421            debug!("Streamable handler completed");
422            Ok(streamable_response)
423        } else {
424            // Use SessionMcpHandler for legacy clients (MCP 2024-11-05 and earlier)
425            match handler.session_handler.handle_mcp_request(req).await {
426                Ok(mcp_response) => Ok(mcp_response),
427                Err(err) => {
428                    error!("Request handling error: {}", err);
429                    Ok(Response::builder()
430                        .status(hyper::StatusCode::INTERNAL_SERVER_ERROR)
431                        .body(
432                            Full::new(Bytes::from(format!("Internal Server Error: {}", err)))
433                                .map_err(|never| match never {})
434                                .boxed_unsync(),
435                        )
436                        .unwrap())
437                }
438            }
439        }
440    } else {
441        // 404 for other paths
442        Ok(Response::builder()
443            .status(hyper::StatusCode::NOT_FOUND)
444            .body(
445                Full::new(Bytes::from("Not Found"))
446                    .map_err(|never| match never {})
447                    .boxed_unsync(),
448            )
449            .unwrap())
450    };
451
452    // Apply CORS if enabled
453    match response {
454        Ok(mut final_response) => {
455            if handler.session_handler.config.enable_cors {
456                CorsLayer::apply_cors_headers(final_response.headers_mut());
457            }
458            Ok(final_response)
459        }
460        Err(e) => Err(e),
461    }
462}
463
464/// Server statistics
465#[derive(Debug, Clone)]
466pub struct ServerStats {
467    pub sessions: usize,
468    pub events: usize,
469    pub storage_type: String,
470}
471
472#[cfg(test)]
473mod tests {
474    use super::*;
475    use std::net::{IpAddr, Ipv4Addr};
476    use std::sync::Arc;
477    use turul_mcp_session_storage::InMemorySessionStorage;
478
479    #[test]
480    fn test_server_config_default() {
481        let config = ServerConfig::default();
482        assert_eq!(config.mcp_path, "/mcp");
483        assert!(config.enable_cors);
484        assert_eq!(config.max_body_size, 1024 * 1024);
485    }
486
487    #[test]
488    fn test_builder() {
489        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 3000);
490        let session_storage = Arc::new(InMemorySessionStorage::new());
491        let server = HttpMcpServer::builder_with_storage(session_storage)
492            .bind_address(addr)
493            .mcp_path("/api/mcp")
494            .cors(false)
495            .max_body_size(2048)
496            .build();
497
498        assert_eq!(server.config.bind_address, addr);
499        assert_eq!(server.config.mcp_path, "/api/mcp");
500        assert!(!server.config.enable_cors);
501        assert_eq!(server.config.max_body_size, 2048);
502    }
503
504    #[tokio::test]
505    async fn test_server_stats() {
506        let session_storage = Arc::new(InMemorySessionStorage::new());
507        let server = HttpMcpServer::builder_with_storage(session_storage).build();
508
509        let stats = server.get_stats().await;
510        assert_eq!(stats.sessions, 0);
511        assert_eq!(stats.events, 0);
512        assert_eq!(stats.storage_type, "InMemory");
513    }
514}