1use std::sync::Arc;
12use std::convert::Infallible;
13use std::pin::Pin;
14use std::task::{Context, Poll};
15use tracing::{debug, warn, error};
16
17use hyper::{Request, Response, Method, StatusCode};
18use bytes::Bytes;
19use hyper::header::{CONTENT_TYPE, ACCEPT};
20use http_body_util::{BodyExt, Full};
21use http_body::{Body, Frame};
22use futures::Stream;
23
24use turul_mcp_json_rpc_server::{
25 JsonRpcDispatcher,
26 r#async::SessionContext,
27 dispatch::{parse_json_rpc_message, JsonRpcMessage, JsonRpcMessageResult},
28 error::{JsonRpcError, JsonRpcErrorObject}
29};
30use turul_mcp_session_storage::InMemorySessionStorage;
31use turul_mcp_protocol::ServerCapabilities;
32use chrono;
33use uuid::Uuid;
34
35use crate::{
36 Result, ServerConfig, StreamConfig,
37 protocol::{extract_protocol_version, extract_session_id, extract_last_event_id},
38 json_rpc_responses::*,
39 StreamManager,
40 notification_bridge::{StreamManagerNotificationBroadcaster, SharedNotificationBroadcaster}
41};
42
43pub struct SessionSseStream {
45 stream: Pin<Box<dyn Stream<Item = std::result::Result<Bytes, Infallible>> + Send>>,
46}
47
48impl SessionSseStream {
49 pub fn new<S>(stream: S) -> Self
50 where
51 S: Stream<Item = std::result::Result<Bytes, Infallible>> + Send + 'static,
52 {
53 Self {
54 stream: Box::pin(stream),
55 }
56 }
57}
58
59impl Drop for SessionSseStream {
60 fn drop(&mut self) {
61 debug!("🔥 DROP: SessionSseStream - HTTP response body being cleaned up");
62 debug!("🔥 This may indicate early cleanup of SSE response stream");
63 }
64}
65
66impl Body for SessionSseStream {
67 type Data = Bytes;
68 type Error = Infallible;
69
70 fn poll_frame(
71 mut self: Pin<&mut Self>,
72 cx: &mut Context<'_>,
73 ) -> Poll<Option<std::result::Result<Frame<Self::Data>, Self::Error>>> {
74 match self.stream.as_mut().poll_next(cx) {
75 Poll::Ready(Some(Ok(data))) => {
76 Poll::Ready(Some(Ok(Frame::data(data))))
77 }
78 Poll::Ready(Some(Err(never))) => match never {},
79 Poll::Ready(None) => Poll::Ready(None),
80 Poll::Pending => Poll::Pending,
81 }
82 }
83}
84
85type JsonRpcBody = Full<Bytes>;
87
88type UnifiedMcpBody = http_body_util::combinators::UnsyncBoxBody<Bytes, hyper::Error>;
90
91#[derive(Debug, Clone, PartialEq)]
93enum AcceptMode {
94 Compliant,
96 JsonOnly,
98 SseOnly,
100 Invalid,
102}
103
104fn parse_mcp_accept_header(accept_header: &str) -> (AcceptMode, bool) {
106 let accepts_json = accept_header.contains("application/json") || accept_header.contains("*/*");
107 let accepts_sse = accept_header.contains("text/event-stream");
108
109 let mode = match (accepts_json, accepts_sse) {
110 (true, true) => AcceptMode::Compliant,
111 (true, false) => AcceptMode::JsonOnly, (false, true) => AcceptMode::SseOnly,
113 (false, false) => AcceptMode::Invalid,
114 };
115
116 let should_use_sse = match mode {
119 AcceptMode::Compliant => true, AcceptMode::JsonOnly => false, AcceptMode::SseOnly => true, AcceptMode::Invalid => false, };
124
125 (mode, should_use_sse)
126}
127
128fn convert_to_unified_body(full_body: Full<Bytes>) -> UnifiedMcpBody {
130 full_body.map_err(|never| match never {}).boxed_unsync()
131}
132
133fn jsonrpc_error_to_unified_body(error: JsonRpcError) -> Result<Response<UnifiedMcpBody>> {
135 let error_json = serde_json::to_string(&error)?;
136 Ok(Response::builder()
137 .status(StatusCode::OK) .header(CONTENT_TYPE, "application/json")
139 .body(convert_to_unified_body(Full::new(Bytes::from(error_json))))
140 .unwrap())
141}
142
143pub struct SessionMcpHandler {
147 pub(crate) config: ServerConfig,
148 pub(crate) dispatcher: Arc<JsonRpcDispatcher>,
149 pub(crate) session_storage: Arc<turul_mcp_session_storage::BoxedSessionStorage>,
150 pub(crate) stream_config: StreamConfig,
151 pub(crate) stream_manager: Arc<StreamManager>,
153}
154
155impl Clone for SessionMcpHandler {
156 fn clone(&self) -> Self {
157 Self {
158 config: self.config.clone(),
159 dispatcher: Arc::clone(&self.dispatcher),
160 session_storage: Arc::clone(&self.session_storage),
161 stream_config: self.stream_config.clone(),
162 stream_manager: Arc::clone(&self.stream_manager),
163 }
164 }
165}
166
167impl SessionMcpHandler {
168 pub fn new(
170 config: ServerConfig,
171 dispatcher: Arc<JsonRpcDispatcher>,
172 stream_config: StreamConfig,
173 ) -> Self {
174 let storage: Arc<turul_mcp_session_storage::BoxedSessionStorage> = Arc::new(InMemorySessionStorage::new());
175 Self::with_storage(config, dispatcher, storage, stream_config)
176 }
177
178 pub fn with_shared_stream_manager(
180 config: ServerConfig,
181 dispatcher: Arc<JsonRpcDispatcher>,
182 session_storage: Arc<turul_mcp_session_storage::BoxedSessionStorage>,
183 stream_config: StreamConfig,
184 stream_manager: Arc<StreamManager>,
185 ) -> Self {
186 Self {
187 config,
188 dispatcher,
189 session_storage,
190 stream_config,
191 stream_manager,
192 }
193 }
194
195 pub fn with_storage(
198 config: ServerConfig,
199 dispatcher: Arc<JsonRpcDispatcher>,
200 session_storage: Arc<turul_mcp_session_storage::BoxedSessionStorage>,
201 stream_config: StreamConfig,
202 ) -> Self {
203 let stream_manager = Arc::new(StreamManager::with_config(
205 Arc::clone(&session_storage),
206 stream_config.clone()
207 ));
208
209 Self {
210 config,
211 dispatcher,
212 session_storage,
213 stream_config,
214 stream_manager,
215 }
216 }
217
218 pub fn get_stream_manager(&self) -> &Arc<StreamManager> {
220 &self.stream_manager
221 }
222
223
224 pub async fn handle_mcp_request(
226 &self,
227 req: Request<hyper::body::Incoming>,
228 ) -> Result<Response<UnifiedMcpBody>> {
229 match req.method() {
230 &Method::POST => {
231 let response = self.handle_json_rpc_request(req).await?;
232 Ok(response)
233 },
234 &Method::GET => self.handle_sse_request(req).await,
235 &Method::DELETE => {
236 let response = self.handle_delete_request(req).await?;
237 Ok(response.map(convert_to_unified_body))
238 },
239 &Method::OPTIONS => {
240 let response = self.handle_preflight();
241 Ok(response.map(convert_to_unified_body))
242 },
243 _ => {
244 let response = self.method_not_allowed();
245 Ok(response.map(convert_to_unified_body))
246 }
247 }
248 }
249
250 async fn handle_json_rpc_request(
252 &self,
253 req: Request<hyper::body::Incoming>,
254 ) -> Result<Response<UnifiedMcpBody>> {
255 let protocol_version = extract_protocol_version(req.headers());
257 let session_id = extract_session_id(req.headers());
258
259 debug!("POST request - Protocol: {}, Session: {:?}", protocol_version, session_id);
260
261 let content_type = req.headers()
263 .get(CONTENT_TYPE)
264 .and_then(|ct| ct.to_str().ok())
265 .unwrap_or("");
266
267 if !content_type.starts_with("application/json") {
268 warn!("Invalid content type: {}", content_type);
269 return Ok(bad_request_response("Content-Type must be application/json").map(convert_to_unified_body));
270 }
271
272 let accept_header = req.headers()
274 .get(ACCEPT)
275 .and_then(|accept| accept.to_str().ok())
276 .unwrap_or("application/json");
277
278 let (accept_mode, accepts_sse) = parse_mcp_accept_header(accept_header);
279 debug!("POST request Accept header: '{}', mode: {:?}, will use SSE for tool calls: {}",
280 accept_header, accept_mode, accepts_sse);
281
282 let body = req.into_body();
284 let body_bytes = match body.collect().await {
285 Ok(collected) => collected.to_bytes(),
286 Err(err) => {
287 error!("Failed to read request body: {}", err);
288 return Ok(bad_request_response("Failed to read request body").map(convert_to_unified_body));
289 }
290 };
291
292 if body_bytes.len() > self.config.max_body_size {
294 warn!("Request body too large: {} bytes", body_bytes.len());
295 return Ok(Response::builder()
296 .status(StatusCode::PAYLOAD_TOO_LARGE)
297 .header(CONTENT_TYPE, "application/json")
298 .body(convert_to_unified_body(Full::new(Bytes::from("Request body too large"))))
299 .unwrap());
300 }
301
302 let body_str = match std::str::from_utf8(&body_bytes) {
304 Ok(s) => s,
305 Err(err) => {
306 error!("Invalid UTF-8 in request body: {}", err);
307 return Ok(bad_request_response("Request body must be valid UTF-8").map(convert_to_unified_body));
308 }
309 };
310
311 debug!("Received JSON-RPC request: {}", body_str);
312
313 let message = match parse_json_rpc_message(body_str) {
315 Ok(msg) => msg,
316 Err(rpc_err) => {
317 error!("JSON-RPC parse error: {}", rpc_err);
318 let error_response = serde_json::to_string(&rpc_err)
320 .unwrap_or_else(|_| "{}".to_string());
321 return Ok(Response::builder()
322 .status(StatusCode::OK) .header(CONTENT_TYPE, "application/json")
324 .body(convert_to_unified_body(Full::new(Bytes::from(error_response))))
325 .unwrap());
326 }
327 };
328
329 let (message_result, response_session_id, method_name) = match message {
331 JsonRpcMessage::Request(request) => {
332 debug!("Processing JSON-RPC request: method={}", request.method);
333 let method_name = request.method.clone();
334
335 let (response, response_session_id) = if request.method == "initialize" {
337 debug!("Handling initialize request - creating new session via session storage");
338
339 let capabilities = ServerCapabilities::default();
341 match self.session_storage.create_session(capabilities).await {
342 Ok(session_info) => {
343 debug!("Created new session via session storage: {}", session_info.session_id);
344
345 let broadcaster: SharedNotificationBroadcaster = Arc::new(StreamManagerNotificationBroadcaster::new(
347 Arc::clone(&self.stream_manager)
348 ));
349 let broadcaster_any = Arc::new(broadcaster) as Arc<dyn std::any::Any + Send + Sync>;
350
351 let session_context = SessionContext {
352 session_id: session_info.session_id.clone(),
353 metadata: std::collections::HashMap::new(),
354 broadcaster: Some(broadcaster_any),
355 timestamp: chrono::Utc::now().timestamp_millis() as u64,
356 };
357
358 let response = self.dispatcher.handle_request_with_context(request, session_context).await;
359
360 (response, Some(session_info.session_id))
362 }
363 Err(err) => {
364 error!("Failed to create session during initialize: {}", err);
365 let error_msg = format!("Session creation failed: {}", err);
367 let error_response = turul_mcp_json_rpc_server::JsonRpcResponse::success(
368 request.id,
369 serde_json::json!({"error": error_msg})
370 );
371 (error_response, None)
372 }
373 }
374 } else {
375 let session_id_str = session_id.clone().unwrap_or("unknown".to_string());
377 let broadcaster: SharedNotificationBroadcaster = Arc::new(StreamManagerNotificationBroadcaster::new(
378 Arc::clone(&self.stream_manager)
379 ));
380 let broadcaster_any = Arc::new(broadcaster) as Arc<dyn std::any::Any + Send + Sync>;
381
382 let session_context = SessionContext {
383 session_id: session_id_str,
384 metadata: std::collections::HashMap::new(),
385 broadcaster: Some(broadcaster_any),
386 timestamp: chrono::Utc::now().timestamp_millis() as u64,
387 };
388
389 let response = self.dispatcher.handle_request_with_context(request, session_context).await;
390 (response, session_id)
391 };
392
393 (JsonRpcMessageResult::Response(response), response_session_id, Some(method_name))
394 }
395 JsonRpcMessage::Notification(notification) => {
396 debug!("Processing JSON-RPC notification: method={}", notification.method);
397 let method_name = notification.method.clone();
398
399 let session_context = if let Some(ref session_id) = session_id {
401 let broadcaster: SharedNotificationBroadcaster = Arc::new(StreamManagerNotificationBroadcaster::new(
402 Arc::clone(&self.stream_manager)
403 ));
404 let broadcaster_any = Arc::new(broadcaster) as Arc<dyn std::any::Any + Send + Sync>;
405
406 Some(SessionContext {
407 session_id: session_id.clone(),
408 metadata: std::collections::HashMap::new(),
409 broadcaster: Some(broadcaster_any),
410 timestamp: chrono::Utc::now().timestamp_millis() as u64,
411 })
412 } else {
413 None
414 };
415
416 if let Err(err) = self.dispatcher.handle_notification_with_context(notification, session_context).await {
417 error!("Notification handling error: {}", err);
418 }
419 (JsonRpcMessageResult::NoResponse, session_id.clone(), Some(method_name))
420 }
421 };
422
423 match message_result {
425 JsonRpcMessageResult::Response(response) => {
426 let is_tool_call = method_name.as_ref().map_or(false, |m| m == "tools/call");
429
430 debug!("Decision point: method={:?}, accept_mode={:?}, accepts_sse={}, server_post_sse_enabled={}, session_id={:?}, is_tool_call={}",
431 method_name, accept_mode, accepts_sse, self.config.enable_post_sse, response_session_id, is_tool_call);
432
433 let should_use_sse = match accept_mode {
435 AcceptMode::JsonOnly => false, AcceptMode::Invalid => false, AcceptMode::Compliant => self.config.enable_post_sse && accepts_sse && is_tool_call, AcceptMode::SseOnly => self.config.enable_post_sse && accepts_sse, };
440
441 if should_use_sse && response_session_id.is_some() {
442 debug!("📡 Creating POST SSE stream (mode: {:?}) for tool call with notifications", accept_mode);
443 match self.stream_manager.create_post_sse_stream(
444 response_session_id.clone().unwrap(),
445 response.clone(), ).await {
447 Ok(sse_response) => {
448 debug!("✅ POST SSE stream created successfully");
449 Ok(sse_response.map(|body| body.map_err(|never| match never {}).boxed_unsync()))
450 },
451 Err(e) => {
452 warn!("Failed to create POST SSE stream, falling back to JSON: {}", e);
453 Ok(jsonrpc_response_with_session(response, response_session_id)?.map(convert_to_unified_body))
454 }
455 }
456 } else {
457 debug!("📄 Returning standard JSON response (mode: {:?}) for method: {:?}", accept_mode, method_name);
458 Ok(jsonrpc_response_with_session(response, response_session_id)?.map(convert_to_unified_body))
459 }
460 }
461 JsonRpcMessageResult::Error(error) => {
462 warn!("Sending JSON-RPC error response");
463 let error_json = serde_json::to_string(&error)?;
465 Ok(Response::builder()
466 .status(StatusCode::OK) .header(CONTENT_TYPE, "application/json")
468 .body(convert_to_unified_body(Full::new(Bytes::from(error_json))))
469 .unwrap())
470 }
471 JsonRpcMessageResult::NoResponse => {
472 Ok(jsonrpc_notification_response()?.map(convert_to_unified_body))
474 }
475 }
476 }
477
478 async fn handle_sse_request(
483 &self,
484 req: Request<hyper::body::Incoming>,
485 ) -> Result<Response<UnifiedMcpBody>> {
486 let headers = req.headers();
488 let accept = headers
489 .get(ACCEPT)
490 .and_then(|accept| accept.to_str().ok())
491 .unwrap_or("");
492
493 if !accept.contains("text/event-stream") {
494 warn!("GET request received without SSE support - header does not contain 'text/event-stream'");
495 let error = JsonRpcError::new(
496 None,
497 JsonRpcErrorObject::server_error(
498 -32001,
499 "SSE not accepted - missing 'text/event-stream' in Accept header",
500 None
501 )
502 );
503 return jsonrpc_error_to_unified_body(error);
504 }
505
506 if !self.config.enable_get_sse {
508 warn!("GET SSE request received but GET SSE is disabled on server");
509 let error = JsonRpcError::new(
510 None,
511 JsonRpcErrorObject::server_error(
512 -32003,
513 "GET SSE is disabled on this server",
514 None
515 )
516 );
517 return jsonrpc_error_to_unified_body(error);
518 }
519
520 let protocol_version = extract_protocol_version(headers);
522 let session_id = extract_session_id(headers);
523
524 debug!("GET SSE request - Protocol: {}, Session: {:?}", protocol_version, session_id);
525
526 let session_id = match session_id {
528 Some(id) => id,
529 None => {
530 warn!("Missing Mcp-Session-Id header for SSE request");
531 let error = JsonRpcError::new(
532 None,
533 JsonRpcErrorObject::server_error(
534 -32002,
535 "Missing Mcp-Session-Id header",
536 None
537 )
538 );
539 return jsonrpc_error_to_unified_body(error);
540 }
541 };
542
543 if let Err(err) = self.validate_session_exists(&session_id).await {
545 error!("Session validation failed for Session ID {}: {}", session_id, err);
546 let error = JsonRpcError::new(
547 None,
548 JsonRpcErrorObject::server_error(
549 -32003,
550 &format!("Session validation failed: {}", err),
551 None
552 )
553 );
554 return jsonrpc_error_to_unified_body(error);
555 }
556
557 let last_event_id = extract_last_event_id(headers);
559
560 let connection_id = Uuid::now_v7().to_string();
562
563 debug!("Creating SSE stream for session: {} with connection: {}, last_event_id: {:?}",
564 session_id, connection_id, last_event_id);
565
566 match self.stream_manager.handle_sse_connection(
568 session_id,
569 connection_id,
570 last_event_id,
571 ).await {
572 Ok(response) => Ok(response),
573 Err(err) => {
574 error!("Failed to create SSE connection: {}", err);
575 let error = JsonRpcError::new(
576 None,
577 JsonRpcErrorObject::internal_error(
578 Some(format!("SSE connection failed: {}", err))
579 )
580 );
581 jsonrpc_error_to_unified_body(error)
582 }
583 }
584 }
585
586 async fn handle_delete_request(
588 &self,
589 req: Request<hyper::body::Incoming>,
590 ) -> Result<Response<JsonRpcBody>> {
591 let session_id = extract_session_id(req.headers());
592
593 debug!("DELETE request - Session: {:?}", session_id);
594
595 if let Some(session_id) = session_id {
596 match self.session_storage.delete_session(&session_id).await {
597 Ok(true) => {
598 debug!("Session {} removed via DELETE", session_id);
599 Ok(Response::builder()
600 .status(StatusCode::OK)
601 .body(Full::new(Bytes::from("Session removed")))
602 .unwrap())
603 }
604 Ok(false) => {
605 Ok(Response::builder()
606 .status(StatusCode::NOT_FOUND)
607 .body(Full::new(Bytes::from("Session not found")))
608 .unwrap())
609 }
610 Err(err) => {
611 error!("Error deleting session {}: {}", session_id, err);
612 Ok(Response::builder()
613 .status(StatusCode::INTERNAL_SERVER_ERROR)
614 .body(Full::new(Bytes::from("Session deletion error")))
615 .unwrap())
616 }
617 }
618 } else {
619 Ok(Response::builder()
620 .status(StatusCode::BAD_REQUEST)
621 .body(Full::new(Bytes::from("Missing Mcp-Session-Id header")))
622 .unwrap())
623 }
624 }
625
626 fn handle_preflight(&self) -> Response<JsonRpcBody> {
628 options_response()
629 }
630
631 fn method_not_allowed(&self) -> Response<JsonRpcBody> {
633 method_not_allowed_response()
634 }
635
636 async fn validate_session_exists(&self, session_id: &str) -> Result<()> {
638 match self.session_storage.get_session(session_id).await {
640 Ok(Some(_)) => {
641 debug!("Session validation successful: {}", session_id);
642 Ok(())
643 }
644 Ok(None) => {
645 error!("Session not found: {}", session_id);
646 Err(crate::HttpMcpError::InvalidRequest(
647 format!("Session '{}' not found. Sessions must be created via initialize request first.", session_id)
648 ))
649 }
650 Err(err) => {
651 error!("Failed to validate session {}: {}", session_id, err);
652 Err(crate::HttpMcpError::InvalidRequest(format!("Session validation failed: {}", err)))
653 }
654 }
655 }
656}