turul_http_mcp_server/
streamable_http.rs

1//! Streamable HTTP Transport for MCP 2025-06-18
2//!
3//! This module implements the "Streamable HTTP" transport mechanism introduced
4//! in MCP 2025-03-26, which replaces the previous HTTP+SSE approach from 2024-11-05.
5//!
6//! ## Key Improvements over HTTP+SSE
7//! - **Serverless Compatibility**: Enables deployment on AWS Lambda, Google Cloud Run
8//! - **Improved Scalability**: Supports chunked transfer encoding and progressive delivery
9//! - **Session Management**: Cryptographically secure session IDs for connection tracking
10//! - **Enterprise Network Friendly**: No long-lived connections or polling requirements
11
12use std::collections::HashMap;
13use std::pin::Pin;
14use std::sync::Arc;
15
16use bytes::Bytes;
17use futures::Stream;
18use http_body::Body;
19use http_body_util::{BodyExt, Full};
20use hyper::header::{ACCEPT, CONTENT_TYPE};
21use hyper::{HeaderMap, Method, Request, Response, StatusCode};
22use serde_json::Value;
23use tracing::{debug, error, warn};
24
25use crate::ServerConfig;
26
27/// MCP Protocol versions
28#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
29pub enum McpProtocolVersion {
30    /// Original protocol without streamable HTTP (2024-11-05)
31    V2024_11_05,
32    /// Protocol including streamable HTTP (2025-03-26)
33    V2025_03_26,
34    /// Protocol with structured _meta, cursor, progressToken, and elicitation (2025-06-18)
35    #[default]
36    V2025_06_18,
37}
38
39impl McpProtocolVersion {
40    /// Parse from header string
41    pub fn parse_version(s: &str) -> Option<Self> {
42        match s {
43            "2024-11-05" => Some(Self::V2024_11_05),
44            "2025-03-26" => Some(Self::V2025_03_26),
45            "2025-06-18" => Some(Self::V2025_06_18),
46            _ => None,
47        }
48    }
49
50    /// Convert to string representation
51    pub fn as_str(&self) -> &'static str {
52        match self {
53            Self::V2024_11_05 => "2024-11-05",
54            Self::V2025_03_26 => "2025-03-26",
55            Self::V2025_06_18 => "2025-06-18",
56        }
57    }
58
59    /// Returns whether this version supports streamable HTTP
60    pub fn supports_streamable_http(&self) -> bool {
61        matches!(self, Self::V2025_03_26 | Self::V2025_06_18)
62    }
63
64    /// Returns whether this version supports _meta fields
65    pub fn supports_meta_fields(&self) -> bool {
66        matches!(self, Self::V2025_06_18)
67    }
68
69    /// Returns whether this version supports cursor-based pagination
70    pub fn supports_cursors(&self) -> bool {
71        matches!(self, Self::V2025_06_18)
72    }
73
74    /// Returns whether this version supports progress tokens
75    pub fn supports_progress_tokens(&self) -> bool {
76        matches!(self, Self::V2025_06_18)
77    }
78
79    /// Returns whether this version supports elicitation
80    pub fn supports_elicitation(&self) -> bool {
81        matches!(self, Self::V2025_06_18)
82    }
83
84    /// Get list of supported features for this version
85    pub fn supported_features(&self) -> Vec<&'static str> {
86        let mut features = vec![];
87        if self.supports_streamable_http() {
88            features.push("streamable-http");
89        }
90        if self.supports_meta_fields() {
91            features.push("_meta-fields");
92        }
93        if self.supports_cursors() {
94            features.push("cursor-pagination");
95        }
96        if self.supports_progress_tokens() {
97            features.push("progress-tokens");
98        }
99        if self.supports_elicitation() {
100            features.push("elicitation");
101        }
102        features
103    }
104}
105
106/// Streamable HTTP request context
107#[derive(Debug, Clone)]
108pub struct StreamableHttpContext {
109    /// Protocol version negotiated
110    pub protocol_version: McpProtocolVersion,
111    /// Session ID if provided
112    pub session_id: Option<String>,
113    /// Whether client wants SSE stream (text/event-stream)
114    pub wants_sse_stream: bool,
115    /// Whether client accepts stream frames (application/json, text/event-stream, or */*)
116    pub accepts_stream_frames: bool,
117    /// Additional request headers
118    pub headers: HashMap<String, String>,
119}
120
121impl StreamableHttpContext {
122    /// Parse context from HTTP request headers
123    pub fn from_request<T>(req: &Request<T>) -> Self {
124        let headers = req.headers();
125
126        // Parse protocol version from MCP-Protocol-Version header
127        let protocol_version = headers
128            .get("MCP-Protocol-Version")
129            .and_then(|h| h.to_str().ok())
130            .and_then(McpProtocolVersion::parse_version)
131            .unwrap_or_default();
132
133        // Extract session ID from Mcp-Session-Id header (note capitalization)
134        let session_id = headers
135            .get("Mcp-Session-Id")
136            .and_then(|h| h.to_str().ok())
137            .map(|s| s.to_string());
138
139        // Check Accept header for streaming and JSON support
140        let accept_header = headers
141            .get(ACCEPT)
142            .and_then(|h| h.to_str().ok())
143            .unwrap_or_default()
144            .to_ascii_lowercase();
145
146        let wants_sse_stream = accept_header.contains("text/event-stream");
147        let accepts_stream_frames = accept_header.contains("application/json")
148            || accept_header.contains("text/event-stream")
149            || accept_header.contains("*/*");
150
151        // Collect additional headers for debugging/logging
152        let mut header_map = HashMap::new();
153        for (name, value) in headers.iter() {
154            if let Ok(value_str) = value.to_str() {
155                header_map.insert(name.to_string(), value_str.to_string());
156            }
157        }
158
159        Self {
160            protocol_version,
161            session_id,
162            wants_sse_stream,
163            accepts_stream_frames,
164            headers: header_map,
165        }
166    }
167
168    /// Whether client wants SSE stream
169    pub fn wants_sse_stream(&self) -> bool {
170        self.wants_sse_stream
171    }
172
173    /// Whether client wants streaming POST responses
174    pub fn wants_streaming_post(&self) -> bool {
175        self.accepts_stream_frames && self.wants_sse_stream
176    }
177
178    /// Check if request is compatible with streamable HTTP
179    pub fn is_streamable_compatible(&self) -> bool {
180        self.protocol_version.supports_streamable_http() && self.accepts_stream_frames
181    }
182
183    /// Validate request for MCP compliance
184    pub fn validate(&self, method: &Method) -> std::result::Result<(), String> {
185        if !self.accepts_stream_frames {
186            return Err(
187                "Accept header must include application/json, text/event-stream, or */*"
188                    .to_string(),
189            );
190        }
191
192        if self.wants_sse_stream && !self.protocol_version.supports_streamable_http() {
193            return Err(format!(
194                "Protocol version {} does not support streamable HTTP",
195                self.protocol_version.as_str()
196            ));
197        }
198
199        // Only enforce session_id for GET requests with SSE streams
200        // POST requests will validate session based on the JSON-RPC method (initialize vs others)
201        if method == &Method::GET && self.wants_sse_stream && self.session_id.is_none() {
202            return Err("Mcp-Session-Id header required for SSE streaming connections".to_string());
203        }
204
205        Ok(())
206    }
207
208    /// Create response headers for this context
209    pub fn response_headers(&self) -> HeaderMap {
210        let mut headers = HeaderMap::new();
211
212        // Always include protocol version in response
213        headers.insert(
214            "MCP-Protocol-Version",
215            self.protocol_version.as_str().parse().unwrap(),
216        );
217
218        // Include session ID if present
219        if let Some(session_id) = &self.session_id {
220            headers.insert("Mcp-Session-Id", session_id.parse().unwrap());
221        }
222
223        // Add capabilities header showing supported features
224        let features = self.protocol_version.supported_features();
225        if !features.is_empty() {
226            headers.insert("MCP-Capabilities", features.join(",").parse().unwrap());
227        }
228
229        headers
230    }
231}
232
233/// Streamable HTTP response types
234pub enum StreamableResponse {
235    /// Single JSON response
236    Json(Value),
237    /// Streaming response with multiple JSON messages
238    Stream(Pin<Box<dyn Stream<Item = std::result::Result<Value, String>> + Send>>),
239    /// Error response
240    Error { status: StatusCode, message: String },
241}
242
243impl std::fmt::Debug for StreamableResponse {
244    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
245        match self {
246            Self::Json(value) => f.debug_tuple("Json").field(value).finish(),
247            Self::Stream(_) => f.debug_tuple("Stream").field(&"<stream>").finish(),
248            Self::Error { status, message } => f
249                .debug_struct("Error")
250                .field("status", status)
251                .field("message", message)
252                .finish(),
253        }
254    }
255}
256
257impl StreamableResponse {
258    /// Convert to HTTP response
259    pub fn into_response(self, context: &StreamableHttpContext) -> Response<Full<Bytes>> {
260        let mut response_headers = context.response_headers();
261
262        match self {
263            StreamableResponse::Json(json) => {
264                response_headers.insert(CONTENT_TYPE, "application/json".parse().unwrap());
265
266                let body = serde_json::to_string(&json)
267                    .unwrap_or_else(|_| r#"{"error": "Failed to serialize response"}"#.to_string());
268
269                Response::builder()
270                    .status(StatusCode::OK)
271                    .body(Full::new(Bytes::from(body)))
272                    .unwrap()
273            }
274
275            StreamableResponse::Stream(_stream) => {
276                // For streaming responses, set appropriate headers
277                response_headers.insert(CONTENT_TYPE, "text/event-stream".parse().unwrap());
278                response_headers.insert("Cache-Control", "no-cache, no-transform".parse().unwrap());
279                response_headers.insert("Connection", "keep-alive".parse().unwrap());
280
281                // TODO: Implement actual streaming body with chunked transfer encoding
282                // Should stream JSON messages over HTTP with proper Content-Type: text/event-stream
283                // For now, return 202 Accepted to indicate streaming would happen
284                Response::builder()
285                    .status(StatusCode::ACCEPTED)
286                    .body(Full::new(Bytes::from("Streaming response accepted")))
287                    .unwrap()
288            }
289
290            StreamableResponse::Error { status, message } => {
291                response_headers.insert(CONTENT_TYPE, "application/json".parse().unwrap());
292
293                let error_json = serde_json::json!({
294                    "error": {
295                        "code": status.as_u16(),
296                        "message": message
297                    }
298                });
299
300                let body = serde_json::to_string(&error_json).unwrap_or_else(|_| {
301                    r#"{"error": {"code": 500, "message": "Internal server error"}}"#.to_string()
302                });
303
304                Response::builder()
305                    .status(status)
306                    .body(Full::new(Bytes::from(body)))
307                    .unwrap()
308            }
309        }
310    }
311
312    /// Convert to HTTP response with UnsyncBoxBody for streaming compatibility
313    pub fn into_boxed_response(
314        self,
315        context: &StreamableHttpContext,
316    ) -> Response<http_body_util::combinators::UnsyncBoxBody<Bytes, hyper::Error>> {
317        self.into_response(context)
318            .map(|body| body.map_err(|never| match never {}).boxed_unsync())
319    }
320}
321
322/// Streamable HTTP transport handler
323#[derive(Clone)]
324pub struct StreamableHttpHandler {
325    config: Arc<ServerConfig>,
326    dispatcher: Arc<turul_mcp_json_rpc_server::JsonRpcDispatcher<turul_mcp_protocol::McpError>>,
327    session_storage: Arc<turul_mcp_session_storage::BoxedSessionStorage>,
328    stream_manager: Arc<crate::StreamManager>,
329    server_capabilities: turul_mcp_protocol::ServerCapabilities,
330}
331
332impl StreamableHttpHandler {
333    pub fn new(
334        config: Arc<ServerConfig>,
335        dispatcher: Arc<turul_mcp_json_rpc_server::JsonRpcDispatcher<turul_mcp_protocol::McpError>>,
336        session_storage: Arc<turul_mcp_session_storage::BoxedSessionStorage>,
337        stream_manager: Arc<crate::StreamManager>,
338        server_capabilities: turul_mcp_protocol::ServerCapabilities,
339    ) -> Self {
340        Self {
341            config,
342            dispatcher,
343            session_storage,
344            stream_manager,
345            server_capabilities,
346        }
347    }
348
349    /// Handle incoming HTTP request with streamable HTTP support
350    pub async fn handle_request<T>(
351        &self,
352        req: Request<T>,
353    ) -> Response<http_body_util::combinators::UnsyncBoxBody<Bytes, hyper::Error>>
354    where
355        T: Body + Send + 'static,
356        T::Data: Send,
357        T::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
358    {
359        debug!(
360            "Streamable handler request: method={}, uri={}",
361            req.method(),
362            req.uri()
363        );
364        // Parse streamable HTTP context from request
365        let context = StreamableHttpContext::from_request(&req);
366
367        debug!(
368            "Streamable handler entry: method={}, protocol={}, session={:?}, accepts_stream_frames={}, wants_sse_stream={}",
369            req.method(),
370            context.protocol_version.as_str(),
371            context.session_id,
372            context.accepts_stream_frames,
373            context.wants_sse_stream()
374        );
375
376        // Validate request
377        if let Err(error) = context.validate(req.method()) {
378            warn!("Invalid streamable HTTP request: {}", error);
379            return StreamableResponse::Error {
380                status: StatusCode::BAD_REQUEST,
381                message: error,
382            }
383            .into_boxed_response(&context);
384        }
385
386        // Route based on MCP 2025-06-18 specification
387        match req.method() {
388            &Method::POST => {
389                // ALL client messages (requests, notifications, responses) come via POST
390                // Server decides whether to respond with JSON or SSE stream
391                self.handle_client_message(req, context).await
392            }
393            &Method::GET => {
394                // Optional SSE stream for server-initiated messages
395                self.handle_sse_stream(req, context).await
396            }
397            &Method::DELETE => {
398                // Optional session cleanup
399                self.handle_session_delete(req, context).await
400            }
401            _ => StreamableResponse::Error {
402                status: StatusCode::METHOD_NOT_ALLOWED,
403                message: "Method not allowed for this endpoint".to_string(),
404            }
405            .into_boxed_response(&context),
406        }
407    }
408
409    /// Handle GET request for optional SSE stream (MCP 2025-06-18 compliant)
410    async fn handle_sse_stream<T>(
411        &self,
412        req: Request<T>,
413        context: StreamableHttpContext,
414    ) -> Response<http_body_util::combinators::UnsyncBoxBody<Bytes, hyper::Error>>
415    where
416        T: Body + Send + 'static,
417    {
418        debug!(
419            "Opening streaming connection for session: {:?}",
420            context.session_id
421        );
422
423        // 1. Validate session exists and is authorized
424        let session_id = match context.session_id {
425            Some(ref id) => id.clone(),
426            None => {
427                warn!("Missing session ID for streaming GET request");
428                return StreamableResponse::Error {
429                    status: StatusCode::BAD_REQUEST,
430                    message: "Mcp-Session-Id header required for streaming connection".to_string(),
431                }
432                .into_boxed_response(&context);
433            }
434        };
435
436        // Validate session exists (do NOT create if missing)
437        match self.validate_session_exists(&session_id).await {
438            Ok(_) => {
439                debug!(
440                    "Session validation successful for streaming GET: {}",
441                    session_id
442                );
443            }
444            Err(err) => {
445                error!(
446                    "Session validation failed for streaming GET {}: {}",
447                    session_id, err
448                );
449                return StreamableResponse::Error {
450                    status: StatusCode::UNAUTHORIZED,
451                    message: format!("Session validation failed: {}", err),
452                }
453                .into_boxed_response(&context);
454            }
455        }
456
457        // 2. Create bi-directional stream with chunked transfer encoding
458        // For MCP 2025-06-18 Streamable HTTP, we create a stream that can handle bidirectional JSON-RPC
459        // Unlike SSE which is unidirectional server->client, this supports client->server and server->client
460
461        // Extract Last-Event-ID for resumability (if client supports it)
462        let last_event_id = req
463            .headers()
464            .get("Last-Event-ID")
465            .and_then(|h| h.to_str().ok())
466            .and_then(|s| s.parse::<u64>().ok());
467
468        // Generate unique connection ID for tracking this stream
469        let connection_id = uuid::Uuid::now_v7().to_string();
470
471        debug!(
472            "Creating streamable HTTP connection: session={}, connection={}, last_event_id={:?}",
473            session_id, connection_id, last_event_id
474        );
475
476        // 3. Return streaming response supporting progressive message delivery
477        // ✅ FIXED: Return the actual streaming response from StreamManager
478        // This preserves event replay, resumability, and live streaming capabilities
479        match self
480            .stream_manager
481            .handle_sse_connection(session_id.clone(), connection_id.clone(), last_event_id)
482            .await
483        {
484            Ok(mut streaming_response) => {
485                debug!(
486                    "Streamable HTTP connection established: session={}, connection={}",
487                    session_id, connection_id
488                );
489
490                // Merge MCP headers from context.response_headers()
491                let mcp_headers = context.response_headers();
492                for (key, value) in mcp_headers.iter() {
493                    streaming_response.headers_mut().insert(key, value.clone());
494                }
495
496                // ✅ PRESERVE STREAMING: Return the streaming response with MCP headers
497                // This maintains event replay from session storage and live streaming
498                streaming_response
499            }
500            Err(err) => {
501                error!("Failed to create streamable HTTP connection: {}", err);
502                StreamableResponse::Error {
503                    status: StatusCode::INTERNAL_SERVER_ERROR,
504                    message: format!("Streaming connection failed: {}", err),
505                }
506                .into_boxed_response(&context)
507            }
508        }
509    }
510
511    /// Validate that a session exists - do NOT create if missing
512    async fn validate_session_exists(&self, session_id: &str) -> std::result::Result<(), String> {
513        match self.session_storage.get_session(session_id).await {
514            Ok(Some(_)) => {
515                debug!("Session validation successful: {}", session_id);
516                Ok(())
517            }
518            Ok(None) => {
519                error!("Session not found: {}", session_id);
520                Err(format!(
521                    "Session '{}' not found. Sessions must be created via initialize request first.",
522                    session_id
523                ))
524            }
525            Err(err) => {
526                error!("Failed to validate session {}: {}", session_id, err);
527                Err(format!("Session validation failed: {}", err))
528            }
529        }
530    }
531
532    /// Handle POST request with streaming response
533    #[allow(dead_code)]
534    async fn handle_streaming_post<T>(
535        &self,
536        req: Request<T>,
537        context: StreamableHttpContext,
538    ) -> Response<http_body_util::combinators::UnsyncBoxBody<Bytes, hyper::Error>>
539    where
540        T: Body + Send + 'static,
541    {
542        debug!(
543            "Handling streaming POST for session: {:?}",
544            context.session_id
545        );
546
547        // 1. Validate session exists and is authorized
548        let session_id = match context.session_id {
549            Some(ref id) => id.clone(),
550            None => {
551                warn!("Missing session ID for streaming POST request");
552                return StreamableResponse::Error {
553                    status: StatusCode::BAD_REQUEST,
554                    message: "Mcp-Session-Id header required for streaming POST".to_string(),
555                }
556                .into_boxed_response(&context);
557            }
558        };
559
560        // Validate session exists (do NOT create if missing)
561        if let Err(err) = self.validate_session_exists(&session_id).await {
562            error!(
563                "Session validation failed for streaming POST {}: {}",
564                session_id, err
565            );
566            return StreamableResponse::Error {
567                status: StatusCode::UNAUTHORIZED,
568                message: format!("Session validation failed: {}", err),
569            }
570            .into_boxed_response(&context);
571        }
572
573        // Check content type
574        let content_type = req
575            .headers()
576            .get(CONTENT_TYPE)
577            .and_then(|ct| ct.to_str().ok())
578            .unwrap_or("");
579
580        if !content_type.starts_with("application/json") {
581            warn!("Invalid content type for streaming POST: {}", content_type);
582            return StreamableResponse::Error {
583                status: StatusCode::BAD_REQUEST,
584                message: "Content-Type must be application/json".to_string(),
585            }
586            .into_boxed_response(&context);
587        }
588
589        // 2. Parse JSON-RPC request(s) from chunked request body
590        let body_bytes = match req.into_body().collect().await {
591            Ok(collected) => collected.to_bytes(),
592            Err(_err) => {
593                error!("Failed to read streaming POST request body");
594                return StreamableResponse::Error {
595                    status: StatusCode::BAD_REQUEST,
596                    message: "Failed to read request body".to_string(),
597                }
598                .into_boxed_response(&context);
599            }
600        };
601
602        // Check body size
603        if body_bytes.len() > self.config.max_body_size {
604            warn!(
605                "Streaming POST request body too large: {} bytes",
606                body_bytes.len()
607            );
608            return StreamableResponse::Error {
609                status: StatusCode::PAYLOAD_TOO_LARGE,
610                message: "Request body too large".to_string(),
611            }
612            .into_boxed_response(&context);
613        }
614
615        // Parse as UTF-8
616        let body_str = match std::str::from_utf8(&body_bytes) {
617            Ok(s) => s,
618            Err(err) => {
619                error!("Invalid UTF-8 in streaming POST request body: {}", err);
620                return StreamableResponse::Error {
621                    status: StatusCode::BAD_REQUEST,
622                    message: "Request body must be valid UTF-8".to_string(),
623                }
624                .into_boxed_response(&context);
625            }
626        };
627
628        debug!("Received streaming POST JSON-RPC request: {}", body_str);
629
630        // Parse JSON-RPC message
631        use crate::notification_bridge::StreamManagerNotificationBroadcaster;
632        use turul_mcp_json_rpc_server::r#async::SessionContext;
633        use turul_mcp_json_rpc_server::dispatch::{
634            JsonRpcMessage, JsonRpcMessageResult, parse_json_rpc_message,
635        };
636
637        let message = match parse_json_rpc_message(body_str) {
638            Ok(msg) => msg,
639            Err(rpc_err) => {
640                error!("JSON-RPC parse error in streaming POST: {}", rpc_err);
641                let error_json =
642                    serde_json::to_string(&rpc_err).unwrap_or_else(|_| "{}".to_string());
643                return Response::builder()
644                    .status(StatusCode::OK) // JSON-RPC parse errors still use 200 OK
645                    .header(CONTENT_TYPE, "application/json")
646                    .header("MCP-Protocol-Version", context.protocol_version.as_str())
647                    .body(Full::new(Bytes::from(error_json)))
648                    .unwrap()
649                    .map(|body| body.map_err(|never| match never {}).boxed_unsync())
650                    .map(|body| body.map_err(|never| match never {}).boxed_unsync());
651            }
652        };
653
654        // 3. Process via dispatcher with session context and streaming capabilities
655        let broadcaster = Arc::new(StreamManagerNotificationBroadcaster::new(Arc::clone(
656            &self.stream_manager,
657        )));
658        let broadcaster_any = Arc::new(broadcaster) as Arc<dyn std::any::Any + Send + Sync>;
659
660        let session_context = SessionContext {
661            session_id: session_id.clone(),
662            metadata: std::collections::HashMap::new(),
663            broadcaster: Some(broadcaster_any),
664            timestamp: chrono::Utc::now().timestamp_millis() as u64,
665        };
666
667        let message_result = match message {
668            JsonRpcMessage::Request(request) => {
669                debug!(
670                    "Processing streaming POST JSON-RPC request: method={}",
671                    request.method
672                );
673                let response = self
674                    .dispatcher
675                    .handle_request_with_context(request, session_context)
676                    .await;
677
678                // Convert JsonRpcMessage to JsonRpcMessageResult
679                match response {
680                    turul_mcp_json_rpc_server::JsonRpcMessage::Response(resp) => {
681                        JsonRpcMessageResult::Response(resp)
682                    }
683                    turul_mcp_json_rpc_server::JsonRpcMessage::Error(err) => {
684                        JsonRpcMessageResult::Error(err)
685                    }
686                }
687            }
688            JsonRpcMessage::Notification(notification) => {
689                debug!(
690                    "Processing streaming POST JSON-RPC notification: method={}",
691                    notification.method
692                );
693
694                let result = self
695                    .dispatcher
696                    .handle_notification_with_context(notification, Some(session_context))
697                    .await;
698
699                if let Err(err) = result {
700                    error!("Streaming POST notification handling error: {}", err);
701                }
702                JsonRpcMessageResult::NoResponse
703            }
704        };
705
706        // 4. Stream responses back with progressive message delivery
707        // For now, return the response immediately - TODO: implement actual streaming
708        match message_result {
709            JsonRpcMessageResult::Response(response) => {
710                let response_json = serde_json::to_string(&response)
711                    .unwrap_or_else(|_| r#"{"error": "Failed to serialize response"}"#.to_string());
712
713                Response::builder()
714                    .status(StatusCode::OK)
715                    .header(CONTENT_TYPE, "application/json")
716                    .header("MCP-Protocol-Version", context.protocol_version.as_str())
717                    .header("Mcp-Session-Id", &session_id)
718                    .body(Full::new(Bytes::from(response_json)))
719                    .unwrap()
720                    .map(|body| body.map_err(|never| match never {}).boxed_unsync())
721                    .map(|body| body.map_err(|never| match never {}).boxed_unsync())
722            }
723            JsonRpcMessageResult::Error(error) => {
724                let error_json = serde_json::to_string(&error)
725                    .unwrap_or_else(|_| r#"{"error": "Internal error"}"#.to_string());
726
727                Response::builder()
728                    .status(StatusCode::OK) // JSON-RPC errors still return 200 OK
729                    .header(CONTENT_TYPE, "application/json")
730                    .header("MCP-Protocol-Version", context.protocol_version.as_str())
731                    .header("Mcp-Session-Id", &session_id)
732                    .body(Full::new(Bytes::from(error_json)))
733                    .unwrap()
734                    .map(|body| body.map_err(|never| match never {}).boxed_unsync())
735                    .map(|body| body.map_err(|never| match never {}).boxed_unsync())
736            }
737            JsonRpcMessageResult::NoResponse => {
738                // Notifications return 202 Accepted per MCP spec
739                Response::builder()
740                    .status(StatusCode::ACCEPTED)
741                    .header("MCP-Protocol-Version", context.protocol_version.as_str())
742                    .header("Mcp-Session-Id", &session_id)
743                    .body(Full::new(Bytes::new()))
744                    .unwrap()
745                    .map(|body| body.map_err(|never| match never {}).boxed_unsync())
746                    .map(|body| body.map_err(|never| match never {}).boxed_unsync())
747            }
748        }
749    }
750
751    /// Handle POST request with JSON response (legacy compatibility)
752    #[allow(dead_code)]
753    async fn handle_json_post<T>(
754        &self,
755        req: Request<T>,
756        context: StreamableHttpContext,
757    ) -> Response<http_body_util::combinators::UnsyncBoxBody<Bytes, hyper::Error>>
758    where
759        T: Body + Send + 'static,
760    {
761        debug!("Handling JSON POST (non-streaming/legacy)");
762
763        // 1. Parse JSON-RPC request(s) from request body (legacy clients don't require sessions)
764
765        // Check content type
766        let content_type = req
767            .headers()
768            .get(CONTENT_TYPE)
769            .and_then(|ct| ct.to_str().ok())
770            .unwrap_or("");
771
772        if !content_type.starts_with("application/json") {
773            warn!("Invalid content type for legacy POST: {}", content_type);
774            return StreamableResponse::Error {
775                status: StatusCode::BAD_REQUEST,
776                message: "Content-Type must be application/json".to_string(),
777            }
778            .into_boxed_response(&context);
779        }
780
781        // Read request body
782        let body_bytes = match req.into_body().collect().await {
783            Ok(collected) => collected.to_bytes(),
784            Err(_err) => {
785                error!("Failed to read legacy POST request body");
786                return StreamableResponse::Error {
787                    status: StatusCode::BAD_REQUEST,
788                    message: "Failed to read request body".to_string(),
789                }
790                .into_boxed_response(&context);
791            }
792        };
793
794        // Check body size
795        if body_bytes.len() > self.config.max_body_size {
796            warn!(
797                "Legacy POST request body too large: {} bytes",
798                body_bytes.len()
799            );
800            return StreamableResponse::Error {
801                status: StatusCode::PAYLOAD_TOO_LARGE,
802                message: "Request body too large".to_string(),
803            }
804            .into_boxed_response(&context);
805        }
806
807        // Parse as UTF-8
808        let body_str = match std::str::from_utf8(&body_bytes) {
809            Ok(s) => s,
810            Err(err) => {
811                error!("Invalid UTF-8 in legacy POST request body: {}", err);
812                return StreamableResponse::Error {
813                    status: StatusCode::BAD_REQUEST,
814                    message: "Request body must be valid UTF-8".to_string(),
815                }
816                .into_boxed_response(&context);
817            }
818        };
819
820        debug!("Received legacy POST JSON-RPC request: {}", body_str);
821
822        // Parse JSON-RPC message
823        use turul_mcp_json_rpc_server::dispatch::{
824            JsonRpcMessage, JsonRpcMessageResult, parse_json_rpc_message,
825        };
826
827        let message = match parse_json_rpc_message(body_str) {
828            Ok(msg) => msg,
829            Err(rpc_err) => {
830                error!("JSON-RPC parse error in legacy POST: {}", rpc_err);
831                let error_json =
832                    serde_json::to_string(&rpc_err).unwrap_or_else(|_| "{}".to_string());
833                return Response::builder()
834                    .status(StatusCode::OK) // JSON-RPC parse errors still use 200 OK
835                    .header(CONTENT_TYPE, "application/json")
836                    .header("MCP-Protocol-Version", context.protocol_version.as_str())
837                    .body(Full::new(Bytes::from(error_json)))
838                    .unwrap()
839                    .map(|body| body.map_err(|never| match never {}).boxed_unsync());
840            }
841        };
842
843        // 2. Process via dispatcher (no session context for legacy clients)
844        // Legacy clients (MCP 2024-11-05) don't use sessions, so no session context
845        let message_result = match message {
846            JsonRpcMessage::Request(request) => {
847                debug!(
848                    "Processing legacy POST JSON-RPC request: method={}",
849                    request.method
850                );
851
852                // Special handling for initialize requests - legacy clients can create sessions too
853                let response = if request.method == "initialize" {
854                    debug!("Handling legacy initialize request - creating new session");
855
856                    // Let session storage create the session and generate the ID
857                    match self
858                        .session_storage
859                        .create_session(self.server_capabilities.clone())
860                        .await
861                    {
862                        Ok(session_info) => {
863                            debug!(
864                                "Created new session for legacy client: {}",
865                                session_info.session_id
866                            );
867
868                            // Create session context for initialize response
869                            use crate::notification_bridge::StreamManagerNotificationBroadcaster;
870                            use turul_mcp_json_rpc_server::r#async::SessionContext;
871
872                            let broadcaster = Arc::new(StreamManagerNotificationBroadcaster::new(
873                                Arc::clone(&self.stream_manager),
874                            ));
875                            let broadcaster_any =
876                                Arc::new(broadcaster) as Arc<dyn std::any::Any + Send + Sync>;
877
878                            let session_context = SessionContext {
879                                session_id: session_info.session_id.clone(),
880                                metadata: std::collections::HashMap::new(),
881                                broadcaster: Some(broadcaster_any),
882                                timestamp: chrono::Utc::now().timestamp_millis() as u64,
883                            };
884
885                            self.dispatcher
886                                .handle_request_with_context(request, session_context)
887                                .await
888                        }
889                        Err(err) => {
890                            error!("Failed to create session during legacy initialize: {}", err);
891                            let error_msg = format!("Session creation failed: {}", err);
892                            turul_mcp_json_rpc_server::JsonRpcMessage::error(
893                                turul_mcp_json_rpc_server::JsonRpcError::internal_error(
894                                    Some(request.id),
895                                    Some(error_msg),
896                                ),
897                            )
898                        }
899                    }
900                } else {
901                    // For non-initialize requests, process without session context (legacy mode)
902                    self.dispatcher.handle_request(request).await
903                };
904
905                // Convert JsonRpcMessage to JsonRpcMessageResult
906                match response {
907                    turul_mcp_json_rpc_server::JsonRpcMessage::Response(resp) => {
908                        JsonRpcMessageResult::Response(resp)
909                    }
910                    turul_mcp_json_rpc_server::JsonRpcMessage::Error(err) => {
911                        JsonRpcMessageResult::Error(err)
912                    }
913                }
914            }
915            JsonRpcMessage::Notification(notification) => {
916                debug!(
917                    "Processing legacy POST JSON-RPC notification: method={}",
918                    notification.method
919                );
920
921                // Process notification without session context (legacy mode)
922                let result = self
923                    .dispatcher
924                    .handle_notification_with_context(notification, None)
925                    .await;
926
927                if let Err(err) = result {
928                    error!("Legacy POST notification handling error: {}", err);
929                }
930                JsonRpcMessageResult::NoResponse
931            }
932        };
933
934        // 3. Return single JSON response (no streaming) - legacy compatibility
935        match message_result {
936            JsonRpcMessageResult::Response(response) => {
937                let response_json = serde_json::to_string(&response)
938                    .unwrap_or_else(|_| r#"{"error": "Failed to serialize response"}"#.to_string());
939
940                Response::builder()
941                    .status(StatusCode::OK)
942                    .header(CONTENT_TYPE, "application/json")
943                    .header("MCP-Protocol-Version", context.protocol_version.as_str())
944                    .body(Full::new(Bytes::from(response_json)))
945                    .unwrap()
946                    .map(|body| body.map_err(|never| match never {}).boxed_unsync())
947            }
948            JsonRpcMessageResult::Error(error) => {
949                let error_json = serde_json::to_string(&error)
950                    .unwrap_or_else(|_| r#"{"error": "Internal error"}"#.to_string());
951
952                Response::builder()
953                    .status(StatusCode::OK) // JSON-RPC errors still return 200 OK
954                    .header(CONTENT_TYPE, "application/json")
955                    .header("MCP-Protocol-Version", context.protocol_version.as_str())
956                    .body(Full::new(Bytes::from(error_json)))
957                    .unwrap()
958                    .map(|body| body.map_err(|never| match never {}).boxed_unsync())
959            }
960            JsonRpcMessageResult::NoResponse => {
961                // Notifications return 202 Accepted per MCP spec
962                Response::builder()
963                    .status(StatusCode::ACCEPTED)
964                    .header("MCP-Protocol-Version", context.protocol_version.as_str())
965                    .body(Full::new(Bytes::new()))
966                    .unwrap()
967                    .map(|body| body.map_err(|never| match never {}).boxed_unsync())
968            }
969        }
970    }
971
972    /// Handle DELETE request for session cleanup
973    async fn handle_session_delete<T>(
974        &self,
975        _req: Request<T>,
976        context: StreamableHttpContext,
977    ) -> Response<http_body_util::combinators::UnsyncBoxBody<Bytes, hyper::Error>>
978    where
979        T: Body + Send + 'static,
980    {
981        if let Some(session_id) = &context.session_id {
982            debug!("Deleting session: {}", session_id);
983
984            // Implement proper session cleanup for Streamable HTTP
985            // 1. Close any active streaming connections for this session
986            let closed_connections = self
987                .stream_manager
988                .close_session_connections(session_id)
989                .await;
990            debug!(
991                "Closed {} streaming connections for session: {}",
992                closed_connections, session_id
993            );
994
995            // 2. Mark session as terminated instead of immediate deletion (for proper lifecycle management)
996            match self.session_storage.get_session(session_id).await {
997                Ok(Some(mut session_info)) => {
998                    // Mark session as terminated in state
999                    session_info
1000                        .state
1001                        .insert("terminated".to_string(), serde_json::Value::Bool(true));
1002                    session_info.state.insert(
1003                        "terminated_at".to_string(),
1004                        serde_json::Value::Number(serde_json::Number::from(
1005                            chrono::Utc::now().timestamp_millis(),
1006                        )),
1007                    );
1008                    session_info.touch();
1009
1010                    // 3. Update session with termination markers
1011                    match self.session_storage.update_session(session_info).await {
1012                        Ok(()) => {
1013                            debug!(
1014                                "Session {} marked as terminated (TTL will handle cleanup)",
1015                                session_id
1016                            );
1017
1018                            // Return success response with proper headers
1019                            Response::builder()
1020                                .status(StatusCode::OK)
1021                                .header(CONTENT_TYPE, "application/json")
1022                                .header("MCP-Protocol-Version", context.protocol_version.as_str())
1023                                .header("Mcp-Session-Id", session_id)
1024                                .body(Full::new(Bytes::from(
1025                                    serde_json::to_string(&serde_json::json!({
1026                                        "status": "session_terminated",
1027                                        "session_id": session_id,
1028                                        "closed_connections": closed_connections,
1029                                        "message": "Session marked for cleanup"
1030                                    }))
1031                                    .unwrap_or_else(|_| {
1032                                        r#"{"status":"session_terminated"}"#.to_string()
1033                                    }),
1034                                )))
1035                                .unwrap()
1036                                .map(|body| body.map_err(|never| match never {}).boxed_unsync())
1037                        }
1038                        Err(err) => {
1039                            error!(
1040                                "Error marking session {} as terminated: {}",
1041                                session_id, err
1042                            );
1043                            // Fallback to deletion if update fails
1044                            match self.session_storage.delete_session(session_id).await {
1045                                Ok(_) => {
1046                                    debug!("Session {} deleted as fallback", session_id);
1047                                    Response::builder()
1048                                        .status(StatusCode::OK)
1049                                        .header(CONTENT_TYPE, "application/json")
1050                                        .header(
1051                                            "MCP-Protocol-Version",
1052                                            context.protocol_version.as_str(),
1053                                        )
1054                                        .body(Full::new(Bytes::from(
1055                                            serde_json::to_string(&serde_json::json!({
1056                                                "status": "session_deleted",
1057                                                "session_id": session_id,
1058                                                "closed_connections": closed_connections,
1059                                                "message": "Session removed"
1060                                            }))
1061                                            .unwrap_or_else(|_| {
1062                                                r#"{"status":"session_deleted"}"#.to_string()
1063                                            }),
1064                                        )))
1065                                        .unwrap()
1066                                        .map(|body| {
1067                                            body.map_err(|never| match never {}).boxed_unsync()
1068                                        })
1069                                }
1070                                Err(delete_err) => {
1071                                    error!(
1072                                        "Error deleting session {} as fallback: {}",
1073                                        session_id, delete_err
1074                                    );
1075                                    StreamableResponse::Error {
1076                                        status: StatusCode::INTERNAL_SERVER_ERROR,
1077                                        message: "Session termination error".to_string(),
1078                                    }
1079                                    .into_boxed_response(&context)
1080                                }
1081                            }
1082                        }
1083                    }
1084                }
1085                Ok(None) => {
1086                    // Session not found
1087                    Response::builder()
1088                        .status(StatusCode::NOT_FOUND)
1089                        .header(CONTENT_TYPE, "application/json")
1090                        .header("MCP-Protocol-Version", context.protocol_version.as_str())
1091                        .body(Full::new(Bytes::from(
1092                            serde_json::to_string(&serde_json::json!({
1093                                "status": "session_not_found",
1094                                "session_id": session_id,
1095                                "message": "Session not found"
1096                            }))
1097                            .unwrap_or_else(|_| r#"{"status":"session_not_found"}"#.to_string()),
1098                        )))
1099                        .unwrap()
1100                        .map(|body| body.map_err(|never| match never {}).boxed_unsync())
1101                }
1102                Err(err) => {
1103                    error!(
1104                        "Error retrieving session {} for termination: {}",
1105                        session_id, err
1106                    );
1107                    StreamableResponse::Error {
1108                        status: StatusCode::INTERNAL_SERVER_ERROR,
1109                        message: "Session lookup error".to_string(),
1110                    }
1111                    .into_boxed_response(&context)
1112                }
1113            }
1114        } else {
1115            StreamableResponse::Error {
1116                status: StatusCode::BAD_REQUEST,
1117                message: "Mcp-Session-Id header required for session deletion".to_string(),
1118            }
1119            .into_boxed_response(&context)
1120        }
1121    }
1122
1123    /// Handle POST with TRUE streaming using hyper::Body::channel()
1124    /// This implements actual MCP 2025-06-18 chunked transfer encoding
1125    async fn handle_streaming_post_real<T>(
1126        &self,
1127        req: Request<T>,
1128        mut context: StreamableHttpContext,
1129    ) -> Response<http_body_util::combinators::UnsyncBoxBody<Bytes, hyper::Error>>
1130    where
1131        T: Body + Send + 'static,
1132    {
1133        debug!("Streaming handler called - using true streaming POST");
1134
1135        // Parse request body (still need to collect for JSON-RPC parsing)
1136        let body_bytes = match req.into_body().collect().await {
1137            Ok(collected) => collected.to_bytes(),
1138            Err(_err) => {
1139                error!("Failed to read streaming POST request body");
1140                return StreamableResponse::Error {
1141                    status: StatusCode::BAD_REQUEST,
1142                    message: "Failed to read request body".to_string(),
1143                }
1144                .into_boxed_response(&context);
1145            }
1146        };
1147
1148        // Check body size
1149        if body_bytes.len() > self.config.max_body_size {
1150            warn!(
1151                "Streaming POST request body too large: {} bytes",
1152                body_bytes.len()
1153            );
1154            return StreamableResponse::Error {
1155                status: StatusCode::PAYLOAD_TOO_LARGE,
1156                message: "Request body too large".to_string(),
1157            }
1158            .into_boxed_response(&context);
1159        }
1160
1161        // Parse as UTF-8
1162        let body_str = match std::str::from_utf8(&body_bytes) {
1163            Ok(s) => s,
1164            Err(err) => {
1165                error!("Invalid UTF-8 in streaming POST request body: {}", err);
1166                return StreamableResponse::Error {
1167                    status: StatusCode::BAD_REQUEST,
1168                    message: "Request body must be valid UTF-8".to_string(),
1169                }
1170                .into_boxed_response(&context);
1171            }
1172        };
1173
1174        debug!("Streaming POST received JSON-RPC request: {}", body_str);
1175
1176        // Parse JSON-RPC message
1177        use turul_mcp_json_rpc_server::dispatch::{JsonRpcMessage, parse_json_rpc_message};
1178        use turul_mcp_json_rpc_server::error::JsonRpcErrorObject;
1179
1180        let message = match parse_json_rpc_message(body_str) {
1181            Ok(msg) => msg,
1182            Err(rpc_err) => {
1183                error!("JSON-RPC parse error in streaming POST: {}", rpc_err);
1184                let error_json =
1185                    serde_json::to_string(&rpc_err).unwrap_or_else(|_| "{}".to_string());
1186
1187                // Return error with MCP headers (no session header for parse errors)
1188                return Response::builder()
1189                    .status(StatusCode::OK) // JSON-RPC parse errors still use 200 OK
1190                    .header(CONTENT_TYPE, "application/json")
1191                    .header("MCP-Protocol-Version", context.protocol_version.as_str())
1192                    .body(
1193                        Full::new(Bytes::from(error_json))
1194                            .map_err(|never| match never {})
1195                            .boxed_unsync(),
1196                    )
1197                    .unwrap();
1198            }
1199        };
1200
1201        // Validate session requirements based on method
1202        let session_id = match &message {
1203            JsonRpcMessage::Request(req) if req.method == "initialize" => {
1204                // Initialize can create session if none exists
1205                if let Some(existing_id) = &context.session_id {
1206                    // Validate existing session for initialize
1207                    if let Err(err) = self.validate_session_exists(existing_id).await {
1208                        warn!(
1209                            "Invalid session ID {} during initialize: {}",
1210                            existing_id, err
1211                        );
1212                        return StreamableResponse::Error {
1213                            status: StatusCode::UNAUTHORIZED,
1214                            message: "Invalid or expired session".to_string(),
1215                        }
1216                        .into_boxed_response(&context);
1217                    }
1218                    existing_id.clone()
1219                } else {
1220                    // Create new session for initialize
1221                    match self
1222                        .session_storage
1223                        .create_session(self.server_capabilities.clone())
1224                        .await
1225                    {
1226                        Ok(session_info) => {
1227                            debug!(
1228                                "Created new session for initialize: {}",
1229                                session_info.session_id
1230                            );
1231                            context.session_id = Some(session_info.session_id.clone());
1232                            session_info.session_id
1233                        }
1234                        Err(err) => {
1235                            error!("Failed to create session during initialize: {}", err);
1236                            return StreamableResponse::Error {
1237                                status: StatusCode::INTERNAL_SERVER_ERROR,
1238                                message: "Failed to create session".to_string(),
1239                            }
1240                            .into_boxed_response(&context);
1241                        }
1242                    }
1243                }
1244            }
1245            JsonRpcMessage::Request(_) | JsonRpcMessage::Notification(_) => {
1246                // All other methods REQUIRE session ID
1247                if let Some(existing_id) = &context.session_id {
1248                    // Validate existing session
1249                    if let Err(err) = self.validate_session_exists(existing_id).await {
1250                        warn!("Invalid session ID {}: {}", existing_id, err);
1251                        return StreamableResponse::Error {
1252                            status: StatusCode::UNAUTHORIZED,
1253                            message: "Invalid or expired session".to_string(),
1254                        }
1255                        .into_boxed_response(&context);
1256                    }
1257                    existing_id.clone()
1258                } else {
1259                    // Return 401 JSON-RPC error for missing session
1260                    let method_name = match &message {
1261                        JsonRpcMessage::Request(req) => &req.method,
1262                        JsonRpcMessage::Notification(notif) => &notif.method,
1263                    };
1264                    let request_id = match &message {
1265                        JsonRpcMessage::Request(req) => Some(req.id.clone()),
1266                        JsonRpcMessage::Notification(_) => None,
1267                    };
1268
1269                    warn!("Missing session ID for method: {}", method_name);
1270
1271                    let error_response = turul_mcp_json_rpc_server::JsonRpcError::new(
1272                        request_id,
1273                        JsonRpcErrorObject::server_error(
1274                            -32001,
1275                            "Missing Mcp-Session-Id header. Call initialize first.",
1276                            None::<serde_json::Value>,
1277                        ),
1278                    );
1279
1280                    let error_json =
1281                        serde_json::to_string(&error_response).unwrap_or_else(|_| "{}".to_string());
1282
1283                    return Response::builder()
1284                        .status(StatusCode::UNAUTHORIZED)
1285                        .header(CONTENT_TYPE, "application/json")
1286                        .header("MCP-Protocol-Version", context.protocol_version.as_str())
1287                        .body(
1288                            Full::new(Bytes::from(error_json))
1289                                .map_err(|never| match never {})
1290                                .boxed_unsync(),
1291                        )
1292                        .unwrap();
1293                }
1294            }
1295        };
1296
1297        debug!("Processing streaming request with session: {}", session_id);
1298
1299        // Create streaming response using hyper::Body::channel()
1300        match message {
1301            JsonRpcMessage::Request(request) => {
1302                debug!(
1303                    "Processing streaming JSON-RPC request: method={}",
1304                    request.method
1305                );
1306                self.create_streaming_response(request, session_id, context)
1307                    .await
1308            }
1309            JsonRpcMessage::Notification(notification) => {
1310                debug!(
1311                    "Processing streaming JSON-RPC notification: method={}",
1312                    notification.method
1313                );
1314
1315                // Create session context with notification broadcaster for notifications
1316                use crate::notification_bridge::StreamManagerNotificationBroadcaster;
1317                use turul_mcp_json_rpc_server::SessionContext;
1318
1319                let broadcaster = Arc::new(StreamManagerNotificationBroadcaster::new(Arc::clone(
1320                    &self.stream_manager,
1321                )));
1322                let broadcaster_any = Arc::new(broadcaster) as Arc<dyn std::any::Any + Send + Sync>;
1323
1324                let session_context = SessionContext {
1325                    session_id: session_id.clone(),
1326                    metadata: std::collections::HashMap::new(),
1327                    broadcaster: Some(broadcaster_any),
1328                    timestamp: chrono::Utc::now().timestamp_millis() as u64,
1329                };
1330
1331                // Process notification through dispatcher (notifications don't return responses)
1332                let dispatcher = Arc::clone(&self.dispatcher);
1333                let notification_clone = notification.clone();
1334
1335                // Spawn task to handle notification asynchronously (notifications are fire-and-forget)
1336                tokio::spawn(async move {
1337                    if let Err(e) = dispatcher
1338                        .handle_notification_with_context(notification_clone, Some(session_context))
1339                        .await
1340                    {
1341                        error!("Failed to process notification: {}", e);
1342                    }
1343                });
1344
1345                // Return 202 Accepted with MCP headers (notifications are accepted immediately)
1346                Response::builder()
1347                    .status(StatusCode::ACCEPTED)
1348                    .header("MCP-Protocol-Version", context.protocol_version.as_str())
1349                    .header("Mcp-Session-Id", &session_id)
1350                    .body(
1351                        Full::new(Bytes::new())
1352                            .map_err(|never| match never {})
1353                            .boxed_unsync(),
1354                    )
1355                    .unwrap()
1356            }
1357        }
1358    }
1359
1360    /// Create a streaming response using hyper::Body::channel()
1361    /// This enables true progressive responses with Transfer-Encoding: chunked
1362    async fn create_streaming_response(
1363        &self,
1364        request: turul_mcp_json_rpc_server::JsonRpcRequest,
1365        session_id: String,
1366        context: StreamableHttpContext,
1367    ) -> Response<http_body_util::combinators::UnsyncBoxBody<Bytes, hyper::Error>> {
1368        debug!(
1369            "Creating streaming response for method: {}, session: {}",
1370            request.method, session_id
1371        );
1372        // Create channel for streaming response
1373        use http_body_util::StreamBody;
1374        use tokio_stream::StreamExt;
1375        use tokio_stream::wrappers::UnboundedReceiverStream; // Add StreamExt for map method
1376
1377        let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<Result<bytes::Bytes, hyper::Error>>();
1378        let body_stream = UnboundedReceiverStream::new(rx)
1379            .map(|item| item.map(|bytes| http_body::Frame::data(bytes)));
1380        let body = StreamBody::new(body_stream);
1381
1382        // Create session context with notification broadcaster (same pattern as SessionMcpHandler)
1383        use crate::notification_bridge::{
1384            SharedNotificationBroadcaster, StreamManagerNotificationBroadcaster,
1385        };
1386        use turul_mcp_json_rpc_server::SessionContext;
1387
1388        let broadcaster: SharedNotificationBroadcaster = Arc::new(
1389            StreamManagerNotificationBroadcaster::new(Arc::clone(&self.stream_manager)),
1390        );
1391        let broadcaster_any = Arc::new(broadcaster) as Arc<dyn std::any::Any + Send + Sync>;
1392
1393        let session_context = SessionContext {
1394            session_id: session_id.clone(),
1395            metadata: std::collections::HashMap::new(),
1396            broadcaster: Some(broadcaster_any),
1397            timestamp: chrono::Utc::now().timestamp_millis() as u64,
1398        };
1399
1400        // Register streaming POST connection with StreamManager for progress events
1401        let wants_sse = context.wants_sse_stream();
1402        let connection_id = format!("post-{}", uuid::Uuid::now_v7());
1403
1404        // Progress forwarding only for SSE clients
1405        let (shutdown_tx, completion_rx) = if wants_sse {
1406            // Create shutdown signal for progress task (critical for no-progress-events case)
1407            let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel::<()>();
1408            let (completion_tx, completion_rx) = tokio::sync::oneshot::channel::<()>();
1409            let (progress_tx, mut progress_rx) = tokio::sync::mpsc::channel(100);
1410
1411            // Register with StreamManager to receive progress events
1412            let registration_result = self
1413                .stream_manager
1414                .register_streaming_connection(&session_id, connection_id.clone(), progress_tx)
1415                .await;
1416
1417            if let Err(e) = registration_result {
1418                error!("Failed to register POST streaming connection: {}", e);
1419                // Continue without streaming - will still work as regular POST
1420                (None, None)
1421            } else {
1422                debug!(
1423                    "Registered SSE streaming connection for session: {}",
1424                    session_id
1425                );
1426
1427                // Spawn task to forward progress events to HTTP response
1428                let sender_clone = tx.clone();
1429                let session_id_clone = session_id.clone();
1430                let connection_id_clone = connection_id.clone();
1431                let stream_manager_clone = Arc::clone(&self.stream_manager);
1432
1433                tokio::spawn(async move {
1434                    debug!(
1435                        "Starting progress forwarding task for session: {}",
1436                        session_id_clone
1437                    );
1438
1439                    // CRITICAL: Use select to handle both progress events AND explicit shutdown
1440                    loop {
1441                        debug!(
1442                            "🔍 Progress task entering select loop for session: {}",
1443                            session_id_clone
1444                        );
1445                        tokio::select! {
1446                            // Handle progress events if they arrive
1447                            maybe_event = progress_rx.recv() => {
1448                                debug!("🔍 Progress task: progress_rx.recv() branch fired for session: {}", session_id_clone);
1449                                match maybe_event {
1450                                    Some(sse_event) => {
1451                                        debug!("🔍 Forwarding progress event to POST response: session={}, event={:?}", session_id_clone, sse_event.event_type);
1452
1453                                        // Convert SSE event to fully-formatted SSE chunk with event metadata
1454                                        let sse_chunk = sse_event.format();
1455
1456                                        if let Err(e) = sender_clone.send(Ok(Bytes::from(sse_chunk))) {
1457                                            error!("Failed to send progress event to POST response: {}", e);
1458                                            break;
1459                                        }
1460                                    }
1461                                    None => {
1462                                        // Progress channel closed naturally
1463                                        debug!("🔍 Progress channel closed naturally for session: {}", session_id_clone);
1464                                        break;
1465                                    }
1466                                }
1467                            }
1468                            // Handle explicit shutdown signal from main task
1469                            _ = &mut shutdown_rx => {
1470                                debug!("🔍 Progress task: shutdown_rx branch fired! Received explicit shutdown signal for session: {}", session_id_clone);
1471                                break;
1472                            }
1473                        }
1474                    }
1475
1476                    // Clean up: Unregister from StreamManager to close progress_tx
1477                    debug!(
1478                        "Progress task unregistering connection for session: {}",
1479                        session_id_clone
1480                    );
1481                    stream_manager_clone
1482                        .unregister_connection(&session_id_clone, &connection_id_clone)
1483                        .await;
1484
1485                    // CRITICAL: Drop the sender to ensure stream can close
1486                    debug!(
1487                        "🔍 Progress task: dropping sender_clone for session: {}",
1488                        session_id_clone
1489                    );
1490                    drop(sender_clone);
1491
1492                    // Signal completion to main task
1493                    debug!(
1494                        "🔍 Progress task: signaling completion for session: {}",
1495                        session_id_clone
1496                    );
1497                    if let Err(_) = completion_tx.send(()) {
1498                        debug!(
1499                            "🔍 Progress task: main task already dropped completion_rx for session: {}",
1500                            session_id_clone
1501                        );
1502                    }
1503
1504                    debug!(
1505                        "🔍 Progress forwarding task completed for session: {}",
1506                        session_id_clone
1507                    );
1508                });
1509
1510                // Return shutdown_tx and completion_rx for later use
1511                (Some(shutdown_tx), Some(completion_rx))
1512            }
1513        } else {
1514            // No SSE, no shutdown signal needed
1515            (None, None)
1516        };
1517
1518        // Spawn task to handle streaming dispatch
1519        let dispatcher = Arc::clone(&self.dispatcher);
1520        let request_id = request.id.clone();
1521        let sender = tx; // Rename for clarity
1522
1523        tokio::spawn(async move {
1524            debug!(
1525                "Spawning streaming task for request ID: {:?}, wants_sse: {}",
1526                request_id, wants_sse
1527            );
1528
1529            // Process actual request
1530            let response = dispatcher
1531                .handle_request_with_context(request, session_context)
1532                .await;
1533
1534            // Send final result - format depends on client type
1535            if wants_sse {
1536                // For SSE clients, send as streaming frame with SSE framing
1537                let final_frame = match response {
1538                    turul_mcp_json_rpc_server::JsonRpcMessage::Response(resp) => {
1539                        turul_mcp_json_rpc_server::JsonRpcFrame::FinalResult {
1540                            request_id: request_id.clone(),
1541                            result: match resp.result {
1542                                turul_mcp_json_rpc_server::response::ResponseResult::Success(
1543                                    val,
1544                                ) => val,
1545                                turul_mcp_json_rpc_server::response::ResponseResult::Null => {
1546                                    serde_json::Value::Null
1547                                }
1548                            },
1549                        }
1550                    }
1551                    turul_mcp_json_rpc_server::JsonRpcMessage::Error(err) => {
1552                        turul_mcp_json_rpc_server::JsonRpcFrame::Error {
1553                            request_id: request_id.clone(),
1554                            error: turul_mcp_json_rpc_server::error::JsonRpcErrorObject {
1555                                code: err.error.code,
1556                                message: err.error.message,
1557                                data: err.error.data,
1558                            },
1559                        }
1560                    }
1561                };
1562
1563                let final_json = final_frame.to_json();
1564                // SSE framing: data: {json}\n\n
1565                let final_chunk =
1566                    format!("data: {}\n\n", serde_json::to_string(&final_json).unwrap());
1567
1568                if let Err(err) = sender.send(Ok(Bytes::from(final_chunk))) {
1569                    error!("Failed to send SSE final chunk: {}", err);
1570                }
1571
1572                // CRITICAL: Send explicit shutdown signal to progress forwarding task (SSE only)
1573                // This breaks it out of the progress_rx.recv().await loop immediately
1574                if let Some(shutdown_tx) = shutdown_tx {
1575                    debug!(
1576                        "🔍 Main task sending shutdown signal to progress task for request: {:?}",
1577                        request_id
1578                    );
1579                    match shutdown_tx.send(()) {
1580                        Ok(()) => {
1581                            debug!(
1582                                "🔍 Main task: shutdown signal sent successfully for request: {:?}",
1583                                request_id
1584                            );
1585
1586                            // CRITICAL: Wait for progress task to complete and drop its sender_clone
1587                            // This ensures both senders are dropped before the stream tries to close
1588                            if let Some(completion_rx) = completion_rx {
1589                                match tokio::time::timeout(
1590                                    tokio::time::Duration::from_millis(100),
1591                                    completion_rx,
1592                                )
1593                                .await
1594                                {
1595                                    Ok(Ok(())) => {
1596                                        debug!(
1597                                            "🔍 Main task: progress task completed successfully for request: {:?}",
1598                                            request_id
1599                                        );
1600                                    }
1601                                    Ok(Err(_)) => {
1602                                        debug!(
1603                                            "🔍 Main task: progress task completion signal dropped for request: {:?}",
1604                                            request_id
1605                                        );
1606                                    }
1607                                    Err(_) => {
1608                                        debug!(
1609                                            "🔍 Main task: progress task completion timeout for request: {:?}",
1610                                            request_id
1611                                        );
1612                                    }
1613                                }
1614                            }
1615                        }
1616                        Err(_) => {
1617                            debug!(
1618                                "🔍 Main task: progress task already completed (shutdown_rx dropped) for request: {:?}",
1619                                request_id
1620                            );
1621                        }
1622                    }
1623                } else {
1624                    debug!(
1625                        "🔍 Main task: no shutdown_tx available (not SSE client) for request: {:?}",
1626                        request_id
1627                    );
1628                }
1629            } else {
1630                // For JSON-only clients, send as regular JSON-RPC response (no streaming frames)
1631                let final_json = serde_json::to_string(&response).unwrap();
1632
1633                if let Err(err) = sender.send(Ok(Bytes::from(final_json))) {
1634                    error!("Failed to send final JSON response: {}", err);
1635                }
1636            }
1637
1638            debug!(
1639                "🔍 Main task: streaming task completed for request ID: {:?}",
1640                request_id
1641            );
1642
1643            // CRITICAL: Drop the sender to close the stream and signal completion to client
1644            debug!(
1645                "🔍 Main task: dropping main sender for request ID: {:?}",
1646                request_id
1647            );
1648            drop(sender);
1649        });
1650
1651        // Build response with MCP headers merged from context
1652        // Set content type based on client preference
1653        let content_type = if context.wants_sse_stream() {
1654            "text/event-stream"
1655        } else {
1656            "application/json"
1657        };
1658
1659        let mut response = Response::builder()
1660            .status(StatusCode::OK)
1661            .header(CONTENT_TYPE, content_type)
1662            .header("Transfer-Encoding", "chunked") // Key: Enable chunked encoding!
1663            .header("Cache-Control", "no-cache")
1664            .body(http_body_util::BodyExt::boxed_unsync(body))
1665            .unwrap();
1666
1667        // Merge MCP headers from context.response_headers()
1668        let mcp_headers = context.response_headers();
1669        for (key, value) in mcp_headers.iter() {
1670            response.headers_mut().insert(key, value.clone());
1671        }
1672
1673        response
1674    }
1675
1676    /// Handle POST with buffered response (fallback for legacy clients)
1677    #[allow(dead_code)]
1678    async fn handle_buffered_post<T>(
1679        &self,
1680        _req: Request<T>,
1681        context: StreamableHttpContext,
1682        session_id: String,
1683    ) -> Response<http_body_util::combinators::UnsyncBoxBody<Bytes, hyper::Error>>
1684    where
1685        T: Body + Send + 'static,
1686    {
1687        debug!(
1688            "Using buffered POST for legacy client, session: {}",
1689            session_id
1690        );
1691
1692        // Use the existing logic (simplified version)
1693        // TODO: Extract common logic into helper method
1694
1695        Response::builder()
1696            .status(StatusCode::OK)
1697            .header(CONTENT_TYPE, "application/json")
1698            .header("MCP-Protocol-Version", context.protocol_version.as_str())
1699            .header("Mcp-Session-Id", &session_id)
1700            .body(
1701                Full::new(Bytes::from(
1702                    r#"{"jsonrpc":"2.0","id":1,"result":"buffered"}"#,
1703                ))
1704                .map_err(|never| match never {})
1705                .boxed_unsync(),
1706            )
1707            .unwrap()
1708    }
1709
1710    /// Handle POST request - unified handler for all client messages (MCP 2025-06-18 compliant)
1711    /// Processes JSON-RPC requests, notifications, and responses
1712    /// Server decides whether to respond with JSON or SSE stream based on message type
1713    async fn handle_client_message<T>(
1714        &self,
1715        req: Request<T>,
1716        context: StreamableHttpContext,
1717    ) -> Response<http_body_util::combinators::UnsyncBoxBody<Bytes, hyper::Error>>
1718    where
1719        T: Body + Send + 'static,
1720    {
1721        debug!("Handling client message via POST (MCP 2025-06-18)");
1722
1723        // Reject POST if accepts_stream_frames is false
1724        // Per MCP spec: "Include Accept header with application/json and text/event-stream"
1725        if !context.accepts_stream_frames {
1726            warn!("Client POST missing application/json in Accept header");
1727            return StreamableResponse::Error {
1728                status: StatusCode::BAD_REQUEST,
1729                message: "Accept header must include application/json, text/event-stream, or */*"
1730                    .to_string(),
1731            }
1732            .into_boxed_response(&context);
1733        }
1734
1735        // Check content type
1736        let content_type = req
1737            .headers()
1738            .get(CONTENT_TYPE)
1739            .and_then(|ct| ct.to_str().ok())
1740            .unwrap_or("");
1741        if !content_type.starts_with("application/json") {
1742            warn!("Invalid content type for POST: {}", content_type);
1743            return StreamableResponse::Error {
1744                status: StatusCode::BAD_REQUEST,
1745                message: "Content-Type must be application/json".to_string(),
1746            }
1747            .into_boxed_response(&context);
1748        }
1749
1750        // Use streaming for all POST requests, but adapt based on client needs
1751        // For simple JSON clients, streaming will send only the final result (no progress frames)
1752        debug!("Using streaming POST handler for all requests");
1753        return self.handle_streaming_post_real(req, context).await;
1754    }
1755}
1756
1757#[cfg(test)]
1758mod tests {
1759    use super::*;
1760
1761    #[test]
1762    fn test_version_parsing() {
1763        assert_eq!(
1764            McpProtocolVersion::parse_version("2024-11-05"),
1765            Some(McpProtocolVersion::V2024_11_05)
1766        );
1767        assert_eq!(
1768            McpProtocolVersion::parse_version("2025-03-26"),
1769            Some(McpProtocolVersion::V2025_03_26)
1770        );
1771        assert_eq!(
1772            McpProtocolVersion::parse_version("2025-06-18"),
1773            Some(McpProtocolVersion::V2025_06_18)
1774        );
1775        assert_eq!(McpProtocolVersion::parse_version("invalid"), None);
1776    }
1777
1778    #[test]
1779    fn test_version_capabilities() {
1780        let v1 = McpProtocolVersion::V2024_11_05;
1781        assert!(!v1.supports_streamable_http());
1782        assert!(!v1.supports_meta_fields());
1783
1784        let v2 = McpProtocolVersion::V2025_03_26;
1785        assert!(v2.supports_streamable_http());
1786        assert!(!v2.supports_meta_fields());
1787
1788        let v3 = McpProtocolVersion::V2025_06_18;
1789        assert!(v3.supports_streamable_http());
1790        assert!(v3.supports_meta_fields());
1791        assert!(v3.supports_cursors());
1792        assert!(v3.supports_progress_tokens());
1793        assert!(v3.supports_elicitation());
1794    }
1795
1796    #[test]
1797    fn test_context_validation() {
1798        let mut context = StreamableHttpContext {
1799            protocol_version: McpProtocolVersion::V2025_06_18,
1800            session_id: Some("test-session".to_string()),
1801            wants_sse_stream: true,
1802            accepts_stream_frames: true,
1803            headers: HashMap::new(),
1804        };
1805
1806        // POST with session should be valid
1807        assert!(context.validate(&Method::POST).is_ok());
1808        // GET with session should be valid
1809        assert!(context.validate(&Method::GET).is_ok());
1810
1811        // Test invalid cases
1812        context.accepts_stream_frames = false;
1813        assert!(context.validate(&Method::POST).is_err());
1814
1815        context.accepts_stream_frames = true;
1816        context.protocol_version = McpProtocolVersion::V2024_11_05;
1817        context.wants_sse_stream = true;
1818        assert!(context.validate(&Method::POST).is_err());
1819
1820        context.protocol_version = McpProtocolVersion::V2025_06_18;
1821        context.session_id = None;
1822        // POST without session should be OK (for initialize)
1823        assert!(context.validate(&Method::POST).is_ok());
1824        // GET without session should fail
1825        assert!(context.validate(&Method::GET).is_err());
1826    }
1827}