turul_http_mcp_server/
server.rs

1//! HTTP MCP Server with SessionStorage integration
2//!
3//! This server provides MCP 2025-06-18 compliant HTTP transport with
4//! pluggable session storage backends and proper SSE resumability.
5
6use std::net::SocketAddr;
7use std::sync::Arc;
8use hyper::server::conn::http1;
9use hyper::service::service_fn;
10use hyper::{Request, Response};
11use http_body_util::{Full, BodyExt};
12use bytes::Bytes;
13use hyper_util::rt::TokioIo;
14use tokio::net::TcpListener;
15use tracing::{info, error, debug};
16
17use turul_mcp_json_rpc_server::{JsonRpcHandler, JsonRpcDispatcher};
18use turul_mcp_session_storage::InMemorySessionStorage;
19
20use crate::{
21    Result, SessionMcpHandler, StreamConfig, StreamManager,
22    CorsLayer
23};
24
25/// Configuration for the HTTP MCP server
26#[derive(Debug, Clone)]
27pub struct ServerConfig {
28    /// Address to bind to
29    pub bind_address: SocketAddr,
30    /// Path for MCP endpoint
31    pub mcp_path: String,
32    /// Enable CORS
33    pub enable_cors: bool,
34    /// Maximum request body size
35    pub max_body_size: usize,
36    /// Enable GET SSE support (persistent event streams)
37    pub enable_get_sse: bool,
38    /// Enable POST SSE support (streaming tool call responses) - disabled by default for compatibility
39    pub enable_post_sse: bool,
40}
41
42impl Default for ServerConfig {
43    fn default() -> Self {
44        Self {
45            bind_address: "127.0.0.1:8000".parse().unwrap(),
46            mcp_path: "/mcp".to_string(),
47            enable_cors: true,
48            max_body_size: 1024 * 1024, // 1MB
49            enable_get_sse: cfg!(feature = "sse"), // GET SSE enabled if "sse" feature is compiled
50            enable_post_sse: false, // Disabled by default for better client compatibility (e.g., MCP Inspector)
51        }
52    }
53}
54
55/// Builder for HTTP MCP server with pluggable storage
56pub struct HttpMcpServerBuilder {
57    config: ServerConfig,
58    dispatcher: JsonRpcDispatcher,
59    session_storage: Option<Arc<turul_mcp_session_storage::BoxedSessionStorage>>,
60    stream_config: StreamConfig,
61}
62
63impl HttpMcpServerBuilder {
64    /// Create a new builder with in-memory storage (zero-configuration)
65    pub fn new() -> Self {
66        Self {
67            config: ServerConfig::default(),
68            dispatcher: JsonRpcDispatcher::new(),
69            session_storage: Some(Arc::new(InMemorySessionStorage::new())),
70            stream_config: StreamConfig::default(),
71        }
72    }
73}
74
75impl HttpMcpServerBuilder {
76    /// Create a new builder with specific session storage
77    pub fn with_storage(session_storage: Arc<turul_mcp_session_storage::BoxedSessionStorage>) -> Self {
78        Self {
79            config: ServerConfig::default(),
80            dispatcher: JsonRpcDispatcher::new(),
81            session_storage: Some(session_storage),
82            stream_config: StreamConfig::default(),
83        }
84    }
85
86    /// Set the bind address
87    pub fn bind_address(mut self, addr: SocketAddr) -> Self {
88        self.config.bind_address = addr;
89        self
90    }
91
92    /// Set the MCP endpoint path
93    pub fn mcp_path(mut self, path: impl Into<String>) -> Self {
94        self.config.mcp_path = path.into();
95        self
96    }
97
98    /// Enable or disable CORS
99    pub fn cors(mut self, enable: bool) -> Self {
100        self.config.enable_cors = enable;
101        self
102    }
103
104    /// Set maximum request body size
105    pub fn max_body_size(mut self, size: usize) -> Self {
106        self.config.max_body_size = size;
107        self
108    }
109
110    /// Enable or disable GET SSE for persistent event streams
111    pub fn get_sse(mut self, enable: bool) -> Self {
112        self.config.enable_get_sse = enable;
113        self
114    }
115
116    /// Enable or disable POST SSE for streaming tool call responses (disabled by default for compatibility)
117    pub fn post_sse(mut self, enable: bool) -> Self {
118        self.config.enable_post_sse = enable;
119        self
120    }
121
122    /// Enable or disable both GET and POST SSE (convenience method)
123    pub fn sse(mut self, enable: bool) -> Self {
124        self.config.enable_get_sse = enable;
125        self.config.enable_post_sse = enable;
126        self
127    }
128
129    /// Configure SSE streaming settings
130    pub fn stream_config(mut self, config: StreamConfig) -> Self {
131        self.stream_config = config;
132        self
133    }
134
135    /// Register a JSON-RPC handler for specific methods
136    pub fn register_handler<H>(mut self, methods: Vec<String>, handler: H) -> Self
137    where
138        H: JsonRpcHandler + 'static,
139    {
140        self.dispatcher.register_methods(methods, handler);
141        self
142    }
143
144    /// Register a default handler for unhandled methods
145    pub fn default_handler<H>(mut self, handler: H) -> Self
146    where
147        H: JsonRpcHandler + 'static,
148    {
149        self.dispatcher.set_default_handler(handler);
150        self
151    }
152
153    /// Build the HTTP MCP server
154    pub fn build(self) -> HttpMcpServer {
155        let session_storage = self.session_storage.expect("Session storage must be provided");
156
157        // ✅ CORRECTED ARCHITECTURE: Create single shared StreamManager instance
158        let stream_manager = Arc::new(StreamManager::with_config(
159            Arc::clone(&session_storage),
160            self.stream_config.clone()
161        ));
162
163        HttpMcpServer {
164            config: self.config,
165            dispatcher: Arc::new(self.dispatcher),
166            session_storage,
167            stream_config: self.stream_config,
168            stream_manager,
169        }
170    }
171}
172
173impl Default for HttpMcpServerBuilder {
174    fn default() -> Self {
175        Self::new()
176    }
177}
178
179/// HTTP MCP Server with SessionStorage integration
180#[derive(Clone)]
181pub struct HttpMcpServer {
182    config: ServerConfig,
183    dispatcher: Arc<JsonRpcDispatcher>,
184    session_storage: Arc<turul_mcp_session_storage::BoxedSessionStorage>,
185    stream_config: StreamConfig,
186    // ✅ CORRECTED ARCHITECTURE: Single shared StreamManager instance
187    stream_manager: Arc<StreamManager>,
188}
189
190impl HttpMcpServer {
191    /// Create a new builder with default in-memory storage
192    pub fn builder() -> HttpMcpServerBuilder {
193        HttpMcpServerBuilder::new()
194    }
195}
196
197impl HttpMcpServer {
198    /// Create a new builder with specific session storage
199    pub fn builder_with_storage(session_storage: Arc<turul_mcp_session_storage::BoxedSessionStorage>) -> HttpMcpServerBuilder {
200        HttpMcpServerBuilder::with_storage(session_storage)
201    }
202
203    /// Get the shared StreamManager instance for event forwarding bridge
204    /// Returns reference to the same StreamManager used by HTTP server
205    pub fn get_stream_manager(&self) -> Arc<crate::StreamManager> {
206        Arc::clone(&self.stream_manager)
207    }
208
209    /// Run the server with session management
210    pub async fn run(&self) -> Result<()> {
211        // Start session cleanup task
212        self.start_session_cleanup().await;
213
214        let listener = TcpListener::bind(&self.config.bind_address).await?;
215        info!("HTTP MCP server listening on {}", self.config.bind_address);
216        info!("MCP endpoint available at: {}", self.config.mcp_path);
217        info!("Session storage: turul_mcp_session_storage::BoxedSessionStorage");
218
219        // ✅ CORRECTED ARCHITECTURE: Create single SessionMcpHandler instance outside the loop
220        let handler = SessionMcpHandler::with_shared_stream_manager(
221            self.config.clone(),
222            Arc::clone(&self.dispatcher),
223            Arc::clone(&self.session_storage),
224            self.stream_config.clone(),
225            Arc::clone(&self.stream_manager),
226        );
227
228        loop {
229            let (stream, peer_addr) = listener.accept().await?;
230            debug!("New connection from {}", peer_addr);
231
232            let handler_clone = handler.clone();
233            tokio::spawn(async move {
234                let io = TokioIo::new(stream);
235                let service = service_fn(move |req| {
236                    handle_request(req, handler_clone.clone())
237                });
238
239                if let Err(err) = http1::Builder::new().serve_connection(io, service).await {
240                    // Filter out common client disconnection errors that aren't actual problems
241                    let err_str = err.to_string();
242                    if err_str.contains("connection closed before message completed") {
243                        debug!("Client disconnected (normal): {}", err);
244                    } else {
245                        error!("Error serving connection: {}", err);
246                    }
247                }
248            });
249        }
250    }
251
252    /// Start background session cleanup task
253    async fn start_session_cleanup(&self) {
254        let storage = Arc::clone(&self.session_storage);
255        tokio::spawn(async move {
256            let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(60));
257            loop {
258                interval.tick().await;
259
260                let expire_time = std::time::SystemTime::now() - std::time::Duration::from_secs(30 * 60); // 30 minutes
261                match storage.expire_sessions(expire_time).await {
262                    Ok(expired) => {
263                        if !expired.is_empty() {
264                            info!("Expired {} sessions", expired.len());
265                            for session_id in expired {
266                                debug!("Expired session: {}", session_id);
267                            }
268                        }
269                    }
270                    Err(err) => {
271                        error!("Session cleanup error: {}", err);
272                    }
273                }
274            }
275        });
276    }
277
278    /// Get server statistics
279    pub async fn get_stats(&self) -> ServerStats {
280        let session_count = self.session_storage.session_count().await.unwrap_or(0);
281        let event_count = self.session_storage.event_count().await.unwrap_or(0);
282
283        ServerStats {
284            sessions: session_count,
285            events: event_count,
286            storage_type: "turul_mcp_session_storage::BoxedSessionStorage".to_string(),
287        }
288    }
289}
290
291/// Handle requests with MCP 2025-06-18 compliance
292async fn handle_request(
293    req: Request<hyper::body::Incoming>,
294    handler: SessionMcpHandler,
295) -> std::result::Result<Response<http_body_util::combinators::UnsyncBoxBody<Bytes, hyper::Error>>, hyper::Error> {
296    let method = req.method().clone();
297    let uri = req.uri().clone();
298    let path = uri.path();
299
300    debug!("Handling {} {}", method, path);
301
302    // Route the request
303    let response = if path == &handler.config.mcp_path {
304        match handler.handle_mcp_request(req).await {
305            Ok(mcp_response) => mcp_response,
306            Err(err) => {
307                error!("Request handling error: {}", err);
308                Response::builder()
309                    .status(hyper::StatusCode::INTERNAL_SERVER_ERROR)
310                    .body(Full::new(Bytes::from(format!("Internal Server Error: {}", err))).map_err(|never| match never {}).boxed_unsync())
311                    .unwrap()
312            }
313        }
314    } else {
315        // 404 for other paths
316        Response::builder()
317            .status(hyper::StatusCode::NOT_FOUND)
318            .body(Full::new(Bytes::from("Not Found")).map_err(|never| match never {}).boxed_unsync())
319            .unwrap()
320    };
321
322    // Apply CORS if enabled
323    let mut final_response = response;
324    if handler.config.enable_cors {
325        CorsLayer::apply_cors_headers(final_response.headers_mut());
326    }
327
328    Ok(final_response)
329}
330
331/// Server statistics
332#[derive(Debug, Clone)]
333pub struct ServerStats {
334    pub sessions: usize,
335    pub events: usize,
336    pub storage_type: String,
337}
338
339#[cfg(test)]
340mod tests {
341    use super::*;
342    use std::net::{IpAddr, Ipv4Addr};
343    use std::sync::Arc;
344    use turul_mcp_session_storage::InMemorySessionStorage;
345
346    #[test]
347    fn test_server_config_default() {
348        let config = ServerConfig::default();
349        assert_eq!(config.mcp_path, "/mcp");
350        assert!(config.enable_cors);
351        assert_eq!(config.max_body_size, 1024 * 1024);
352    }
353
354    #[test]
355    fn test_builder() {
356        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 3000);
357        let session_storage = Arc::new(InMemorySessionStorage::new());
358        let server = HttpMcpServer::builder_with_storage(session_storage)
359            .bind_address(addr)
360            .mcp_path("/api/mcp")
361            .cors(false)
362            .max_body_size(2048)
363            .build();
364
365        assert_eq!(server.config.bind_address, addr);
366        assert_eq!(server.config.mcp_path, "/api/mcp");
367        assert!(!server.config.enable_cors);
368        assert_eq!(server.config.max_body_size, 2048);
369    }
370
371    #[tokio::test]
372    async fn test_server_stats() {
373        let session_storage = Arc::new(InMemorySessionStorage::new());
374        let server = HttpMcpServer::builder_with_storage(session_storage).build();
375
376        let stats = server.get_stats().await;
377        assert_eq!(stats.sessions, 0);
378        assert_eq!(stats.events, 0);
379        assert!(stats.storage_type.contains("InMemorySessionStorage"));
380    }
381}