Skip to main content

turul_http_mcp_server/
server.rs

1//! HTTP MCP Server with SessionStorage integration
2//!
3//! This server provides MCP 2025-11-25 compliant HTTP transport with
4//! pluggable session storage backends and proper SSE resumability.
5
6use bytes::Bytes;
7use http_body_util::{BodyExt, Full};
8use hyper::server::conn::http1;
9use hyper::service::service_fn;
10use hyper::{Request, Response};
11use hyper_util::rt::TokioIo;
12use std::net::SocketAddr;
13use std::sync::Arc;
14use tokio::net::TcpListener;
15use tracing::{debug, error, info, warn};
16
17use turul_mcp_json_rpc_server::{JsonRpcDispatcher, JsonRpcHandler};
18use turul_mcp_protocol::McpError;
19use turul_mcp_session_storage::InMemorySessionStorage;
20
21use crate::streamable_http::{McpProtocolVersion, StreamableHttpHandler};
22use crate::{CorsLayer, Result, SessionMcpHandler, StreamConfig, StreamManager};
23
24/// Configuration for the HTTP MCP server
25#[derive(Debug, Clone)]
26pub struct ServerConfig {
27    /// Address to bind to
28    pub bind_address: SocketAddr,
29    /// Path for MCP endpoint
30    pub mcp_path: String,
31    /// Enable CORS
32    pub enable_cors: bool,
33    /// Maximum request body size
34    pub max_body_size: usize,
35    /// Enable GET SSE support (persistent event streams)
36    pub enable_get_sse: bool,
37    /// Enable POST SSE support (streaming tool call responses) - disabled by default for compatibility
38    pub enable_post_sse: bool,
39    /// Session expiry time in minutes (default: 30 minutes)
40    pub session_expiry_minutes: u64,
41    /// Allow ping requests without Mcp-Session-Id header (default: true)
42    ///
43    /// When true, the server accepts pre-initialization `ping` requests without
44    /// requiring a session. The full middleware stack still runs with `session=None`,
45    /// so rate-limiting middleware can still block unauthenticated pings.
46    ///
47    /// Set to false for hardened deployments that require session for all methods.
48    pub allow_unauthenticated_ping: bool,
49}
50
51impl Default for ServerConfig {
52    fn default() -> Self {
53        Self {
54            bind_address: "127.0.0.1:8000".parse().unwrap(),
55            mcp_path: "/mcp".to_string(),
56            enable_cors: true,
57            max_body_size: 1024 * 1024,            // 1MB
58            enable_get_sse: cfg!(feature = "sse"), // GET SSE enabled if "sse" feature is compiled
59            enable_post_sse: false, // Disabled by default for better client compatibility (e.g., MCP Inspector)
60            session_expiry_minutes: 30, // 30 minutes default
61            allow_unauthenticated_ping: true, // Allow pre-init pings per MCP spec
62        }
63    }
64}
65
66/// Builder for HTTP MCP server with pluggable storage
67pub struct HttpMcpServerBuilder {
68    config: ServerConfig,
69    dispatcher: JsonRpcDispatcher<McpError>,
70    session_storage: Option<Arc<turul_mcp_session_storage::BoxedSessionStorage>>,
71    stream_config: StreamConfig,
72    server_capabilities: Option<turul_mcp_protocol::ServerCapabilities>,
73    middleware_stack: Arc<crate::middleware::MiddlewareStack>,
74    route_registry: Arc<crate::routes::RouteRegistry>,
75}
76
77impl HttpMcpServerBuilder {
78    /// Create a new builder with in-memory storage (zero-configuration)
79    pub fn new() -> Self {
80        Self {
81            config: ServerConfig::default(),
82            dispatcher: JsonRpcDispatcher::<McpError>::new(),
83            session_storage: Some(Arc::new(InMemorySessionStorage::new())),
84            stream_config: StreamConfig::default(),
85            server_capabilities: None,
86            middleware_stack: Arc::new(crate::middleware::MiddlewareStack::new()),
87            route_registry: Arc::new(crate::routes::RouteRegistry::new()),
88        }
89    }
90}
91
92impl HttpMcpServerBuilder {
93    /// Create a new builder with specific session storage
94    pub fn with_storage(
95        session_storage: Arc<turul_mcp_session_storage::BoxedSessionStorage>,
96    ) -> Self {
97        Self {
98            config: ServerConfig::default(),
99            dispatcher: JsonRpcDispatcher::<McpError>::new(),
100            session_storage: Some(session_storage),
101            stream_config: StreamConfig::default(),
102            server_capabilities: None,
103            middleware_stack: Arc::new(crate::middleware::MiddlewareStack::new()),
104            route_registry: Arc::new(crate::routes::RouteRegistry::new()),
105        }
106    }
107
108    /// Set the middleware stack (for HTTP transport middleware support)
109    pub fn with_middleware_stack(
110        mut self,
111        middleware_stack: Arc<crate::middleware::MiddlewareStack>,
112    ) -> Self {
113        self.middleware_stack = middleware_stack;
114        self
115    }
116
117    /// Set the route registry for custom HTTP paths (e.g., `.well-known`)
118    pub fn route_registry(mut self, registry: Arc<crate::routes::RouteRegistry>) -> Self {
119        self.route_registry = registry;
120        self
121    }
122
123    /// Set the bind address
124    pub fn bind_address(mut self, addr: SocketAddr) -> Self {
125        self.config.bind_address = addr;
126        self
127    }
128
129    /// Set the MCP endpoint path
130    pub fn mcp_path(mut self, path: impl Into<String>) -> Self {
131        self.config.mcp_path = path.into();
132        self
133    }
134
135    /// Enable or disable CORS
136    pub fn cors(mut self, enable: bool) -> Self {
137        self.config.enable_cors = enable;
138        self
139    }
140
141    /// Set maximum request body size
142    pub fn max_body_size(mut self, size: usize) -> Self {
143        self.config.max_body_size = size;
144        self
145    }
146
147    /// Enable or disable GET SSE for persistent event streams
148    pub fn get_sse(mut self, enable: bool) -> Self {
149        self.config.enable_get_sse = enable;
150        self
151    }
152
153    /// Enable or disable POST SSE for streaming tool call responses (disabled by default for compatibility)
154    pub fn post_sse(mut self, enable: bool) -> Self {
155        self.config.enable_post_sse = enable;
156        self
157    }
158
159    /// Enable or disable both GET and POST SSE (convenience method)
160    pub fn sse(mut self, enable: bool) -> Self {
161        self.config.enable_get_sse = enable;
162        self.config.enable_post_sse = enable;
163        self
164    }
165
166    /// Set session expiry time in minutes
167    pub fn session_expiry_minutes(mut self, minutes: u64) -> Self {
168        self.config.session_expiry_minutes = minutes;
169        self
170    }
171
172    /// Allow or disallow ping requests without Mcp-Session-Id header
173    ///
174    /// Default: `true` (sessionless pings allowed per MCP spec).
175    /// Set to `false` for hardened deployments requiring session for all methods.
176    pub fn allow_unauthenticated_ping(mut self, allow: bool) -> Self {
177        self.config.allow_unauthenticated_ping = allow;
178        self
179    }
180
181    /// Configure SSE streaming settings
182    pub fn stream_config(mut self, config: StreamConfig) -> Self {
183        self.stream_config = config;
184        self
185    }
186
187    /// Register a JSON-RPC handler for specific methods
188    pub fn register_handler<H>(mut self, methods: Vec<String>, handler: H) -> Self
189    where
190        H: JsonRpcHandler<Error = McpError> + 'static,
191    {
192        self.dispatcher.register_methods(methods, handler);
193        self
194    }
195
196    /// Register a default handler for unhandled methods
197    pub fn default_handler<H>(mut self, handler: H) -> Self
198    where
199        H: JsonRpcHandler<Error = McpError> + 'static,
200    {
201        self.dispatcher.set_default_handler(handler);
202        self
203    }
204
205    /// Set server capabilities
206    pub fn server_capabilities(
207        mut self,
208        capabilities: turul_mcp_protocol::ServerCapabilities,
209    ) -> Self {
210        self.server_capabilities = Some(capabilities);
211        self
212    }
213
214    /// Build the HTTP MCP server
215    pub fn build(self) -> HttpMcpServer {
216        let session_storage = self
217            .session_storage
218            .expect("Session storage must be provided");
219
220        // ✅ CORRECTED ARCHITECTURE: Create single shared StreamManager instance
221        let stream_manager = Arc::new(StreamManager::with_config(
222            Arc::clone(&session_storage),
223            self.stream_config.clone(),
224        ));
225
226        // Create shared dispatcher Arc
227        let dispatcher = Arc::new(self.dispatcher);
228
229        // Use middleware stack from builder
230        let middleware_stack = self.middleware_stack;
231
232        // Create StreamableHttpHandler for MCP 2025-11-25 support
233        let streamable_handler = StreamableHttpHandler::new(
234            Arc::new(self.config.clone()),
235            Arc::clone(&dispatcher),
236            Arc::clone(&session_storage),
237            Arc::clone(&stream_manager),
238            self.server_capabilities.unwrap_or_default(),
239            Arc::clone(&middleware_stack),
240        );
241
242        HttpMcpServer {
243            config: self.config,
244            dispatcher,
245            session_storage,
246            stream_config: self.stream_config,
247            stream_manager,
248            streamable_handler,
249            route_registry: self.route_registry,
250        }
251    }
252}
253
254impl Default for HttpMcpServerBuilder {
255    fn default() -> Self {
256        Self::new()
257    }
258}
259
260/// HTTP MCP Server with SessionStorage integration
261#[derive(Clone)]
262pub struct HttpMcpServer {
263    config: ServerConfig,
264    dispatcher: Arc<JsonRpcDispatcher<McpError>>,
265    session_storage: Arc<turul_mcp_session_storage::BoxedSessionStorage>,
266    stream_config: StreamConfig,
267    // ✅ CORRECTED ARCHITECTURE: Single shared StreamManager instance
268    stream_manager: Arc<StreamManager>,
269    // StreamableHttpHandler for MCP 2025-11-25 clients
270    streamable_handler: StreamableHttpHandler,
271    // Custom route registry for paths like .well-known
272    route_registry: Arc<crate::routes::RouteRegistry>,
273}
274
275impl HttpMcpServer {
276    /// Create a new builder with default in-memory storage
277    pub fn builder() -> HttpMcpServerBuilder {
278        HttpMcpServerBuilder::new()
279    }
280}
281
282impl HttpMcpServer {
283    /// Create a new builder with specific session storage
284    pub fn builder_with_storage(
285        session_storage: Arc<turul_mcp_session_storage::BoxedSessionStorage>,
286    ) -> HttpMcpServerBuilder {
287        HttpMcpServerBuilder::with_storage(session_storage)
288    }
289
290    /// Get the shared StreamManager instance for event forwarding bridge
291    /// Returns reference to the same StreamManager used by HTTP server
292    pub fn get_stream_manager(&self) -> Arc<crate::StreamManager> {
293        Arc::clone(&self.stream_manager)
294    }
295
296    /// Run the server with session management
297    pub async fn run(&self) -> Result<()> {
298        // Start session cleanup task
299        self.start_session_cleanup().await;
300
301        let listener = TcpListener::bind(&self.config.bind_address).await?;
302        info!("HTTP MCP server listening on {}", self.config.bind_address);
303        info!("MCP endpoint available at: {}", self.config.mcp_path);
304        info!("Session storage: {}", self.session_storage.backend_name());
305
306        // ✅ CORRECTED ARCHITECTURE: Create single SessionMcpHandler instance outside the loop
307        // Use the same middleware stack as streamable_handler (both handlers share it)
308        let session_handler = SessionMcpHandler::with_shared_stream_manager(
309            self.config.clone(),
310            Arc::clone(&self.dispatcher),
311            Arc::clone(&self.session_storage),
312            self.stream_config.clone(),
313            Arc::clone(&self.stream_manager),
314            Arc::clone(&self.streamable_handler.middleware_stack),
315        );
316
317        // Create combined handler that routes based on protocol version
318        let handler = McpRequestHandler {
319            session_handler,
320            streamable_handler: self.streamable_handler.clone(),
321            route_registry: Arc::clone(&self.route_registry),
322        };
323
324        loop {
325            let (stream, peer_addr) = listener.accept().await?;
326            debug!("New connection from {}", peer_addr);
327
328            let handler_clone = handler.clone();
329            tokio::spawn(async move {
330                let io = TokioIo::new(stream);
331                let service = service_fn(move |req| handle_request(req, handler_clone.clone()));
332
333                if let Err(err) = http1::Builder::new().serve_connection(io, service).await {
334                    // Filter out common client disconnection errors that aren't actual problems
335                    let err_str = err.to_string();
336                    if err_str.contains("connection closed before message completed") {
337                        debug!("Client disconnected (normal): {}", err);
338                    } else {
339                        error!("Error serving connection: {}", err);
340                    }
341                }
342            });
343        }
344    }
345
346    /// Start background session cleanup task
347    async fn start_session_cleanup(&self) {
348        let storage = Arc::clone(&self.session_storage);
349        let session_expiry_minutes = self.config.session_expiry_minutes;
350        tokio::spawn(async move {
351            let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(60));
352            loop {
353                interval.tick().await;
354
355                let expire_time = std::time::SystemTime::now()
356                    - std::time::Duration::from_secs(session_expiry_minutes * 60);
357                match storage.expire_sessions(expire_time).await {
358                    Ok(expired) => {
359                        if !expired.is_empty() {
360                            info!("Expired {} sessions", expired.len());
361                            for session_id in expired {
362                                debug!("Expired session: {}", session_id);
363                            }
364                        }
365                    }
366                    Err(err) => {
367                        error!("Session cleanup error: {}", err);
368                    }
369                }
370            }
371        });
372    }
373
374    /// Get server statistics
375    pub async fn get_stats(&self) -> ServerStats {
376        let session_count = self.session_storage.session_count().await.unwrap_or(0);
377        let event_count = self.session_storage.event_count().await.unwrap_or(0);
378
379        ServerStats {
380            sessions: session_count,
381            events: event_count,
382            storage_type: self.session_storage.backend_name().to_string(),
383        }
384    }
385}
386
387/// Handle requests with MCP 2025-11-25 compliance
388/// Combined handler that routes based on MCP protocol version
389#[derive(Clone)]
390struct McpRequestHandler {
391    session_handler: SessionMcpHandler,
392    streamable_handler: StreamableHttpHandler,
393    route_registry: Arc<crate::routes::RouteRegistry>,
394}
395
396async fn handle_request(
397    req: Request<hyper::body::Incoming>,
398    handler: McpRequestHandler,
399) -> std::result::Result<
400    Response<http_body_util::combinators::UnsyncBoxBody<Bytes, hyper::Error>>,
401    hyper::Error,
402> {
403    let method = req.method().clone();
404    let uri = req.uri().clone();
405    let path = uri.path();
406
407    debug!("Handling {} {}", method, path);
408
409    // Route the request
410    debug!(
411        "HTTP server dispatch: path={}, expected_mcp_path={}",
412        path, handler.session_handler.config.mcp_path
413    );
414    let response = if path == handler.session_handler.config.mcp_path {
415        debug!("Path match: Request routed to MCP handler");
416        // Extract MCP protocol version from headers
417        let protocol_version_str = req
418            .headers()
419            .get("MCP-Protocol-Version")
420            .and_then(|h| h.to_str().ok())
421            .unwrap_or("2025-11-25"); // Default to latest version (we only support the latest protocol)
422        debug!("Protocol version: {}", protocol_version_str);
423
424        let protocol_version = McpProtocolVersion::parse_version(protocol_version_str)
425            .unwrap_or(McpProtocolVersion::V2025_11_25);
426
427        debug!(
428            "MCP request: protocol_version={}, method={}",
429            protocol_version.as_str(),
430            method
431        );
432
433        // Route based on protocol version - MCP 2025-11-25 uses Streamable HTTP, older versions use SessionMcpHandler
434        debug!(
435            "Routing decision: protocol_version={}, method={}, supports_streamable={}, handler={}",
436            protocol_version.as_str(),
437            method,
438            protocol_version.supports_streamable_http(),
439            if protocol_version.supports_streamable_http() {
440                "StreamableHttpHandler"
441            } else {
442                "SessionMcpHandler"
443            }
444        );
445
446        if protocol_version.supports_streamable_http() {
447            // Use StreamableHttpHandler for MCP 2025-11-25 clients
448            debug!(
449                "Calling streamable handler for protocol {}",
450                protocol_version.as_str()
451            );
452            let streamable_response = handler.streamable_handler.handle_request(req).await;
453            debug!("Streamable handler completed");
454            Ok(streamable_response)
455        } else {
456            // Use SessionMcpHandler for legacy clients (MCP 2024-11-05 and earlier)
457            match handler.session_handler.handle_mcp_request(req).await {
458                Ok(mcp_response) => Ok(mcp_response),
459                Err(err) => {
460                    error!("Request handling error: {}", err);
461                    Ok(Response::builder()
462                        .status(hyper::StatusCode::INTERNAL_SERVER_ERROR)
463                        .body(
464                            Full::new(Bytes::from(format!("Internal Server Error: {}", err)))
465                                .map_err(|never| match never {})
466                                .boxed_unsync(),
467                        )
468                        .unwrap())
469                }
470            }
471        }
472    } else {
473        // Check custom routes (e.g., .well-known)
474        match handler.route_registry.match_route(path) {
475            Ok(Some(route_handler)) => {
476                debug!("Custom route matched: {}", path);
477                // Convert Incoming body to type-erased RouteBody for handler portability
478                let (parts, body) = req.into_parts();
479                let boxed_req = Request::from_parts(parts, body.boxed_unsync());
480                Ok(route_handler.handle(boxed_req).await)
481            }
482            Ok(None) => {
483                // 404 for other paths
484                Ok(Response::builder()
485                    .status(hyper::StatusCode::NOT_FOUND)
486                    .body(
487                        Full::new(Bytes::from("Not Found"))
488                            .map_err(|never| match never {})
489                            .boxed_unsync(),
490                    )
491                    .unwrap())
492            }
493            Err(validation_err) => {
494                // Path failed security validation — 400 Bad Request
495                warn!(
496                    "Route validation failed for path '{}': {}",
497                    path, validation_err
498                );
499                Ok(validation_err.into_response())
500            }
501        }
502    };
503
504    // Apply CORS if enabled
505    match response {
506        Ok(mut final_response) => {
507            if handler.session_handler.config.enable_cors {
508                CorsLayer::apply_cors_headers(final_response.headers_mut());
509            }
510            Ok(final_response)
511        }
512        Err(e) => Err(e),
513    }
514}
515
516/// Server statistics
517#[derive(Debug, Clone)]
518pub struct ServerStats {
519    pub sessions: usize,
520    pub events: usize,
521    pub storage_type: String,
522}
523
524#[cfg(test)]
525mod tests {
526    use super::*;
527    use std::net::{IpAddr, Ipv4Addr};
528    use std::sync::Arc;
529    use turul_mcp_session_storage::InMemorySessionStorage;
530
531    #[test]
532    fn test_server_config_default() {
533        let config = ServerConfig::default();
534        assert_eq!(config.mcp_path, "/mcp");
535        assert!(config.enable_cors);
536        assert_eq!(config.max_body_size, 1024 * 1024);
537    }
538
539    #[test]
540    fn test_builder() {
541        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 3000);
542        let session_storage = Arc::new(InMemorySessionStorage::new());
543        let server = HttpMcpServer::builder_with_storage(session_storage)
544            .bind_address(addr)
545            .mcp_path("/api/mcp")
546            .cors(false)
547            .max_body_size(2048)
548            .build();
549
550        assert_eq!(server.config.bind_address, addr);
551        assert_eq!(server.config.mcp_path, "/api/mcp");
552        assert!(!server.config.enable_cors);
553        assert_eq!(server.config.max_body_size, 2048);
554    }
555
556    #[tokio::test]
557    async fn test_server_stats() {
558        let session_storage = Arc::new(InMemorySessionStorage::new());
559        let server = HttpMcpServer::builder_with_storage(session_storage).build();
560
561        let stats = server.get_stats().await;
562        assert_eq!(stats.sessions, 0);
563        assert_eq!(stats.events, 0);
564        assert_eq!(stats.storage_type, "InMemory");
565    }
566}