turul_http_mcp_server/
server.rs

1//! HTTP MCP Server with SessionStorage integration
2//!
3//! This server provides MCP 2025-06-18 compliant HTTP transport with
4//! pluggable session storage backends and proper SSE resumability.
5
6use bytes::Bytes;
7use http_body_util::{BodyExt, Full};
8use hyper::server::conn::http1;
9use hyper::service::service_fn;
10use hyper::{Request, Response};
11use hyper_util::rt::TokioIo;
12use std::net::SocketAddr;
13use std::sync::Arc;
14use tokio::net::TcpListener;
15use tracing::{debug, error, info};
16
17use turul_mcp_json_rpc_server::{JsonRpcDispatcher, JsonRpcHandler};
18use turul_mcp_protocol::McpError;
19use turul_mcp_session_storage::InMemorySessionStorage;
20
21use crate::streamable_http::{McpProtocolVersion, StreamableHttpHandler};
22use crate::{CorsLayer, Result, SessionMcpHandler, StreamConfig, StreamManager};
23
24/// Configuration for the HTTP MCP server
25#[derive(Debug, Clone)]
26pub struct ServerConfig {
27    /// Address to bind to
28    pub bind_address: SocketAddr,
29    /// Path for MCP endpoint
30    pub mcp_path: String,
31    /// Enable CORS
32    pub enable_cors: bool,
33    /// Maximum request body size
34    pub max_body_size: usize,
35    /// Enable GET SSE support (persistent event streams)
36    pub enable_get_sse: bool,
37    /// Enable POST SSE support (streaming tool call responses) - disabled by default for compatibility
38    pub enable_post_sse: bool,
39    /// Session expiry time in minutes (default: 30 minutes)
40    pub session_expiry_minutes: u64,
41}
42
43impl Default for ServerConfig {
44    fn default() -> Self {
45        Self {
46            bind_address: "127.0.0.1:8000".parse().unwrap(),
47            mcp_path: "/mcp".to_string(),
48            enable_cors: true,
49            max_body_size: 1024 * 1024,            // 1MB
50            enable_get_sse: cfg!(feature = "sse"), // GET SSE enabled if "sse" feature is compiled
51            enable_post_sse: false, // Disabled by default for better client compatibility (e.g., MCP Inspector)
52            session_expiry_minutes: 30, // 30 minutes default
53        }
54    }
55}
56
57/// Builder for HTTP MCP server with pluggable storage
58pub struct HttpMcpServerBuilder {
59    config: ServerConfig,
60    dispatcher: JsonRpcDispatcher<McpError>,
61    session_storage: Option<Arc<turul_mcp_session_storage::BoxedSessionStorage>>,
62    stream_config: StreamConfig,
63    server_capabilities: Option<turul_mcp_protocol::ServerCapabilities>,
64}
65
66impl HttpMcpServerBuilder {
67    /// Create a new builder with in-memory storage (zero-configuration)
68    pub fn new() -> Self {
69        Self {
70            config: ServerConfig::default(),
71            dispatcher: JsonRpcDispatcher::<McpError>::new(),
72            session_storage: Some(Arc::new(InMemorySessionStorage::new())),
73            stream_config: StreamConfig::default(),
74            server_capabilities: None,
75        }
76    }
77}
78
79impl HttpMcpServerBuilder {
80    /// Create a new builder with specific session storage
81    pub fn with_storage(
82        session_storage: Arc<turul_mcp_session_storage::BoxedSessionStorage>,
83    ) -> Self {
84        Self {
85            config: ServerConfig::default(),
86            dispatcher: JsonRpcDispatcher::<McpError>::new(),
87            session_storage: Some(session_storage),
88            stream_config: StreamConfig::default(),
89            server_capabilities: None,
90        }
91    }
92
93    /// Set the bind address
94    pub fn bind_address(mut self, addr: SocketAddr) -> Self {
95        self.config.bind_address = addr;
96        self
97    }
98
99    /// Set the MCP endpoint path
100    pub fn mcp_path(mut self, path: impl Into<String>) -> Self {
101        self.config.mcp_path = path.into();
102        self
103    }
104
105    /// Enable or disable CORS
106    pub fn cors(mut self, enable: bool) -> Self {
107        self.config.enable_cors = enable;
108        self
109    }
110
111    /// Set maximum request body size
112    pub fn max_body_size(mut self, size: usize) -> Self {
113        self.config.max_body_size = size;
114        self
115    }
116
117    /// Enable or disable GET SSE for persistent event streams
118    pub fn get_sse(mut self, enable: bool) -> Self {
119        self.config.enable_get_sse = enable;
120        self
121    }
122
123    /// Enable or disable POST SSE for streaming tool call responses (disabled by default for compatibility)
124    pub fn post_sse(mut self, enable: bool) -> Self {
125        self.config.enable_post_sse = enable;
126        self
127    }
128
129    /// Enable or disable both GET and POST SSE (convenience method)
130    pub fn sse(mut self, enable: bool) -> Self {
131        self.config.enable_get_sse = enable;
132        self.config.enable_post_sse = enable;
133        self
134    }
135
136    /// Set session expiry time in minutes
137    pub fn session_expiry_minutes(mut self, minutes: u64) -> Self {
138        self.config.session_expiry_minutes = minutes;
139        self
140    }
141
142    /// Configure SSE streaming settings
143    pub fn stream_config(mut self, config: StreamConfig) -> Self {
144        self.stream_config = config;
145        self
146    }
147
148    /// Register a JSON-RPC handler for specific methods
149    pub fn register_handler<H>(mut self, methods: Vec<String>, handler: H) -> Self
150    where
151        H: JsonRpcHandler<Error = McpError> + 'static,
152    {
153        self.dispatcher.register_methods(methods, handler);
154        self
155    }
156
157    /// Register a default handler for unhandled methods
158    pub fn default_handler<H>(mut self, handler: H) -> Self
159    where
160        H: JsonRpcHandler<Error = McpError> + 'static,
161    {
162        self.dispatcher.set_default_handler(handler);
163        self
164    }
165
166    /// Set server capabilities
167    pub fn server_capabilities(
168        mut self,
169        capabilities: turul_mcp_protocol::ServerCapabilities,
170    ) -> Self {
171        self.server_capabilities = Some(capabilities);
172        self
173    }
174
175    /// Build the HTTP MCP server
176    pub fn build(self) -> HttpMcpServer {
177        let session_storage = self
178            .session_storage
179            .expect("Session storage must be provided");
180
181        // ✅ CORRECTED ARCHITECTURE: Create single shared StreamManager instance
182        let stream_manager = Arc::new(StreamManager::with_config(
183            Arc::clone(&session_storage),
184            self.stream_config.clone(),
185        ));
186
187        // Create shared dispatcher Arc
188        let dispatcher = Arc::new(self.dispatcher);
189
190        // Create StreamableHttpHandler for MCP 2025-06-18 support
191        let streamable_handler = StreamableHttpHandler::new(
192            Arc::new(self.config.clone()),
193            Arc::clone(&dispatcher),
194            Arc::clone(&session_storage),
195            Arc::clone(&stream_manager),
196            self.server_capabilities
197                .unwrap_or_else(|| turul_mcp_protocol::ServerCapabilities::default()),
198        );
199
200        HttpMcpServer {
201            config: self.config,
202            dispatcher,
203            session_storage,
204            stream_config: self.stream_config,
205            stream_manager,
206            streamable_handler,
207        }
208    }
209}
210
211impl Default for HttpMcpServerBuilder {
212    fn default() -> Self {
213        Self::new()
214    }
215}
216
217/// HTTP MCP Server with SessionStorage integration
218#[derive(Clone)]
219pub struct HttpMcpServer {
220    config: ServerConfig,
221    dispatcher: Arc<JsonRpcDispatcher<McpError>>,
222    session_storage: Arc<turul_mcp_session_storage::BoxedSessionStorage>,
223    stream_config: StreamConfig,
224    // ✅ CORRECTED ARCHITECTURE: Single shared StreamManager instance
225    stream_manager: Arc<StreamManager>,
226    // StreamableHttpHandler for MCP 2025-06-18 clients
227    streamable_handler: StreamableHttpHandler,
228}
229
230impl HttpMcpServer {
231    /// Create a new builder with default in-memory storage
232    pub fn builder() -> HttpMcpServerBuilder {
233        HttpMcpServerBuilder::new()
234    }
235}
236
237impl HttpMcpServer {
238    /// Create a new builder with specific session storage
239    pub fn builder_with_storage(
240        session_storage: Arc<turul_mcp_session_storage::BoxedSessionStorage>,
241    ) -> HttpMcpServerBuilder {
242        HttpMcpServerBuilder::with_storage(session_storage)
243    }
244
245    /// Get the shared StreamManager instance for event forwarding bridge
246    /// Returns reference to the same StreamManager used by HTTP server
247    pub fn get_stream_manager(&self) -> Arc<crate::StreamManager> {
248        Arc::clone(&self.stream_manager)
249    }
250
251    /// Run the server with session management
252    pub async fn run(&self) -> Result<()> {
253        // Start session cleanup task
254        self.start_session_cleanup().await;
255
256        let listener = TcpListener::bind(&self.config.bind_address).await?;
257        info!("HTTP MCP server listening on {}", self.config.bind_address);
258        info!("MCP endpoint available at: {}", self.config.mcp_path);
259        info!("Session storage: {}", self.session_storage.backend_name());
260
261        // ✅ CORRECTED ARCHITECTURE: Create single SessionMcpHandler instance outside the loop
262        let session_handler = SessionMcpHandler::with_shared_stream_manager(
263            self.config.clone(),
264            Arc::clone(&self.dispatcher),
265            Arc::clone(&self.session_storage),
266            self.stream_config.clone(),
267            Arc::clone(&self.stream_manager),
268        );
269
270        // Create combined handler that routes based on protocol version
271        let handler = McpRequestHandler {
272            session_handler,
273            streamable_handler: self.streamable_handler.clone(),
274        };
275
276        loop {
277            let (stream, peer_addr) = listener.accept().await?;
278            debug!("New connection from {}", peer_addr);
279
280            let handler_clone = handler.clone();
281            tokio::spawn(async move {
282                let io = TokioIo::new(stream);
283                let service = service_fn(move |req| handle_request(req, handler_clone.clone()));
284
285                if let Err(err) = http1::Builder::new().serve_connection(io, service).await {
286                    // Filter out common client disconnection errors that aren't actual problems
287                    let err_str = err.to_string();
288                    if err_str.contains("connection closed before message completed") {
289                        debug!("Client disconnected (normal): {}", err);
290                    } else {
291                        error!("Error serving connection: {}", err);
292                    }
293                }
294            });
295        }
296    }
297
298    /// Start background session cleanup task
299    async fn start_session_cleanup(&self) {
300        let storage = Arc::clone(&self.session_storage);
301        let session_expiry_minutes = self.config.session_expiry_minutes;
302        tokio::spawn(async move {
303            let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(60));
304            loop {
305                interval.tick().await;
306
307                let expire_time = std::time::SystemTime::now()
308                    - std::time::Duration::from_secs(session_expiry_minutes * 60);
309                match storage.expire_sessions(expire_time).await {
310                    Ok(expired) => {
311                        if !expired.is_empty() {
312                            info!("Expired {} sessions", expired.len());
313                            for session_id in expired {
314                                debug!("Expired session: {}", session_id);
315                            }
316                        }
317                    }
318                    Err(err) => {
319                        error!("Session cleanup error: {}", err);
320                    }
321                }
322            }
323        });
324    }
325
326    /// Get server statistics
327    pub async fn get_stats(&self) -> ServerStats {
328        let session_count = self.session_storage.session_count().await.unwrap_or(0);
329        let event_count = self.session_storage.event_count().await.unwrap_or(0);
330
331        ServerStats {
332            sessions: session_count,
333            events: event_count,
334            storage_type: self.session_storage.backend_name().to_string(),
335        }
336    }
337}
338
339/// Handle requests with MCP 2025-06-18 compliance
340/// Combined handler that routes based on MCP protocol version
341#[derive(Clone)]
342struct McpRequestHandler {
343    session_handler: SessionMcpHandler,
344    streamable_handler: StreamableHttpHandler,
345}
346
347async fn handle_request(
348    req: Request<hyper::body::Incoming>,
349    handler: McpRequestHandler,
350) -> std::result::Result<
351    Response<http_body_util::combinators::UnsyncBoxBody<Bytes, hyper::Error>>,
352    hyper::Error,
353> {
354    let method = req.method().clone();
355    let uri = req.uri().clone();
356    let path = uri.path();
357
358    debug!("Handling {} {}", method, path);
359
360    // Route the request
361    debug!(
362        "HTTP server dispatch: path={}, expected_mcp_path={}",
363        path, handler.session_handler.config.mcp_path
364    );
365    let response = if path == handler.session_handler.config.mcp_path {
366        debug!("Path match: Request routed to MCP handler");
367        // Extract MCP protocol version from headers
368        let protocol_version_str = req
369            .headers()
370            .get("MCP-Protocol-Version")
371            .and_then(|h| h.to_str().ok())
372            .unwrap_or("2025-06-18"); // Default to latest version (we only support the latest protocol)
373        debug!("Protocol version: {}", protocol_version_str);
374
375        let protocol_version = McpProtocolVersion::parse_version(protocol_version_str)
376            .unwrap_or(McpProtocolVersion::V2025_06_18);
377
378        debug!(
379            "MCP request: protocol_version={}, method={}",
380            protocol_version.as_str(),
381            method
382        );
383
384        // Route based on protocol version - MCP 2025-06-18 uses Streamable HTTP, older versions use SessionMcpHandler
385        debug!(
386            "Routing decision: protocol_version={}, method={}, supports_streamable={}, handler={}",
387            protocol_version.as_str(),
388            method,
389            protocol_version.supports_streamable_http(),
390            if protocol_version.supports_streamable_http() {
391                "StreamableHttpHandler"
392            } else {
393                "SessionMcpHandler"
394            }
395        );
396
397        if protocol_version.supports_streamable_http() {
398            // Use StreamableHttpHandler for MCP 2025-06-18 clients
399            debug!(
400                "Calling streamable handler for protocol {}",
401                protocol_version.as_str()
402            );
403            let streamable_response = handler.streamable_handler.handle_request(req).await;
404            debug!("Streamable handler completed");
405            Ok(streamable_response)
406        } else {
407            // Use SessionMcpHandler for legacy clients (MCP 2024-11-05 and earlier)
408            match handler.session_handler.handle_mcp_request(req).await {
409                Ok(mcp_response) => Ok(mcp_response),
410                Err(err) => {
411                    error!("Request handling error: {}", err);
412                    Ok(Response::builder()
413                        .status(hyper::StatusCode::INTERNAL_SERVER_ERROR)
414                        .body(
415                            Full::new(Bytes::from(format!("Internal Server Error: {}", err)))
416                                .map_err(|never| match never {})
417                                .boxed_unsync(),
418                        )
419                        .unwrap())
420                }
421            }
422        }
423    } else {
424        // 404 for other paths
425        Ok(Response::builder()
426            .status(hyper::StatusCode::NOT_FOUND)
427            .body(
428                Full::new(Bytes::from("Not Found"))
429                    .map_err(|never| match never {})
430                    .boxed_unsync(),
431            )
432            .unwrap())
433    };
434
435    // Apply CORS if enabled
436    match response {
437        Ok(mut final_response) => {
438            if handler.session_handler.config.enable_cors {
439                CorsLayer::apply_cors_headers(final_response.headers_mut());
440            }
441            Ok(final_response)
442        }
443        Err(e) => Err(e),
444    }
445}
446
447/// Server statistics
448#[derive(Debug, Clone)]
449pub struct ServerStats {
450    pub sessions: usize,
451    pub events: usize,
452    pub storage_type: String,
453}
454
455#[cfg(test)]
456mod tests {
457    use super::*;
458    use std::net::{IpAddr, Ipv4Addr};
459    use std::sync::Arc;
460    use turul_mcp_session_storage::InMemorySessionStorage;
461
462    #[test]
463    fn test_server_config_default() {
464        let config = ServerConfig::default();
465        assert_eq!(config.mcp_path, "/mcp");
466        assert!(config.enable_cors);
467        assert_eq!(config.max_body_size, 1024 * 1024);
468    }
469
470    #[test]
471    fn test_builder() {
472        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 3000);
473        let session_storage = Arc::new(InMemorySessionStorage::new());
474        let server = HttpMcpServer::builder_with_storage(session_storage)
475            .bind_address(addr)
476            .mcp_path("/api/mcp")
477            .cors(false)
478            .max_body_size(2048)
479            .build();
480
481        assert_eq!(server.config.bind_address, addr);
482        assert_eq!(server.config.mcp_path, "/api/mcp");
483        assert!(!server.config.enable_cors);
484        assert_eq!(server.config.max_body_size, 2048);
485    }
486
487    #[tokio::test]
488    async fn test_server_stats() {
489        let session_storage = Arc::new(InMemorySessionStorage::new());
490        let server = HttpMcpServer::builder_with_storage(session_storage).build();
491
492        let stats = server.get_stats().await;
493        assert_eq!(stats.sessions, 0);
494        assert_eq!(stats.events, 0);
495        assert_eq!(stats.storage_type, "InMemory");
496    }
497}