turul_http_mcp_server/
server.rs

1//! HTTP MCP Server with SessionStorage integration
2//!
3//! This server provides MCP 2025-06-18 compliant HTTP transport with
4//! pluggable session storage backends and proper SSE resumability.
5
6use std::net::SocketAddr;
7use std::sync::Arc;
8use hyper::server::conn::http1;
9use hyper::service::service_fn;
10use hyper::{Request, Response};
11use http_body_util::{Full, BodyExt};
12use bytes::Bytes;
13use hyper_util::rt::TokioIo;
14use tokio::net::TcpListener;
15use tracing::{info, error, debug};
16
17use turul_mcp_json_rpc_server::{JsonRpcHandler, JsonRpcDispatcher};
18use turul_mcp_session_storage::InMemorySessionStorage;
19
20use crate::{
21    Result, SessionMcpHandler, StreamConfig, StreamManager,
22    CorsLayer
23};
24
25/// Configuration for the HTTP MCP server
26#[derive(Debug, Clone)]
27pub struct ServerConfig {
28    /// Address to bind to
29    pub bind_address: SocketAddr,
30    /// Path for MCP endpoint
31    pub mcp_path: String,
32    /// Enable CORS
33    pub enable_cors: bool,
34    /// Maximum request body size
35    pub max_body_size: usize,
36    /// Enable SSE support
37    pub enable_sse: bool,
38}
39
40impl Default for ServerConfig {
41    fn default() -> Self {
42        Self {
43            bind_address: "127.0.0.1:8000".parse().unwrap(),
44            mcp_path: "/mcp".to_string(),
45            enable_cors: true,
46            max_body_size: 1024 * 1024, // 1MB
47            enable_sse: cfg!(feature = "sse"),
48        }
49    }
50}
51
52/// Builder for HTTP MCP server with pluggable storage
53pub struct HttpMcpServerBuilder {
54    config: ServerConfig,
55    dispatcher: JsonRpcDispatcher,
56    session_storage: Option<Arc<turul_mcp_session_storage::BoxedSessionStorage>>,
57    stream_config: StreamConfig,
58}
59
60impl HttpMcpServerBuilder {
61    /// Create a new builder with in-memory storage (zero-configuration)
62    pub fn new() -> Self {
63        Self {
64            config: ServerConfig::default(),
65            dispatcher: JsonRpcDispatcher::new(),
66            session_storage: Some(Arc::new(InMemorySessionStorage::new())),
67            stream_config: StreamConfig::default(),
68        }
69    }
70}
71
72impl HttpMcpServerBuilder {
73    /// Create a new builder with specific session storage
74    pub fn with_storage(session_storage: Arc<turul_mcp_session_storage::BoxedSessionStorage>) -> Self {
75        Self {
76            config: ServerConfig::default(),
77            dispatcher: JsonRpcDispatcher::new(),
78            session_storage: Some(session_storage),
79            stream_config: StreamConfig::default(),
80        }
81    }
82
83    /// Set the bind address
84    pub fn bind_address(mut self, addr: SocketAddr) -> Self {
85        self.config.bind_address = addr;
86        self
87    }
88
89    /// Set the MCP endpoint path
90    pub fn mcp_path(mut self, path: impl Into<String>) -> Self {
91        self.config.mcp_path = path.into();
92        self
93    }
94
95    /// Enable or disable CORS
96    pub fn cors(mut self, enable: bool) -> Self {
97        self.config.enable_cors = enable;
98        self
99    }
100
101    /// Set maximum request body size
102    pub fn max_body_size(mut self, size: usize) -> Self {
103        self.config.max_body_size = size;
104        self
105    }
106
107    /// Enable or disable SSE
108    pub fn sse(mut self, enable: bool) -> Self {
109        self.config.enable_sse = enable;
110        self
111    }
112
113    /// Configure SSE streaming settings
114    pub fn stream_config(mut self, config: StreamConfig) -> Self {
115        self.stream_config = config;
116        self
117    }
118
119    /// Register a JSON-RPC handler for specific methods
120    pub fn register_handler<H>(mut self, methods: Vec<String>, handler: H) -> Self
121    where
122        H: JsonRpcHandler + 'static,
123    {
124        self.dispatcher.register_methods(methods, handler);
125        self
126    }
127
128    /// Register a default handler for unhandled methods
129    pub fn default_handler<H>(mut self, handler: H) -> Self
130    where
131        H: JsonRpcHandler + 'static,
132    {
133        self.dispatcher.set_default_handler(handler);
134        self
135    }
136
137    /// Build the HTTP MCP server
138    pub fn build(self) -> HttpMcpServer {
139        let session_storage = self.session_storage.expect("Session storage must be provided");
140
141        // ✅ CORRECTED ARCHITECTURE: Create single shared StreamManager instance
142        let stream_manager = Arc::new(StreamManager::with_config(
143            Arc::clone(&session_storage),
144            self.stream_config.clone()
145        ));
146
147        HttpMcpServer {
148            config: self.config,
149            dispatcher: Arc::new(self.dispatcher),
150            session_storage,
151            stream_config: self.stream_config,
152            stream_manager,
153        }
154    }
155}
156
157impl Default for HttpMcpServerBuilder {
158    fn default() -> Self {
159        Self::new()
160    }
161}
162
163/// HTTP MCP Server with SessionStorage integration
164#[derive(Clone)]
165pub struct HttpMcpServer {
166    config: ServerConfig,
167    dispatcher: Arc<JsonRpcDispatcher>,
168    session_storage: Arc<turul_mcp_session_storage::BoxedSessionStorage>,
169    stream_config: StreamConfig,
170    // ✅ CORRECTED ARCHITECTURE: Single shared StreamManager instance
171    stream_manager: Arc<StreamManager>,
172}
173
174impl HttpMcpServer {
175    /// Create a new builder with default in-memory storage
176    pub fn builder() -> HttpMcpServerBuilder {
177        HttpMcpServerBuilder::new()
178    }
179}
180
181impl HttpMcpServer {
182    /// Create a new builder with specific session storage
183    pub fn builder_with_storage(session_storage: Arc<turul_mcp_session_storage::BoxedSessionStorage>) -> HttpMcpServerBuilder {
184        HttpMcpServerBuilder::with_storage(session_storage)
185    }
186
187    /// Get the shared StreamManager instance for event forwarding bridge
188    /// Returns reference to the same StreamManager used by HTTP server
189    pub fn get_stream_manager(&self) -> Arc<crate::StreamManager> {
190        Arc::clone(&self.stream_manager)
191    }
192
193    /// Run the server with session management
194    pub async fn run(&self) -> Result<()> {
195        // Start session cleanup task
196        self.start_session_cleanup().await;
197
198        let listener = TcpListener::bind(&self.config.bind_address).await?;
199        info!("HTTP MCP server listening on {}", self.config.bind_address);
200        info!("MCP endpoint available at: {}", self.config.mcp_path);
201        info!("Session storage: turul_mcp_session_storage::BoxedSessionStorage");
202
203        // ✅ CORRECTED ARCHITECTURE: Create single SessionMcpHandler instance outside the loop
204        let handler = SessionMcpHandler::with_shared_stream_manager(
205            self.config.clone(),
206            Arc::clone(&self.dispatcher),
207            Arc::clone(&self.session_storage),
208            self.stream_config.clone(),
209            Arc::clone(&self.stream_manager),
210        );
211
212        loop {
213            let (stream, peer_addr) = listener.accept().await?;
214            debug!("New connection from {}", peer_addr);
215
216            let handler_clone = handler.clone();
217            tokio::spawn(async move {
218                let io = TokioIo::new(stream);
219                let service = service_fn(move |req| {
220                    handle_request(req, handler_clone.clone())
221                });
222
223                if let Err(err) = http1::Builder::new().serve_connection(io, service).await {
224                    // Filter out common client disconnection errors that aren't actual problems
225                    let err_str = err.to_string();
226                    if err_str.contains("connection closed before message completed") {
227                        debug!("Client disconnected (normal): {}", err);
228                    } else {
229                        error!("Error serving connection: {}", err);
230                    }
231                }
232            });
233        }
234    }
235
236    /// Start background session cleanup task
237    async fn start_session_cleanup(&self) {
238        let storage = Arc::clone(&self.session_storage);
239        tokio::spawn(async move {
240            let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(60));
241            loop {
242                interval.tick().await;
243
244                let expire_time = std::time::SystemTime::now() - std::time::Duration::from_secs(30 * 60); // 30 minutes
245                match storage.expire_sessions(expire_time).await {
246                    Ok(expired) => {
247                        if !expired.is_empty() {
248                            info!("Expired {} sessions", expired.len());
249                            for session_id in expired {
250                                debug!("Expired session: {}", session_id);
251                            }
252                        }
253                    }
254                    Err(err) => {
255                        error!("Session cleanup error: {}", err);
256                    }
257                }
258            }
259        });
260    }
261
262    /// Get server statistics
263    pub async fn get_stats(&self) -> ServerStats {
264        let session_count = self.session_storage.session_count().await.unwrap_or(0);
265        let event_count = self.session_storage.event_count().await.unwrap_or(0);
266
267        ServerStats {
268            sessions: session_count,
269            events: event_count,
270            storage_type: "turul_mcp_session_storage::BoxedSessionStorage".to_string(),
271        }
272    }
273}
274
275/// Handle requests with MCP 2025-06-18 compliance
276async fn handle_request(
277    req: Request<hyper::body::Incoming>,
278    handler: SessionMcpHandler,
279) -> std::result::Result<Response<http_body_util::combinators::UnsyncBoxBody<Bytes, hyper::Error>>, hyper::Error> {
280    let method = req.method().clone();
281    let uri = req.uri().clone();
282    let path = uri.path();
283
284    debug!("Handling {} {}", method, path);
285
286    // Route the request
287    let response = if path == &handler.config.mcp_path {
288        match handler.handle_mcp_request(req).await {
289            Ok(mcp_response) => mcp_response,
290            Err(err) => {
291                error!("Request handling error: {}", err);
292                Response::builder()
293                    .status(hyper::StatusCode::INTERNAL_SERVER_ERROR)
294                    .body(Full::new(Bytes::from(format!("Internal Server Error: {}", err))).map_err(|never| match never {}).boxed_unsync())
295                    .unwrap()
296            }
297        }
298    } else {
299        // 404 for other paths
300        Response::builder()
301            .status(hyper::StatusCode::NOT_FOUND)
302            .body(Full::new(Bytes::from("Not Found")).map_err(|never| match never {}).boxed_unsync())
303            .unwrap()
304    };
305
306    // Apply CORS if enabled
307    let mut final_response = response;
308    if handler.config.enable_cors {
309        CorsLayer::apply_cors_headers(final_response.headers_mut());
310    }
311
312    Ok(final_response)
313}
314
315/// Server statistics
316#[derive(Debug, Clone)]
317pub struct ServerStats {
318    pub sessions: usize,
319    pub events: usize,
320    pub storage_type: String,
321}
322
323#[cfg(test)]
324mod tests {
325    use super::*;
326    use std::net::{IpAddr, Ipv4Addr};
327    use std::sync::Arc;
328    use turul_mcp_session_storage::InMemorySessionStorage;
329
330    #[test]
331    fn test_server_config_default() {
332        let config = ServerConfig::default();
333        assert_eq!(config.mcp_path, "/mcp");
334        assert!(config.enable_cors);
335        assert_eq!(config.max_body_size, 1024 * 1024);
336    }
337
338    #[test]
339    fn test_builder() {
340        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 3000);
341        let session_storage = Arc::new(InMemorySessionStorage::new());
342        let server = HttpMcpServer::builder_with_storage(session_storage)
343            .bind_address(addr)
344            .mcp_path("/api/mcp")
345            .cors(false)
346            .max_body_size(2048)
347            .build();
348
349        assert_eq!(server.config.bind_address, addr);
350        assert_eq!(server.config.mcp_path, "/api/mcp");
351        assert!(!server.config.enable_cors);
352        assert_eq!(server.config.max_body_size, 2048);
353    }
354
355    #[tokio::test]
356    async fn test_server_stats() {
357        let session_storage = Arc::new(InMemorySessionStorage::new());
358        let server = HttpMcpServer::builder_with_storage(session_storage).build();
359
360        let stats = server.get_stats().await;
361        assert_eq!(stats.sessions, 0);
362        assert_eq!(stats.events, 0);
363        assert!(stats.storage_type.contains("InMemorySessionStorage"));
364    }
365}