1use std::net::SocketAddr;
7use std::sync::Arc;
8use hyper::server::conn::http1;
9use hyper::service::service_fn;
10use hyper::{Request, Response};
11use http_body_util::{Full, BodyExt};
12use bytes::Bytes;
13use hyper_util::rt::TokioIo;
14use tokio::net::TcpListener;
15use tracing::{info, error, debug};
16
17use turul_mcp_json_rpc_server::{JsonRpcHandler, JsonRpcDispatcher};
18use turul_mcp_session_storage::InMemorySessionStorage;
19
20use crate::{
21 Result, SessionMcpHandler, StreamConfig, StreamManager,
22 CorsLayer
23};
24
25#[derive(Debug, Clone)]
27pub struct ServerConfig {
28 pub bind_address: SocketAddr,
30 pub mcp_path: String,
32 pub enable_cors: bool,
34 pub max_body_size: usize,
36 pub enable_get_sse: bool,
38 pub enable_post_sse: bool,
40}
41
42impl Default for ServerConfig {
43 fn default() -> Self {
44 Self {
45 bind_address: "127.0.0.1:8000".parse().unwrap(),
46 mcp_path: "/mcp".to_string(),
47 enable_cors: true,
48 max_body_size: 1024 * 1024, enable_get_sse: cfg!(feature = "sse"), enable_post_sse: false, }
52 }
53}
54
55pub struct HttpMcpServerBuilder {
57 config: ServerConfig,
58 dispatcher: JsonRpcDispatcher,
59 session_storage: Option<Arc<turul_mcp_session_storage::BoxedSessionStorage>>,
60 stream_config: StreamConfig,
61}
62
63impl HttpMcpServerBuilder {
64 pub fn new() -> Self {
66 Self {
67 config: ServerConfig::default(),
68 dispatcher: JsonRpcDispatcher::new(),
69 session_storage: Some(Arc::new(InMemorySessionStorage::new())),
70 stream_config: StreamConfig::default(),
71 }
72 }
73}
74
75impl HttpMcpServerBuilder {
76 pub fn with_storage(session_storage: Arc<turul_mcp_session_storage::BoxedSessionStorage>) -> Self {
78 Self {
79 config: ServerConfig::default(),
80 dispatcher: JsonRpcDispatcher::new(),
81 session_storage: Some(session_storage),
82 stream_config: StreamConfig::default(),
83 }
84 }
85
86 pub fn bind_address(mut self, addr: SocketAddr) -> Self {
88 self.config.bind_address = addr;
89 self
90 }
91
92 pub fn mcp_path(mut self, path: impl Into<String>) -> Self {
94 self.config.mcp_path = path.into();
95 self
96 }
97
98 pub fn cors(mut self, enable: bool) -> Self {
100 self.config.enable_cors = enable;
101 self
102 }
103
104 pub fn max_body_size(mut self, size: usize) -> Self {
106 self.config.max_body_size = size;
107 self
108 }
109
110 pub fn get_sse(mut self, enable: bool) -> Self {
112 self.config.enable_get_sse = enable;
113 self
114 }
115
116 pub fn post_sse(mut self, enable: bool) -> Self {
118 self.config.enable_post_sse = enable;
119 self
120 }
121
122 pub fn sse(mut self, enable: bool) -> Self {
124 self.config.enable_get_sse = enable;
125 self.config.enable_post_sse = enable;
126 self
127 }
128
129 pub fn stream_config(mut self, config: StreamConfig) -> Self {
131 self.stream_config = config;
132 self
133 }
134
135 pub fn register_handler<H>(mut self, methods: Vec<String>, handler: H) -> Self
137 where
138 H: JsonRpcHandler + 'static,
139 {
140 self.dispatcher.register_methods(methods, handler);
141 self
142 }
143
144 pub fn default_handler<H>(mut self, handler: H) -> Self
146 where
147 H: JsonRpcHandler + 'static,
148 {
149 self.dispatcher.set_default_handler(handler);
150 self
151 }
152
153 pub fn build(self) -> HttpMcpServer {
155 let session_storage = self.session_storage.expect("Session storage must be provided");
156
157 let stream_manager = Arc::new(StreamManager::with_config(
159 Arc::clone(&session_storage),
160 self.stream_config.clone()
161 ));
162
163 HttpMcpServer {
164 config: self.config,
165 dispatcher: Arc::new(self.dispatcher),
166 session_storage,
167 stream_config: self.stream_config,
168 stream_manager,
169 }
170 }
171}
172
173impl Default for HttpMcpServerBuilder {
174 fn default() -> Self {
175 Self::new()
176 }
177}
178
179#[derive(Clone)]
181pub struct HttpMcpServer {
182 config: ServerConfig,
183 dispatcher: Arc<JsonRpcDispatcher>,
184 session_storage: Arc<turul_mcp_session_storage::BoxedSessionStorage>,
185 stream_config: StreamConfig,
186 stream_manager: Arc<StreamManager>,
188}
189
190impl HttpMcpServer {
191 pub fn builder() -> HttpMcpServerBuilder {
193 HttpMcpServerBuilder::new()
194 }
195}
196
197impl HttpMcpServer {
198 pub fn builder_with_storage(session_storage: Arc<turul_mcp_session_storage::BoxedSessionStorage>) -> HttpMcpServerBuilder {
200 HttpMcpServerBuilder::with_storage(session_storage)
201 }
202
203 pub fn get_stream_manager(&self) -> Arc<crate::StreamManager> {
206 Arc::clone(&self.stream_manager)
207 }
208
209 pub async fn run(&self) -> Result<()> {
211 self.start_session_cleanup().await;
213
214 let listener = TcpListener::bind(&self.config.bind_address).await?;
215 info!("HTTP MCP server listening on {}", self.config.bind_address);
216 info!("MCP endpoint available at: {}", self.config.mcp_path);
217 info!("Session storage: turul_mcp_session_storage::BoxedSessionStorage");
218
219 let handler = SessionMcpHandler::with_shared_stream_manager(
221 self.config.clone(),
222 Arc::clone(&self.dispatcher),
223 Arc::clone(&self.session_storage),
224 self.stream_config.clone(),
225 Arc::clone(&self.stream_manager),
226 );
227
228 loop {
229 let (stream, peer_addr) = listener.accept().await?;
230 debug!("New connection from {}", peer_addr);
231
232 let handler_clone = handler.clone();
233 tokio::spawn(async move {
234 let io = TokioIo::new(stream);
235 let service = service_fn(move |req| {
236 handle_request(req, handler_clone.clone())
237 });
238
239 if let Err(err) = http1::Builder::new().serve_connection(io, service).await {
240 let err_str = err.to_string();
242 if err_str.contains("connection closed before message completed") {
243 debug!("Client disconnected (normal): {}", err);
244 } else {
245 error!("Error serving connection: {}", err);
246 }
247 }
248 });
249 }
250 }
251
252 async fn start_session_cleanup(&self) {
254 let storage = Arc::clone(&self.session_storage);
255 tokio::spawn(async move {
256 let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(60));
257 loop {
258 interval.tick().await;
259
260 let expire_time = std::time::SystemTime::now() - std::time::Duration::from_secs(30 * 60); match storage.expire_sessions(expire_time).await {
262 Ok(expired) => {
263 if !expired.is_empty() {
264 info!("Expired {} sessions", expired.len());
265 for session_id in expired {
266 debug!("Expired session: {}", session_id);
267 }
268 }
269 }
270 Err(err) => {
271 error!("Session cleanup error: {}", err);
272 }
273 }
274 }
275 });
276 }
277
278 pub async fn get_stats(&self) -> ServerStats {
280 let session_count = self.session_storage.session_count().await.unwrap_or(0);
281 let event_count = self.session_storage.event_count().await.unwrap_or(0);
282
283 ServerStats {
284 sessions: session_count,
285 events: event_count,
286 storage_type: "turul_mcp_session_storage::BoxedSessionStorage".to_string(),
287 }
288 }
289}
290
291async fn handle_request(
293 req: Request<hyper::body::Incoming>,
294 handler: SessionMcpHandler,
295) -> std::result::Result<Response<http_body_util::combinators::UnsyncBoxBody<Bytes, hyper::Error>>, hyper::Error> {
296 let method = req.method().clone();
297 let uri = req.uri().clone();
298 let path = uri.path();
299
300 debug!("Handling {} {}", method, path);
301
302 let response = if path == &handler.config.mcp_path {
304 match handler.handle_mcp_request(req).await {
305 Ok(mcp_response) => mcp_response,
306 Err(err) => {
307 error!("Request handling error: {}", err);
308 Response::builder()
309 .status(hyper::StatusCode::INTERNAL_SERVER_ERROR)
310 .body(Full::new(Bytes::from(format!("Internal Server Error: {}", err))).map_err(|never| match never {}).boxed_unsync())
311 .unwrap()
312 }
313 }
314 } else {
315 Response::builder()
317 .status(hyper::StatusCode::NOT_FOUND)
318 .body(Full::new(Bytes::from("Not Found")).map_err(|never| match never {}).boxed_unsync())
319 .unwrap()
320 };
321
322 let mut final_response = response;
324 if handler.config.enable_cors {
325 CorsLayer::apply_cors_headers(final_response.headers_mut());
326 }
327
328 Ok(final_response)
329}
330
331#[derive(Debug, Clone)]
333pub struct ServerStats {
334 pub sessions: usize,
335 pub events: usize,
336 pub storage_type: String,
337}
338
339#[cfg(test)]
340mod tests {
341 use super::*;
342 use std::net::{IpAddr, Ipv4Addr};
343 use std::sync::Arc;
344 use turul_mcp_session_storage::InMemorySessionStorage;
345
346 #[test]
347 fn test_server_config_default() {
348 let config = ServerConfig::default();
349 assert_eq!(config.mcp_path, "/mcp");
350 assert!(config.enable_cors);
351 assert_eq!(config.max_body_size, 1024 * 1024);
352 }
353
354 #[test]
355 fn test_builder() {
356 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 3000);
357 let session_storage = Arc::new(InMemorySessionStorage::new());
358 let server = HttpMcpServer::builder_with_storage(session_storage)
359 .bind_address(addr)
360 .mcp_path("/api/mcp")
361 .cors(false)
362 .max_body_size(2048)
363 .build();
364
365 assert_eq!(server.config.bind_address, addr);
366 assert_eq!(server.config.mcp_path, "/api/mcp");
367 assert!(!server.config.enable_cors);
368 assert_eq!(server.config.max_body_size, 2048);
369 }
370
371 #[tokio::test]
372 async fn test_server_stats() {
373 let session_storage = Arc::new(InMemorySessionStorage::new());
374 let server = HttpMcpServer::builder_with_storage(session_storage).build();
375
376 let stats = server.get_stats().await;
377 assert_eq!(stats.sessions, 0);
378 assert_eq!(stats.events, 0);
379 assert!(stats.storage_type.contains("InMemorySessionStorage"));
380 }
381}