1use std::collections::HashMap;
13use std::pin::Pin;
14use std::sync::Arc;
15
16use bytes::Bytes;
17use futures::Stream;
18use http_body::Body;
19use http_body_util::{BodyExt, Full};
20use hyper::header::{ACCEPT, CONTENT_TYPE};
21use hyper::{HeaderMap, Method, Request, Response, StatusCode};
22use serde_json::Value;
23use tracing::{debug, error, warn};
24use turul_mcp_session_storage::SessionView;
25
26use crate::ServerConfig;
27
28#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
30pub enum McpProtocolVersion {
31 V2024_11_05,
33 V2025_03_26,
35 #[default]
37 V2025_06_18,
38}
39
40impl McpProtocolVersion {
41 pub fn parse_version(s: &str) -> Option<Self> {
43 match s {
44 "2024-11-05" => Some(Self::V2024_11_05),
45 "2025-03-26" => Some(Self::V2025_03_26),
46 "2025-06-18" => Some(Self::V2025_06_18),
47 _ => None,
48 }
49 }
50
51 pub fn as_str(&self) -> &'static str {
53 match self {
54 Self::V2024_11_05 => "2024-11-05",
55 Self::V2025_03_26 => "2025-03-26",
56 Self::V2025_06_18 => "2025-06-18",
57 }
58 }
59
60 pub fn supports_streamable_http(&self) -> bool {
62 matches!(self, Self::V2025_03_26 | Self::V2025_06_18)
63 }
64
65 pub fn supports_meta_fields(&self) -> bool {
67 matches!(self, Self::V2025_06_18)
68 }
69
70 pub fn supports_cursors(&self) -> bool {
72 matches!(self, Self::V2025_06_18)
73 }
74
75 pub fn supports_progress_tokens(&self) -> bool {
77 matches!(self, Self::V2025_06_18)
78 }
79
80 pub fn supports_elicitation(&self) -> bool {
82 matches!(self, Self::V2025_06_18)
83 }
84
85 pub fn supported_features(&self) -> Vec<&'static str> {
87 let mut features = vec![];
88 if self.supports_streamable_http() {
89 features.push("streamable-http");
90 }
91 if self.supports_meta_fields() {
92 features.push("_meta-fields");
93 }
94 if self.supports_cursors() {
95 features.push("cursor-pagination");
96 }
97 if self.supports_progress_tokens() {
98 features.push("progress-tokens");
99 }
100 if self.supports_elicitation() {
101 features.push("elicitation");
102 }
103 features
104 }
105}
106
107#[derive(Debug, Clone)]
109pub struct StreamableHttpContext {
110 pub protocol_version: McpProtocolVersion,
112 pub session_id: Option<String>,
114 pub wants_sse_stream: bool,
116 pub accepts_stream_frames: bool,
118 pub headers: HashMap<String, String>,
120}
121
122impl StreamableHttpContext {
123 pub fn from_request<T>(req: &Request<T>) -> Self {
125 let headers = req.headers();
126
127 let protocol_version = headers
129 .get("MCP-Protocol-Version")
130 .and_then(|h| h.to_str().ok())
131 .and_then(McpProtocolVersion::parse_version)
132 .unwrap_or_default();
133
134 let session_id = headers
136 .get("Mcp-Session-Id")
137 .and_then(|h| h.to_str().ok())
138 .map(|s| s.to_string());
139
140 let accept_header = headers
142 .get(ACCEPT)
143 .and_then(|h| h.to_str().ok())
144 .unwrap_or_default()
145 .to_ascii_lowercase();
146
147 let wants_sse_stream = accept_header.contains("text/event-stream");
148 let accepts_stream_frames = accept_header.contains("application/json")
149 || accept_header.contains("text/event-stream")
150 || accept_header.contains("*/*");
151
152 let mut header_map = HashMap::new();
154 for (name, value) in headers.iter() {
155 if let Ok(value_str) = value.to_str() {
156 header_map.insert(name.to_string(), value_str.to_string());
157 }
158 }
159
160 Self {
161 protocol_version,
162 session_id,
163 wants_sse_stream,
164 accepts_stream_frames,
165 headers: header_map,
166 }
167 }
168
169 pub fn wants_sse_stream(&self) -> bool {
171 self.wants_sse_stream
172 }
173
174 pub fn wants_streaming_post(&self) -> bool {
176 self.accepts_stream_frames && self.wants_sse_stream
177 }
178
179 pub fn is_streamable_compatible(&self) -> bool {
181 self.protocol_version.supports_streamable_http() && self.accepts_stream_frames
182 }
183
184 pub fn validate(&self, method: &Method) -> std::result::Result<(), String> {
186 if !self.accepts_stream_frames {
187 return Err(
188 "Accept header must include application/json, text/event-stream, or */*"
189 .to_string(),
190 );
191 }
192
193 if self.wants_sse_stream && !self.protocol_version.supports_streamable_http() {
194 return Err(format!(
195 "Protocol version {} does not support streamable HTTP",
196 self.protocol_version.as_str()
197 ));
198 }
199
200 if *method == Method::GET && self.wants_sse_stream && self.session_id.is_none() {
203 return Err("Mcp-Session-Id header required for SSE streaming connections".to_string());
204 }
205
206 Ok(())
207 }
208
209 pub fn response_headers(&self) -> HeaderMap {
211 let mut headers = HeaderMap::new();
212
213 headers.insert(
215 "MCP-Protocol-Version",
216 self.protocol_version.as_str().parse().unwrap(),
217 );
218
219 if let Some(session_id) = &self.session_id {
221 headers.insert("Mcp-Session-Id", session_id.parse().unwrap());
222 }
223
224 let features = self.protocol_version.supported_features();
226 if !features.is_empty() {
227 headers.insert("MCP-Capabilities", features.join(",").parse().unwrap());
228 }
229
230 headers
231 }
232}
233
234pub enum StreamableResponse {
236 Json(Value),
238 Stream(Pin<Box<dyn Stream<Item = std::result::Result<Value, String>> + Send>>),
240 Error { status: StatusCode, message: String },
242}
243
244impl std::fmt::Debug for StreamableResponse {
245 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
246 match self {
247 Self::Json(value) => f.debug_tuple("Json").field(value).finish(),
248 Self::Stream(_) => f.debug_tuple("Stream").field(&"<stream>").finish(),
249 Self::Error { status, message } => f
250 .debug_struct("Error")
251 .field("status", status)
252 .field("message", message)
253 .finish(),
254 }
255 }
256}
257
258impl StreamableResponse {
259 pub fn into_response(self, context: &StreamableHttpContext) -> Response<Full<Bytes>> {
261 let mut response_headers = context.response_headers();
262
263 match self {
264 StreamableResponse::Json(json) => {
265 response_headers.insert(CONTENT_TYPE, "application/json".parse().unwrap());
266
267 let body = serde_json::to_string(&json)
268 .unwrap_or_else(|_| r#"{"error": "Failed to serialize response"}"#.to_string());
269
270 Response::builder()
271 .status(StatusCode::OK)
272 .body(Full::new(Bytes::from(body)))
273 .unwrap()
274 }
275
276 StreamableResponse::Stream(_stream) => {
277 response_headers.insert(CONTENT_TYPE, "text/event-stream".parse().unwrap());
279 response_headers.insert("Cache-Control", "no-cache, no-transform".parse().unwrap());
280 response_headers.insert("Connection", "keep-alive".parse().unwrap());
281
282 Response::builder()
286 .status(StatusCode::ACCEPTED)
287 .body(Full::new(Bytes::from("Streaming response accepted")))
288 .unwrap()
289 }
290
291 StreamableResponse::Error { status, message } => {
292 response_headers.insert(CONTENT_TYPE, "application/json".parse().unwrap());
293
294 let error_json = serde_json::json!({
295 "error": {
296 "code": status.as_u16(),
297 "message": message
298 }
299 });
300
301 let body = serde_json::to_string(&error_json).unwrap_or_else(|_| {
302 r#"{"error": {"code": 500, "message": "Internal server error"}}"#.to_string()
303 });
304
305 Response::builder()
306 .status(status)
307 .body(Full::new(Bytes::from(body)))
308 .unwrap()
309 }
310 }
311 }
312
313 pub fn into_boxed_response(
315 self,
316 context: &StreamableHttpContext,
317 ) -> Response<http_body_util::combinators::UnsyncBoxBody<Bytes, hyper::Error>> {
318 self.into_response(context)
319 .map(|body| body.map_err(|never| match never {}).boxed_unsync())
320 }
321}
322
323#[derive(Clone)]
325pub struct StreamableHttpHandler {
326 config: Arc<ServerConfig>,
327 dispatcher: Arc<turul_mcp_json_rpc_server::JsonRpcDispatcher<turul_mcp_protocol::McpError>>,
328 session_storage: Arc<turul_mcp_session_storage::BoxedSessionStorage>,
329 stream_manager: Arc<crate::StreamManager>,
330 server_capabilities: turul_mcp_protocol::ServerCapabilities,
331 pub(crate) middleware_stack: Arc<crate::middleware::MiddlewareStack>,
332}
333
334impl StreamableHttpHandler {
335 pub fn new(
336 config: Arc<ServerConfig>,
337 dispatcher: Arc<turul_mcp_json_rpc_server::JsonRpcDispatcher<turul_mcp_protocol::McpError>>,
338 session_storage: Arc<turul_mcp_session_storage::BoxedSessionStorage>,
339 stream_manager: Arc<crate::StreamManager>,
340 server_capabilities: turul_mcp_protocol::ServerCapabilities,
341 middleware_stack: Arc<crate::middleware::MiddlewareStack>,
342 ) -> Self {
343 Self {
344 config,
345 dispatcher,
346 session_storage,
347 stream_manager,
348 server_capabilities,
349 middleware_stack,
350 }
351 }
352
353 pub async fn handle_request<T>(
355 &self,
356 req: Request<T>,
357 ) -> Response<http_body_util::combinators::UnsyncBoxBody<Bytes, hyper::Error>>
358 where
359 T: Body + Send + 'static,
360 T::Data: Send,
361 T::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
362 {
363 debug!(
364 "Streamable handler request: method={}, uri={}",
365 req.method(),
366 req.uri()
367 );
368 let context = StreamableHttpContext::from_request(&req);
370
371 debug!(
372 "Streamable handler entry: method={}, protocol={}, session={:?}, accepts_stream_frames={}, wants_sse_stream={}",
373 req.method(),
374 context.protocol_version.as_str(),
375 context.session_id,
376 context.accepts_stream_frames,
377 context.wants_sse_stream()
378 );
379
380 if let Err(error) = context.validate(req.method()) {
382 warn!("Invalid streamable HTTP request: {}", error);
383 return StreamableResponse::Error {
384 status: StatusCode::BAD_REQUEST,
385 message: error,
386 }
387 .into_boxed_response(&context);
388 }
389
390 match *req.method() {
392 Method::POST => {
393 self.handle_client_message(req, context).await
396 }
397 Method::GET => {
398 self.handle_get_sse_notifications(req, context).await
400 }
401 Method::DELETE => {
402 self.handle_session_delete(req, context).await
404 }
405 _ => StreamableResponse::Error {
406 status: StatusCode::METHOD_NOT_ALLOWED,
407 message: "Method not allowed for this endpoint".to_string(),
408 }
409 .into_boxed_response(&context),
410 }
411 }
412
413 async fn handle_get_sse_notifications<T>(
419 &self,
420 req: Request<T>,
421 context: StreamableHttpContext,
422 ) -> Response<http_body_util::combinators::UnsyncBoxBody<Bytes, hyper::Error>>
423 where
424 T: Body + Send + 'static,
425 {
426 debug!(
427 "Opening streaming connection for session: {:?}",
428 context.session_id
429 );
430
431 let session_id = match context.session_id {
433 Some(ref id) => id.clone(),
434 None => {
435 warn!("Missing session ID for streaming GET request");
436 return StreamableResponse::Error {
437 status: StatusCode::BAD_REQUEST,
438 message: "Mcp-Session-Id header required for streaming connection".to_string(),
439 }
440 .into_boxed_response(&context);
441 }
442 };
443
444 match self.validate_session_exists(&session_id).await {
446 Ok(_) => {
447 debug!(
448 "Session validation successful for streaming GET: {}",
449 session_id
450 );
451 }
452 Err(err) => {
453 error!(
454 "Session validation failed for streaming GET {}: {}",
455 session_id, err
456 );
457 return StreamableResponse::Error {
458 status: StatusCode::UNAUTHORIZED,
459 message: format!("Session validation failed: {}", err),
460 }
461 .into_boxed_response(&context);
462 }
463 }
464
465 let last_event_id = req
471 .headers()
472 .get("Last-Event-ID")
473 .and_then(|h| h.to_str().ok())
474 .and_then(|s| s.parse::<u64>().ok());
475
476 let connection_id = uuid::Uuid::now_v7().to_string();
478
479 debug!(
480 "Creating streamable HTTP connection: session={}, connection={}, last_event_id={:?}",
481 session_id, connection_id, last_event_id
482 );
483
484 match self
488 .stream_manager
489 .handle_sse_connection(session_id.clone(), connection_id.clone(), last_event_id)
490 .await
491 {
492 Ok(mut streaming_response) => {
493 debug!(
494 "Streamable HTTP connection established: session={}, connection={}",
495 session_id, connection_id
496 );
497
498 let mcp_headers = context.response_headers();
500 for (key, value) in mcp_headers.iter() {
501 streaming_response.headers_mut().insert(key, value.clone());
502 }
503
504 streaming_response
507 }
508 Err(err) => {
509 error!("Failed to create streamable HTTP connection: {}", err);
510 StreamableResponse::Error {
511 status: StatusCode::INTERNAL_SERVER_ERROR,
512 message: format!("Streaming connection failed: {}", err),
513 }
514 .into_boxed_response(&context)
515 }
516 }
517 }
518
519 async fn validate_session_exists(&self, session_id: &str) -> std::result::Result<(), String> {
521 match self.session_storage.get_session(session_id).await {
522 Ok(Some(_)) => {
523 debug!("Session validation successful: {}", session_id);
524 Ok(())
525 }
526 Ok(None) => {
527 error!("Session not found: {}", session_id);
528 Err(format!(
529 "Session '{}' not found. Sessions must be created via initialize request first.",
530 session_id
531 ))
532 }
533 Err(err) => {
534 error!("Failed to validate session {}: {}", session_id, err);
535 Err(format!("Session validation failed: {}", err))
536 }
537 }
538 }
539
540 #[allow(dead_code)]
542 async fn handle_json_post<T>(
543 &self,
544 req: Request<T>,
545 context: StreamableHttpContext,
546 ) -> Response<http_body_util::combinators::UnsyncBoxBody<Bytes, hyper::Error>>
547 where
548 T: Body + Send + 'static,
549 {
550 debug!("Handling JSON POST (non-streaming/legacy)");
551
552 let content_type = req
556 .headers()
557 .get(CONTENT_TYPE)
558 .and_then(|ct| ct.to_str().ok())
559 .unwrap_or("");
560
561 if !content_type.starts_with("application/json") {
562 warn!("Invalid content type for legacy POST: {}", content_type);
563 return StreamableResponse::Error {
564 status: StatusCode::BAD_REQUEST,
565 message: "Content-Type must be application/json".to_string(),
566 }
567 .into_boxed_response(&context);
568 }
569
570 let body_bytes = match req.into_body().collect().await {
572 Ok(collected) => collected.to_bytes(),
573 Err(_err) => {
574 error!("Failed to read legacy POST request body");
575 return StreamableResponse::Error {
576 status: StatusCode::BAD_REQUEST,
577 message: "Failed to read request body".to_string(),
578 }
579 .into_boxed_response(&context);
580 }
581 };
582
583 if body_bytes.len() > self.config.max_body_size {
585 warn!(
586 "Legacy POST request body too large: {} bytes",
587 body_bytes.len()
588 );
589 return StreamableResponse::Error {
590 status: StatusCode::PAYLOAD_TOO_LARGE,
591 message: "Request body too large".to_string(),
592 }
593 .into_boxed_response(&context);
594 }
595
596 let body_str = match std::str::from_utf8(&body_bytes) {
598 Ok(s) => s,
599 Err(err) => {
600 error!("Invalid UTF-8 in legacy POST request body: {}", err);
601 return StreamableResponse::Error {
602 status: StatusCode::BAD_REQUEST,
603 message: "Request body must be valid UTF-8".to_string(),
604 }
605 .into_boxed_response(&context);
606 }
607 };
608
609 debug!("Received legacy POST JSON-RPC request: {}", body_str);
610
611 use turul_mcp_json_rpc_server::dispatch::{
613 JsonRpcMessage, JsonRpcMessageResult, parse_json_rpc_message,
614 };
615
616 let message = match parse_json_rpc_message(body_str) {
617 Ok(msg) => msg,
618 Err(rpc_err) => {
619 error!("JSON-RPC parse error in legacy POST: {}", rpc_err);
620 let error_json =
621 serde_json::to_string(&rpc_err).unwrap_or_else(|_| "{}".to_string());
622 return Response::builder()
623 .status(StatusCode::OK) .header(CONTENT_TYPE, "application/json")
625 .header("MCP-Protocol-Version", context.protocol_version.as_str())
626 .body(Full::new(Bytes::from(error_json)))
627 .unwrap()
628 .map(|body| body.map_err(|never| match never {}).boxed_unsync());
629 }
630 };
631
632 let message_result = match message {
635 JsonRpcMessage::Request(request) => {
636 debug!(
637 "Processing legacy POST JSON-RPC request: method={}",
638 request.method
639 );
640
641 let response = if request.method == "initialize" {
643 debug!("Handling legacy initialize request - creating new session");
644
645 match self
647 .session_storage
648 .create_session(self.server_capabilities.clone())
649 .await
650 {
651 Ok(session_info) => {
652 debug!(
653 "Created new session for legacy client: {}",
654 session_info.session_id
655 );
656
657 use crate::notification_bridge::StreamManagerNotificationBroadcaster;
659 use turul_mcp_json_rpc_server::r#async::SessionContext;
660
661 let broadcaster = Arc::new(StreamManagerNotificationBroadcaster::new(
662 Arc::clone(&self.stream_manager),
663 ));
664 let broadcaster_any =
665 Arc::new(broadcaster) as Arc<dyn std::any::Any + Send + Sync>;
666
667 let session_context = SessionContext {
668 session_id: session_info.session_id.clone(),
669 metadata: std::collections::HashMap::new(),
670 broadcaster: Some(broadcaster_any),
671 timestamp: chrono::Utc::now().timestamp_millis() as u64,
672 };
673
674 self.dispatcher
675 .handle_request_with_context(request, session_context)
676 .await
677 }
678 Err(err) => {
679 error!("Failed to create session during legacy initialize: {}", err);
680 let error_msg = format!("Session creation failed: {}", err);
681 turul_mcp_json_rpc_server::JsonRpcMessage::error(
682 turul_mcp_json_rpc_server::JsonRpcError::internal_error(
683 Some(request.id),
684 Some(error_msg),
685 ),
686 )
687 }
688 }
689 } else {
690 self.dispatcher.handle_request(request).await
692 };
693
694 match response {
696 turul_mcp_json_rpc_server::JsonRpcMessage::Response(resp) => {
697 JsonRpcMessageResult::Response(resp)
698 }
699 turul_mcp_json_rpc_server::JsonRpcMessage::Error(err) => {
700 JsonRpcMessageResult::Error(err)
701 }
702 }
703 }
704 JsonRpcMessage::Notification(notification) => {
705 debug!(
706 "Processing legacy POST JSON-RPC notification: method={}",
707 notification.method
708 );
709
710 let result = self
712 .dispatcher
713 .handle_notification_with_context(notification, None)
714 .await;
715
716 if let Err(err) = result {
717 error!("Legacy POST notification handling error: {}", err);
718 }
719 JsonRpcMessageResult::NoResponse
720 }
721 };
722
723 match message_result {
725 JsonRpcMessageResult::Response(response) => {
726 let response_json = serde_json::to_string(&response)
727 .unwrap_or_else(|_| r#"{"error": "Failed to serialize response"}"#.to_string());
728
729 Response::builder()
730 .status(StatusCode::OK)
731 .header(CONTENT_TYPE, "application/json")
732 .header("MCP-Protocol-Version", context.protocol_version.as_str())
733 .body(Full::new(Bytes::from(response_json)))
734 .unwrap()
735 .map(|body| body.map_err(|never| match never {}).boxed_unsync())
736 }
737 JsonRpcMessageResult::Error(error) => {
738 let error_json = serde_json::to_string(&error)
739 .unwrap_or_else(|_| r#"{"error": "Internal error"}"#.to_string());
740
741 Response::builder()
742 .status(StatusCode::OK) .header(CONTENT_TYPE, "application/json")
744 .header("MCP-Protocol-Version", context.protocol_version.as_str())
745 .body(Full::new(Bytes::from(error_json)))
746 .unwrap()
747 .map(|body| body.map_err(|never| match never {}).boxed_unsync())
748 }
749 JsonRpcMessageResult::NoResponse => {
750 Response::builder()
752 .status(StatusCode::ACCEPTED)
753 .header("MCP-Protocol-Version", context.protocol_version.as_str())
754 .body(Full::new(Bytes::new()))
755 .unwrap()
756 .map(|body| body.map_err(|never| match never {}).boxed_unsync())
757 }
758 }
759 }
760
761 async fn handle_session_delete<T>(
763 &self,
764 _req: Request<T>,
765 context: StreamableHttpContext,
766 ) -> Response<http_body_util::combinators::UnsyncBoxBody<Bytes, hyper::Error>>
767 where
768 T: Body + Send + 'static,
769 {
770 if let Some(session_id) = &context.session_id {
771 debug!("Deleting session: {}", session_id);
772
773 let closed_connections = self
776 .stream_manager
777 .close_session_connections(session_id)
778 .await;
779 debug!(
780 "Closed {} streaming connections for session: {}",
781 closed_connections, session_id
782 );
783
784 match self.session_storage.get_session(session_id).await {
786 Ok(Some(mut session_info)) => {
787 session_info
789 .state
790 .insert("terminated".to_string(), serde_json::Value::Bool(true));
791 session_info.state.insert(
792 "terminated_at".to_string(),
793 serde_json::Value::Number(serde_json::Number::from(
794 chrono::Utc::now().timestamp_millis(),
795 )),
796 );
797 session_info.touch();
798
799 match self.session_storage.update_session(session_info).await {
801 Ok(()) => {
802 debug!(
803 "Session {} marked as terminated (TTL will handle cleanup)",
804 session_id
805 );
806
807 Response::builder()
809 .status(StatusCode::OK)
810 .header(CONTENT_TYPE, "application/json")
811 .header("MCP-Protocol-Version", context.protocol_version.as_str())
812 .header("Mcp-Session-Id", session_id)
813 .body(Full::new(Bytes::from(
814 serde_json::to_string(&serde_json::json!({
815 "status": "session_terminated",
816 "session_id": session_id,
817 "closed_connections": closed_connections,
818 "message": "Session marked for cleanup"
819 }))
820 .unwrap_or_else(|_| {
821 r#"{"status":"session_terminated"}"#.to_string()
822 }),
823 )))
824 .unwrap()
825 .map(|body| body.map_err(|never| match never {}).boxed_unsync())
826 }
827 Err(err) => {
828 error!(
829 "Error marking session {} as terminated: {}",
830 session_id, err
831 );
832 match self.session_storage.delete_session(session_id).await {
834 Ok(_) => {
835 debug!("Session {} deleted as fallback", session_id);
836 Response::builder()
837 .status(StatusCode::OK)
838 .header(CONTENT_TYPE, "application/json")
839 .header(
840 "MCP-Protocol-Version",
841 context.protocol_version.as_str(),
842 )
843 .body(Full::new(Bytes::from(
844 serde_json::to_string(&serde_json::json!({
845 "status": "session_deleted",
846 "session_id": session_id,
847 "closed_connections": closed_connections,
848 "message": "Session removed"
849 }))
850 .unwrap_or_else(|_| {
851 r#"{"status":"session_deleted"}"#.to_string()
852 }),
853 )))
854 .unwrap()
855 .map(|body| {
856 body.map_err(|never| match never {}).boxed_unsync()
857 })
858 }
859 Err(delete_err) => {
860 error!(
861 "Error deleting session {} as fallback: {}",
862 session_id, delete_err
863 );
864 StreamableResponse::Error {
865 status: StatusCode::INTERNAL_SERVER_ERROR,
866 message: "Session termination error".to_string(),
867 }
868 .into_boxed_response(&context)
869 }
870 }
871 }
872 }
873 }
874 Ok(None) => {
875 Response::builder()
877 .status(StatusCode::NOT_FOUND)
878 .header(CONTENT_TYPE, "application/json")
879 .header("MCP-Protocol-Version", context.protocol_version.as_str())
880 .body(Full::new(Bytes::from(
881 serde_json::to_string(&serde_json::json!({
882 "status": "session_not_found",
883 "session_id": session_id,
884 "message": "Session not found"
885 }))
886 .unwrap_or_else(|_| r#"{"status":"session_not_found"}"#.to_string()),
887 )))
888 .unwrap()
889 .map(|body| body.map_err(|never| match never {}).boxed_unsync())
890 }
891 Err(err) => {
892 error!(
893 "Error retrieving session {} for termination: {}",
894 session_id, err
895 );
896 StreamableResponse::Error {
897 status: StatusCode::INTERNAL_SERVER_ERROR,
898 message: "Session lookup error".to_string(),
899 }
900 .into_boxed_response(&context)
901 }
902 }
903 } else {
904 StreamableResponse::Error {
905 status: StatusCode::BAD_REQUEST,
906 message: "Mcp-Session-Id header required for session deletion".to_string(),
907 }
908 .into_boxed_response(&context)
909 }
910 }
911
912 async fn handle_post_streamable_http<T>(
921 &self,
922 req: Request<T>,
923 mut context: StreamableHttpContext,
924 ) -> Response<http_body_util::combinators::UnsyncBoxBody<Bytes, hyper::Error>>
925 where
926 T: Body + Send + 'static,
927 {
928 debug!("Streaming handler called - using true streaming POST");
929
930 let body_bytes = match req.into_body().collect().await {
932 Ok(collected) => collected.to_bytes(),
933 Err(_err) => {
934 error!("Failed to read streaming POST request body");
935 return StreamableResponse::Error {
936 status: StatusCode::BAD_REQUEST,
937 message: "Failed to read request body".to_string(),
938 }
939 .into_boxed_response(&context);
940 }
941 };
942
943 if body_bytes.len() > self.config.max_body_size {
945 warn!(
946 "Streaming POST request body too large: {} bytes",
947 body_bytes.len()
948 );
949 return StreamableResponse::Error {
950 status: StatusCode::PAYLOAD_TOO_LARGE,
951 message: "Request body too large".to_string(),
952 }
953 .into_boxed_response(&context);
954 }
955
956 let body_str = match std::str::from_utf8(&body_bytes) {
958 Ok(s) => s,
959 Err(err) => {
960 error!("Invalid UTF-8 in streaming POST request body: {}", err);
961 return StreamableResponse::Error {
962 status: StatusCode::BAD_REQUEST,
963 message: "Request body must be valid UTF-8".to_string(),
964 }
965 .into_boxed_response(&context);
966 }
967 };
968
969 debug!("Streaming POST received JSON-RPC request: {}", body_str);
970
971 use turul_mcp_json_rpc_server::dispatch::{JsonRpcMessage, parse_json_rpc_message};
973 use turul_mcp_json_rpc_server::error::JsonRpcErrorObject;
974
975 let message = match parse_json_rpc_message(body_str) {
976 Ok(msg) => msg,
977 Err(rpc_err) => {
978 error!("JSON-RPC parse error in streaming POST: {}", rpc_err);
979 let error_json =
980 serde_json::to_string(&rpc_err).unwrap_or_else(|_| "{}".to_string());
981
982 return Response::builder()
984 .status(StatusCode::OK) .header(CONTENT_TYPE, "application/json")
986 .header("MCP-Protocol-Version", context.protocol_version.as_str())
987 .body(
988 Full::new(Bytes::from(error_json))
989 .map_err(|never| match never {})
990 .boxed_unsync(),
991 )
992 .unwrap();
993 }
994 };
995
996 let session_id = match &message {
998 JsonRpcMessage::Request(req) if req.method == "initialize" => {
999 if let Some(existing_id) = &context.session_id {
1001 if let Err(err) = self.validate_session_exists(existing_id).await {
1003 warn!(
1004 "Invalid session ID {} during initialize: {}",
1005 existing_id, err
1006 );
1007 return StreamableResponse::Error {
1008 status: StatusCode::UNAUTHORIZED,
1009 message: "Invalid or expired session".to_string(),
1010 }
1011 .into_boxed_response(&context);
1012 }
1013 existing_id.clone()
1014 } else {
1015 match self
1017 .session_storage
1018 .create_session(self.server_capabilities.clone())
1019 .await
1020 {
1021 Ok(session_info) => {
1022 debug!(
1023 "Created new session for initialize: {}",
1024 session_info.session_id
1025 );
1026 context.session_id = Some(session_info.session_id.clone());
1027 session_info.session_id
1028 }
1029 Err(err) => {
1030 error!("Failed to create session during initialize: {}", err);
1031 return StreamableResponse::Error {
1032 status: StatusCode::INTERNAL_SERVER_ERROR,
1033 message: "Failed to create session".to_string(),
1034 }
1035 .into_boxed_response(&context);
1036 }
1037 }
1038 }
1039 }
1040 JsonRpcMessage::Request(_) | JsonRpcMessage::Notification(_) => {
1041 if let Some(existing_id) = &context.session_id {
1043 if let Err(err) = self.validate_session_exists(existing_id).await {
1045 warn!("Invalid session ID {}: {}", existing_id, err);
1046 return StreamableResponse::Error {
1047 status: StatusCode::UNAUTHORIZED,
1048 message: "Invalid or expired session".to_string(),
1049 }
1050 .into_boxed_response(&context);
1051 }
1052 existing_id.clone()
1053 } else {
1054 let method_name = match &message {
1056 JsonRpcMessage::Request(req) => &req.method,
1057 JsonRpcMessage::Notification(notif) => ¬if.method,
1058 };
1059 let request_id = match &message {
1060 JsonRpcMessage::Request(req) => Some(req.id.clone()),
1061 JsonRpcMessage::Notification(_) => None,
1062 };
1063
1064 warn!("Missing session ID for method: {}", method_name);
1065
1066 let error_response = turul_mcp_json_rpc_server::JsonRpcError::new(
1067 request_id,
1068 JsonRpcErrorObject::server_error(
1069 -32001,
1070 "Missing Mcp-Session-Id header. Call initialize first.",
1071 None::<serde_json::Value>,
1072 ),
1073 );
1074
1075 let error_json =
1076 serde_json::to_string(&error_response).unwrap_or_else(|_| "{}".to_string());
1077
1078 return Response::builder()
1079 .status(StatusCode::UNAUTHORIZED)
1080 .header(CONTENT_TYPE, "application/json")
1081 .header("MCP-Protocol-Version", context.protocol_version.as_str())
1082 .body(
1083 Full::new(Bytes::from(error_json))
1084 .map_err(|never| match never {})
1085 .boxed_unsync(),
1086 )
1087 .unwrap();
1088 }
1089 }
1090 };
1091
1092 debug!("Processing streaming request with session: {}", session_id);
1093
1094 match message {
1096 JsonRpcMessage::Request(request) => {
1097 debug!(
1098 "Processing streaming JSON-RPC request: method={}",
1099 request.method
1100 );
1101 self.create_streaming_response(request, session_id, context)
1102 .await
1103 }
1104 JsonRpcMessage::Notification(notification) => {
1105 debug!(
1106 "Processing streaming JSON-RPC notification: method={}",
1107 notification.method
1108 );
1109
1110 use crate::notification_bridge::StreamManagerNotificationBroadcaster;
1112 use turul_mcp_json_rpc_server::SessionContext;
1113
1114 let broadcaster = Arc::new(StreamManagerNotificationBroadcaster::new(Arc::clone(
1115 &self.stream_manager,
1116 )));
1117 let broadcaster_any = Arc::new(broadcaster) as Arc<dyn std::any::Any + Send + Sync>;
1118
1119 let session_context = SessionContext {
1120 session_id: session_id.clone(),
1121 metadata: std::collections::HashMap::new(),
1122 broadcaster: Some(broadcaster_any),
1123 timestamp: chrono::Utc::now().timestamp_millis() as u64,
1124 };
1125
1126 let dispatcher = Arc::clone(&self.dispatcher);
1128 let notification_clone = notification.clone();
1129
1130 tokio::spawn(async move {
1132 if let Err(e) = dispatcher
1133 .handle_notification_with_context(notification_clone, Some(session_context))
1134 .await
1135 {
1136 error!("Failed to process notification: {}", e);
1137 }
1138 });
1139
1140 Response::builder()
1142 .status(StatusCode::ACCEPTED)
1143 .header("MCP-Protocol-Version", context.protocol_version.as_str())
1144 .header("Mcp-Session-Id", &session_id)
1145 .body(
1146 Full::new(Bytes::new())
1147 .map_err(|never| match never {})
1148 .boxed_unsync(),
1149 )
1150 .unwrap()
1151 }
1152 }
1153 }
1154
1155 async fn create_streaming_response(
1158 &self,
1159 request: turul_mcp_json_rpc_server::JsonRpcRequest,
1160 session_id: String,
1161 context: StreamableHttpContext,
1162 ) -> Response<http_body_util::combinators::UnsyncBoxBody<Bytes, hyper::Error>> {
1163 debug!(
1164 "Creating streaming response for method: {}, session: {}",
1165 request.method, session_id
1166 );
1167 use http_body_util::StreamBody;
1169 use tokio_stream::StreamExt;
1170 use tokio_stream::wrappers::UnboundedReceiverStream; let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<Result<bytes::Bytes, hyper::Error>>();
1173 let body_stream =
1174 UnboundedReceiverStream::new(rx).map(|item| item.map(http_body::Frame::data));
1175 let body = StreamBody::new(body_stream);
1176
1177 use crate::notification_bridge::{
1179 SharedNotificationBroadcaster, StreamManagerNotificationBroadcaster,
1180 };
1181 use turul_mcp_json_rpc_server::SessionContext;
1182
1183 let broadcaster: SharedNotificationBroadcaster = Arc::new(
1184 StreamManagerNotificationBroadcaster::new(Arc::clone(&self.stream_manager)),
1185 );
1186 let broadcaster_any = Arc::new(broadcaster) as Arc<dyn std::any::Any + Send + Sync>;
1187
1188 let session_context = SessionContext {
1189 session_id: session_id.clone(),
1190 metadata: std::collections::HashMap::new(),
1191 broadcaster: Some(broadcaster_any),
1192 timestamp: chrono::Utc::now().timestamp_millis() as u64,
1193 };
1194
1195 let wants_sse = context.wants_sse_stream();
1197 let connection_id = format!("post-{}", uuid::Uuid::now_v7());
1198
1199 let (shutdown_tx, completion_rx) = if wants_sse {
1201 let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel::<()>();
1203 let (completion_tx, completion_rx) = tokio::sync::oneshot::channel::<()>();
1204 let (progress_tx, mut progress_rx) = tokio::sync::mpsc::channel(100);
1205
1206 let registration_result = self
1208 .stream_manager
1209 .register_streaming_connection(&session_id, connection_id.clone(), progress_tx)
1210 .await;
1211
1212 if let Err(e) = registration_result {
1213 error!("Failed to register POST streaming connection: {}", e);
1214 (None, None)
1216 } else {
1217 debug!(
1218 "Registered SSE streaming connection for session: {}",
1219 session_id
1220 );
1221
1222 let sender_clone = tx.clone();
1224 let session_id_clone = session_id.clone();
1225 let connection_id_clone = connection_id.clone();
1226 let stream_manager_clone = Arc::clone(&self.stream_manager);
1227
1228 tokio::spawn(async move {
1229 debug!(
1230 "Starting progress forwarding task for session: {}",
1231 session_id_clone
1232 );
1233
1234 loop {
1236 debug!(
1237 "🔍 Progress task entering select loop for session: {}",
1238 session_id_clone
1239 );
1240 tokio::select! {
1241 maybe_event = progress_rx.recv() => {
1243 debug!("🔍 Progress task: progress_rx.recv() branch fired for session: {}", session_id_clone);
1244 match maybe_event {
1245 Some(sse_event) => {
1246 debug!("🔍 Forwarding progress event to POST response: session={}, event={:?}", session_id_clone, sse_event.event_type);
1247
1248 let sse_chunk = sse_event.format();
1250
1251 if let Err(e) = sender_clone.send(Ok(Bytes::from(sse_chunk))) {
1252 error!("Failed to send progress event to POST response: {}", e);
1253 break;
1254 }
1255 }
1256 None => {
1257 debug!("🔍 Progress channel closed naturally for session: {}", session_id_clone);
1259 break;
1260 }
1261 }
1262 }
1263 _ = &mut shutdown_rx => {
1265 debug!("🔍 Progress task: shutdown_rx branch fired! Received explicit shutdown signal for session: {}", session_id_clone);
1266 break;
1267 }
1268 }
1269 }
1270
1271 debug!(
1273 "Progress task unregistering connection for session: {}",
1274 session_id_clone
1275 );
1276 stream_manager_clone
1277 .unregister_connection(&session_id_clone, &connection_id_clone)
1278 .await;
1279
1280 debug!(
1282 "🔍 Progress task: dropping sender_clone for session: {}",
1283 session_id_clone
1284 );
1285 drop(sender_clone);
1286
1287 debug!(
1289 "🔍 Progress task: signaling completion for session: {}",
1290 session_id_clone
1291 );
1292 if completion_tx.send(()).is_err() {
1293 debug!(
1294 "🔍 Progress task: main task already dropped completion_rx for session: {}",
1295 session_id_clone
1296 );
1297 }
1298
1299 debug!(
1300 "🔍 Progress forwarding task completed for session: {}",
1301 session_id_clone
1302 );
1303 });
1304
1305 (Some(shutdown_tx), Some(completion_rx))
1307 }
1308 } else {
1309 (None, None)
1311 };
1312
1313 let request_id = request.id.clone();
1315 let sender = tx; let headers = context.headers.clone();
1319 let self_clone = self.clone();
1320
1321 tokio::spawn(async move {
1322 debug!(
1323 "Spawning streaming task for request ID: {:?}, wants_sse: {}",
1324 request_id, wants_sse
1325 );
1326
1327 let (response, _) = self_clone
1330 .run_middleware_and_dispatch(request, headers, session_context)
1331 .await;
1332
1333 if wants_sse {
1335 let final_frame = match response {
1337 turul_mcp_json_rpc_server::JsonRpcMessage::Response(resp) => {
1338 turul_mcp_json_rpc_server::JsonRpcFrame::FinalResult {
1339 request_id: request_id.clone(),
1340 result: match resp.result {
1341 turul_mcp_json_rpc_server::response::ResponseResult::Success(
1342 val,
1343 ) => val,
1344 turul_mcp_json_rpc_server::response::ResponseResult::Null => {
1345 serde_json::Value::Null
1346 }
1347 },
1348 }
1349 }
1350 turul_mcp_json_rpc_server::JsonRpcMessage::Error(err) => {
1351 turul_mcp_json_rpc_server::JsonRpcFrame::Error {
1352 request_id: request_id.clone(),
1353 error: turul_mcp_json_rpc_server::error::JsonRpcErrorObject {
1354 code: err.error.code,
1355 message: err.error.message,
1356 data: err.error.data,
1357 },
1358 }
1359 }
1360 };
1361
1362 let final_json = final_frame.to_json();
1363 let final_chunk =
1365 format!("data: {}\n\n", serde_json::to_string(&final_json).unwrap());
1366
1367 if let Err(err) = sender.send(Ok(Bytes::from(final_chunk))) {
1368 error!("Failed to send SSE final chunk: {}", err);
1369 }
1370
1371 if let Some(shutdown_tx) = shutdown_tx {
1374 debug!(
1375 "🔍 Main task sending shutdown signal to progress task for request: {:?}",
1376 request_id
1377 );
1378 match shutdown_tx.send(()) {
1379 Ok(()) => {
1380 debug!(
1381 "🔍 Main task: shutdown signal sent successfully for request: {:?}",
1382 request_id
1383 );
1384
1385 if let Some(completion_rx) = completion_rx {
1388 match tokio::time::timeout(
1389 tokio::time::Duration::from_millis(100),
1390 completion_rx,
1391 )
1392 .await
1393 {
1394 Ok(Ok(())) => {
1395 debug!(
1396 "🔍 Main task: progress task completed successfully for request: {:?}",
1397 request_id
1398 );
1399 }
1400 Ok(Err(_)) => {
1401 debug!(
1402 "🔍 Main task: progress task completion signal dropped for request: {:?}",
1403 request_id
1404 );
1405 }
1406 Err(_) => {
1407 debug!(
1408 "🔍 Main task: progress task completion timeout for request: {:?}",
1409 request_id
1410 );
1411 }
1412 }
1413 }
1414 }
1415 Err(_) => {
1416 debug!(
1417 "🔍 Main task: progress task already completed (shutdown_rx dropped) for request: {:?}",
1418 request_id
1419 );
1420 }
1421 }
1422 } else {
1423 debug!(
1424 "🔍 Main task: no shutdown_tx available (not SSE client) for request: {:?}",
1425 request_id
1426 );
1427 }
1428 } else {
1429 let final_json = serde_json::to_string(&response).unwrap();
1431
1432 if let Err(err) = sender.send(Ok(Bytes::from(final_json))) {
1433 error!("Failed to send final JSON response: {}", err);
1434 }
1435 }
1436
1437 debug!(
1438 "🔍 Main task: streaming task completed for request ID: {:?}",
1439 request_id
1440 );
1441
1442 debug!(
1444 "🔍 Main task: dropping main sender for request ID: {:?}",
1445 request_id
1446 );
1447 drop(sender);
1448 });
1449
1450 let content_type = if context.wants_sse_stream() {
1453 "text/event-stream"
1454 } else {
1455 "application/json"
1456 };
1457
1458 let mut response = Response::builder()
1459 .status(StatusCode::OK)
1460 .header(CONTENT_TYPE, content_type)
1461 .header("Transfer-Encoding", "chunked") .header("Cache-Control", "no-cache")
1463 .body(http_body_util::BodyExt::boxed_unsync(body))
1464 .unwrap();
1465
1466 let mcp_headers = context.response_headers();
1468 for (key, value) in mcp_headers.iter() {
1469 response.headers_mut().insert(key, value.clone());
1470 }
1471
1472 response
1473 }
1474
1475 #[allow(dead_code)]
1477 async fn handle_buffered_post<T>(
1478 &self,
1479 _req: Request<T>,
1480 context: StreamableHttpContext,
1481 session_id: String,
1482 ) -> Response<http_body_util::combinators::UnsyncBoxBody<Bytes, hyper::Error>>
1483 where
1484 T: Body + Send + 'static,
1485 {
1486 debug!(
1487 "Using buffered POST for legacy client, session: {}",
1488 session_id
1489 );
1490
1491 Response::builder()
1495 .status(StatusCode::OK)
1496 .header(CONTENT_TYPE, "application/json")
1497 .header("MCP-Protocol-Version", context.protocol_version.as_str())
1498 .header("Mcp-Session-Id", &session_id)
1499 .body(
1500 Full::new(Bytes::from(
1501 r#"{"jsonrpc":"2.0","id":1,"result":"buffered"}"#,
1502 ))
1503 .map_err(|never| match never {})
1504 .boxed_unsync(),
1505 )
1506 .unwrap()
1507 }
1508
1509 async fn handle_client_message<T>(
1513 &self,
1514 req: Request<T>,
1515 context: StreamableHttpContext,
1516 ) -> Response<http_body_util::combinators::UnsyncBoxBody<Bytes, hyper::Error>>
1517 where
1518 T: Body + Send + 'static,
1519 {
1520 debug!("Handling client message via POST (MCP 2025-06-18)");
1521
1522 if !context.accepts_stream_frames {
1525 warn!("Client POST missing application/json in Accept header");
1526 return StreamableResponse::Error {
1527 status: StatusCode::BAD_REQUEST,
1528 message: "Accept header must include application/json, text/event-stream, or */*"
1529 .to_string(),
1530 }
1531 .into_boxed_response(&context);
1532 }
1533
1534 let content_type = req
1536 .headers()
1537 .get(CONTENT_TYPE)
1538 .and_then(|ct| ct.to_str().ok())
1539 .unwrap_or("");
1540 if !content_type.starts_with("application/json") {
1541 warn!("Invalid content type for POST: {}", content_type);
1542 return StreamableResponse::Error {
1543 status: StatusCode::BAD_REQUEST,
1544 message: "Content-Type must be application/json".to_string(),
1545 }
1546 .into_boxed_response(&context);
1547 }
1548
1549 debug!("Using streaming POST handler for all requests");
1552 return self.handle_post_streamable_http(req, context).await;
1553 }
1554
1555 async fn run_middleware_and_dispatch(
1570 &self,
1571 request: turul_mcp_json_rpc_server::JsonRpcRequest,
1572 headers: HashMap<String, String>,
1573 session: turul_mcp_json_rpc_server::SessionContext,
1574 ) -> (turul_mcp_json_rpc_server::JsonRpcMessage, Option<crate::middleware::SessionInjection>) {
1575 if self.middleware_stack.is_empty() {
1577 let result = self.dispatcher
1578 .handle_request_with_context(request, session)
1579 .await;
1580 return (result, None);
1581 }
1582
1583 let normalized_headers: HashMap<String, String> = headers
1585 .iter()
1586 .map(|(k, v)| (k.to_lowercase(), v.clone()))
1587 .collect();
1588
1589 let method = request.method.clone();
1592 let session_id = session.session_id.clone();
1593
1594 let params = request.params.clone().map(|p| match p {
1596 turul_mcp_json_rpc_server::RequestParams::Object(map) => {
1597 serde_json::Value::Object(map.into_iter().collect())
1598 }
1599 turul_mcp_json_rpc_server::RequestParams::Array(arr) => serde_json::Value::Array(arr),
1600 });
1601 let mut ctx = crate::middleware::RequestContext::new(&method, params);
1602
1603 for (k, v) in normalized_headers {
1604 ctx.add_metadata(k, serde_json::json!(v));
1605 }
1606
1607 let session_view = crate::middleware::StorageBackedSessionView::new(
1609 session_id.clone(),
1610 Arc::clone(&self.session_storage),
1611 );
1612
1613 let injection = match self.middleware_stack.execute_before(&mut ctx, Some(&session_view)).await {
1615 Ok(inj) => inj,
1616 Err(err) => {
1617 return (Self::map_middleware_error_to_jsonrpc(err, request.id), None);
1619 }
1620 };
1621
1622 if !injection.is_empty() {
1624 for (key, value) in injection.state() {
1625 if let Err(e) = session_view.set_state(key, value.clone()).await {
1626 tracing::warn!("Failed to apply injection state '{}': {}", key, e);
1627 }
1628 }
1629 for (key, value) in injection.metadata() {
1630 if let Err(e) = session_view.set_metadata(key, value.clone()).await {
1631 tracing::warn!("Failed to apply injection metadata '{}': {}", key, e);
1632 }
1633 }
1634 }
1635
1636 let result = self.dispatcher
1638 .handle_request_with_context(request, session)
1639 .await;
1640
1641 let mut dispatcher_result = match &result {
1644 turul_mcp_json_rpc_server::JsonRpcMessage::Response(resp) => {
1645 match &resp.result {
1646 turul_mcp_json_rpc_server::response::ResponseResult::Success(val) => {
1647 crate::middleware::DispatcherResult::Success(val.clone())
1648 }
1649 turul_mcp_json_rpc_server::response::ResponseResult::Null => {
1650 crate::middleware::DispatcherResult::Success(serde_json::Value::Null)
1651 }
1652 }
1653 }
1654 turul_mcp_json_rpc_server::JsonRpcMessage::Error(err) => {
1655 crate::middleware::DispatcherResult::Error(err.error.message.clone())
1656 }
1657 };
1658
1659 let _ = self.middleware_stack.execute_after(&ctx, &mut dispatcher_result).await;
1661
1662 (result, None) }
1664
1665 fn map_middleware_error_to_jsonrpc(
1667 err: crate::middleware::MiddlewareError,
1668 request_id: turul_mcp_json_rpc_server::RequestId,
1669 ) -> turul_mcp_json_rpc_server::JsonRpcMessage {
1670 use crate::middleware::error::error_codes;
1671 use crate::middleware::MiddlewareError;
1672
1673 let (code, message, data) = match err {
1674 MiddlewareError::Unauthenticated(msg) => (error_codes::UNAUTHENTICATED, msg, None),
1675 MiddlewareError::Unauthorized(msg) => (error_codes::UNAUTHORIZED, msg, None),
1676 MiddlewareError::RateLimitExceeded {
1677 message,
1678 retry_after,
1679 } => {
1680 let data = retry_after.map(|s| serde_json::json!({"retryAfter": s}));
1681 (error_codes::RATE_LIMIT_EXCEEDED, message, data)
1682 }
1683 MiddlewareError::InvalidRequest(msg) => (error_codes::INVALID_REQUEST, msg, None),
1684 MiddlewareError::Internal(msg) => (error_codes::INTERNAL_ERROR, msg, None),
1685 MiddlewareError::Custom { message, .. } => (error_codes::INTERNAL_ERROR, message, None),
1686 };
1687
1688 let error_obj = if let Some(d) = data {
1689 turul_mcp_json_rpc_server::error::JsonRpcErrorObject::server_error(code, &message, Some(d))
1690 } else {
1691 turul_mcp_json_rpc_server::error::JsonRpcErrorObject::server_error(
1692 code,
1693 &message,
1694 None::<serde_json::Value>,
1695 )
1696 };
1697
1698 turul_mcp_json_rpc_server::JsonRpcMessage::Error(turul_mcp_json_rpc_server::JsonRpcError::new(
1699 Some(request_id),
1700 error_obj,
1701 ))
1702 }
1703}
1704
1705#[cfg(test)]
1706mod tests {
1707 use super::*;
1708
1709 #[test]
1710 fn test_version_parsing() {
1711 assert_eq!(
1712 McpProtocolVersion::parse_version("2024-11-05"),
1713 Some(McpProtocolVersion::V2024_11_05)
1714 );
1715 assert_eq!(
1716 McpProtocolVersion::parse_version("2025-03-26"),
1717 Some(McpProtocolVersion::V2025_03_26)
1718 );
1719 assert_eq!(
1720 McpProtocolVersion::parse_version("2025-06-18"),
1721 Some(McpProtocolVersion::V2025_06_18)
1722 );
1723 assert_eq!(McpProtocolVersion::parse_version("invalid"), None);
1724 }
1725
1726 #[test]
1727 fn test_version_capabilities() {
1728 let v1 = McpProtocolVersion::V2024_11_05;
1729 assert!(!v1.supports_streamable_http());
1730 assert!(!v1.supports_meta_fields());
1731
1732 let v2 = McpProtocolVersion::V2025_03_26;
1733 assert!(v2.supports_streamable_http());
1734 assert!(!v2.supports_meta_fields());
1735
1736 let v3 = McpProtocolVersion::V2025_06_18;
1737 assert!(v3.supports_streamable_http());
1738 assert!(v3.supports_meta_fields());
1739 assert!(v3.supports_cursors());
1740 assert!(v3.supports_progress_tokens());
1741 assert!(v3.supports_elicitation());
1742 }
1743
1744 #[test]
1745 fn test_context_validation() {
1746 let mut context = StreamableHttpContext {
1747 protocol_version: McpProtocolVersion::V2025_06_18,
1748 session_id: Some("test-session".to_string()),
1749 wants_sse_stream: true,
1750 accepts_stream_frames: true,
1751 headers: HashMap::new(),
1752 };
1753
1754 assert!(context.validate(&Method::POST).is_ok());
1756 assert!(context.validate(&Method::GET).is_ok());
1758
1759 context.accepts_stream_frames = false;
1761 assert!(context.validate(&Method::POST).is_err());
1762
1763 context.accepts_stream_frames = true;
1764 context.protocol_version = McpProtocolVersion::V2024_11_05;
1765 context.wants_sse_stream = true;
1766 assert!(context.validate(&Method::POST).is_err());
1767
1768 context.protocol_version = McpProtocolVersion::V2025_06_18;
1769 context.session_id = None;
1770 assert!(context.validate(&Method::POST).is_ok());
1772 assert!(context.validate(&Method::GET).is_err());
1774 }
1775}