1use std::net::SocketAddr;
7use std::sync::Arc;
8use hyper::server::conn::http1;
9use hyper::service::service_fn;
10use hyper::{Request, Response};
11use http_body_util::{Full, BodyExt};
12use bytes::Bytes;
13use hyper_util::rt::TokioIo;
14use tokio::net::TcpListener;
15use tracing::{info, error, debug};
16
17use turul_mcp_json_rpc_server::{JsonRpcHandler, JsonRpcDispatcher};
18use turul_mcp_session_storage::InMemorySessionStorage;
19
20use crate::{
21 Result, SessionMcpHandler, StreamConfig, StreamManager,
22 CorsLayer
23};
24
25#[derive(Debug, Clone)]
27pub struct ServerConfig {
28 pub bind_address: SocketAddr,
30 pub mcp_path: String,
32 pub enable_cors: bool,
34 pub max_body_size: usize,
36 pub enable_sse: bool,
38}
39
40impl Default for ServerConfig {
41 fn default() -> Self {
42 Self {
43 bind_address: "127.0.0.1:8000".parse().unwrap(),
44 mcp_path: "/mcp".to_string(),
45 enable_cors: true,
46 max_body_size: 1024 * 1024, enable_sse: cfg!(feature = "sse"),
48 }
49 }
50}
51
52pub struct HttpMcpServerBuilder {
54 config: ServerConfig,
55 dispatcher: JsonRpcDispatcher,
56 session_storage: Option<Arc<turul_mcp_session_storage::BoxedSessionStorage>>,
57 stream_config: StreamConfig,
58}
59
60impl HttpMcpServerBuilder {
61 pub fn new() -> Self {
63 Self {
64 config: ServerConfig::default(),
65 dispatcher: JsonRpcDispatcher::new(),
66 session_storage: Some(Arc::new(InMemorySessionStorage::new())),
67 stream_config: StreamConfig::default(),
68 }
69 }
70}
71
72impl HttpMcpServerBuilder {
73 pub fn with_storage(session_storage: Arc<turul_mcp_session_storage::BoxedSessionStorage>) -> Self {
75 Self {
76 config: ServerConfig::default(),
77 dispatcher: JsonRpcDispatcher::new(),
78 session_storage: Some(session_storage),
79 stream_config: StreamConfig::default(),
80 }
81 }
82
83 pub fn bind_address(mut self, addr: SocketAddr) -> Self {
85 self.config.bind_address = addr;
86 self
87 }
88
89 pub fn mcp_path(mut self, path: impl Into<String>) -> Self {
91 self.config.mcp_path = path.into();
92 self
93 }
94
95 pub fn cors(mut self, enable: bool) -> Self {
97 self.config.enable_cors = enable;
98 self
99 }
100
101 pub fn max_body_size(mut self, size: usize) -> Self {
103 self.config.max_body_size = size;
104 self
105 }
106
107 pub fn sse(mut self, enable: bool) -> Self {
109 self.config.enable_sse = enable;
110 self
111 }
112
113 pub fn stream_config(mut self, config: StreamConfig) -> Self {
115 self.stream_config = config;
116 self
117 }
118
119 pub fn register_handler<H>(mut self, methods: Vec<String>, handler: H) -> Self
121 where
122 H: JsonRpcHandler + 'static,
123 {
124 self.dispatcher.register_methods(methods, handler);
125 self
126 }
127
128 pub fn default_handler<H>(mut self, handler: H) -> Self
130 where
131 H: JsonRpcHandler + 'static,
132 {
133 self.dispatcher.set_default_handler(handler);
134 self
135 }
136
137 pub fn build(self) -> HttpMcpServer {
139 let session_storage = self.session_storage.expect("Session storage must be provided");
140
141 let stream_manager = Arc::new(StreamManager::with_config(
143 Arc::clone(&session_storage),
144 self.stream_config.clone()
145 ));
146
147 HttpMcpServer {
148 config: self.config,
149 dispatcher: Arc::new(self.dispatcher),
150 session_storage,
151 stream_config: self.stream_config,
152 stream_manager,
153 }
154 }
155}
156
157impl Default for HttpMcpServerBuilder {
158 fn default() -> Self {
159 Self::new()
160 }
161}
162
163#[derive(Clone)]
165pub struct HttpMcpServer {
166 config: ServerConfig,
167 dispatcher: Arc<JsonRpcDispatcher>,
168 session_storage: Arc<turul_mcp_session_storage::BoxedSessionStorage>,
169 stream_config: StreamConfig,
170 stream_manager: Arc<StreamManager>,
172}
173
174impl HttpMcpServer {
175 pub fn builder() -> HttpMcpServerBuilder {
177 HttpMcpServerBuilder::new()
178 }
179}
180
181impl HttpMcpServer {
182 pub fn builder_with_storage(session_storage: Arc<turul_mcp_session_storage::BoxedSessionStorage>) -> HttpMcpServerBuilder {
184 HttpMcpServerBuilder::with_storage(session_storage)
185 }
186
187 pub fn get_stream_manager(&self) -> Arc<crate::StreamManager> {
190 Arc::clone(&self.stream_manager)
191 }
192
193 pub async fn run(&self) -> Result<()> {
195 self.start_session_cleanup().await;
197
198 let listener = TcpListener::bind(&self.config.bind_address).await?;
199 info!("HTTP MCP server listening on {}", self.config.bind_address);
200 info!("MCP endpoint available at: {}", self.config.mcp_path);
201 info!("Session storage: turul_mcp_session_storage::BoxedSessionStorage");
202
203 let handler = SessionMcpHandler::with_shared_stream_manager(
205 self.config.clone(),
206 Arc::clone(&self.dispatcher),
207 Arc::clone(&self.session_storage),
208 self.stream_config.clone(),
209 Arc::clone(&self.stream_manager),
210 );
211
212 loop {
213 let (stream, peer_addr) = listener.accept().await?;
214 debug!("New connection from {}", peer_addr);
215
216 let handler_clone = handler.clone();
217 tokio::spawn(async move {
218 let io = TokioIo::new(stream);
219 let service = service_fn(move |req| {
220 handle_request(req, handler_clone.clone())
221 });
222
223 if let Err(err) = http1::Builder::new().serve_connection(io, service).await {
224 let err_str = err.to_string();
226 if err_str.contains("connection closed before message completed") {
227 debug!("Client disconnected (normal): {}", err);
228 } else {
229 error!("Error serving connection: {}", err);
230 }
231 }
232 });
233 }
234 }
235
236 async fn start_session_cleanup(&self) {
238 let storage = Arc::clone(&self.session_storage);
239 tokio::spawn(async move {
240 let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(60));
241 loop {
242 interval.tick().await;
243
244 let expire_time = std::time::SystemTime::now() - std::time::Duration::from_secs(30 * 60); match storage.expire_sessions(expire_time).await {
246 Ok(expired) => {
247 if !expired.is_empty() {
248 info!("Expired {} sessions", expired.len());
249 for session_id in expired {
250 debug!("Expired session: {}", session_id);
251 }
252 }
253 }
254 Err(err) => {
255 error!("Session cleanup error: {}", err);
256 }
257 }
258 }
259 });
260 }
261
262 pub async fn get_stats(&self) -> ServerStats {
264 let session_count = self.session_storage.session_count().await.unwrap_or(0);
265 let event_count = self.session_storage.event_count().await.unwrap_or(0);
266
267 ServerStats {
268 sessions: session_count,
269 events: event_count,
270 storage_type: "turul_mcp_session_storage::BoxedSessionStorage".to_string(),
271 }
272 }
273}
274
275async fn handle_request(
277 req: Request<hyper::body::Incoming>,
278 handler: SessionMcpHandler,
279) -> std::result::Result<Response<http_body_util::combinators::UnsyncBoxBody<Bytes, hyper::Error>>, hyper::Error> {
280 let method = req.method().clone();
281 let uri = req.uri().clone();
282 let path = uri.path();
283
284 debug!("Handling {} {}", method, path);
285
286 let response = if path == &handler.config.mcp_path {
288 match handler.handle_mcp_request(req).await {
289 Ok(mcp_response) => mcp_response,
290 Err(err) => {
291 error!("Request handling error: {}", err);
292 Response::builder()
293 .status(hyper::StatusCode::INTERNAL_SERVER_ERROR)
294 .body(Full::new(Bytes::from(format!("Internal Server Error: {}", err))).map_err(|never| match never {}).boxed_unsync())
295 .unwrap()
296 }
297 }
298 } else {
299 Response::builder()
301 .status(hyper::StatusCode::NOT_FOUND)
302 .body(Full::new(Bytes::from("Not Found")).map_err(|never| match never {}).boxed_unsync())
303 .unwrap()
304 };
305
306 let mut final_response = response;
308 if handler.config.enable_cors {
309 CorsLayer::apply_cors_headers(final_response.headers_mut());
310 }
311
312 Ok(final_response)
313}
314
315#[derive(Debug, Clone)]
317pub struct ServerStats {
318 pub sessions: usize,
319 pub events: usize,
320 pub storage_type: String,
321}
322
323#[cfg(test)]
324mod tests {
325 use super::*;
326 use std::net::{IpAddr, Ipv4Addr};
327 use std::sync::Arc;
328 use turul_mcp_session_storage::InMemorySessionStorage;
329
330 #[test]
331 fn test_server_config_default() {
332 let config = ServerConfig::default();
333 assert_eq!(config.mcp_path, "/mcp");
334 assert!(config.enable_cors);
335 assert_eq!(config.max_body_size, 1024 * 1024);
336 }
337
338 #[test]
339 fn test_builder() {
340 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 3000);
341 let session_storage = Arc::new(InMemorySessionStorage::new());
342 let server = HttpMcpServer::builder_with_storage(session_storage)
343 .bind_address(addr)
344 .mcp_path("/api/mcp")
345 .cors(false)
346 .max_body_size(2048)
347 .build();
348
349 assert_eq!(server.config.bind_address, addr);
350 assert_eq!(server.config.mcp_path, "/api/mcp");
351 assert!(!server.config.enable_cors);
352 assert_eq!(server.config.max_body_size, 2048);
353 }
354
355 #[tokio::test]
356 async fn test_server_stats() {
357 let session_storage = Arc::new(InMemorySessionStorage::new());
358 let server = HttpMcpServer::builder_with_storage(session_storage).build();
359
360 let stats = server.get_stats().await;
361 assert_eq!(stats.sessions, 0);
362 assert_eq!(stats.events, 0);
363 assert!(stats.storage_type.contains("InMemorySessionStorage"));
364 }
365}