1use std::collections::HashMap;
13use std::pin::Pin;
14use std::sync::Arc;
15
16use bytes::Bytes;
17use futures::Stream;
18use http_body::Body;
19use http_body_util::{BodyExt, Full};
20use hyper::header::{ACCEPT, CONTENT_TYPE};
21use hyper::{HeaderMap, Method, Request, Response, StatusCode};
22use serde_json::Value;
23use tracing::{debug, error, warn};
24
25use crate::ServerConfig;
26
27#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
29pub enum McpProtocolVersion {
30 V2024_11_05,
32 V2025_03_26,
34 #[default]
36 V2025_06_18,
37}
38
39impl McpProtocolVersion {
40 pub fn parse_version(s: &str) -> Option<Self> {
42 match s {
43 "2024-11-05" => Some(Self::V2024_11_05),
44 "2025-03-26" => Some(Self::V2025_03_26),
45 "2025-06-18" => Some(Self::V2025_06_18),
46 _ => None,
47 }
48 }
49
50 pub fn as_str(&self) -> &'static str {
52 match self {
53 Self::V2024_11_05 => "2024-11-05",
54 Self::V2025_03_26 => "2025-03-26",
55 Self::V2025_06_18 => "2025-06-18",
56 }
57 }
58
59 pub fn supports_streamable_http(&self) -> bool {
61 matches!(self, Self::V2025_03_26 | Self::V2025_06_18)
62 }
63
64 pub fn supports_meta_fields(&self) -> bool {
66 matches!(self, Self::V2025_06_18)
67 }
68
69 pub fn supports_cursors(&self) -> bool {
71 matches!(self, Self::V2025_06_18)
72 }
73
74 pub fn supports_progress_tokens(&self) -> bool {
76 matches!(self, Self::V2025_06_18)
77 }
78
79 pub fn supports_elicitation(&self) -> bool {
81 matches!(self, Self::V2025_06_18)
82 }
83
84 pub fn supported_features(&self) -> Vec<&'static str> {
86 let mut features = vec![];
87 if self.supports_streamable_http() {
88 features.push("streamable-http");
89 }
90 if self.supports_meta_fields() {
91 features.push("_meta-fields");
92 }
93 if self.supports_cursors() {
94 features.push("cursor-pagination");
95 }
96 if self.supports_progress_tokens() {
97 features.push("progress-tokens");
98 }
99 if self.supports_elicitation() {
100 features.push("elicitation");
101 }
102 features
103 }
104}
105
106#[derive(Debug, Clone)]
108pub struct StreamableHttpContext {
109 pub protocol_version: McpProtocolVersion,
111 pub session_id: Option<String>,
113 pub wants_sse_stream: bool,
115 pub accepts_stream_frames: bool,
117 pub headers: HashMap<String, String>,
119}
120
121impl StreamableHttpContext {
122 pub fn from_request<T>(req: &Request<T>) -> Self {
124 let headers = req.headers();
125
126 let protocol_version = headers
128 .get("MCP-Protocol-Version")
129 .and_then(|h| h.to_str().ok())
130 .and_then(McpProtocolVersion::parse_version)
131 .unwrap_or_default();
132
133 let session_id = headers
135 .get("Mcp-Session-Id")
136 .and_then(|h| h.to_str().ok())
137 .map(|s| s.to_string());
138
139 let accept_header = headers
141 .get(ACCEPT)
142 .and_then(|h| h.to_str().ok())
143 .unwrap_or_default()
144 .to_ascii_lowercase();
145
146 let wants_sse_stream = accept_header.contains("text/event-stream");
147 let accepts_stream_frames = accept_header.contains("application/json")
148 || accept_header.contains("text/event-stream")
149 || accept_header.contains("*/*");
150
151 let mut header_map = HashMap::new();
153 for (name, value) in headers.iter() {
154 if let Ok(value_str) = value.to_str() {
155 header_map.insert(name.to_string(), value_str.to_string());
156 }
157 }
158
159 Self {
160 protocol_version,
161 session_id,
162 wants_sse_stream,
163 accepts_stream_frames,
164 headers: header_map,
165 }
166 }
167
168 pub fn wants_sse_stream(&self) -> bool {
170 self.wants_sse_stream
171 }
172
173 pub fn wants_streaming_post(&self) -> bool {
175 self.accepts_stream_frames && self.wants_sse_stream
176 }
177
178 pub fn is_streamable_compatible(&self) -> bool {
180 self.protocol_version.supports_streamable_http() && self.accepts_stream_frames
181 }
182
183 pub fn validate(&self, method: &Method) -> std::result::Result<(), String> {
185 if !self.accepts_stream_frames {
186 return Err(
187 "Accept header must include application/json, text/event-stream, or */*"
188 .to_string(),
189 );
190 }
191
192 if self.wants_sse_stream && !self.protocol_version.supports_streamable_http() {
193 return Err(format!(
194 "Protocol version {} does not support streamable HTTP",
195 self.protocol_version.as_str()
196 ));
197 }
198
199 if method == &Method::GET && self.wants_sse_stream && self.session_id.is_none() {
202 return Err("Mcp-Session-Id header required for SSE streaming connections".to_string());
203 }
204
205 Ok(())
206 }
207
208 pub fn response_headers(&self) -> HeaderMap {
210 let mut headers = HeaderMap::new();
211
212 headers.insert(
214 "MCP-Protocol-Version",
215 self.protocol_version.as_str().parse().unwrap(),
216 );
217
218 if let Some(session_id) = &self.session_id {
220 headers.insert("Mcp-Session-Id", session_id.parse().unwrap());
221 }
222
223 let features = self.protocol_version.supported_features();
225 if !features.is_empty() {
226 headers.insert("MCP-Capabilities", features.join(",").parse().unwrap());
227 }
228
229 headers
230 }
231}
232
233pub enum StreamableResponse {
235 Json(Value),
237 Stream(Pin<Box<dyn Stream<Item = std::result::Result<Value, String>> + Send>>),
239 Error { status: StatusCode, message: String },
241}
242
243impl std::fmt::Debug for StreamableResponse {
244 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
245 match self {
246 Self::Json(value) => f.debug_tuple("Json").field(value).finish(),
247 Self::Stream(_) => f.debug_tuple("Stream").field(&"<stream>").finish(),
248 Self::Error { status, message } => f
249 .debug_struct("Error")
250 .field("status", status)
251 .field("message", message)
252 .finish(),
253 }
254 }
255}
256
257impl StreamableResponse {
258 pub fn into_response(self, context: &StreamableHttpContext) -> Response<Full<Bytes>> {
260 let mut response_headers = context.response_headers();
261
262 match self {
263 StreamableResponse::Json(json) => {
264 response_headers.insert(CONTENT_TYPE, "application/json".parse().unwrap());
265
266 let body = serde_json::to_string(&json)
267 .unwrap_or_else(|_| r#"{"error": "Failed to serialize response"}"#.to_string());
268
269 Response::builder()
270 .status(StatusCode::OK)
271 .body(Full::new(Bytes::from(body)))
272 .unwrap()
273 }
274
275 StreamableResponse::Stream(_stream) => {
276 response_headers.insert(CONTENT_TYPE, "text/event-stream".parse().unwrap());
278 response_headers.insert("Cache-Control", "no-cache, no-transform".parse().unwrap());
279 response_headers.insert("Connection", "keep-alive".parse().unwrap());
280
281 Response::builder()
285 .status(StatusCode::ACCEPTED)
286 .body(Full::new(Bytes::from("Streaming response accepted")))
287 .unwrap()
288 }
289
290 StreamableResponse::Error { status, message } => {
291 response_headers.insert(CONTENT_TYPE, "application/json".parse().unwrap());
292
293 let error_json = serde_json::json!({
294 "error": {
295 "code": status.as_u16(),
296 "message": message
297 }
298 });
299
300 let body = serde_json::to_string(&error_json).unwrap_or_else(|_| {
301 r#"{"error": {"code": 500, "message": "Internal server error"}}"#.to_string()
302 });
303
304 Response::builder()
305 .status(status)
306 .body(Full::new(Bytes::from(body)))
307 .unwrap()
308 }
309 }
310 }
311
312 pub fn into_boxed_response(
314 self,
315 context: &StreamableHttpContext,
316 ) -> Response<http_body_util::combinators::UnsyncBoxBody<Bytes, hyper::Error>> {
317 self.into_response(context)
318 .map(|body| body.map_err(|never| match never {}).boxed_unsync())
319 }
320}
321
322#[derive(Clone)]
324pub struct StreamableHttpHandler {
325 config: Arc<ServerConfig>,
326 dispatcher: Arc<turul_mcp_json_rpc_server::JsonRpcDispatcher<turul_mcp_protocol::McpError>>,
327 session_storage: Arc<turul_mcp_session_storage::BoxedSessionStorage>,
328 stream_manager: Arc<crate::StreamManager>,
329 server_capabilities: turul_mcp_protocol::ServerCapabilities,
330}
331
332impl StreamableHttpHandler {
333 pub fn new(
334 config: Arc<ServerConfig>,
335 dispatcher: Arc<turul_mcp_json_rpc_server::JsonRpcDispatcher<turul_mcp_protocol::McpError>>,
336 session_storage: Arc<turul_mcp_session_storage::BoxedSessionStorage>,
337 stream_manager: Arc<crate::StreamManager>,
338 server_capabilities: turul_mcp_protocol::ServerCapabilities,
339 ) -> Self {
340 Self {
341 config,
342 dispatcher,
343 session_storage,
344 stream_manager,
345 server_capabilities,
346 }
347 }
348
349 pub async fn handle_request<T>(
351 &self,
352 req: Request<T>,
353 ) -> Response<http_body_util::combinators::UnsyncBoxBody<Bytes, hyper::Error>>
354 where
355 T: Body + Send + 'static,
356 T::Data: Send,
357 T::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
358 {
359 debug!(
360 "Streamable handler request: method={}, uri={}",
361 req.method(),
362 req.uri()
363 );
364 let context = StreamableHttpContext::from_request(&req);
366
367 debug!(
368 "Streamable handler entry: method={}, protocol={}, session={:?}, accepts_stream_frames={}, wants_sse_stream={}",
369 req.method(),
370 context.protocol_version.as_str(),
371 context.session_id,
372 context.accepts_stream_frames,
373 context.wants_sse_stream()
374 );
375
376 if let Err(error) = context.validate(req.method()) {
378 warn!("Invalid streamable HTTP request: {}", error);
379 return StreamableResponse::Error {
380 status: StatusCode::BAD_REQUEST,
381 message: error,
382 }
383 .into_boxed_response(&context);
384 }
385
386 match req.method() {
388 &Method::POST => {
389 self.handle_client_message(req, context).await
392 }
393 &Method::GET => {
394 self.handle_sse_stream(req, context).await
396 }
397 &Method::DELETE => {
398 self.handle_session_delete(req, context).await
400 }
401 _ => StreamableResponse::Error {
402 status: StatusCode::METHOD_NOT_ALLOWED,
403 message: "Method not allowed for this endpoint".to_string(),
404 }
405 .into_boxed_response(&context),
406 }
407 }
408
409 async fn handle_sse_stream<T>(
411 &self,
412 req: Request<T>,
413 context: StreamableHttpContext,
414 ) -> Response<http_body_util::combinators::UnsyncBoxBody<Bytes, hyper::Error>>
415 where
416 T: Body + Send + 'static,
417 {
418 debug!(
419 "Opening streaming connection for session: {:?}",
420 context.session_id
421 );
422
423 let session_id = match context.session_id {
425 Some(ref id) => id.clone(),
426 None => {
427 warn!("Missing session ID for streaming GET request");
428 return StreamableResponse::Error {
429 status: StatusCode::BAD_REQUEST,
430 message: "Mcp-Session-Id header required for streaming connection".to_string(),
431 }
432 .into_boxed_response(&context);
433 }
434 };
435
436 match self.validate_session_exists(&session_id).await {
438 Ok(_) => {
439 debug!(
440 "Session validation successful for streaming GET: {}",
441 session_id
442 );
443 }
444 Err(err) => {
445 error!(
446 "Session validation failed for streaming GET {}: {}",
447 session_id, err
448 );
449 return StreamableResponse::Error {
450 status: StatusCode::UNAUTHORIZED,
451 message: format!("Session validation failed: {}", err),
452 }
453 .into_boxed_response(&context);
454 }
455 }
456
457 let last_event_id = req
463 .headers()
464 .get("Last-Event-ID")
465 .and_then(|h| h.to_str().ok())
466 .and_then(|s| s.parse::<u64>().ok());
467
468 let connection_id = uuid::Uuid::now_v7().to_string();
470
471 debug!(
472 "Creating streamable HTTP connection: session={}, connection={}, last_event_id={:?}",
473 session_id, connection_id, last_event_id
474 );
475
476 match self
480 .stream_manager
481 .handle_sse_connection(session_id.clone(), connection_id.clone(), last_event_id)
482 .await
483 {
484 Ok(mut streaming_response) => {
485 debug!(
486 "Streamable HTTP connection established: session={}, connection={}",
487 session_id, connection_id
488 );
489
490 let mcp_headers = context.response_headers();
492 for (key, value) in mcp_headers.iter() {
493 streaming_response.headers_mut().insert(key, value.clone());
494 }
495
496 streaming_response
499 }
500 Err(err) => {
501 error!("Failed to create streamable HTTP connection: {}", err);
502 StreamableResponse::Error {
503 status: StatusCode::INTERNAL_SERVER_ERROR,
504 message: format!("Streaming connection failed: {}", err),
505 }
506 .into_boxed_response(&context)
507 }
508 }
509 }
510
511 async fn validate_session_exists(&self, session_id: &str) -> std::result::Result<(), String> {
513 match self.session_storage.get_session(session_id).await {
514 Ok(Some(_)) => {
515 debug!("Session validation successful: {}", session_id);
516 Ok(())
517 }
518 Ok(None) => {
519 error!("Session not found: {}", session_id);
520 Err(format!(
521 "Session '{}' not found. Sessions must be created via initialize request first.",
522 session_id
523 ))
524 }
525 Err(err) => {
526 error!("Failed to validate session {}: {}", session_id, err);
527 Err(format!("Session validation failed: {}", err))
528 }
529 }
530 }
531
532 #[allow(dead_code)]
534 async fn handle_streaming_post<T>(
535 &self,
536 req: Request<T>,
537 context: StreamableHttpContext,
538 ) -> Response<http_body_util::combinators::UnsyncBoxBody<Bytes, hyper::Error>>
539 where
540 T: Body + Send + 'static,
541 {
542 debug!(
543 "Handling streaming POST for session: {:?}",
544 context.session_id
545 );
546
547 let session_id = match context.session_id {
549 Some(ref id) => id.clone(),
550 None => {
551 warn!("Missing session ID for streaming POST request");
552 return StreamableResponse::Error {
553 status: StatusCode::BAD_REQUEST,
554 message: "Mcp-Session-Id header required for streaming POST".to_string(),
555 }
556 .into_boxed_response(&context);
557 }
558 };
559
560 if let Err(err) = self.validate_session_exists(&session_id).await {
562 error!(
563 "Session validation failed for streaming POST {}: {}",
564 session_id, err
565 );
566 return StreamableResponse::Error {
567 status: StatusCode::UNAUTHORIZED,
568 message: format!("Session validation failed: {}", err),
569 }
570 .into_boxed_response(&context);
571 }
572
573 let content_type = req
575 .headers()
576 .get(CONTENT_TYPE)
577 .and_then(|ct| ct.to_str().ok())
578 .unwrap_or("");
579
580 if !content_type.starts_with("application/json") {
581 warn!("Invalid content type for streaming POST: {}", content_type);
582 return StreamableResponse::Error {
583 status: StatusCode::BAD_REQUEST,
584 message: "Content-Type must be application/json".to_string(),
585 }
586 .into_boxed_response(&context);
587 }
588
589 let body_bytes = match req.into_body().collect().await {
591 Ok(collected) => collected.to_bytes(),
592 Err(_err) => {
593 error!("Failed to read streaming POST request body");
594 return StreamableResponse::Error {
595 status: StatusCode::BAD_REQUEST,
596 message: "Failed to read request body".to_string(),
597 }
598 .into_boxed_response(&context);
599 }
600 };
601
602 if body_bytes.len() > self.config.max_body_size {
604 warn!(
605 "Streaming POST request body too large: {} bytes",
606 body_bytes.len()
607 );
608 return StreamableResponse::Error {
609 status: StatusCode::PAYLOAD_TOO_LARGE,
610 message: "Request body too large".to_string(),
611 }
612 .into_boxed_response(&context);
613 }
614
615 let body_str = match std::str::from_utf8(&body_bytes) {
617 Ok(s) => s,
618 Err(err) => {
619 error!("Invalid UTF-8 in streaming POST request body: {}", err);
620 return StreamableResponse::Error {
621 status: StatusCode::BAD_REQUEST,
622 message: "Request body must be valid UTF-8".to_string(),
623 }
624 .into_boxed_response(&context);
625 }
626 };
627
628 debug!("Received streaming POST JSON-RPC request: {}", body_str);
629
630 use crate::notification_bridge::StreamManagerNotificationBroadcaster;
632 use turul_mcp_json_rpc_server::r#async::SessionContext;
633 use turul_mcp_json_rpc_server::dispatch::{
634 JsonRpcMessage, JsonRpcMessageResult, parse_json_rpc_message,
635 };
636
637 let message = match parse_json_rpc_message(body_str) {
638 Ok(msg) => msg,
639 Err(rpc_err) => {
640 error!("JSON-RPC parse error in streaming POST: {}", rpc_err);
641 let error_json =
642 serde_json::to_string(&rpc_err).unwrap_or_else(|_| "{}".to_string());
643 return Response::builder()
644 .status(StatusCode::OK) .header(CONTENT_TYPE, "application/json")
646 .header("MCP-Protocol-Version", context.protocol_version.as_str())
647 .body(Full::new(Bytes::from(error_json)))
648 .unwrap()
649 .map(|body| body.map_err(|never| match never {}).boxed_unsync())
650 .map(|body| body.map_err(|never| match never {}).boxed_unsync());
651 }
652 };
653
654 let broadcaster = Arc::new(StreamManagerNotificationBroadcaster::new(Arc::clone(
656 &self.stream_manager,
657 )));
658 let broadcaster_any = Arc::new(broadcaster) as Arc<dyn std::any::Any + Send + Sync>;
659
660 let session_context = SessionContext {
661 session_id: session_id.clone(),
662 metadata: std::collections::HashMap::new(),
663 broadcaster: Some(broadcaster_any),
664 timestamp: chrono::Utc::now().timestamp_millis() as u64,
665 };
666
667 let message_result = match message {
668 JsonRpcMessage::Request(request) => {
669 debug!(
670 "Processing streaming POST JSON-RPC request: method={}",
671 request.method
672 );
673 let response = self
674 .dispatcher
675 .handle_request_with_context(request, session_context)
676 .await;
677
678 match response {
680 turul_mcp_json_rpc_server::JsonRpcMessage::Response(resp) => {
681 JsonRpcMessageResult::Response(resp)
682 }
683 turul_mcp_json_rpc_server::JsonRpcMessage::Error(err) => {
684 JsonRpcMessageResult::Error(err)
685 }
686 }
687 }
688 JsonRpcMessage::Notification(notification) => {
689 debug!(
690 "Processing streaming POST JSON-RPC notification: method={}",
691 notification.method
692 );
693
694 let result = self
695 .dispatcher
696 .handle_notification_with_context(notification, Some(session_context))
697 .await;
698
699 if let Err(err) = result {
700 error!("Streaming POST notification handling error: {}", err);
701 }
702 JsonRpcMessageResult::NoResponse
703 }
704 };
705
706 match message_result {
709 JsonRpcMessageResult::Response(response) => {
710 let response_json = serde_json::to_string(&response)
711 .unwrap_or_else(|_| r#"{"error": "Failed to serialize response"}"#.to_string());
712
713 Response::builder()
714 .status(StatusCode::OK)
715 .header(CONTENT_TYPE, "application/json")
716 .header("MCP-Protocol-Version", context.protocol_version.as_str())
717 .header("Mcp-Session-Id", &session_id)
718 .body(Full::new(Bytes::from(response_json)))
719 .unwrap()
720 .map(|body| body.map_err(|never| match never {}).boxed_unsync())
721 .map(|body| body.map_err(|never| match never {}).boxed_unsync())
722 }
723 JsonRpcMessageResult::Error(error) => {
724 let error_json = serde_json::to_string(&error)
725 .unwrap_or_else(|_| r#"{"error": "Internal error"}"#.to_string());
726
727 Response::builder()
728 .status(StatusCode::OK) .header(CONTENT_TYPE, "application/json")
730 .header("MCP-Protocol-Version", context.protocol_version.as_str())
731 .header("Mcp-Session-Id", &session_id)
732 .body(Full::new(Bytes::from(error_json)))
733 .unwrap()
734 .map(|body| body.map_err(|never| match never {}).boxed_unsync())
735 .map(|body| body.map_err(|never| match never {}).boxed_unsync())
736 }
737 JsonRpcMessageResult::NoResponse => {
738 Response::builder()
740 .status(StatusCode::ACCEPTED)
741 .header("MCP-Protocol-Version", context.protocol_version.as_str())
742 .header("Mcp-Session-Id", &session_id)
743 .body(Full::new(Bytes::new()))
744 .unwrap()
745 .map(|body| body.map_err(|never| match never {}).boxed_unsync())
746 .map(|body| body.map_err(|never| match never {}).boxed_unsync())
747 }
748 }
749 }
750
751 #[allow(dead_code)]
753 async fn handle_json_post<T>(
754 &self,
755 req: Request<T>,
756 context: StreamableHttpContext,
757 ) -> Response<http_body_util::combinators::UnsyncBoxBody<Bytes, hyper::Error>>
758 where
759 T: Body + Send + 'static,
760 {
761 debug!("Handling JSON POST (non-streaming/legacy)");
762
763 let content_type = req
767 .headers()
768 .get(CONTENT_TYPE)
769 .and_then(|ct| ct.to_str().ok())
770 .unwrap_or("");
771
772 if !content_type.starts_with("application/json") {
773 warn!("Invalid content type for legacy POST: {}", content_type);
774 return StreamableResponse::Error {
775 status: StatusCode::BAD_REQUEST,
776 message: "Content-Type must be application/json".to_string(),
777 }
778 .into_boxed_response(&context);
779 }
780
781 let body_bytes = match req.into_body().collect().await {
783 Ok(collected) => collected.to_bytes(),
784 Err(_err) => {
785 error!("Failed to read legacy POST request body");
786 return StreamableResponse::Error {
787 status: StatusCode::BAD_REQUEST,
788 message: "Failed to read request body".to_string(),
789 }
790 .into_boxed_response(&context);
791 }
792 };
793
794 if body_bytes.len() > self.config.max_body_size {
796 warn!(
797 "Legacy POST request body too large: {} bytes",
798 body_bytes.len()
799 );
800 return StreamableResponse::Error {
801 status: StatusCode::PAYLOAD_TOO_LARGE,
802 message: "Request body too large".to_string(),
803 }
804 .into_boxed_response(&context);
805 }
806
807 let body_str = match std::str::from_utf8(&body_bytes) {
809 Ok(s) => s,
810 Err(err) => {
811 error!("Invalid UTF-8 in legacy POST request body: {}", err);
812 return StreamableResponse::Error {
813 status: StatusCode::BAD_REQUEST,
814 message: "Request body must be valid UTF-8".to_string(),
815 }
816 .into_boxed_response(&context);
817 }
818 };
819
820 debug!("Received legacy POST JSON-RPC request: {}", body_str);
821
822 use turul_mcp_json_rpc_server::dispatch::{
824 JsonRpcMessage, JsonRpcMessageResult, parse_json_rpc_message,
825 };
826
827 let message = match parse_json_rpc_message(body_str) {
828 Ok(msg) => msg,
829 Err(rpc_err) => {
830 error!("JSON-RPC parse error in legacy POST: {}", rpc_err);
831 let error_json =
832 serde_json::to_string(&rpc_err).unwrap_or_else(|_| "{}".to_string());
833 return Response::builder()
834 .status(StatusCode::OK) .header(CONTENT_TYPE, "application/json")
836 .header("MCP-Protocol-Version", context.protocol_version.as_str())
837 .body(Full::new(Bytes::from(error_json)))
838 .unwrap()
839 .map(|body| body.map_err(|never| match never {}).boxed_unsync());
840 }
841 };
842
843 let message_result = match message {
846 JsonRpcMessage::Request(request) => {
847 debug!(
848 "Processing legacy POST JSON-RPC request: method={}",
849 request.method
850 );
851
852 let response = if request.method == "initialize" {
854 debug!("Handling legacy initialize request - creating new session");
855
856 match self
858 .session_storage
859 .create_session(self.server_capabilities.clone())
860 .await
861 {
862 Ok(session_info) => {
863 debug!(
864 "Created new session for legacy client: {}",
865 session_info.session_id
866 );
867
868 use crate::notification_bridge::StreamManagerNotificationBroadcaster;
870 use turul_mcp_json_rpc_server::r#async::SessionContext;
871
872 let broadcaster = Arc::new(StreamManagerNotificationBroadcaster::new(
873 Arc::clone(&self.stream_manager),
874 ));
875 let broadcaster_any =
876 Arc::new(broadcaster) as Arc<dyn std::any::Any + Send + Sync>;
877
878 let session_context = SessionContext {
879 session_id: session_info.session_id.clone(),
880 metadata: std::collections::HashMap::new(),
881 broadcaster: Some(broadcaster_any),
882 timestamp: chrono::Utc::now().timestamp_millis() as u64,
883 };
884
885 self.dispatcher
886 .handle_request_with_context(request, session_context)
887 .await
888 }
889 Err(err) => {
890 error!("Failed to create session during legacy initialize: {}", err);
891 let error_msg = format!("Session creation failed: {}", err);
892 turul_mcp_json_rpc_server::JsonRpcMessage::error(
893 turul_mcp_json_rpc_server::JsonRpcError::internal_error(
894 Some(request.id),
895 Some(error_msg),
896 ),
897 )
898 }
899 }
900 } else {
901 self.dispatcher.handle_request(request).await
903 };
904
905 match response {
907 turul_mcp_json_rpc_server::JsonRpcMessage::Response(resp) => {
908 JsonRpcMessageResult::Response(resp)
909 }
910 turul_mcp_json_rpc_server::JsonRpcMessage::Error(err) => {
911 JsonRpcMessageResult::Error(err)
912 }
913 }
914 }
915 JsonRpcMessage::Notification(notification) => {
916 debug!(
917 "Processing legacy POST JSON-RPC notification: method={}",
918 notification.method
919 );
920
921 let result = self
923 .dispatcher
924 .handle_notification_with_context(notification, None)
925 .await;
926
927 if let Err(err) = result {
928 error!("Legacy POST notification handling error: {}", err);
929 }
930 JsonRpcMessageResult::NoResponse
931 }
932 };
933
934 match message_result {
936 JsonRpcMessageResult::Response(response) => {
937 let response_json = serde_json::to_string(&response)
938 .unwrap_or_else(|_| r#"{"error": "Failed to serialize response"}"#.to_string());
939
940 Response::builder()
941 .status(StatusCode::OK)
942 .header(CONTENT_TYPE, "application/json")
943 .header("MCP-Protocol-Version", context.protocol_version.as_str())
944 .body(Full::new(Bytes::from(response_json)))
945 .unwrap()
946 .map(|body| body.map_err(|never| match never {}).boxed_unsync())
947 }
948 JsonRpcMessageResult::Error(error) => {
949 let error_json = serde_json::to_string(&error)
950 .unwrap_or_else(|_| r#"{"error": "Internal error"}"#.to_string());
951
952 Response::builder()
953 .status(StatusCode::OK) .header(CONTENT_TYPE, "application/json")
955 .header("MCP-Protocol-Version", context.protocol_version.as_str())
956 .body(Full::new(Bytes::from(error_json)))
957 .unwrap()
958 .map(|body| body.map_err(|never| match never {}).boxed_unsync())
959 }
960 JsonRpcMessageResult::NoResponse => {
961 Response::builder()
963 .status(StatusCode::ACCEPTED)
964 .header("MCP-Protocol-Version", context.protocol_version.as_str())
965 .body(Full::new(Bytes::new()))
966 .unwrap()
967 .map(|body| body.map_err(|never| match never {}).boxed_unsync())
968 }
969 }
970 }
971
972 async fn handle_session_delete<T>(
974 &self,
975 _req: Request<T>,
976 context: StreamableHttpContext,
977 ) -> Response<http_body_util::combinators::UnsyncBoxBody<Bytes, hyper::Error>>
978 where
979 T: Body + Send + 'static,
980 {
981 if let Some(session_id) = &context.session_id {
982 debug!("Deleting session: {}", session_id);
983
984 let closed_connections = self
987 .stream_manager
988 .close_session_connections(session_id)
989 .await;
990 debug!(
991 "Closed {} streaming connections for session: {}",
992 closed_connections, session_id
993 );
994
995 match self.session_storage.get_session(session_id).await {
997 Ok(Some(mut session_info)) => {
998 session_info
1000 .state
1001 .insert("terminated".to_string(), serde_json::Value::Bool(true));
1002 session_info.state.insert(
1003 "terminated_at".to_string(),
1004 serde_json::Value::Number(serde_json::Number::from(
1005 chrono::Utc::now().timestamp_millis(),
1006 )),
1007 );
1008 session_info.touch();
1009
1010 match self.session_storage.update_session(session_info).await {
1012 Ok(()) => {
1013 debug!(
1014 "Session {} marked as terminated (TTL will handle cleanup)",
1015 session_id
1016 );
1017
1018 Response::builder()
1020 .status(StatusCode::OK)
1021 .header(CONTENT_TYPE, "application/json")
1022 .header("MCP-Protocol-Version", context.protocol_version.as_str())
1023 .header("Mcp-Session-Id", session_id)
1024 .body(Full::new(Bytes::from(
1025 serde_json::to_string(&serde_json::json!({
1026 "status": "session_terminated",
1027 "session_id": session_id,
1028 "closed_connections": closed_connections,
1029 "message": "Session marked for cleanup"
1030 }))
1031 .unwrap_or_else(|_| {
1032 r#"{"status":"session_terminated"}"#.to_string()
1033 }),
1034 )))
1035 .unwrap()
1036 .map(|body| body.map_err(|never| match never {}).boxed_unsync())
1037 }
1038 Err(err) => {
1039 error!(
1040 "Error marking session {} as terminated: {}",
1041 session_id, err
1042 );
1043 match self.session_storage.delete_session(session_id).await {
1045 Ok(_) => {
1046 debug!("Session {} deleted as fallback", session_id);
1047 Response::builder()
1048 .status(StatusCode::OK)
1049 .header(CONTENT_TYPE, "application/json")
1050 .header(
1051 "MCP-Protocol-Version",
1052 context.protocol_version.as_str(),
1053 )
1054 .body(Full::new(Bytes::from(
1055 serde_json::to_string(&serde_json::json!({
1056 "status": "session_deleted",
1057 "session_id": session_id,
1058 "closed_connections": closed_connections,
1059 "message": "Session removed"
1060 }))
1061 .unwrap_or_else(|_| {
1062 r#"{"status":"session_deleted"}"#.to_string()
1063 }),
1064 )))
1065 .unwrap()
1066 .map(|body| {
1067 body.map_err(|never| match never {}).boxed_unsync()
1068 })
1069 }
1070 Err(delete_err) => {
1071 error!(
1072 "Error deleting session {} as fallback: {}",
1073 session_id, delete_err
1074 );
1075 StreamableResponse::Error {
1076 status: StatusCode::INTERNAL_SERVER_ERROR,
1077 message: "Session termination error".to_string(),
1078 }
1079 .into_boxed_response(&context)
1080 }
1081 }
1082 }
1083 }
1084 }
1085 Ok(None) => {
1086 Response::builder()
1088 .status(StatusCode::NOT_FOUND)
1089 .header(CONTENT_TYPE, "application/json")
1090 .header("MCP-Protocol-Version", context.protocol_version.as_str())
1091 .body(Full::new(Bytes::from(
1092 serde_json::to_string(&serde_json::json!({
1093 "status": "session_not_found",
1094 "session_id": session_id,
1095 "message": "Session not found"
1096 }))
1097 .unwrap_or_else(|_| r#"{"status":"session_not_found"}"#.to_string()),
1098 )))
1099 .unwrap()
1100 .map(|body| body.map_err(|never| match never {}).boxed_unsync())
1101 }
1102 Err(err) => {
1103 error!(
1104 "Error retrieving session {} for termination: {}",
1105 session_id, err
1106 );
1107 StreamableResponse::Error {
1108 status: StatusCode::INTERNAL_SERVER_ERROR,
1109 message: "Session lookup error".to_string(),
1110 }
1111 .into_boxed_response(&context)
1112 }
1113 }
1114 } else {
1115 StreamableResponse::Error {
1116 status: StatusCode::BAD_REQUEST,
1117 message: "Mcp-Session-Id header required for session deletion".to_string(),
1118 }
1119 .into_boxed_response(&context)
1120 }
1121 }
1122
1123 async fn handle_streaming_post_real<T>(
1126 &self,
1127 req: Request<T>,
1128 mut context: StreamableHttpContext,
1129 ) -> Response<http_body_util::combinators::UnsyncBoxBody<Bytes, hyper::Error>>
1130 where
1131 T: Body + Send + 'static,
1132 {
1133 debug!("Streaming handler called - using true streaming POST");
1134
1135 let body_bytes = match req.into_body().collect().await {
1137 Ok(collected) => collected.to_bytes(),
1138 Err(_err) => {
1139 error!("Failed to read streaming POST request body");
1140 return StreamableResponse::Error {
1141 status: StatusCode::BAD_REQUEST,
1142 message: "Failed to read request body".to_string(),
1143 }
1144 .into_boxed_response(&context);
1145 }
1146 };
1147
1148 if body_bytes.len() > self.config.max_body_size {
1150 warn!(
1151 "Streaming POST request body too large: {} bytes",
1152 body_bytes.len()
1153 );
1154 return StreamableResponse::Error {
1155 status: StatusCode::PAYLOAD_TOO_LARGE,
1156 message: "Request body too large".to_string(),
1157 }
1158 .into_boxed_response(&context);
1159 }
1160
1161 let body_str = match std::str::from_utf8(&body_bytes) {
1163 Ok(s) => s,
1164 Err(err) => {
1165 error!("Invalid UTF-8 in streaming POST request body: {}", err);
1166 return StreamableResponse::Error {
1167 status: StatusCode::BAD_REQUEST,
1168 message: "Request body must be valid UTF-8".to_string(),
1169 }
1170 .into_boxed_response(&context);
1171 }
1172 };
1173
1174 debug!("Streaming POST received JSON-RPC request: {}", body_str);
1175
1176 use turul_mcp_json_rpc_server::dispatch::{JsonRpcMessage, parse_json_rpc_message};
1178 use turul_mcp_json_rpc_server::error::JsonRpcErrorObject;
1179
1180 let message = match parse_json_rpc_message(body_str) {
1181 Ok(msg) => msg,
1182 Err(rpc_err) => {
1183 error!("JSON-RPC parse error in streaming POST: {}", rpc_err);
1184 let error_json =
1185 serde_json::to_string(&rpc_err).unwrap_or_else(|_| "{}".to_string());
1186
1187 return Response::builder()
1189 .status(StatusCode::OK) .header(CONTENT_TYPE, "application/json")
1191 .header("MCP-Protocol-Version", context.protocol_version.as_str())
1192 .body(
1193 Full::new(Bytes::from(error_json))
1194 .map_err(|never| match never {})
1195 .boxed_unsync(),
1196 )
1197 .unwrap();
1198 }
1199 };
1200
1201 let session_id = match &message {
1203 JsonRpcMessage::Request(req) if req.method == "initialize" => {
1204 if let Some(existing_id) = &context.session_id {
1206 if let Err(err) = self.validate_session_exists(existing_id).await {
1208 warn!(
1209 "Invalid session ID {} during initialize: {}",
1210 existing_id, err
1211 );
1212 return StreamableResponse::Error {
1213 status: StatusCode::UNAUTHORIZED,
1214 message: "Invalid or expired session".to_string(),
1215 }
1216 .into_boxed_response(&context);
1217 }
1218 existing_id.clone()
1219 } else {
1220 match self
1222 .session_storage
1223 .create_session(self.server_capabilities.clone())
1224 .await
1225 {
1226 Ok(session_info) => {
1227 debug!(
1228 "Created new session for initialize: {}",
1229 session_info.session_id
1230 );
1231 context.session_id = Some(session_info.session_id.clone());
1232 session_info.session_id
1233 }
1234 Err(err) => {
1235 error!("Failed to create session during initialize: {}", err);
1236 return StreamableResponse::Error {
1237 status: StatusCode::INTERNAL_SERVER_ERROR,
1238 message: "Failed to create session".to_string(),
1239 }
1240 .into_boxed_response(&context);
1241 }
1242 }
1243 }
1244 }
1245 JsonRpcMessage::Request(_) | JsonRpcMessage::Notification(_) => {
1246 if let Some(existing_id) = &context.session_id {
1248 if let Err(err) = self.validate_session_exists(existing_id).await {
1250 warn!("Invalid session ID {}: {}", existing_id, err);
1251 return StreamableResponse::Error {
1252 status: StatusCode::UNAUTHORIZED,
1253 message: "Invalid or expired session".to_string(),
1254 }
1255 .into_boxed_response(&context);
1256 }
1257 existing_id.clone()
1258 } else {
1259 let method_name = match &message {
1261 JsonRpcMessage::Request(req) => &req.method,
1262 JsonRpcMessage::Notification(notif) => ¬if.method,
1263 };
1264 let request_id = match &message {
1265 JsonRpcMessage::Request(req) => Some(req.id.clone()),
1266 JsonRpcMessage::Notification(_) => None,
1267 };
1268
1269 warn!("Missing session ID for method: {}", method_name);
1270
1271 let error_response = turul_mcp_json_rpc_server::JsonRpcError::new(
1272 request_id,
1273 JsonRpcErrorObject::server_error(
1274 -32001,
1275 "Missing Mcp-Session-Id header. Call initialize first.",
1276 None::<serde_json::Value>,
1277 ),
1278 );
1279
1280 let error_json =
1281 serde_json::to_string(&error_response).unwrap_or_else(|_| "{}".to_string());
1282
1283 return Response::builder()
1284 .status(StatusCode::UNAUTHORIZED)
1285 .header(CONTENT_TYPE, "application/json")
1286 .header("MCP-Protocol-Version", context.protocol_version.as_str())
1287 .body(
1288 Full::new(Bytes::from(error_json))
1289 .map_err(|never| match never {})
1290 .boxed_unsync(),
1291 )
1292 .unwrap();
1293 }
1294 }
1295 };
1296
1297 debug!("Processing streaming request with session: {}", session_id);
1298
1299 match message {
1301 JsonRpcMessage::Request(request) => {
1302 debug!(
1303 "Processing streaming JSON-RPC request: method={}",
1304 request.method
1305 );
1306 self.create_streaming_response(request, session_id, context)
1307 .await
1308 }
1309 JsonRpcMessage::Notification(notification) => {
1310 debug!(
1311 "Processing streaming JSON-RPC notification: method={}",
1312 notification.method
1313 );
1314
1315 use crate::notification_bridge::StreamManagerNotificationBroadcaster;
1317 use turul_mcp_json_rpc_server::SessionContext;
1318
1319 let broadcaster = Arc::new(StreamManagerNotificationBroadcaster::new(Arc::clone(
1320 &self.stream_manager,
1321 )));
1322 let broadcaster_any = Arc::new(broadcaster) as Arc<dyn std::any::Any + Send + Sync>;
1323
1324 let session_context = SessionContext {
1325 session_id: session_id.clone(),
1326 metadata: std::collections::HashMap::new(),
1327 broadcaster: Some(broadcaster_any),
1328 timestamp: chrono::Utc::now().timestamp_millis() as u64,
1329 };
1330
1331 let dispatcher = Arc::clone(&self.dispatcher);
1333 let notification_clone = notification.clone();
1334
1335 tokio::spawn(async move {
1337 if let Err(e) = dispatcher
1338 .handle_notification_with_context(notification_clone, Some(session_context))
1339 .await
1340 {
1341 error!("Failed to process notification: {}", e);
1342 }
1343 });
1344
1345 Response::builder()
1347 .status(StatusCode::ACCEPTED)
1348 .header("MCP-Protocol-Version", context.protocol_version.as_str())
1349 .header("Mcp-Session-Id", &session_id)
1350 .body(
1351 Full::new(Bytes::new())
1352 .map_err(|never| match never {})
1353 .boxed_unsync(),
1354 )
1355 .unwrap()
1356 }
1357 }
1358 }
1359
1360 async fn create_streaming_response(
1363 &self,
1364 request: turul_mcp_json_rpc_server::JsonRpcRequest,
1365 session_id: String,
1366 context: StreamableHttpContext,
1367 ) -> Response<http_body_util::combinators::UnsyncBoxBody<Bytes, hyper::Error>> {
1368 debug!(
1369 "Creating streaming response for method: {}, session: {}",
1370 request.method, session_id
1371 );
1372 use http_body_util::StreamBody;
1374 use tokio_stream::StreamExt;
1375 use tokio_stream::wrappers::UnboundedReceiverStream; let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<Result<bytes::Bytes, hyper::Error>>();
1378 let body_stream = UnboundedReceiverStream::new(rx)
1379 .map(|item| item.map(|bytes| http_body::Frame::data(bytes)));
1380 let body = StreamBody::new(body_stream);
1381
1382 use crate::notification_bridge::{
1384 SharedNotificationBroadcaster, StreamManagerNotificationBroadcaster,
1385 };
1386 use turul_mcp_json_rpc_server::SessionContext;
1387
1388 let broadcaster: SharedNotificationBroadcaster = Arc::new(
1389 StreamManagerNotificationBroadcaster::new(Arc::clone(&self.stream_manager)),
1390 );
1391 let broadcaster_any = Arc::new(broadcaster) as Arc<dyn std::any::Any + Send + Sync>;
1392
1393 let session_context = SessionContext {
1394 session_id: session_id.clone(),
1395 metadata: std::collections::HashMap::new(),
1396 broadcaster: Some(broadcaster_any),
1397 timestamp: chrono::Utc::now().timestamp_millis() as u64,
1398 };
1399
1400 let wants_sse = context.wants_sse_stream();
1402 let connection_id = format!("post-{}", uuid::Uuid::now_v7());
1403
1404 let (shutdown_tx, completion_rx) = if wants_sse {
1406 let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel::<()>();
1408 let (completion_tx, completion_rx) = tokio::sync::oneshot::channel::<()>();
1409 let (progress_tx, mut progress_rx) = tokio::sync::mpsc::channel(100);
1410
1411 let registration_result = self
1413 .stream_manager
1414 .register_streaming_connection(&session_id, connection_id.clone(), progress_tx)
1415 .await;
1416
1417 if let Err(e) = registration_result {
1418 error!("Failed to register POST streaming connection: {}", e);
1419 (None, None)
1421 } else {
1422 debug!(
1423 "Registered SSE streaming connection for session: {}",
1424 session_id
1425 );
1426
1427 let sender_clone = tx.clone();
1429 let session_id_clone = session_id.clone();
1430 let connection_id_clone = connection_id.clone();
1431 let stream_manager_clone = Arc::clone(&self.stream_manager);
1432
1433 tokio::spawn(async move {
1434 debug!(
1435 "Starting progress forwarding task for session: {}",
1436 session_id_clone
1437 );
1438
1439 loop {
1441 debug!(
1442 "🔍 Progress task entering select loop for session: {}",
1443 session_id_clone
1444 );
1445 tokio::select! {
1446 maybe_event = progress_rx.recv() => {
1448 debug!("🔍 Progress task: progress_rx.recv() branch fired for session: {}", session_id_clone);
1449 match maybe_event {
1450 Some(sse_event) => {
1451 debug!("🔍 Forwarding progress event to POST response: session={}, event={:?}", session_id_clone, sse_event.event_type);
1452
1453 let sse_chunk = sse_event.format();
1455
1456 if let Err(e) = sender_clone.send(Ok(Bytes::from(sse_chunk))) {
1457 error!("Failed to send progress event to POST response: {}", e);
1458 break;
1459 }
1460 }
1461 None => {
1462 debug!("🔍 Progress channel closed naturally for session: {}", session_id_clone);
1464 break;
1465 }
1466 }
1467 }
1468 _ = &mut shutdown_rx => {
1470 debug!("🔍 Progress task: shutdown_rx branch fired! Received explicit shutdown signal for session: {}", session_id_clone);
1471 break;
1472 }
1473 }
1474 }
1475
1476 debug!(
1478 "Progress task unregistering connection for session: {}",
1479 session_id_clone
1480 );
1481 stream_manager_clone
1482 .unregister_connection(&session_id_clone, &connection_id_clone)
1483 .await;
1484
1485 debug!(
1487 "🔍 Progress task: dropping sender_clone for session: {}",
1488 session_id_clone
1489 );
1490 drop(sender_clone);
1491
1492 debug!(
1494 "🔍 Progress task: signaling completion for session: {}",
1495 session_id_clone
1496 );
1497 if let Err(_) = completion_tx.send(()) {
1498 debug!(
1499 "🔍 Progress task: main task already dropped completion_rx for session: {}",
1500 session_id_clone
1501 );
1502 }
1503
1504 debug!(
1505 "🔍 Progress forwarding task completed for session: {}",
1506 session_id_clone
1507 );
1508 });
1509
1510 (Some(shutdown_tx), Some(completion_rx))
1512 }
1513 } else {
1514 (None, None)
1516 };
1517
1518 let dispatcher = Arc::clone(&self.dispatcher);
1520 let request_id = request.id.clone();
1521 let sender = tx; tokio::spawn(async move {
1524 debug!(
1525 "Spawning streaming task for request ID: {:?}, wants_sse: {}",
1526 request_id, wants_sse
1527 );
1528
1529 let response = dispatcher
1531 .handle_request_with_context(request, session_context)
1532 .await;
1533
1534 if wants_sse {
1536 let final_frame = match response {
1538 turul_mcp_json_rpc_server::JsonRpcMessage::Response(resp) => {
1539 turul_mcp_json_rpc_server::JsonRpcFrame::FinalResult {
1540 request_id: request_id.clone(),
1541 result: match resp.result {
1542 turul_mcp_json_rpc_server::response::ResponseResult::Success(
1543 val,
1544 ) => val,
1545 turul_mcp_json_rpc_server::response::ResponseResult::Null => {
1546 serde_json::Value::Null
1547 }
1548 },
1549 }
1550 }
1551 turul_mcp_json_rpc_server::JsonRpcMessage::Error(err) => {
1552 turul_mcp_json_rpc_server::JsonRpcFrame::Error {
1553 request_id: request_id.clone(),
1554 error: turul_mcp_json_rpc_server::error::JsonRpcErrorObject {
1555 code: err.error.code,
1556 message: err.error.message,
1557 data: err.error.data,
1558 },
1559 }
1560 }
1561 };
1562
1563 let final_json = final_frame.to_json();
1564 let final_chunk =
1566 format!("data: {}\n\n", serde_json::to_string(&final_json).unwrap());
1567
1568 if let Err(err) = sender.send(Ok(Bytes::from(final_chunk))) {
1569 error!("Failed to send SSE final chunk: {}", err);
1570 }
1571
1572 if let Some(shutdown_tx) = shutdown_tx {
1575 debug!(
1576 "🔍 Main task sending shutdown signal to progress task for request: {:?}",
1577 request_id
1578 );
1579 match shutdown_tx.send(()) {
1580 Ok(()) => {
1581 debug!(
1582 "🔍 Main task: shutdown signal sent successfully for request: {:?}",
1583 request_id
1584 );
1585
1586 if let Some(completion_rx) = completion_rx {
1589 match tokio::time::timeout(
1590 tokio::time::Duration::from_millis(100),
1591 completion_rx,
1592 )
1593 .await
1594 {
1595 Ok(Ok(())) => {
1596 debug!(
1597 "🔍 Main task: progress task completed successfully for request: {:?}",
1598 request_id
1599 );
1600 }
1601 Ok(Err(_)) => {
1602 debug!(
1603 "🔍 Main task: progress task completion signal dropped for request: {:?}",
1604 request_id
1605 );
1606 }
1607 Err(_) => {
1608 debug!(
1609 "🔍 Main task: progress task completion timeout for request: {:?}",
1610 request_id
1611 );
1612 }
1613 }
1614 }
1615 }
1616 Err(_) => {
1617 debug!(
1618 "🔍 Main task: progress task already completed (shutdown_rx dropped) for request: {:?}",
1619 request_id
1620 );
1621 }
1622 }
1623 } else {
1624 debug!(
1625 "🔍 Main task: no shutdown_tx available (not SSE client) for request: {:?}",
1626 request_id
1627 );
1628 }
1629 } else {
1630 let final_json = serde_json::to_string(&response).unwrap();
1632
1633 if let Err(err) = sender.send(Ok(Bytes::from(final_json))) {
1634 error!("Failed to send final JSON response: {}", err);
1635 }
1636 }
1637
1638 debug!(
1639 "🔍 Main task: streaming task completed for request ID: {:?}",
1640 request_id
1641 );
1642
1643 debug!(
1645 "🔍 Main task: dropping main sender for request ID: {:?}",
1646 request_id
1647 );
1648 drop(sender);
1649 });
1650
1651 let content_type = if context.wants_sse_stream() {
1654 "text/event-stream"
1655 } else {
1656 "application/json"
1657 };
1658
1659 let mut response = Response::builder()
1660 .status(StatusCode::OK)
1661 .header(CONTENT_TYPE, content_type)
1662 .header("Transfer-Encoding", "chunked") .header("Cache-Control", "no-cache")
1664 .body(http_body_util::BodyExt::boxed_unsync(body))
1665 .unwrap();
1666
1667 let mcp_headers = context.response_headers();
1669 for (key, value) in mcp_headers.iter() {
1670 response.headers_mut().insert(key, value.clone());
1671 }
1672
1673 response
1674 }
1675
1676 #[allow(dead_code)]
1678 async fn handle_buffered_post<T>(
1679 &self,
1680 _req: Request<T>,
1681 context: StreamableHttpContext,
1682 session_id: String,
1683 ) -> Response<http_body_util::combinators::UnsyncBoxBody<Bytes, hyper::Error>>
1684 where
1685 T: Body + Send + 'static,
1686 {
1687 debug!(
1688 "Using buffered POST for legacy client, session: {}",
1689 session_id
1690 );
1691
1692 Response::builder()
1696 .status(StatusCode::OK)
1697 .header(CONTENT_TYPE, "application/json")
1698 .header("MCP-Protocol-Version", context.protocol_version.as_str())
1699 .header("Mcp-Session-Id", &session_id)
1700 .body(
1701 Full::new(Bytes::from(
1702 r#"{"jsonrpc":"2.0","id":1,"result":"buffered"}"#,
1703 ))
1704 .map_err(|never| match never {})
1705 .boxed_unsync(),
1706 )
1707 .unwrap()
1708 }
1709
1710 async fn handle_client_message<T>(
1714 &self,
1715 req: Request<T>,
1716 context: StreamableHttpContext,
1717 ) -> Response<http_body_util::combinators::UnsyncBoxBody<Bytes, hyper::Error>>
1718 where
1719 T: Body + Send + 'static,
1720 {
1721 debug!("Handling client message via POST (MCP 2025-06-18)");
1722
1723 if !context.accepts_stream_frames {
1726 warn!("Client POST missing application/json in Accept header");
1727 return StreamableResponse::Error {
1728 status: StatusCode::BAD_REQUEST,
1729 message: "Accept header must include application/json, text/event-stream, or */*"
1730 .to_string(),
1731 }
1732 .into_boxed_response(&context);
1733 }
1734
1735 let content_type = req
1737 .headers()
1738 .get(CONTENT_TYPE)
1739 .and_then(|ct| ct.to_str().ok())
1740 .unwrap_or("");
1741 if !content_type.starts_with("application/json") {
1742 warn!("Invalid content type for POST: {}", content_type);
1743 return StreamableResponse::Error {
1744 status: StatusCode::BAD_REQUEST,
1745 message: "Content-Type must be application/json".to_string(),
1746 }
1747 .into_boxed_response(&context);
1748 }
1749
1750 debug!("Using streaming POST handler for all requests");
1753 return self.handle_streaming_post_real(req, context).await;
1754 }
1755}
1756
1757#[cfg(test)]
1758mod tests {
1759 use super::*;
1760
1761 #[test]
1762 fn test_version_parsing() {
1763 assert_eq!(
1764 McpProtocolVersion::parse_version("2024-11-05"),
1765 Some(McpProtocolVersion::V2024_11_05)
1766 );
1767 assert_eq!(
1768 McpProtocolVersion::parse_version("2025-03-26"),
1769 Some(McpProtocolVersion::V2025_03_26)
1770 );
1771 assert_eq!(
1772 McpProtocolVersion::parse_version("2025-06-18"),
1773 Some(McpProtocolVersion::V2025_06_18)
1774 );
1775 assert_eq!(McpProtocolVersion::parse_version("invalid"), None);
1776 }
1777
1778 #[test]
1779 fn test_version_capabilities() {
1780 let v1 = McpProtocolVersion::V2024_11_05;
1781 assert!(!v1.supports_streamable_http());
1782 assert!(!v1.supports_meta_fields());
1783
1784 let v2 = McpProtocolVersion::V2025_03_26;
1785 assert!(v2.supports_streamable_http());
1786 assert!(!v2.supports_meta_fields());
1787
1788 let v3 = McpProtocolVersion::V2025_06_18;
1789 assert!(v3.supports_streamable_http());
1790 assert!(v3.supports_meta_fields());
1791 assert!(v3.supports_cursors());
1792 assert!(v3.supports_progress_tokens());
1793 assert!(v3.supports_elicitation());
1794 }
1795
1796 #[test]
1797 fn test_context_validation() {
1798 let mut context = StreamableHttpContext {
1799 protocol_version: McpProtocolVersion::V2025_06_18,
1800 session_id: Some("test-session".to_string()),
1801 wants_sse_stream: true,
1802 accepts_stream_frames: true,
1803 headers: HashMap::new(),
1804 };
1805
1806 assert!(context.validate(&Method::POST).is_ok());
1808 assert!(context.validate(&Method::GET).is_ok());
1810
1811 context.accepts_stream_frames = false;
1813 assert!(context.validate(&Method::POST).is_err());
1814
1815 context.accepts_stream_frames = true;
1816 context.protocol_version = McpProtocolVersion::V2024_11_05;
1817 context.wants_sse_stream = true;
1818 assert!(context.validate(&Method::POST).is_err());
1819
1820 context.protocol_version = McpProtocolVersion::V2025_06_18;
1821 context.session_id = None;
1822 assert!(context.validate(&Method::POST).is_ok());
1824 assert!(context.validate(&Method::GET).is_err());
1826 }
1827}