1use std::sync::Arc;
12use std::convert::Infallible;
13use std::pin::Pin;
14use std::task::{Context, Poll};
15use tracing::{debug, warn, error};
16
17use hyper::{Request, Response, Method, StatusCode};
18use bytes::Bytes;
19use hyper::header::{CONTENT_TYPE, ACCEPT};
20use http_body_util::{BodyExt, Full};
21use http_body::{Body, Frame};
22use futures::Stream;
23
24use turul_mcp_json_rpc_server::{
25 JsonRpcDispatcher,
26 r#async::SessionContext,
27 dispatch::{parse_json_rpc_message, JsonRpcMessage, JsonRpcMessageResult},
28 error::{JsonRpcError, JsonRpcErrorObject}
29};
30use turul_mcp_session_storage::InMemorySessionStorage;
31use turul_mcp_protocol::ServerCapabilities;
32use chrono;
33use uuid::Uuid;
34
35use crate::{
36 Result, ServerConfig, StreamConfig,
37 protocol::{extract_protocol_version, extract_session_id, extract_last_event_id},
38 json_rpc_responses::*,
39 StreamManager,
40 notification_bridge::{StreamManagerNotificationBroadcaster, SharedNotificationBroadcaster}
41};
42
43pub struct SessionSseStream {
45 stream: Pin<Box<dyn Stream<Item = std::result::Result<Bytes, Infallible>> + Send>>,
46}
47
48impl SessionSseStream {
49 pub fn new<S>(stream: S) -> Self
50 where
51 S: Stream<Item = std::result::Result<Bytes, Infallible>> + Send + 'static,
52 {
53 Self {
54 stream: Box::pin(stream),
55 }
56 }
57}
58
59impl Drop for SessionSseStream {
60 fn drop(&mut self) {
61 debug!("🔥 DROP: SessionSseStream - HTTP response body being cleaned up");
62 debug!("🔥 This may indicate early cleanup of SSE response stream");
63 }
64}
65
66impl Body for SessionSseStream {
67 type Data = Bytes;
68 type Error = Infallible;
69
70 fn poll_frame(
71 mut self: Pin<&mut Self>,
72 cx: &mut Context<'_>,
73 ) -> Poll<Option<std::result::Result<Frame<Self::Data>, Self::Error>>> {
74 match self.stream.as_mut().poll_next(cx) {
75 Poll::Ready(Some(Ok(data))) => {
76 Poll::Ready(Some(Ok(Frame::data(data))))
77 }
78 Poll::Ready(Some(Err(never))) => match never {},
79 Poll::Ready(None) => Poll::Ready(None),
80 Poll::Pending => Poll::Pending,
81 }
82 }
83}
84
85type JsonRpcBody = Full<Bytes>;
87
88type UnifiedMcpBody = http_body_util::combinators::UnsyncBoxBody<Bytes, hyper::Error>;
90
91fn convert_to_unified_body(full_body: Full<Bytes>) -> UnifiedMcpBody {
93 full_body.map_err(|never| match never {}).boxed_unsync()
94}
95
96fn jsonrpc_error_to_unified_body(error: JsonRpcError) -> Result<Response<UnifiedMcpBody>> {
98 let error_json = serde_json::to_string(&error)?;
99 Ok(Response::builder()
100 .status(StatusCode::OK) .header(CONTENT_TYPE, "application/json")
102 .body(convert_to_unified_body(Full::new(Bytes::from(error_json))))
103 .unwrap())
104}
105
106pub struct SessionMcpHandler {
110 pub(crate) config: ServerConfig,
111 pub(crate) dispatcher: Arc<JsonRpcDispatcher>,
112 pub(crate) session_storage: Arc<turul_mcp_session_storage::BoxedSessionStorage>,
113 pub(crate) stream_config: StreamConfig,
114 pub(crate) stream_manager: Arc<StreamManager>,
116}
117
118impl Clone for SessionMcpHandler {
119 fn clone(&self) -> Self {
120 Self {
121 config: self.config.clone(),
122 dispatcher: Arc::clone(&self.dispatcher),
123 session_storage: Arc::clone(&self.session_storage),
124 stream_config: self.stream_config.clone(),
125 stream_manager: Arc::clone(&self.stream_manager),
126 }
127 }
128}
129
130impl SessionMcpHandler {
131 pub fn new(
133 config: ServerConfig,
134 dispatcher: Arc<JsonRpcDispatcher>,
135 stream_config: StreamConfig,
136 ) -> Self {
137 let storage: Arc<turul_mcp_session_storage::BoxedSessionStorage> = Arc::new(InMemorySessionStorage::new());
138 Self::with_storage(config, dispatcher, storage, stream_config)
139 }
140
141 pub fn with_shared_stream_manager(
143 config: ServerConfig,
144 dispatcher: Arc<JsonRpcDispatcher>,
145 session_storage: Arc<turul_mcp_session_storage::BoxedSessionStorage>,
146 stream_config: StreamConfig,
147 stream_manager: Arc<StreamManager>,
148 ) -> Self {
149 Self {
150 config,
151 dispatcher,
152 session_storage,
153 stream_config,
154 stream_manager,
155 }
156 }
157
158 pub fn with_storage(
161 config: ServerConfig,
162 dispatcher: Arc<JsonRpcDispatcher>,
163 session_storage: Arc<turul_mcp_session_storage::BoxedSessionStorage>,
164 stream_config: StreamConfig,
165 ) -> Self {
166 let stream_manager = Arc::new(StreamManager::with_config(
168 Arc::clone(&session_storage),
169 stream_config.clone()
170 ));
171
172 Self {
173 config,
174 dispatcher,
175 session_storage,
176 stream_config,
177 stream_manager,
178 }
179 }
180
181 pub fn get_stream_manager(&self) -> &Arc<StreamManager> {
183 &self.stream_manager
184 }
185
186
187 pub async fn handle_mcp_request(
189 &self,
190 req: Request<hyper::body::Incoming>,
191 ) -> Result<Response<UnifiedMcpBody>> {
192 match req.method() {
193 &Method::POST => {
194 let response = self.handle_json_rpc_request(req).await?;
195 Ok(response.map(convert_to_unified_body))
196 },
197 &Method::GET => self.handle_sse_request(req).await,
198 &Method::DELETE => {
199 let response = self.handle_delete_request(req).await?;
200 Ok(response.map(convert_to_unified_body))
201 },
202 &Method::OPTIONS => {
203 let response = self.handle_preflight();
204 Ok(response.map(convert_to_unified_body))
205 },
206 _ => {
207 let response = self.method_not_allowed();
208 Ok(response.map(convert_to_unified_body))
209 }
210 }
211 }
212
213 async fn handle_json_rpc_request(
215 &self,
216 req: Request<hyper::body::Incoming>,
217 ) -> Result<Response<Full<Bytes>>> {
218 let protocol_version = extract_protocol_version(req.headers());
220 let session_id = extract_session_id(req.headers());
221
222 debug!("POST request - Protocol: {}, Session: {:?}", protocol_version, session_id);
223
224 let content_type = req.headers()
226 .get(CONTENT_TYPE)
227 .and_then(|ct| ct.to_str().ok())
228 .unwrap_or("");
229
230 if !content_type.starts_with("application/json") {
231 warn!("Invalid content type: {}", content_type);
232 return Ok(bad_request_response("Content-Type must be application/json"));
233 }
234
235 let accept_header = req.headers()
237 .get(ACCEPT)
238 .and_then(|accept| accept.to_str().ok())
239 .unwrap_or("application/json");
240
241 let accepts_sse = accept_header.contains("text/event-stream");
242 debug!("POST request Accept header: '{}', will use SSE for tool calls: {}", accept_header, accepts_sse);
243
244 let body = req.into_body();
246 let body_bytes = match body.collect().await {
247 Ok(collected) => collected.to_bytes(),
248 Err(err) => {
249 error!("Failed to read request body: {}", err);
250 return Ok(bad_request_response("Failed to read request body"));
251 }
252 };
253
254 if body_bytes.len() > self.config.max_body_size {
256 warn!("Request body too large: {} bytes", body_bytes.len());
257 return Ok(Response::builder()
258 .status(StatusCode::PAYLOAD_TOO_LARGE)
259 .header(CONTENT_TYPE, "application/json")
260 .body(Full::new(Bytes::from("Request body too large")))
261 .unwrap());
262 }
263
264 let body_str = match std::str::from_utf8(&body_bytes) {
266 Ok(s) => s,
267 Err(err) => {
268 error!("Invalid UTF-8 in request body: {}", err);
269 return Ok(bad_request_response("Request body must be valid UTF-8"));
270 }
271 };
272
273 debug!("Received JSON-RPC request: {}", body_str);
274
275 let message = match parse_json_rpc_message(body_str) {
277 Ok(msg) => msg,
278 Err(rpc_err) => {
279 error!("JSON-RPC parse error: {}", rpc_err);
280 let error_response = serde_json::to_string(&rpc_err)
282 .unwrap_or_else(|_| "{}".to_string());
283 return Ok(Response::builder()
284 .status(StatusCode::OK) .header(CONTENT_TYPE, "application/json")
286 .body(Full::new(Bytes::from(error_response)))
287 .unwrap());
288 }
289 };
290
291 let (message_result, response_session_id, method_name) = match message {
293 JsonRpcMessage::Request(request) => {
294 debug!("Processing JSON-RPC request: method={}", request.method);
295 let method_name = request.method.clone();
296
297 let (response, response_session_id) = if request.method == "initialize" {
299 debug!("Handling initialize request - creating new session via session storage");
300
301 let capabilities = ServerCapabilities::default();
303 match self.session_storage.create_session(capabilities).await {
304 Ok(session_info) => {
305 debug!("Created new session via session storage: {}", session_info.session_id);
306
307 let broadcaster: SharedNotificationBroadcaster = Arc::new(StreamManagerNotificationBroadcaster::new(
309 Arc::clone(&self.stream_manager)
310 ));
311 let broadcaster_any = Arc::new(broadcaster) as Arc<dyn std::any::Any + Send + Sync>;
312
313 let session_context = SessionContext {
314 session_id: session_info.session_id.clone(),
315 metadata: std::collections::HashMap::new(),
316 broadcaster: Some(broadcaster_any),
317 timestamp: chrono::Utc::now().timestamp_millis() as u64,
318 };
319
320 let response = self.dispatcher.handle_request_with_context(request, session_context).await;
321
322 (response, Some(session_info.session_id))
324 }
325 Err(err) => {
326 error!("Failed to create session during initialize: {}", err);
327 let error_msg = format!("Session creation failed: {}", err);
329 let error_response = turul_mcp_json_rpc_server::JsonRpcResponse::success(
330 request.id,
331 serde_json::json!({"error": error_msg})
332 );
333 (error_response, None)
334 }
335 }
336 } else {
337 let session_id_str = session_id.clone().unwrap_or("unknown".to_string());
339 let broadcaster: SharedNotificationBroadcaster = Arc::new(StreamManagerNotificationBroadcaster::new(
340 Arc::clone(&self.stream_manager)
341 ));
342 let broadcaster_any = Arc::new(broadcaster) as Arc<dyn std::any::Any + Send + Sync>;
343
344 let session_context = SessionContext {
345 session_id: session_id_str,
346 metadata: std::collections::HashMap::new(),
347 broadcaster: Some(broadcaster_any),
348 timestamp: chrono::Utc::now().timestamp_millis() as u64,
349 };
350
351 let response = self.dispatcher.handle_request_with_context(request, session_context).await;
352 (response, session_id)
353 };
354
355 (JsonRpcMessageResult::Response(response), response_session_id, Some(method_name))
356 }
357 JsonRpcMessage::Notification(notification) => {
358 debug!("Processing JSON-RPC notification: method={}", notification.method);
359 let method_name = notification.method.clone();
360
361 let session_context = if let Some(ref session_id) = session_id {
363 let broadcaster: SharedNotificationBroadcaster = Arc::new(StreamManagerNotificationBroadcaster::new(
364 Arc::clone(&self.stream_manager)
365 ));
366 let broadcaster_any = Arc::new(broadcaster) as Arc<dyn std::any::Any + Send + Sync>;
367
368 Some(SessionContext {
369 session_id: session_id.clone(),
370 metadata: std::collections::HashMap::new(),
371 broadcaster: Some(broadcaster_any),
372 timestamp: chrono::Utc::now().timestamp_millis() as u64,
373 })
374 } else {
375 None
376 };
377
378 if let Err(err) = self.dispatcher.handle_notification_with_context(notification, session_context).await {
379 error!("Notification handling error: {}", err);
380 }
381 (JsonRpcMessageResult::NoResponse, session_id.clone(), Some(method_name))
382 }
383 };
384
385 match message_result {
387 JsonRpcMessageResult::Response(response) => {
388 let is_tool_call = method_name.as_ref().map_or(false, |m| m == "tools/call");
391
392 debug!("Decision point: method={:?}, accepts_sse={}, session_id={:?}, is_tool_call={}",
393 method_name, accepts_sse, response_session_id, is_tool_call);
394
395 debug!("🔧 COMPATIBILITY MODE: Always returning JSON response for method: {:?} (SSE disabled for tool calls)", method_name);
398 Ok(jsonrpc_response_with_session(response, response_session_id)?)
399 }
400 JsonRpcMessageResult::Error(error) => {
401 warn!("Sending JSON-RPC error response");
402 let error_json = serde_json::to_string(&error)?;
404 Ok(Response::builder()
405 .status(StatusCode::OK) .header(CONTENT_TYPE, "application/json")
407 .body(Full::new(Bytes::from(error_json)))
408 .unwrap())
409 }
410 JsonRpcMessageResult::NoResponse => {
411 Ok(jsonrpc_notification_response()?)
413 }
414 }
415 }
416
417 async fn handle_sse_request(
422 &self,
423 req: Request<hyper::body::Incoming>,
424 ) -> Result<Response<UnifiedMcpBody>> {
425 let headers = req.headers();
427 let accept = headers
428 .get(ACCEPT)
429 .and_then(|accept| accept.to_str().ok())
430 .unwrap_or("");
431
432 if !accept.contains("text/event-stream") {
433 warn!("GET request received without SSE support - header does not contain 'text/event-stream'");
434 let error = JsonRpcError::new(
435 None,
436 JsonRpcErrorObject::server_error(
437 -32001,
438 "SSE not accepted - missing 'text/event-stream' in Accept header",
439 None
440 )
441 );
442 return jsonrpc_error_to_unified_body(error);
443 }
444
445 let protocol_version = extract_protocol_version(headers);
447 let session_id = extract_session_id(headers);
448
449 debug!("GET SSE request - Protocol: {}, Session: {:?}", protocol_version, session_id);
450
451 let session_id = match session_id {
453 Some(id) => id,
454 None => {
455 warn!("Missing Mcp-Session-Id header for SSE request");
456 let error = JsonRpcError::new(
457 None,
458 JsonRpcErrorObject::server_error(
459 -32002,
460 "Missing Mcp-Session-Id header",
461 None
462 )
463 );
464 return jsonrpc_error_to_unified_body(error);
465 }
466 };
467
468 if let Err(err) = self.validate_session_exists(&session_id).await {
470 error!("Session validation failed for Session ID {}: {}", session_id, err);
471 let error = JsonRpcError::new(
472 None,
473 JsonRpcErrorObject::server_error(
474 -32003,
475 &format!("Session validation failed: {}", err),
476 None
477 )
478 );
479 return jsonrpc_error_to_unified_body(error);
480 }
481
482 let last_event_id = extract_last_event_id(headers);
484
485 let connection_id = Uuid::now_v7().to_string();
487
488 debug!("Creating SSE stream for session: {} with connection: {}, last_event_id: {:?}",
489 session_id, connection_id, last_event_id);
490
491 match self.stream_manager.handle_sse_connection(
493 session_id,
494 connection_id,
495 last_event_id,
496 ).await {
497 Ok(response) => Ok(response),
498 Err(err) => {
499 error!("Failed to create SSE connection: {}", err);
500 let error = JsonRpcError::new(
501 None,
502 JsonRpcErrorObject::internal_error(
503 Some(format!("SSE connection failed: {}", err))
504 )
505 );
506 jsonrpc_error_to_unified_body(error)
507 }
508 }
509 }
510
511 async fn handle_delete_request(
513 &self,
514 req: Request<hyper::body::Incoming>,
515 ) -> Result<Response<JsonRpcBody>> {
516 let session_id = extract_session_id(req.headers());
517
518 debug!("DELETE request - Session: {:?}", session_id);
519
520 if let Some(session_id) = session_id {
521 match self.session_storage.delete_session(&session_id).await {
522 Ok(true) => {
523 debug!("Session {} removed via DELETE", session_id);
524 Ok(Response::builder()
525 .status(StatusCode::OK)
526 .body(Full::new(Bytes::from("Session removed")))
527 .unwrap())
528 }
529 Ok(false) => {
530 Ok(Response::builder()
531 .status(StatusCode::NOT_FOUND)
532 .body(Full::new(Bytes::from("Session not found")))
533 .unwrap())
534 }
535 Err(err) => {
536 error!("Error deleting session {}: {}", session_id, err);
537 Ok(Response::builder()
538 .status(StatusCode::INTERNAL_SERVER_ERROR)
539 .body(Full::new(Bytes::from("Session deletion error")))
540 .unwrap())
541 }
542 }
543 } else {
544 Ok(Response::builder()
545 .status(StatusCode::BAD_REQUEST)
546 .body(Full::new(Bytes::from("Missing Mcp-Session-Id header")))
547 .unwrap())
548 }
549 }
550
551 fn handle_preflight(&self) -> Response<JsonRpcBody> {
553 options_response()
554 }
555
556 fn method_not_allowed(&self) -> Response<JsonRpcBody> {
558 method_not_allowed_response()
559 }
560
561 async fn validate_session_exists(&self, session_id: &str) -> Result<()> {
563 match self.session_storage.get_session(session_id).await {
565 Ok(Some(_)) => {
566 debug!("Session validation successful: {}", session_id);
567 Ok(())
568 }
569 Ok(None) => {
570 error!("Session not found: {}", session_id);
571 Err(crate::HttpMcpError::InvalidRequest(
572 format!("Session '{}' not found. Sessions must be created via initialize request first.", session_id)
573 ))
574 }
575 Err(err) => {
576 error!("Failed to validate session {}: {}", session_id, err);
577 Err(crate::HttpMcpError::InvalidRequest(format!("Session validation failed: {}", err)))
578 }
579 }
580 }
581}