Skip to main content

turul_http_mcp_server/
server.rs

1//! HTTP MCP Server with SessionStorage integration
2//!
3//! This server provides MCP 2025-11-25 compliant HTTP transport with
4//! pluggable session storage backends and proper SSE resumability.
5
6use bytes::Bytes;
7use http_body_util::{BodyExt, Full};
8use hyper::server::conn::http1;
9use hyper::service::service_fn;
10use hyper::{Request, Response};
11use hyper_util::rt::TokioIo;
12use std::net::SocketAddr;
13use std::sync::Arc;
14use tokio::net::TcpListener;
15use tracing::{debug, error, info};
16
17use turul_mcp_json_rpc_server::{JsonRpcDispatcher, JsonRpcHandler};
18use turul_mcp_protocol::McpError;
19use turul_mcp_session_storage::InMemorySessionStorage;
20
21use crate::streamable_http::{McpProtocolVersion, StreamableHttpHandler};
22use crate::{CorsLayer, Result, SessionMcpHandler, StreamConfig, StreamManager};
23
24/// Configuration for the HTTP MCP server
25#[derive(Debug, Clone)]
26pub struct ServerConfig {
27    /// Address to bind to
28    pub bind_address: SocketAddr,
29    /// Path for MCP endpoint
30    pub mcp_path: String,
31    /// Enable CORS
32    pub enable_cors: bool,
33    /// Maximum request body size
34    pub max_body_size: usize,
35    /// Enable GET SSE support (persistent event streams)
36    pub enable_get_sse: bool,
37    /// Enable POST SSE support (streaming tool call responses) - disabled by default for compatibility
38    pub enable_post_sse: bool,
39    /// Session expiry time in minutes (default: 30 minutes)
40    pub session_expiry_minutes: u64,
41    /// Allow ping requests without Mcp-Session-Id header (default: true)
42    ///
43    /// When true, the server accepts pre-initialization `ping` requests without
44    /// requiring a session. The full middleware stack still runs with `session=None`,
45    /// so rate-limiting middleware can still block unauthenticated pings.
46    ///
47    /// Set to false for hardened deployments that require session for all methods.
48    pub allow_unauthenticated_ping: bool,
49}
50
51impl Default for ServerConfig {
52    fn default() -> Self {
53        Self {
54            bind_address: "127.0.0.1:8000".parse().unwrap(),
55            mcp_path: "/mcp".to_string(),
56            enable_cors: true,
57            max_body_size: 1024 * 1024,            // 1MB
58            enable_get_sse: cfg!(feature = "sse"), // GET SSE enabled if "sse" feature is compiled
59            enable_post_sse: false, // Disabled by default for better client compatibility (e.g., MCP Inspector)
60            session_expiry_minutes: 30, // 30 minutes default
61            allow_unauthenticated_ping: true, // Allow pre-init pings per MCP spec
62        }
63    }
64}
65
66/// Builder for HTTP MCP server with pluggable storage
67pub struct HttpMcpServerBuilder {
68    config: ServerConfig,
69    dispatcher: JsonRpcDispatcher<McpError>,
70    session_storage: Option<Arc<turul_mcp_session_storage::BoxedSessionStorage>>,
71    stream_config: StreamConfig,
72    server_capabilities: Option<turul_mcp_protocol::ServerCapabilities>,
73    middleware_stack: Arc<crate::middleware::MiddlewareStack>,
74}
75
76impl HttpMcpServerBuilder {
77    /// Create a new builder with in-memory storage (zero-configuration)
78    pub fn new() -> Self {
79        Self {
80            config: ServerConfig::default(),
81            dispatcher: JsonRpcDispatcher::<McpError>::new(),
82            session_storage: Some(Arc::new(InMemorySessionStorage::new())),
83            stream_config: StreamConfig::default(),
84            server_capabilities: None,
85            middleware_stack: Arc::new(crate::middleware::MiddlewareStack::new()),
86        }
87    }
88}
89
90impl HttpMcpServerBuilder {
91    /// Create a new builder with specific session storage
92    pub fn with_storage(
93        session_storage: Arc<turul_mcp_session_storage::BoxedSessionStorage>,
94    ) -> Self {
95        Self {
96            config: ServerConfig::default(),
97            dispatcher: JsonRpcDispatcher::<McpError>::new(),
98            session_storage: Some(session_storage),
99            stream_config: StreamConfig::default(),
100            server_capabilities: None,
101            middleware_stack: Arc::new(crate::middleware::MiddlewareStack::new()),
102        }
103    }
104
105    /// Set the middleware stack (for HTTP transport middleware support)
106    pub fn with_middleware_stack(
107        mut self,
108        middleware_stack: Arc<crate::middleware::MiddlewareStack>,
109    ) -> Self {
110        self.middleware_stack = middleware_stack;
111        self
112    }
113
114    /// Set the bind address
115    pub fn bind_address(mut self, addr: SocketAddr) -> Self {
116        self.config.bind_address = addr;
117        self
118    }
119
120    /// Set the MCP endpoint path
121    pub fn mcp_path(mut self, path: impl Into<String>) -> Self {
122        self.config.mcp_path = path.into();
123        self
124    }
125
126    /// Enable or disable CORS
127    pub fn cors(mut self, enable: bool) -> Self {
128        self.config.enable_cors = enable;
129        self
130    }
131
132    /// Set maximum request body size
133    pub fn max_body_size(mut self, size: usize) -> Self {
134        self.config.max_body_size = size;
135        self
136    }
137
138    /// Enable or disable GET SSE for persistent event streams
139    pub fn get_sse(mut self, enable: bool) -> Self {
140        self.config.enable_get_sse = enable;
141        self
142    }
143
144    /// Enable or disable POST SSE for streaming tool call responses (disabled by default for compatibility)
145    pub fn post_sse(mut self, enable: bool) -> Self {
146        self.config.enable_post_sse = enable;
147        self
148    }
149
150    /// Enable or disable both GET and POST SSE (convenience method)
151    pub fn sse(mut self, enable: bool) -> Self {
152        self.config.enable_get_sse = enable;
153        self.config.enable_post_sse = enable;
154        self
155    }
156
157    /// Set session expiry time in minutes
158    pub fn session_expiry_minutes(mut self, minutes: u64) -> Self {
159        self.config.session_expiry_minutes = minutes;
160        self
161    }
162
163    /// Allow or disallow ping requests without Mcp-Session-Id header
164    ///
165    /// Default: `true` (sessionless pings allowed per MCP spec).
166    /// Set to `false` for hardened deployments requiring session for all methods.
167    pub fn allow_unauthenticated_ping(mut self, allow: bool) -> Self {
168        self.config.allow_unauthenticated_ping = allow;
169        self
170    }
171
172    /// Configure SSE streaming settings
173    pub fn stream_config(mut self, config: StreamConfig) -> Self {
174        self.stream_config = config;
175        self
176    }
177
178    /// Register a JSON-RPC handler for specific methods
179    pub fn register_handler<H>(mut self, methods: Vec<String>, handler: H) -> Self
180    where
181        H: JsonRpcHandler<Error = McpError> + 'static,
182    {
183        self.dispatcher.register_methods(methods, handler);
184        self
185    }
186
187    /// Register a default handler for unhandled methods
188    pub fn default_handler<H>(mut self, handler: H) -> Self
189    where
190        H: JsonRpcHandler<Error = McpError> + 'static,
191    {
192        self.dispatcher.set_default_handler(handler);
193        self
194    }
195
196    /// Set server capabilities
197    pub fn server_capabilities(
198        mut self,
199        capabilities: turul_mcp_protocol::ServerCapabilities,
200    ) -> Self {
201        self.server_capabilities = Some(capabilities);
202        self
203    }
204
205    /// Build the HTTP MCP server
206    pub fn build(self) -> HttpMcpServer {
207        let session_storage = self
208            .session_storage
209            .expect("Session storage must be provided");
210
211        // ✅ CORRECTED ARCHITECTURE: Create single shared StreamManager instance
212        let stream_manager = Arc::new(StreamManager::with_config(
213            Arc::clone(&session_storage),
214            self.stream_config.clone(),
215        ));
216
217        // Create shared dispatcher Arc
218        let dispatcher = Arc::new(self.dispatcher);
219
220        // Use middleware stack from builder
221        let middleware_stack = self.middleware_stack;
222
223        // Create StreamableHttpHandler for MCP 2025-11-25 support
224        let streamable_handler = StreamableHttpHandler::new(
225            Arc::new(self.config.clone()),
226            Arc::clone(&dispatcher),
227            Arc::clone(&session_storage),
228            Arc::clone(&stream_manager),
229            self.server_capabilities.unwrap_or_default(),
230            Arc::clone(&middleware_stack),
231        );
232
233        HttpMcpServer {
234            config: self.config,
235            dispatcher,
236            session_storage,
237            stream_config: self.stream_config,
238            stream_manager,
239            streamable_handler,
240        }
241    }
242}
243
244impl Default for HttpMcpServerBuilder {
245    fn default() -> Self {
246        Self::new()
247    }
248}
249
250/// HTTP MCP Server with SessionStorage integration
251#[derive(Clone)]
252pub struct HttpMcpServer {
253    config: ServerConfig,
254    dispatcher: Arc<JsonRpcDispatcher<McpError>>,
255    session_storage: Arc<turul_mcp_session_storage::BoxedSessionStorage>,
256    stream_config: StreamConfig,
257    // ✅ CORRECTED ARCHITECTURE: Single shared StreamManager instance
258    stream_manager: Arc<StreamManager>,
259    // StreamableHttpHandler for MCP 2025-11-25 clients
260    streamable_handler: StreamableHttpHandler,
261}
262
263impl HttpMcpServer {
264    /// Create a new builder with default in-memory storage
265    pub fn builder() -> HttpMcpServerBuilder {
266        HttpMcpServerBuilder::new()
267    }
268}
269
270impl HttpMcpServer {
271    /// Create a new builder with specific session storage
272    pub fn builder_with_storage(
273        session_storage: Arc<turul_mcp_session_storage::BoxedSessionStorage>,
274    ) -> HttpMcpServerBuilder {
275        HttpMcpServerBuilder::with_storage(session_storage)
276    }
277
278    /// Get the shared StreamManager instance for event forwarding bridge
279    /// Returns reference to the same StreamManager used by HTTP server
280    pub fn get_stream_manager(&self) -> Arc<crate::StreamManager> {
281        Arc::clone(&self.stream_manager)
282    }
283
284    /// Run the server with session management
285    pub async fn run(&self) -> Result<()> {
286        // Start session cleanup task
287        self.start_session_cleanup().await;
288
289        let listener = TcpListener::bind(&self.config.bind_address).await?;
290        info!("HTTP MCP server listening on {}", self.config.bind_address);
291        info!("MCP endpoint available at: {}", self.config.mcp_path);
292        info!("Session storage: {}", self.session_storage.backend_name());
293
294        // ✅ CORRECTED ARCHITECTURE: Create single SessionMcpHandler instance outside the loop
295        // Use the same middleware stack as streamable_handler (both handlers share it)
296        let session_handler = SessionMcpHandler::with_shared_stream_manager(
297            self.config.clone(),
298            Arc::clone(&self.dispatcher),
299            Arc::clone(&self.session_storage),
300            self.stream_config.clone(),
301            Arc::clone(&self.stream_manager),
302            Arc::clone(&self.streamable_handler.middleware_stack),
303        );
304
305        // Create combined handler that routes based on protocol version
306        let handler = McpRequestHandler {
307            session_handler,
308            streamable_handler: self.streamable_handler.clone(),
309        };
310
311        loop {
312            let (stream, peer_addr) = listener.accept().await?;
313            debug!("New connection from {}", peer_addr);
314
315            let handler_clone = handler.clone();
316            tokio::spawn(async move {
317                let io = TokioIo::new(stream);
318                let service = service_fn(move |req| handle_request(req, handler_clone.clone()));
319
320                if let Err(err) = http1::Builder::new().serve_connection(io, service).await {
321                    // Filter out common client disconnection errors that aren't actual problems
322                    let err_str = err.to_string();
323                    if err_str.contains("connection closed before message completed") {
324                        debug!("Client disconnected (normal): {}", err);
325                    } else {
326                        error!("Error serving connection: {}", err);
327                    }
328                }
329            });
330        }
331    }
332
333    /// Start background session cleanup task
334    async fn start_session_cleanup(&self) {
335        let storage = Arc::clone(&self.session_storage);
336        let session_expiry_minutes = self.config.session_expiry_minutes;
337        tokio::spawn(async move {
338            let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(60));
339            loop {
340                interval.tick().await;
341
342                let expire_time = std::time::SystemTime::now()
343                    - std::time::Duration::from_secs(session_expiry_minutes * 60);
344                match storage.expire_sessions(expire_time).await {
345                    Ok(expired) => {
346                        if !expired.is_empty() {
347                            info!("Expired {} sessions", expired.len());
348                            for session_id in expired {
349                                debug!("Expired session: {}", session_id);
350                            }
351                        }
352                    }
353                    Err(err) => {
354                        error!("Session cleanup error: {}", err);
355                    }
356                }
357            }
358        });
359    }
360
361    /// Get server statistics
362    pub async fn get_stats(&self) -> ServerStats {
363        let session_count = self.session_storage.session_count().await.unwrap_or(0);
364        let event_count = self.session_storage.event_count().await.unwrap_or(0);
365
366        ServerStats {
367            sessions: session_count,
368            events: event_count,
369            storage_type: self.session_storage.backend_name().to_string(),
370        }
371    }
372}
373
374/// Handle requests with MCP 2025-11-25 compliance
375/// Combined handler that routes based on MCP protocol version
376#[derive(Clone)]
377struct McpRequestHandler {
378    session_handler: SessionMcpHandler,
379    streamable_handler: StreamableHttpHandler,
380}
381
382async fn handle_request(
383    req: Request<hyper::body::Incoming>,
384    handler: McpRequestHandler,
385) -> std::result::Result<
386    Response<http_body_util::combinators::UnsyncBoxBody<Bytes, hyper::Error>>,
387    hyper::Error,
388> {
389    let method = req.method().clone();
390    let uri = req.uri().clone();
391    let path = uri.path();
392
393    debug!("Handling {} {}", method, path);
394
395    // Route the request
396    debug!(
397        "HTTP server dispatch: path={}, expected_mcp_path={}",
398        path, handler.session_handler.config.mcp_path
399    );
400    let response = if path == handler.session_handler.config.mcp_path {
401        debug!("Path match: Request routed to MCP handler");
402        // Extract MCP protocol version from headers
403        let protocol_version_str = req
404            .headers()
405            .get("MCP-Protocol-Version")
406            .and_then(|h| h.to_str().ok())
407            .unwrap_or("2025-11-25"); // Default to latest version (we only support the latest protocol)
408        debug!("Protocol version: {}", protocol_version_str);
409
410        let protocol_version = McpProtocolVersion::parse_version(protocol_version_str)
411            .unwrap_or(McpProtocolVersion::V2025_11_25);
412
413        debug!(
414            "MCP request: protocol_version={}, method={}",
415            protocol_version.as_str(),
416            method
417        );
418
419        // Route based on protocol version - MCP 2025-11-25 uses Streamable HTTP, older versions use SessionMcpHandler
420        debug!(
421            "Routing decision: protocol_version={}, method={}, supports_streamable={}, handler={}",
422            protocol_version.as_str(),
423            method,
424            protocol_version.supports_streamable_http(),
425            if protocol_version.supports_streamable_http() {
426                "StreamableHttpHandler"
427            } else {
428                "SessionMcpHandler"
429            }
430        );
431
432        if protocol_version.supports_streamable_http() {
433            // Use StreamableHttpHandler for MCP 2025-11-25 clients
434            debug!(
435                "Calling streamable handler for protocol {}",
436                protocol_version.as_str()
437            );
438            let streamable_response = handler.streamable_handler.handle_request(req).await;
439            debug!("Streamable handler completed");
440            Ok(streamable_response)
441        } else {
442            // Use SessionMcpHandler for legacy clients (MCP 2024-11-05 and earlier)
443            match handler.session_handler.handle_mcp_request(req).await {
444                Ok(mcp_response) => Ok(mcp_response),
445                Err(err) => {
446                    error!("Request handling error: {}", err);
447                    Ok(Response::builder()
448                        .status(hyper::StatusCode::INTERNAL_SERVER_ERROR)
449                        .body(
450                            Full::new(Bytes::from(format!("Internal Server Error: {}", err)))
451                                .map_err(|never| match never {})
452                                .boxed_unsync(),
453                        )
454                        .unwrap())
455                }
456            }
457        }
458    } else {
459        // 404 for other paths
460        Ok(Response::builder()
461            .status(hyper::StatusCode::NOT_FOUND)
462            .body(
463                Full::new(Bytes::from("Not Found"))
464                    .map_err(|never| match never {})
465                    .boxed_unsync(),
466            )
467            .unwrap())
468    };
469
470    // Apply CORS if enabled
471    match response {
472        Ok(mut final_response) => {
473            if handler.session_handler.config.enable_cors {
474                CorsLayer::apply_cors_headers(final_response.headers_mut());
475            }
476            Ok(final_response)
477        }
478        Err(e) => Err(e),
479    }
480}
481
482/// Server statistics
483#[derive(Debug, Clone)]
484pub struct ServerStats {
485    pub sessions: usize,
486    pub events: usize,
487    pub storage_type: String,
488}
489
490#[cfg(test)]
491mod tests {
492    use super::*;
493    use std::net::{IpAddr, Ipv4Addr};
494    use std::sync::Arc;
495    use turul_mcp_session_storage::InMemorySessionStorage;
496
497    #[test]
498    fn test_server_config_default() {
499        let config = ServerConfig::default();
500        assert_eq!(config.mcp_path, "/mcp");
501        assert!(config.enable_cors);
502        assert_eq!(config.max_body_size, 1024 * 1024);
503    }
504
505    #[test]
506    fn test_builder() {
507        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 3000);
508        let session_storage = Arc::new(InMemorySessionStorage::new());
509        let server = HttpMcpServer::builder_with_storage(session_storage)
510            .bind_address(addr)
511            .mcp_path("/api/mcp")
512            .cors(false)
513            .max_body_size(2048)
514            .build();
515
516        assert_eq!(server.config.bind_address, addr);
517        assert_eq!(server.config.mcp_path, "/api/mcp");
518        assert!(!server.config.enable_cors);
519        assert_eq!(server.config.max_body_size, 2048);
520    }
521
522    #[tokio::test]
523    async fn test_server_stats() {
524        let session_storage = Arc::new(InMemorySessionStorage::new());
525        let server = HttpMcpServer::builder_with_storage(session_storage).build();
526
527        let stats = server.get_stats().await;
528        assert_eq!(stats.sessions, 0);
529        assert_eq!(stats.events, 0);
530        assert_eq!(stats.storage_type, "InMemory");
531    }
532}