turul_http_mcp_server/
streamable_http.rs

1//! Streamable HTTP Transport for MCP 2025-06-18
2//!
3//! This module implements the "Streamable HTTP" transport mechanism introduced
4//! in MCP 2025-03-26, which replaces the previous HTTP+SSE approach from 2024-11-05.
5//!
6//! ## Key Improvements over HTTP+SSE
7//! - **Serverless Compatibility**: Enables deployment on AWS Lambda, Google Cloud Run
8//! - **Improved Scalability**: Supports chunked transfer encoding and progressive delivery
9//! - **Session Management**: Cryptographically secure session IDs for connection tracking
10//! - **Enterprise Network Friendly**: No long-lived connections or polling requirements
11
12use std::collections::HashMap;
13use std::pin::Pin;
14use std::sync::Arc;
15
16use bytes::Bytes;
17use futures::Stream;
18use http_body::Body;
19use http_body_util::{BodyExt, Full};
20use hyper::header::{ACCEPT, CONTENT_TYPE};
21use hyper::{HeaderMap, Method, Request, Response, StatusCode};
22use serde_json::Value;
23use tracing::{debug, error, warn};
24use turul_mcp_session_storage::SessionView;
25
26use crate::ServerConfig;
27
28/// MCP Protocol versions
29#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
30pub enum McpProtocolVersion {
31    /// Original protocol without streamable HTTP (2024-11-05)
32    V2024_11_05,
33    /// Protocol including streamable HTTP (2025-03-26)
34    V2025_03_26,
35    /// Protocol with structured _meta, cursor, progressToken, and elicitation (2025-06-18)
36    #[default]
37    V2025_06_18,
38}
39
40impl McpProtocolVersion {
41    /// Parse from header string
42    pub fn parse_version(s: &str) -> Option<Self> {
43        match s {
44            "2024-11-05" => Some(Self::V2024_11_05),
45            "2025-03-26" => Some(Self::V2025_03_26),
46            "2025-06-18" => Some(Self::V2025_06_18),
47            _ => None,
48        }
49    }
50
51    /// Convert to string representation
52    pub fn as_str(&self) -> &'static str {
53        match self {
54            Self::V2024_11_05 => "2024-11-05",
55            Self::V2025_03_26 => "2025-03-26",
56            Self::V2025_06_18 => "2025-06-18",
57        }
58    }
59
60    /// Returns whether this version supports streamable HTTP
61    pub fn supports_streamable_http(&self) -> bool {
62        matches!(self, Self::V2025_03_26 | Self::V2025_06_18)
63    }
64
65    /// Returns whether this version supports _meta fields
66    pub fn supports_meta_fields(&self) -> bool {
67        matches!(self, Self::V2025_06_18)
68    }
69
70    /// Returns whether this version supports cursor-based pagination
71    pub fn supports_cursors(&self) -> bool {
72        matches!(self, Self::V2025_06_18)
73    }
74
75    /// Returns whether this version supports progress tokens
76    pub fn supports_progress_tokens(&self) -> bool {
77        matches!(self, Self::V2025_06_18)
78    }
79
80    /// Returns whether this version supports elicitation
81    pub fn supports_elicitation(&self) -> bool {
82        matches!(self, Self::V2025_06_18)
83    }
84
85    /// Get list of supported features for this version
86    pub fn supported_features(&self) -> Vec<&'static str> {
87        let mut features = vec![];
88        if self.supports_streamable_http() {
89            features.push("streamable-http");
90        }
91        if self.supports_meta_fields() {
92            features.push("_meta-fields");
93        }
94        if self.supports_cursors() {
95            features.push("cursor-pagination");
96        }
97        if self.supports_progress_tokens() {
98            features.push("progress-tokens");
99        }
100        if self.supports_elicitation() {
101            features.push("elicitation");
102        }
103        features
104    }
105}
106
107/// Streamable HTTP request context
108#[derive(Debug, Clone)]
109pub struct StreamableHttpContext {
110    /// Protocol version negotiated
111    pub protocol_version: McpProtocolVersion,
112    /// Session ID if provided
113    pub session_id: Option<String>,
114    /// Whether client wants SSE stream (text/event-stream)
115    pub wants_sse_stream: bool,
116    /// Whether client accepts stream frames (application/json, text/event-stream, or */*)
117    pub accepts_stream_frames: bool,
118    /// Additional request headers
119    pub headers: HashMap<String, String>,
120}
121
122impl StreamableHttpContext {
123    /// Parse context from HTTP request headers
124    pub fn from_request<T>(req: &Request<T>) -> Self {
125        let headers = req.headers();
126
127        // Parse protocol version from MCP-Protocol-Version header
128        let protocol_version = headers
129            .get("MCP-Protocol-Version")
130            .and_then(|h| h.to_str().ok())
131            .and_then(McpProtocolVersion::parse_version)
132            .unwrap_or_default();
133
134        // Extract session ID from Mcp-Session-Id header (note capitalization)
135        let session_id = headers
136            .get("Mcp-Session-Id")
137            .and_then(|h| h.to_str().ok())
138            .map(|s| s.to_string());
139
140        // Check Accept header for streaming and JSON support
141        let accept_header = headers
142            .get(ACCEPT)
143            .and_then(|h| h.to_str().ok())
144            .unwrap_or_default()
145            .to_ascii_lowercase();
146
147        let wants_sse_stream = accept_header.contains("text/event-stream");
148        let accepts_stream_frames = accept_header.contains("application/json")
149            || accept_header.contains("text/event-stream")
150            || accept_header.contains("*/*");
151
152        // Collect additional headers for debugging/logging
153        let mut header_map = HashMap::new();
154        for (name, value) in headers.iter() {
155            if let Ok(value_str) = value.to_str() {
156                header_map.insert(name.to_string(), value_str.to_string());
157            }
158        }
159
160        Self {
161            protocol_version,
162            session_id,
163            wants_sse_stream,
164            accepts_stream_frames,
165            headers: header_map,
166        }
167    }
168
169    /// Whether client wants SSE stream
170    pub fn wants_sse_stream(&self) -> bool {
171        self.wants_sse_stream
172    }
173
174    /// Whether client wants streaming POST responses
175    pub fn wants_streaming_post(&self) -> bool {
176        self.accepts_stream_frames && self.wants_sse_stream
177    }
178
179    /// Check if request is compatible with streamable HTTP
180    pub fn is_streamable_compatible(&self) -> bool {
181        self.protocol_version.supports_streamable_http() && self.accepts_stream_frames
182    }
183
184    /// Validate request for MCP compliance
185    pub fn validate(&self, method: &Method) -> std::result::Result<(), String> {
186        if !self.accepts_stream_frames {
187            return Err(
188                "Accept header must include application/json, text/event-stream, or */*"
189                    .to_string(),
190            );
191        }
192
193        if self.wants_sse_stream && !self.protocol_version.supports_streamable_http() {
194            return Err(format!(
195                "Protocol version {} does not support streamable HTTP",
196                self.protocol_version.as_str()
197            ));
198        }
199
200        // Only enforce session_id for GET requests with SSE streams
201        // POST requests will validate session based on the JSON-RPC method (initialize vs others)
202        if *method == Method::GET && self.wants_sse_stream && self.session_id.is_none() {
203            return Err("Mcp-Session-Id header required for SSE streaming connections".to_string());
204        }
205
206        Ok(())
207    }
208
209    /// Create response headers for this context
210    pub fn response_headers(&self) -> HeaderMap {
211        let mut headers = HeaderMap::new();
212
213        // Always include protocol version in response
214        headers.insert(
215            "MCP-Protocol-Version",
216            self.protocol_version.as_str().parse().unwrap(),
217        );
218
219        // Include session ID if present
220        if let Some(session_id) = &self.session_id {
221            headers.insert("Mcp-Session-Id", session_id.parse().unwrap());
222        }
223
224        // Add capabilities header showing supported features
225        let features = self.protocol_version.supported_features();
226        if !features.is_empty() {
227            headers.insert("MCP-Capabilities", features.join(",").parse().unwrap());
228        }
229
230        headers
231    }
232}
233
234/// Streamable HTTP response types
235pub enum StreamableResponse {
236    /// Single JSON response
237    Json(Value),
238    /// Streaming response with multiple JSON messages
239    Stream(Pin<Box<dyn Stream<Item = std::result::Result<Value, String>> + Send>>),
240    /// Error response
241    Error { status: StatusCode, message: String },
242}
243
244impl std::fmt::Debug for StreamableResponse {
245    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
246        match self {
247            Self::Json(value) => f.debug_tuple("Json").field(value).finish(),
248            Self::Stream(_) => f.debug_tuple("Stream").field(&"<stream>").finish(),
249            Self::Error { status, message } => f
250                .debug_struct("Error")
251                .field("status", status)
252                .field("message", message)
253                .finish(),
254        }
255    }
256}
257
258impl StreamableResponse {
259    /// Convert to HTTP response
260    pub fn into_response(self, context: &StreamableHttpContext) -> Response<Full<Bytes>> {
261        let mut response_headers = context.response_headers();
262
263        match self {
264            StreamableResponse::Json(json) => {
265                response_headers.insert(CONTENT_TYPE, "application/json".parse().unwrap());
266
267                let body = serde_json::to_string(&json)
268                    .unwrap_or_else(|_| r#"{"error": "Failed to serialize response"}"#.to_string());
269
270                Response::builder()
271                    .status(StatusCode::OK)
272                    .body(Full::new(Bytes::from(body)))
273                    .unwrap()
274            }
275
276            StreamableResponse::Stream(_stream) => {
277                // For streaming responses, set appropriate headers
278                response_headers.insert(CONTENT_TYPE, "text/event-stream".parse().unwrap());
279                response_headers.insert("Cache-Control", "no-cache, no-transform".parse().unwrap());
280                response_headers.insert("Connection", "keep-alive".parse().unwrap());
281
282                // TODO: Implement actual streaming body with chunked transfer encoding
283                // Should stream JSON messages over HTTP with proper Content-Type: text/event-stream
284                // For now, return 202 Accepted to indicate streaming would happen
285                Response::builder()
286                    .status(StatusCode::ACCEPTED)
287                    .body(Full::new(Bytes::from("Streaming response accepted")))
288                    .unwrap()
289            }
290
291            StreamableResponse::Error { status, message } => {
292                response_headers.insert(CONTENT_TYPE, "application/json".parse().unwrap());
293
294                let error_json = serde_json::json!({
295                    "error": {
296                        "code": status.as_u16(),
297                        "message": message
298                    }
299                });
300
301                let body = serde_json::to_string(&error_json).unwrap_or_else(|_| {
302                    r#"{"error": {"code": 500, "message": "Internal server error"}}"#.to_string()
303                });
304
305                Response::builder()
306                    .status(status)
307                    .body(Full::new(Bytes::from(body)))
308                    .unwrap()
309            }
310        }
311    }
312
313    /// Convert to HTTP response with UnsyncBoxBody for streaming compatibility
314    pub fn into_boxed_response(
315        self,
316        context: &StreamableHttpContext,
317    ) -> Response<http_body_util::combinators::UnsyncBoxBody<Bytes, hyper::Error>> {
318        self.into_response(context)
319            .map(|body| body.map_err(|never| match never {}).boxed_unsync())
320    }
321}
322
323/// Streamable HTTP transport handler
324#[derive(Clone)]
325pub struct StreamableHttpHandler {
326    config: Arc<ServerConfig>,
327    dispatcher: Arc<turul_mcp_json_rpc_server::JsonRpcDispatcher<turul_mcp_protocol::McpError>>,
328    session_storage: Arc<turul_mcp_session_storage::BoxedSessionStorage>,
329    stream_manager: Arc<crate::StreamManager>,
330    server_capabilities: turul_mcp_protocol::ServerCapabilities,
331    pub(crate) middleware_stack: Arc<crate::middleware::MiddlewareStack>,
332}
333
334impl StreamableHttpHandler {
335    pub fn new(
336        config: Arc<ServerConfig>,
337        dispatcher: Arc<turul_mcp_json_rpc_server::JsonRpcDispatcher<turul_mcp_protocol::McpError>>,
338        session_storage: Arc<turul_mcp_session_storage::BoxedSessionStorage>,
339        stream_manager: Arc<crate::StreamManager>,
340        server_capabilities: turul_mcp_protocol::ServerCapabilities,
341        middleware_stack: Arc<crate::middleware::MiddlewareStack>,
342    ) -> Self {
343        Self {
344            config,
345            dispatcher,
346            session_storage,
347            stream_manager,
348            server_capabilities,
349            middleware_stack,
350        }
351    }
352
353    /// Handle incoming HTTP request with streamable HTTP support
354    pub async fn handle_request<T>(
355        &self,
356        req: Request<T>,
357    ) -> Response<http_body_util::combinators::UnsyncBoxBody<Bytes, hyper::Error>>
358    where
359        T: Body + Send + 'static,
360        T::Data: Send,
361        T::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
362    {
363        debug!(
364            "Streamable handler request: method={}, uri={}",
365            req.method(),
366            req.uri()
367        );
368        // Parse streamable HTTP context from request
369        let context = StreamableHttpContext::from_request(&req);
370
371        debug!(
372            "Streamable handler entry: method={}, protocol={}, session={:?}, accepts_stream_frames={}, wants_sse_stream={}",
373            req.method(),
374            context.protocol_version.as_str(),
375            context.session_id,
376            context.accepts_stream_frames,
377            context.wants_sse_stream()
378        );
379
380        // Validate request
381        if let Err(error) = context.validate(req.method()) {
382            warn!("Invalid streamable HTTP request: {}", error);
383            return StreamableResponse::Error {
384                status: StatusCode::BAD_REQUEST,
385                message: error,
386            }
387            .into_boxed_response(&context);
388        }
389
390        // Route based on MCP 2025-06-18 specification
391        match *req.method() {
392            Method::POST => {
393                // ALL client messages (requests, notifications, responses) come via POST
394                // Server decides whether to respond with JSON or SSE stream
395                self.handle_client_message(req, context).await
396            }
397            Method::GET => {
398                // Optional SSE stream for server-initiated messages
399                self.handle_get_sse_notifications(req, context).await
400            }
401            Method::DELETE => {
402                // Optional session cleanup
403                self.handle_session_delete(req, context).await
404            }
405            _ => StreamableResponse::Error {
406                status: StatusCode::METHOD_NOT_ALLOWED,
407                message: "Method not allowed for this endpoint".to_string(),
408            }
409            .into_boxed_response(&context),
410        }
411    }
412
413    /// Handle GET request for long-lived server-initiated notifications (GET SSE)
414    ///
415    /// This is traditional Server-Sent Events - a long-lived GET connection for
416    /// server-initiated notifications unrelated to specific client requests.
417    /// NOT used for tool progress (that's POST Streamable HTTP).
418    async fn handle_get_sse_notifications<T>(
419        &self,
420        req: Request<T>,
421        context: StreamableHttpContext,
422    ) -> Response<http_body_util::combinators::UnsyncBoxBody<Bytes, hyper::Error>>
423    where
424        T: Body + Send + 'static,
425    {
426        debug!(
427            "Opening streaming connection for session: {:?}",
428            context.session_id
429        );
430
431        // 1. Validate session exists and is authorized
432        let session_id = match context.session_id {
433            Some(ref id) => id.clone(),
434            None => {
435                warn!("Missing session ID for streaming GET request");
436                return StreamableResponse::Error {
437                    status: StatusCode::BAD_REQUEST,
438                    message: "Mcp-Session-Id header required for streaming connection".to_string(),
439                }
440                .into_boxed_response(&context);
441            }
442        };
443
444        // Validate session exists (do NOT create if missing)
445        match self.validate_session_exists(&session_id).await {
446            Ok(_) => {
447                debug!(
448                    "Session validation successful for streaming GET: {}",
449                    session_id
450                );
451            }
452            Err(err) => {
453                error!(
454                    "Session validation failed for streaming GET {}: {}",
455                    session_id, err
456                );
457                return StreamableResponse::Error {
458                    status: StatusCode::UNAUTHORIZED,
459                    message: format!("Session validation failed: {}", err),
460                }
461                .into_boxed_response(&context);
462            }
463        }
464
465        // 2. Create bi-directional stream with chunked transfer encoding
466        // For MCP 2025-06-18 Streamable HTTP, we create a stream that can handle bidirectional JSON-RPC
467        // Unlike SSE which is unidirectional server->client, this supports client->server and server->client
468
469        // Extract Last-Event-ID for resumability (if client supports it)
470        let last_event_id = req
471            .headers()
472            .get("Last-Event-ID")
473            .and_then(|h| h.to_str().ok())
474            .and_then(|s| s.parse::<u64>().ok());
475
476        // Generate unique connection ID for tracking this stream
477        let connection_id = uuid::Uuid::now_v7().to_string();
478
479        debug!(
480            "Creating streamable HTTP connection: session={}, connection={}, last_event_id={:?}",
481            session_id, connection_id, last_event_id
482        );
483
484        // 3. Return streaming response supporting progressive message delivery
485        // ✅ FIXED: Return the actual streaming response from StreamManager
486        // This preserves event replay, resumability, and live streaming capabilities
487        match self
488            .stream_manager
489            .handle_sse_connection(session_id.clone(), connection_id.clone(), last_event_id)
490            .await
491        {
492            Ok(mut streaming_response) => {
493                debug!(
494                    "Streamable HTTP connection established: session={}, connection={}",
495                    session_id, connection_id
496                );
497
498                // Merge MCP headers from context.response_headers()
499                let mcp_headers = context.response_headers();
500                for (key, value) in mcp_headers.iter() {
501                    streaming_response.headers_mut().insert(key, value.clone());
502                }
503
504                // ✅ PRESERVE STREAMING: Return the streaming response with MCP headers
505                // This maintains event replay from session storage and live streaming
506                streaming_response
507            }
508            Err(err) => {
509                error!("Failed to create streamable HTTP connection: {}", err);
510                StreamableResponse::Error {
511                    status: StatusCode::INTERNAL_SERVER_ERROR,
512                    message: format!("Streaming connection failed: {}", err),
513                }
514                .into_boxed_response(&context)
515            }
516        }
517    }
518
519    /// Validate that a session exists - do NOT create if missing
520    async fn validate_session_exists(&self, session_id: &str) -> std::result::Result<(), String> {
521        match self.session_storage.get_session(session_id).await {
522            Ok(Some(_)) => {
523                debug!("Session validation successful: {}", session_id);
524                Ok(())
525            }
526            Ok(None) => {
527                error!("Session not found: {}", session_id);
528                Err(format!(
529                    "Session '{}' not found. Sessions must be created via initialize request first.",
530                    session_id
531                ))
532            }
533            Err(err) => {
534                error!("Failed to validate session {}: {}", session_id, err);
535                Err(format!("Session validation failed: {}", err))
536            }
537        }
538    }
539
540    /// Handle POST request with JSON response (legacy compatibility)
541    #[allow(dead_code)]
542    async fn handle_json_post<T>(
543        &self,
544        req: Request<T>,
545        context: StreamableHttpContext,
546    ) -> Response<http_body_util::combinators::UnsyncBoxBody<Bytes, hyper::Error>>
547    where
548        T: Body + Send + 'static,
549    {
550        debug!("Handling JSON POST (non-streaming/legacy)");
551
552        // 1. Parse JSON-RPC request(s) from request body (legacy clients don't require sessions)
553
554        // Check content type
555        let content_type = req
556            .headers()
557            .get(CONTENT_TYPE)
558            .and_then(|ct| ct.to_str().ok())
559            .unwrap_or("");
560
561        if !content_type.starts_with("application/json") {
562            warn!("Invalid content type for legacy POST: {}", content_type);
563            return StreamableResponse::Error {
564                status: StatusCode::BAD_REQUEST,
565                message: "Content-Type must be application/json".to_string(),
566            }
567            .into_boxed_response(&context);
568        }
569
570        // Read request body
571        let body_bytes = match req.into_body().collect().await {
572            Ok(collected) => collected.to_bytes(),
573            Err(_err) => {
574                error!("Failed to read legacy POST request body");
575                return StreamableResponse::Error {
576                    status: StatusCode::BAD_REQUEST,
577                    message: "Failed to read request body".to_string(),
578                }
579                .into_boxed_response(&context);
580            }
581        };
582
583        // Check body size
584        if body_bytes.len() > self.config.max_body_size {
585            warn!(
586                "Legacy POST request body too large: {} bytes",
587                body_bytes.len()
588            );
589            return StreamableResponse::Error {
590                status: StatusCode::PAYLOAD_TOO_LARGE,
591                message: "Request body too large".to_string(),
592            }
593            .into_boxed_response(&context);
594        }
595
596        // Parse as UTF-8
597        let body_str = match std::str::from_utf8(&body_bytes) {
598            Ok(s) => s,
599            Err(err) => {
600                error!("Invalid UTF-8 in legacy POST request body: {}", err);
601                return StreamableResponse::Error {
602                    status: StatusCode::BAD_REQUEST,
603                    message: "Request body must be valid UTF-8".to_string(),
604                }
605                .into_boxed_response(&context);
606            }
607        };
608
609        debug!("Received legacy POST JSON-RPC request: {}", body_str);
610
611        // Parse JSON-RPC message
612        use turul_mcp_json_rpc_server::dispatch::{
613            JsonRpcMessage, JsonRpcMessageResult, parse_json_rpc_message,
614        };
615
616        let message = match parse_json_rpc_message(body_str) {
617            Ok(msg) => msg,
618            Err(rpc_err) => {
619                error!("JSON-RPC parse error in legacy POST: {}", rpc_err);
620                let error_json =
621                    serde_json::to_string(&rpc_err).unwrap_or_else(|_| "{}".to_string());
622                return Response::builder()
623                    .status(StatusCode::OK) // JSON-RPC parse errors still use 200 OK
624                    .header(CONTENT_TYPE, "application/json")
625                    .header("MCP-Protocol-Version", context.protocol_version.as_str())
626                    .body(Full::new(Bytes::from(error_json)))
627                    .unwrap()
628                    .map(|body| body.map_err(|never| match never {}).boxed_unsync());
629            }
630        };
631
632        // 2. Process via dispatcher (no session context for legacy clients)
633        // Legacy clients (MCP 2024-11-05) don't use sessions, so no session context
634        let message_result = match message {
635            JsonRpcMessage::Request(request) => {
636                debug!(
637                    "Processing legacy POST JSON-RPC request: method={}",
638                    request.method
639                );
640
641                // Special handling for initialize requests - legacy clients can create sessions too
642                let response = if request.method == "initialize" {
643                    debug!("Handling legacy initialize request - creating new session");
644
645                    // Let session storage create the session and generate the ID
646                    match self
647                        .session_storage
648                        .create_session(self.server_capabilities.clone())
649                        .await
650                    {
651                        Ok(session_info) => {
652                            debug!(
653                                "Created new session for legacy client: {}",
654                                session_info.session_id
655                            );
656
657                            // Create session context for initialize response
658                            use crate::notification_bridge::StreamManagerNotificationBroadcaster;
659                            use turul_mcp_json_rpc_server::r#async::SessionContext;
660
661                            let broadcaster = Arc::new(StreamManagerNotificationBroadcaster::new(
662                                Arc::clone(&self.stream_manager),
663                            ));
664                            let broadcaster_any =
665                                Arc::new(broadcaster) as Arc<dyn std::any::Any + Send + Sync>;
666
667                            let session_context = SessionContext {
668                                session_id: session_info.session_id.clone(),
669                                metadata: std::collections::HashMap::new(),
670                                broadcaster: Some(broadcaster_any),
671                                timestamp: chrono::Utc::now().timestamp_millis() as u64,
672                            };
673
674                            self.dispatcher
675                                .handle_request_with_context(request, session_context)
676                                .await
677                        }
678                        Err(err) => {
679                            error!("Failed to create session during legacy initialize: {}", err);
680                            let error_msg = format!("Session creation failed: {}", err);
681                            turul_mcp_json_rpc_server::JsonRpcMessage::error(
682                                turul_mcp_json_rpc_server::JsonRpcError::internal_error(
683                                    Some(request.id),
684                                    Some(error_msg),
685                                ),
686                            )
687                        }
688                    }
689                } else {
690                    // For non-initialize requests, process without session context (legacy mode)
691                    self.dispatcher.handle_request(request).await
692                };
693
694                // Convert JsonRpcMessage to JsonRpcMessageResult
695                match response {
696                    turul_mcp_json_rpc_server::JsonRpcMessage::Response(resp) => {
697                        JsonRpcMessageResult::Response(resp)
698                    }
699                    turul_mcp_json_rpc_server::JsonRpcMessage::Error(err) => {
700                        JsonRpcMessageResult::Error(err)
701                    }
702                }
703            }
704            JsonRpcMessage::Notification(notification) => {
705                debug!(
706                    "Processing legacy POST JSON-RPC notification: method={}",
707                    notification.method
708                );
709
710                // Process notification without session context (legacy mode)
711                let result = self
712                    .dispatcher
713                    .handle_notification_with_context(notification, None)
714                    .await;
715
716                if let Err(err) = result {
717                    error!("Legacy POST notification handling error: {}", err);
718                }
719                JsonRpcMessageResult::NoResponse
720            }
721        };
722
723        // 3. Return single JSON response (no streaming) - legacy compatibility
724        match message_result {
725            JsonRpcMessageResult::Response(response) => {
726                let response_json = serde_json::to_string(&response)
727                    .unwrap_or_else(|_| r#"{"error": "Failed to serialize response"}"#.to_string());
728
729                Response::builder()
730                    .status(StatusCode::OK)
731                    .header(CONTENT_TYPE, "application/json")
732                    .header("MCP-Protocol-Version", context.protocol_version.as_str())
733                    .body(Full::new(Bytes::from(response_json)))
734                    .unwrap()
735                    .map(|body| body.map_err(|never| match never {}).boxed_unsync())
736            }
737            JsonRpcMessageResult::Error(error) => {
738                let error_json = serde_json::to_string(&error)
739                    .unwrap_or_else(|_| r#"{"error": "Internal error"}"#.to_string());
740
741                Response::builder()
742                    .status(StatusCode::OK) // JSON-RPC errors still return 200 OK
743                    .header(CONTENT_TYPE, "application/json")
744                    .header("MCP-Protocol-Version", context.protocol_version.as_str())
745                    .body(Full::new(Bytes::from(error_json)))
746                    .unwrap()
747                    .map(|body| body.map_err(|never| match never {}).boxed_unsync())
748            }
749            JsonRpcMessageResult::NoResponse => {
750                // Notifications return 202 Accepted per MCP spec
751                Response::builder()
752                    .status(StatusCode::ACCEPTED)
753                    .header("MCP-Protocol-Version", context.protocol_version.as_str())
754                    .body(Full::new(Bytes::new()))
755                    .unwrap()
756                    .map(|body| body.map_err(|never| match never {}).boxed_unsync())
757            }
758        }
759    }
760
761    /// Handle DELETE request for session cleanup
762    async fn handle_session_delete<T>(
763        &self,
764        _req: Request<T>,
765        context: StreamableHttpContext,
766    ) -> Response<http_body_util::combinators::UnsyncBoxBody<Bytes, hyper::Error>>
767    where
768        T: Body + Send + 'static,
769    {
770        if let Some(session_id) = &context.session_id {
771            debug!("Deleting session: {}", session_id);
772
773            // Implement proper session cleanup for Streamable HTTP
774            // 1. Close any active streaming connections for this session
775            let closed_connections = self
776                .stream_manager
777                .close_session_connections(session_id)
778                .await;
779            debug!(
780                "Closed {} streaming connections for session: {}",
781                closed_connections, session_id
782            );
783
784            // 2. Mark session as terminated instead of immediate deletion (for proper lifecycle management)
785            match self.session_storage.get_session(session_id).await {
786                Ok(Some(mut session_info)) => {
787                    // Mark session as terminated in state
788                    session_info
789                        .state
790                        .insert("terminated".to_string(), serde_json::Value::Bool(true));
791                    session_info.state.insert(
792                        "terminated_at".to_string(),
793                        serde_json::Value::Number(serde_json::Number::from(
794                            chrono::Utc::now().timestamp_millis(),
795                        )),
796                    );
797                    session_info.touch();
798
799                    // 3. Update session with termination markers
800                    match self.session_storage.update_session(session_info).await {
801                        Ok(()) => {
802                            debug!(
803                                "Session {} marked as terminated (TTL will handle cleanup)",
804                                session_id
805                            );
806
807                            // Return success response with proper headers
808                            Response::builder()
809                                .status(StatusCode::OK)
810                                .header(CONTENT_TYPE, "application/json")
811                                .header("MCP-Protocol-Version", context.protocol_version.as_str())
812                                .header("Mcp-Session-Id", session_id)
813                                .body(Full::new(Bytes::from(
814                                    serde_json::to_string(&serde_json::json!({
815                                        "status": "session_terminated",
816                                        "session_id": session_id,
817                                        "closed_connections": closed_connections,
818                                        "message": "Session marked for cleanup"
819                                    }))
820                                    .unwrap_or_else(|_| {
821                                        r#"{"status":"session_terminated"}"#.to_string()
822                                    }),
823                                )))
824                                .unwrap()
825                                .map(|body| body.map_err(|never| match never {}).boxed_unsync())
826                        }
827                        Err(err) => {
828                            error!(
829                                "Error marking session {} as terminated: {}",
830                                session_id, err
831                            );
832                            // Fallback to deletion if update fails
833                            match self.session_storage.delete_session(session_id).await {
834                                Ok(_) => {
835                                    debug!("Session {} deleted as fallback", session_id);
836                                    Response::builder()
837                                        .status(StatusCode::OK)
838                                        .header(CONTENT_TYPE, "application/json")
839                                        .header(
840                                            "MCP-Protocol-Version",
841                                            context.protocol_version.as_str(),
842                                        )
843                                        .body(Full::new(Bytes::from(
844                                            serde_json::to_string(&serde_json::json!({
845                                                "status": "session_deleted",
846                                                "session_id": session_id,
847                                                "closed_connections": closed_connections,
848                                                "message": "Session removed"
849                                            }))
850                                            .unwrap_or_else(|_| {
851                                                r#"{"status":"session_deleted"}"#.to_string()
852                                            }),
853                                        )))
854                                        .unwrap()
855                                        .map(|body| {
856                                            body.map_err(|never| match never {}).boxed_unsync()
857                                        })
858                                }
859                                Err(delete_err) => {
860                                    error!(
861                                        "Error deleting session {} as fallback: {}",
862                                        session_id, delete_err
863                                    );
864                                    StreamableResponse::Error {
865                                        status: StatusCode::INTERNAL_SERVER_ERROR,
866                                        message: "Session termination error".to_string(),
867                                    }
868                                    .into_boxed_response(&context)
869                                }
870                            }
871                        }
872                    }
873                }
874                Ok(None) => {
875                    // Session not found
876                    Response::builder()
877                        .status(StatusCode::NOT_FOUND)
878                        .header(CONTENT_TYPE, "application/json")
879                        .header("MCP-Protocol-Version", context.protocol_version.as_str())
880                        .body(Full::new(Bytes::from(
881                            serde_json::to_string(&serde_json::json!({
882                                "status": "session_not_found",
883                                "session_id": session_id,
884                                "message": "Session not found"
885                            }))
886                            .unwrap_or_else(|_| r#"{"status":"session_not_found"}"#.to_string()),
887                        )))
888                        .unwrap()
889                        .map(|body| body.map_err(|never| match never {}).boxed_unsync())
890                }
891                Err(err) => {
892                    error!(
893                        "Error retrieving session {} for termination: {}",
894                        session_id, err
895                    );
896                    StreamableResponse::Error {
897                        status: StatusCode::INTERNAL_SERVER_ERROR,
898                        message: "Session lookup error".to_string(),
899                    }
900                    .into_boxed_response(&context)
901                }
902            }
903        } else {
904            StreamableResponse::Error {
905                status: StatusCode::BAD_REQUEST,
906                message: "Mcp-Session-Id header required for session deletion".to_string(),
907            }
908            .into_boxed_response(&context)
909        }
910    }
911
912    /// Handle POST with tool progress notifications (POST Streamable HTTP)
913    ///
914    /// Implements MCP 2025-06-18 Streamable HTTP where POST requests receive
915    /// chunked responses containing both progress notifications AND final result.
916    /// This is request-response, NOT a long-lived connection (that's GET SSE).
917    ///
918    /// Uses hyper::Body::channel() for chunked transfer encoding with background
919    /// task forwarding (works for long-running servers, BROKEN in Lambda).
920    async fn handle_post_streamable_http<T>(
921        &self,
922        req: Request<T>,
923        mut context: StreamableHttpContext,
924    ) -> Response<http_body_util::combinators::UnsyncBoxBody<Bytes, hyper::Error>>
925    where
926        T: Body + Send + 'static,
927    {
928        debug!("Streaming handler called - using true streaming POST");
929
930        // Parse request body (still need to collect for JSON-RPC parsing)
931        let body_bytes = match req.into_body().collect().await {
932            Ok(collected) => collected.to_bytes(),
933            Err(_err) => {
934                error!("Failed to read streaming POST request body");
935                return StreamableResponse::Error {
936                    status: StatusCode::BAD_REQUEST,
937                    message: "Failed to read request body".to_string(),
938                }
939                .into_boxed_response(&context);
940            }
941        };
942
943        // Check body size
944        if body_bytes.len() > self.config.max_body_size {
945            warn!(
946                "Streaming POST request body too large: {} bytes",
947                body_bytes.len()
948            );
949            return StreamableResponse::Error {
950                status: StatusCode::PAYLOAD_TOO_LARGE,
951                message: "Request body too large".to_string(),
952            }
953            .into_boxed_response(&context);
954        }
955
956        // Parse as UTF-8
957        let body_str = match std::str::from_utf8(&body_bytes) {
958            Ok(s) => s,
959            Err(err) => {
960                error!("Invalid UTF-8 in streaming POST request body: {}", err);
961                return StreamableResponse::Error {
962                    status: StatusCode::BAD_REQUEST,
963                    message: "Request body must be valid UTF-8".to_string(),
964                }
965                .into_boxed_response(&context);
966            }
967        };
968
969        debug!("Streaming POST received JSON-RPC request: {}", body_str);
970
971        // Parse JSON-RPC message
972        use turul_mcp_json_rpc_server::dispatch::{JsonRpcMessage, parse_json_rpc_message};
973        use turul_mcp_json_rpc_server::error::JsonRpcErrorObject;
974
975        let message = match parse_json_rpc_message(body_str) {
976            Ok(msg) => msg,
977            Err(rpc_err) => {
978                error!("JSON-RPC parse error in streaming POST: {}", rpc_err);
979                let error_json =
980                    serde_json::to_string(&rpc_err).unwrap_or_else(|_| "{}".to_string());
981
982                // Return error with MCP headers (no session header for parse errors)
983                return Response::builder()
984                    .status(StatusCode::OK) // JSON-RPC parse errors still use 200 OK
985                    .header(CONTENT_TYPE, "application/json")
986                    .header("MCP-Protocol-Version", context.protocol_version.as_str())
987                    .body(
988                        Full::new(Bytes::from(error_json))
989                            .map_err(|never| match never {})
990                            .boxed_unsync(),
991                    )
992                    .unwrap();
993            }
994        };
995
996        // Validate session requirements based on method
997        let session_id = match &message {
998            JsonRpcMessage::Request(req) if req.method == "initialize" => {
999                // Initialize can create session if none exists
1000                if let Some(existing_id) = &context.session_id {
1001                    // Validate existing session for initialize
1002                    if let Err(err) = self.validate_session_exists(existing_id).await {
1003                        warn!(
1004                            "Invalid session ID {} during initialize: {}",
1005                            existing_id, err
1006                        );
1007                        return StreamableResponse::Error {
1008                            status: StatusCode::UNAUTHORIZED,
1009                            message: "Invalid or expired session".to_string(),
1010                        }
1011                        .into_boxed_response(&context);
1012                    }
1013                    existing_id.clone()
1014                } else {
1015                    // Create new session for initialize
1016                    match self
1017                        .session_storage
1018                        .create_session(self.server_capabilities.clone())
1019                        .await
1020                    {
1021                        Ok(session_info) => {
1022                            debug!(
1023                                "Created new session for initialize: {}",
1024                                session_info.session_id
1025                            );
1026                            context.session_id = Some(session_info.session_id.clone());
1027                            session_info.session_id
1028                        }
1029                        Err(err) => {
1030                            error!("Failed to create session during initialize: {}", err);
1031                            return StreamableResponse::Error {
1032                                status: StatusCode::INTERNAL_SERVER_ERROR,
1033                                message: "Failed to create session".to_string(),
1034                            }
1035                            .into_boxed_response(&context);
1036                        }
1037                    }
1038                }
1039            }
1040            JsonRpcMessage::Request(_) | JsonRpcMessage::Notification(_) => {
1041                // All other methods REQUIRE session ID
1042                if let Some(existing_id) = &context.session_id {
1043                    // Validate existing session
1044                    if let Err(err) = self.validate_session_exists(existing_id).await {
1045                        warn!("Invalid session ID {}: {}", existing_id, err);
1046                        return StreamableResponse::Error {
1047                            status: StatusCode::UNAUTHORIZED,
1048                            message: "Invalid or expired session".to_string(),
1049                        }
1050                        .into_boxed_response(&context);
1051                    }
1052                    existing_id.clone()
1053                } else {
1054                    // Return 401 JSON-RPC error for missing session
1055                    let method_name = match &message {
1056                        JsonRpcMessage::Request(req) => &req.method,
1057                        JsonRpcMessage::Notification(notif) => &notif.method,
1058                    };
1059                    let request_id = match &message {
1060                        JsonRpcMessage::Request(req) => Some(req.id.clone()),
1061                        JsonRpcMessage::Notification(_) => None,
1062                    };
1063
1064                    warn!("Missing session ID for method: {}", method_name);
1065
1066                    let error_response = turul_mcp_json_rpc_server::JsonRpcError::new(
1067                        request_id,
1068                        JsonRpcErrorObject::server_error(
1069                            -32001,
1070                            "Missing Mcp-Session-Id header. Call initialize first.",
1071                            None::<serde_json::Value>,
1072                        ),
1073                    );
1074
1075                    let error_json =
1076                        serde_json::to_string(&error_response).unwrap_or_else(|_| "{}".to_string());
1077
1078                    return Response::builder()
1079                        .status(StatusCode::UNAUTHORIZED)
1080                        .header(CONTENT_TYPE, "application/json")
1081                        .header("MCP-Protocol-Version", context.protocol_version.as_str())
1082                        .body(
1083                            Full::new(Bytes::from(error_json))
1084                                .map_err(|never| match never {})
1085                                .boxed_unsync(),
1086                        )
1087                        .unwrap();
1088                }
1089            }
1090        };
1091
1092        debug!("Processing streaming request with session: {}", session_id);
1093
1094        // Create streaming response using hyper::Body::channel()
1095        match message {
1096            JsonRpcMessage::Request(request) => {
1097                debug!(
1098                    "Processing streaming JSON-RPC request: method={}",
1099                    request.method
1100                );
1101                self.create_streaming_response(request, session_id, context)
1102                    .await
1103            }
1104            JsonRpcMessage::Notification(notification) => {
1105                debug!(
1106                    "Processing streaming JSON-RPC notification: method={}",
1107                    notification.method
1108                );
1109
1110                // Create session context with notification broadcaster for notifications
1111                use crate::notification_bridge::StreamManagerNotificationBroadcaster;
1112                use turul_mcp_json_rpc_server::SessionContext;
1113
1114                let broadcaster = Arc::new(StreamManagerNotificationBroadcaster::new(Arc::clone(
1115                    &self.stream_manager,
1116                )));
1117                let broadcaster_any = Arc::new(broadcaster) as Arc<dyn std::any::Any + Send + Sync>;
1118
1119                let session_context = SessionContext {
1120                    session_id: session_id.clone(),
1121                    metadata: std::collections::HashMap::new(),
1122                    broadcaster: Some(broadcaster_any),
1123                    timestamp: chrono::Utc::now().timestamp_millis() as u64,
1124                };
1125
1126                // Process notification through dispatcher (notifications don't return responses)
1127                let dispatcher = Arc::clone(&self.dispatcher);
1128                let notification_clone = notification.clone();
1129
1130                // Spawn task to handle notification asynchronously (notifications are fire-and-forget)
1131                tokio::spawn(async move {
1132                    if let Err(e) = dispatcher
1133                        .handle_notification_with_context(notification_clone, Some(session_context))
1134                        .await
1135                    {
1136                        error!("Failed to process notification: {}", e);
1137                    }
1138                });
1139
1140                // Return 202 Accepted with MCP headers (notifications are accepted immediately)
1141                Response::builder()
1142                    .status(StatusCode::ACCEPTED)
1143                    .header("MCP-Protocol-Version", context.protocol_version.as_str())
1144                    .header("Mcp-Session-Id", &session_id)
1145                    .body(
1146                        Full::new(Bytes::new())
1147                            .map_err(|never| match never {})
1148                            .boxed_unsync(),
1149                    )
1150                    .unwrap()
1151            }
1152        }
1153    }
1154
1155    /// Create a streaming response using hyper::Body::channel()
1156    /// This enables true progressive responses with Transfer-Encoding: chunked
1157    async fn create_streaming_response(
1158        &self,
1159        request: turul_mcp_json_rpc_server::JsonRpcRequest,
1160        session_id: String,
1161        context: StreamableHttpContext,
1162    ) -> Response<http_body_util::combinators::UnsyncBoxBody<Bytes, hyper::Error>> {
1163        debug!(
1164            "Creating streaming response for method: {}, session: {}",
1165            request.method, session_id
1166        );
1167        // Create channel for streaming response
1168        use http_body_util::StreamBody;
1169        use tokio_stream::StreamExt;
1170        use tokio_stream::wrappers::UnboundedReceiverStream; // Add StreamExt for map method
1171
1172        let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<Result<bytes::Bytes, hyper::Error>>();
1173        let body_stream =
1174            UnboundedReceiverStream::new(rx).map(|item| item.map(http_body::Frame::data));
1175        let body = StreamBody::new(body_stream);
1176
1177        // Create session context with notification broadcaster (same pattern as SessionMcpHandler)
1178        use crate::notification_bridge::{
1179            SharedNotificationBroadcaster, StreamManagerNotificationBroadcaster,
1180        };
1181        use turul_mcp_json_rpc_server::SessionContext;
1182
1183        let broadcaster: SharedNotificationBroadcaster = Arc::new(
1184            StreamManagerNotificationBroadcaster::new(Arc::clone(&self.stream_manager)),
1185        );
1186        let broadcaster_any = Arc::new(broadcaster) as Arc<dyn std::any::Any + Send + Sync>;
1187
1188        let session_context = SessionContext {
1189            session_id: session_id.clone(),
1190            metadata: std::collections::HashMap::new(),
1191            broadcaster: Some(broadcaster_any),
1192            timestamp: chrono::Utc::now().timestamp_millis() as u64,
1193        };
1194
1195        // Register streaming POST connection with StreamManager for progress events
1196        let wants_sse = context.wants_sse_stream();
1197        let connection_id = format!("post-{}", uuid::Uuid::now_v7());
1198
1199        // Progress forwarding only for SSE clients
1200        let (shutdown_tx, completion_rx) = if wants_sse {
1201            // Create shutdown signal for progress task (critical for no-progress-events case)
1202            let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel::<()>();
1203            let (completion_tx, completion_rx) = tokio::sync::oneshot::channel::<()>();
1204            let (progress_tx, mut progress_rx) = tokio::sync::mpsc::channel(100);
1205
1206            // Register with StreamManager to receive progress events
1207            let registration_result = self
1208                .stream_manager
1209                .register_streaming_connection(&session_id, connection_id.clone(), progress_tx)
1210                .await;
1211
1212            if let Err(e) = registration_result {
1213                error!("Failed to register POST streaming connection: {}", e);
1214                // Continue without streaming - will still work as regular POST
1215                (None, None)
1216            } else {
1217                debug!(
1218                    "Registered SSE streaming connection for session: {}",
1219                    session_id
1220                );
1221
1222                // Spawn task to forward progress events to HTTP response
1223                let sender_clone = tx.clone();
1224                let session_id_clone = session_id.clone();
1225                let connection_id_clone = connection_id.clone();
1226                let stream_manager_clone = Arc::clone(&self.stream_manager);
1227
1228                tokio::spawn(async move {
1229                    debug!(
1230                        "Starting progress forwarding task for session: {}",
1231                        session_id_clone
1232                    );
1233
1234                    // CRITICAL: Use select to handle both progress events AND explicit shutdown
1235                    loop {
1236                        debug!(
1237                            "🔍 Progress task entering select loop for session: {}",
1238                            session_id_clone
1239                        );
1240                        tokio::select! {
1241                            // Handle progress events if they arrive
1242                            maybe_event = progress_rx.recv() => {
1243                                debug!("🔍 Progress task: progress_rx.recv() branch fired for session: {}", session_id_clone);
1244                                match maybe_event {
1245                                    Some(sse_event) => {
1246                                        debug!("🔍 Forwarding progress event to POST response: session={}, event={:?}", session_id_clone, sse_event.event_type);
1247
1248                                        // Convert SSE event to fully-formatted SSE chunk with event metadata
1249                                        let sse_chunk = sse_event.format();
1250
1251                                        if let Err(e) = sender_clone.send(Ok(Bytes::from(sse_chunk))) {
1252                                            error!("Failed to send progress event to POST response: {}", e);
1253                                            break;
1254                                        }
1255                                    }
1256                                    None => {
1257                                        // Progress channel closed naturally
1258                                        debug!("🔍 Progress channel closed naturally for session: {}", session_id_clone);
1259                                        break;
1260                                    }
1261                                }
1262                            }
1263                            // Handle explicit shutdown signal from main task
1264                            _ = &mut shutdown_rx => {
1265                                debug!("🔍 Progress task: shutdown_rx branch fired! Received explicit shutdown signal for session: {}", session_id_clone);
1266                                break;
1267                            }
1268                        }
1269                    }
1270
1271                    // Clean up: Unregister from StreamManager to close progress_tx
1272                    debug!(
1273                        "Progress task unregistering connection for session: {}",
1274                        session_id_clone
1275                    );
1276                    stream_manager_clone
1277                        .unregister_connection(&session_id_clone, &connection_id_clone)
1278                        .await;
1279
1280                    // CRITICAL: Drop the sender to ensure stream can close
1281                    debug!(
1282                        "🔍 Progress task: dropping sender_clone for session: {}",
1283                        session_id_clone
1284                    );
1285                    drop(sender_clone);
1286
1287                    // Signal completion to main task
1288                    debug!(
1289                        "🔍 Progress task: signaling completion for session: {}",
1290                        session_id_clone
1291                    );
1292                    if completion_tx.send(()).is_err() {
1293                        debug!(
1294                            "🔍 Progress task: main task already dropped completion_rx for session: {}",
1295                            session_id_clone
1296                        );
1297                    }
1298
1299                    debug!(
1300                        "🔍 Progress forwarding task completed for session: {}",
1301                        session_id_clone
1302                    );
1303                });
1304
1305                // Return shutdown_tx and completion_rx for later use
1306                (Some(shutdown_tx), Some(completion_rx))
1307            }
1308        } else {
1309            // No SSE, no shutdown signal needed
1310            (None, None)
1311        };
1312
1313        // Spawn task to handle streaming dispatch
1314        let request_id = request.id.clone();
1315        let sender = tx; // Rename for clarity
1316
1317        // Capture headers for middleware (clone before move into spawn)
1318        let headers = context.headers.clone();
1319        let self_clone = self.clone();
1320
1321        tokio::spawn(async move {
1322            debug!(
1323                "Spawning streaming task for request ID: {:?}, wants_sse: {}",
1324                request_id, wants_sse
1325            );
1326
1327            // Process actual request through middleware pipeline
1328            // Injection is applied immediately inside run_middleware_and_dispatch
1329            let (response, _) = self_clone
1330                .run_middleware_and_dispatch(request, headers, session_context)
1331                .await;
1332
1333            // Send final result - format depends on client type
1334            if wants_sse {
1335                // For SSE clients, send as streaming frame with SSE framing
1336                let final_frame = match response {
1337                    turul_mcp_json_rpc_server::JsonRpcMessage::Response(resp) => {
1338                        turul_mcp_json_rpc_server::JsonRpcFrame::FinalResult {
1339                            request_id: request_id.clone(),
1340                            result: match resp.result {
1341                                turul_mcp_json_rpc_server::response::ResponseResult::Success(
1342                                    val,
1343                                ) => val,
1344                                turul_mcp_json_rpc_server::response::ResponseResult::Null => {
1345                                    serde_json::Value::Null
1346                                }
1347                            },
1348                        }
1349                    }
1350                    turul_mcp_json_rpc_server::JsonRpcMessage::Error(err) => {
1351                        turul_mcp_json_rpc_server::JsonRpcFrame::Error {
1352                            request_id: request_id.clone(),
1353                            error: turul_mcp_json_rpc_server::error::JsonRpcErrorObject {
1354                                code: err.error.code,
1355                                message: err.error.message,
1356                                data: err.error.data,
1357                            },
1358                        }
1359                    }
1360                };
1361
1362                let final_json = final_frame.to_json();
1363                // SSE framing: data: {json}\n\n
1364                let final_chunk =
1365                    format!("data: {}\n\n", serde_json::to_string(&final_json).unwrap());
1366
1367                if let Err(err) = sender.send(Ok(Bytes::from(final_chunk))) {
1368                    error!("Failed to send SSE final chunk: {}", err);
1369                }
1370
1371                // CRITICAL: Send explicit shutdown signal to progress forwarding task (SSE only)
1372                // This breaks it out of the progress_rx.recv().await loop immediately
1373                if let Some(shutdown_tx) = shutdown_tx {
1374                    debug!(
1375                        "🔍 Main task sending shutdown signal to progress task for request: {:?}",
1376                        request_id
1377                    );
1378                    match shutdown_tx.send(()) {
1379                        Ok(()) => {
1380                            debug!(
1381                                "🔍 Main task: shutdown signal sent successfully for request: {:?}",
1382                                request_id
1383                            );
1384
1385                            // CRITICAL: Wait for progress task to complete and drop its sender_clone
1386                            // This ensures both senders are dropped before the stream tries to close
1387                            if let Some(completion_rx) = completion_rx {
1388                                match tokio::time::timeout(
1389                                    tokio::time::Duration::from_millis(100),
1390                                    completion_rx,
1391                                )
1392                                .await
1393                                {
1394                                    Ok(Ok(())) => {
1395                                        debug!(
1396                                            "🔍 Main task: progress task completed successfully for request: {:?}",
1397                                            request_id
1398                                        );
1399                                    }
1400                                    Ok(Err(_)) => {
1401                                        debug!(
1402                                            "🔍 Main task: progress task completion signal dropped for request: {:?}",
1403                                            request_id
1404                                        );
1405                                    }
1406                                    Err(_) => {
1407                                        debug!(
1408                                            "🔍 Main task: progress task completion timeout for request: {:?}",
1409                                            request_id
1410                                        );
1411                                    }
1412                                }
1413                            }
1414                        }
1415                        Err(_) => {
1416                            debug!(
1417                                "🔍 Main task: progress task already completed (shutdown_rx dropped) for request: {:?}",
1418                                request_id
1419                            );
1420                        }
1421                    }
1422                } else {
1423                    debug!(
1424                        "🔍 Main task: no shutdown_tx available (not SSE client) for request: {:?}",
1425                        request_id
1426                    );
1427                }
1428            } else {
1429                // For JSON-only clients, send as regular JSON-RPC response (no streaming frames)
1430                let final_json = serde_json::to_string(&response).unwrap();
1431
1432                if let Err(err) = sender.send(Ok(Bytes::from(final_json))) {
1433                    error!("Failed to send final JSON response: {}", err);
1434                }
1435            }
1436
1437            debug!(
1438                "🔍 Main task: streaming task completed for request ID: {:?}",
1439                request_id
1440            );
1441
1442            // CRITICAL: Drop the sender to close the stream and signal completion to client
1443            debug!(
1444                "🔍 Main task: dropping main sender for request ID: {:?}",
1445                request_id
1446            );
1447            drop(sender);
1448        });
1449
1450        // Build response with MCP headers merged from context
1451        // Set content type based on client preference
1452        let content_type = if context.wants_sse_stream() {
1453            "text/event-stream"
1454        } else {
1455            "application/json"
1456        };
1457
1458        let mut response = Response::builder()
1459            .status(StatusCode::OK)
1460            .header(CONTENT_TYPE, content_type)
1461            .header("Transfer-Encoding", "chunked") // Key: Enable chunked encoding!
1462            .header("Cache-Control", "no-cache")
1463            .body(http_body_util::BodyExt::boxed_unsync(body))
1464            .unwrap();
1465
1466        // Merge MCP headers from context.response_headers()
1467        let mcp_headers = context.response_headers();
1468        for (key, value) in mcp_headers.iter() {
1469            response.headers_mut().insert(key, value.clone());
1470        }
1471
1472        response
1473    }
1474
1475    /// Handle POST with buffered response (fallback for legacy clients)
1476    #[allow(dead_code)]
1477    async fn handle_buffered_post<T>(
1478        &self,
1479        _req: Request<T>,
1480        context: StreamableHttpContext,
1481        session_id: String,
1482    ) -> Response<http_body_util::combinators::UnsyncBoxBody<Bytes, hyper::Error>>
1483    where
1484        T: Body + Send + 'static,
1485    {
1486        debug!(
1487            "Using buffered POST for legacy client, session: {}",
1488            session_id
1489        );
1490
1491        // Use the existing logic (simplified version)
1492        // TODO: Extract common logic into helper method
1493
1494        Response::builder()
1495            .status(StatusCode::OK)
1496            .header(CONTENT_TYPE, "application/json")
1497            .header("MCP-Protocol-Version", context.protocol_version.as_str())
1498            .header("Mcp-Session-Id", &session_id)
1499            .body(
1500                Full::new(Bytes::from(
1501                    r#"{"jsonrpc":"2.0","id":1,"result":"buffered"}"#,
1502                ))
1503                .map_err(|never| match never {})
1504                .boxed_unsync(),
1505            )
1506            .unwrap()
1507    }
1508
1509    /// Handle POST request - unified handler for all client messages (MCP 2025-06-18 compliant)
1510    /// Processes JSON-RPC requests, notifications, and responses
1511    /// Server decides whether to respond with JSON or SSE stream based on message type
1512    async fn handle_client_message<T>(
1513        &self,
1514        req: Request<T>,
1515        context: StreamableHttpContext,
1516    ) -> Response<http_body_util::combinators::UnsyncBoxBody<Bytes, hyper::Error>>
1517    where
1518        T: Body + Send + 'static,
1519    {
1520        debug!("Handling client message via POST (MCP 2025-06-18)");
1521
1522        // Reject POST if accepts_stream_frames is false
1523        // Per MCP spec: "Include Accept header with application/json and text/event-stream"
1524        if !context.accepts_stream_frames {
1525            warn!("Client POST missing application/json in Accept header");
1526            return StreamableResponse::Error {
1527                status: StatusCode::BAD_REQUEST,
1528                message: "Accept header must include application/json, text/event-stream, or */*"
1529                    .to_string(),
1530            }
1531            .into_boxed_response(&context);
1532        }
1533
1534        // Check content type
1535        let content_type = req
1536            .headers()
1537            .get(CONTENT_TYPE)
1538            .and_then(|ct| ct.to_str().ok())
1539            .unwrap_or("");
1540        if !content_type.starts_with("application/json") {
1541            warn!("Invalid content type for POST: {}", content_type);
1542            return StreamableResponse::Error {
1543                status: StatusCode::BAD_REQUEST,
1544                message: "Content-Type must be application/json".to_string(),
1545            }
1546            .into_boxed_response(&context);
1547        }
1548
1549        // Use streaming for all POST requests, but adapt based on client needs
1550        // For simple JSON clients, streaming will send only the final result (no progress frames)
1551        debug!("Using streaming POST handler for all requests");
1552        return self.handle_post_streamable_http(req, context).await;
1553    }
1554
1555    /// Run middleware stack around dispatcher call
1556    ///
1557    /// This helper:
1558    /// 1. Fast-paths when middleware stack is empty
1559    /// 2. Normalizes headers to lowercase String keys
1560    /// 3. Builds RequestContext with method + headers
1561    /// 4. Executes before_dispatch
1562    /// 5. Applies or stashes SessionInjection
1563    /// 6. Calls dispatcher
1564    /// 7. Executes after_dispatch
1565    ///
1566    /// Returns (JsonRpcMessage, Option<SessionInjection>) where the injection
1567    /// is Some when session was None (initialize case) and needs to be applied
1568    /// after session creation.
1569    async fn run_middleware_and_dispatch(
1570        &self,
1571        request: turul_mcp_json_rpc_server::JsonRpcRequest,
1572        headers: HashMap<String, String>,
1573        session: turul_mcp_json_rpc_server::SessionContext,
1574    ) -> (turul_mcp_json_rpc_server::JsonRpcMessage, Option<crate::middleware::SessionInjection>) {
1575        // Fast path: if middleware stack is empty, dispatch directly
1576        if self.middleware_stack.is_empty() {
1577            let result = self.dispatcher
1578                .handle_request_with_context(request, session)
1579                .await;
1580            return (result, None);
1581        }
1582
1583        // Normalize headers: lowercase String keys
1584        let normalized_headers: HashMap<String, String> = headers
1585            .iter()
1586            .map(|(k, v)| (k.to_lowercase(), v.clone()))
1587            .collect();
1588
1589        // Build RequestContext with method and headers
1590        // Clone method and session_id for ctx (request will be moved to dispatcher)
1591        let method = request.method.clone();
1592        let session_id = session.session_id.clone();
1593
1594        // Convert params to Option<Value>
1595        let params = request.params.clone().map(|p| match p {
1596            turul_mcp_json_rpc_server::RequestParams::Object(map) => {
1597                serde_json::Value::Object(map.into_iter().collect())
1598            }
1599            turul_mcp_json_rpc_server::RequestParams::Array(arr) => serde_json::Value::Array(arr),
1600        });
1601        let mut ctx = crate::middleware::RequestContext::new(&method, params);
1602
1603        for (k, v) in normalized_headers {
1604            ctx.add_metadata(k, serde_json::json!(v));
1605        }
1606
1607        // Create SessionView adapter for middleware to access storage-backed session
1608        let session_view = crate::middleware::StorageBackedSessionView::new(
1609            session_id.clone(),
1610            Arc::clone(&self.session_storage),
1611        );
1612
1613        // Execute before_dispatch with SessionView
1614        let injection = match self.middleware_stack.execute_before(&mut ctx, Some(&session_view)).await {
1615            Ok(inj) => inj,
1616            Err(err) => {
1617                // Map middleware error to proper JSON-RPC error code
1618                return (Self::map_middleware_error_to_jsonrpc(err, request.id), None);
1619            }
1620        };
1621
1622        // Apply injection immediately to session storage
1623        if !injection.is_empty() {
1624            for (key, value) in injection.state() {
1625                if let Err(e) = session_view.set_state(key, value.clone()).await {
1626                    tracing::warn!("Failed to apply injection state '{}': {}", key, e);
1627                }
1628            }
1629            for (key, value) in injection.metadata() {
1630                if let Err(e) = session_view.set_metadata(key, value.clone()).await {
1631                    tracing::warn!("Failed to apply injection metadata '{}': {}", key, e);
1632                }
1633            }
1634        }
1635
1636        // Dispatch the request
1637        let result = self.dispatcher
1638            .handle_request_with_context(request, session)
1639            .await;
1640
1641        // Execute after_dispatch
1642        // Convert JsonRpcMessage to DispatcherResult for middleware
1643        let mut dispatcher_result = match &result {
1644            turul_mcp_json_rpc_server::JsonRpcMessage::Response(resp) => {
1645                match &resp.result {
1646                    turul_mcp_json_rpc_server::response::ResponseResult::Success(val) => {
1647                        crate::middleware::DispatcherResult::Success(val.clone())
1648                    }
1649                    turul_mcp_json_rpc_server::response::ResponseResult::Null => {
1650                        crate::middleware::DispatcherResult::Success(serde_json::Value::Null)
1651                    }
1652                }
1653            }
1654            turul_mcp_json_rpc_server::JsonRpcMessage::Error(err) => {
1655                crate::middleware::DispatcherResult::Error(err.error.message.clone())
1656            }
1657        };
1658
1659        // Ignore errors from after_dispatch (they shouldn't prevent returning the result)
1660        let _ = self.middleware_stack.execute_after(&ctx, &mut dispatcher_result).await;
1661
1662        (result, None) // Injection already applied, no need to return it
1663    }
1664
1665    /// Map MiddlewareError to JSON-RPC error with semantic error codes
1666    fn map_middleware_error_to_jsonrpc(
1667        err: crate::middleware::MiddlewareError,
1668        request_id: turul_mcp_json_rpc_server::RequestId,
1669    ) -> turul_mcp_json_rpc_server::JsonRpcMessage {
1670        use crate::middleware::error::error_codes;
1671        use crate::middleware::MiddlewareError;
1672
1673        let (code, message, data) = match err {
1674            MiddlewareError::Unauthenticated(msg) => (error_codes::UNAUTHENTICATED, msg, None),
1675            MiddlewareError::Unauthorized(msg) => (error_codes::UNAUTHORIZED, msg, None),
1676            MiddlewareError::RateLimitExceeded {
1677                message,
1678                retry_after,
1679            } => {
1680                let data = retry_after.map(|s| serde_json::json!({"retryAfter": s}));
1681                (error_codes::RATE_LIMIT_EXCEEDED, message, data)
1682            }
1683            MiddlewareError::InvalidRequest(msg) => (error_codes::INVALID_REQUEST, msg, None),
1684            MiddlewareError::Internal(msg) => (error_codes::INTERNAL_ERROR, msg, None),
1685            MiddlewareError::Custom { message, .. } => (error_codes::INTERNAL_ERROR, message, None),
1686        };
1687
1688        let error_obj = if let Some(d) = data {
1689            turul_mcp_json_rpc_server::error::JsonRpcErrorObject::server_error(code, &message, Some(d))
1690        } else {
1691            turul_mcp_json_rpc_server::error::JsonRpcErrorObject::server_error(
1692                code,
1693                &message,
1694                None::<serde_json::Value>,
1695            )
1696        };
1697
1698        turul_mcp_json_rpc_server::JsonRpcMessage::Error(turul_mcp_json_rpc_server::JsonRpcError::new(
1699            Some(request_id),
1700            error_obj,
1701        ))
1702    }
1703}
1704
1705#[cfg(test)]
1706mod tests {
1707    use super::*;
1708
1709    #[test]
1710    fn test_version_parsing() {
1711        assert_eq!(
1712            McpProtocolVersion::parse_version("2024-11-05"),
1713            Some(McpProtocolVersion::V2024_11_05)
1714        );
1715        assert_eq!(
1716            McpProtocolVersion::parse_version("2025-03-26"),
1717            Some(McpProtocolVersion::V2025_03_26)
1718        );
1719        assert_eq!(
1720            McpProtocolVersion::parse_version("2025-06-18"),
1721            Some(McpProtocolVersion::V2025_06_18)
1722        );
1723        assert_eq!(McpProtocolVersion::parse_version("invalid"), None);
1724    }
1725
1726    #[test]
1727    fn test_version_capabilities() {
1728        let v1 = McpProtocolVersion::V2024_11_05;
1729        assert!(!v1.supports_streamable_http());
1730        assert!(!v1.supports_meta_fields());
1731
1732        let v2 = McpProtocolVersion::V2025_03_26;
1733        assert!(v2.supports_streamable_http());
1734        assert!(!v2.supports_meta_fields());
1735
1736        let v3 = McpProtocolVersion::V2025_06_18;
1737        assert!(v3.supports_streamable_http());
1738        assert!(v3.supports_meta_fields());
1739        assert!(v3.supports_cursors());
1740        assert!(v3.supports_progress_tokens());
1741        assert!(v3.supports_elicitation());
1742    }
1743
1744    #[test]
1745    fn test_context_validation() {
1746        let mut context = StreamableHttpContext {
1747            protocol_version: McpProtocolVersion::V2025_06_18,
1748            session_id: Some("test-session".to_string()),
1749            wants_sse_stream: true,
1750            accepts_stream_frames: true,
1751            headers: HashMap::new(),
1752        };
1753
1754        // POST with session should be valid
1755        assert!(context.validate(&Method::POST).is_ok());
1756        // GET with session should be valid
1757        assert!(context.validate(&Method::GET).is_ok());
1758
1759        // Test invalid cases
1760        context.accepts_stream_frames = false;
1761        assert!(context.validate(&Method::POST).is_err());
1762
1763        context.accepts_stream_frames = true;
1764        context.protocol_version = McpProtocolVersion::V2024_11_05;
1765        context.wants_sse_stream = true;
1766        assert!(context.validate(&Method::POST).is_err());
1767
1768        context.protocol_version = McpProtocolVersion::V2025_06_18;
1769        context.session_id = None;
1770        // POST without session should be OK (for initialize)
1771        assert!(context.validate(&Method::POST).is_ok());
1772        // GET without session should fail
1773        assert!(context.validate(&Method::GET).is_err());
1774    }
1775}