turul_http_mcp_server/
session_handler.rs

1//! JSON-RPC 2.0 over HTTP handler for MCP requests with SessionStorage integration
2//!
3//! This handler implements proper JSON-RPC 2.0 server over HTTP transport with
4//! MCP 2025-06-18 compliance, including:
5//! - SessionStorage trait integration (defaults to InMemory)
6//! - StreamManager for SSE with resumability
7//! - 202 Accepted for notifications
8//! - Last-Event-ID header support
9//! - Per-session event targeting
10
11use std::convert::Infallible;
12use std::pin::Pin;
13use std::sync::Arc;
14use std::task::{Context, Poll};
15use tracing::{debug, error, warn};
16
17use bytes::Bytes;
18use futures::Stream;
19use http_body::{Body, Frame};
20use http_body_util::{BodyExt, Full};
21use hyper::header::{ACCEPT, CONTENT_TYPE};
22use hyper::{Method, Request, Response, StatusCode};
23
24use chrono;
25use turul_mcp_json_rpc_server::{
26    JsonRpcDispatcher,
27    r#async::SessionContext,
28    dispatch::{JsonRpcMessage, JsonRpcMessageResult, parse_json_rpc_message},
29    error::{JsonRpcError, JsonRpcErrorObject},
30};
31use turul_mcp_protocol::McpError;
32use turul_mcp_protocol::ServerCapabilities;
33use turul_mcp_session_storage::{InMemorySessionStorage, SessionView};
34use uuid::Uuid;
35
36use crate::{
37    Result, ServerConfig, StreamConfig, StreamManager,
38    json_rpc_responses::*,
39    notification_bridge::{SharedNotificationBroadcaster, StreamManagerNotificationBroadcaster},
40    protocol::{extract_last_event_id, extract_protocol_version, extract_session_id},
41};
42use std::collections::HashMap;
43
44/// SSE stream body that implements hyper's Body trait
45pub struct SessionSseStream {
46    stream: Pin<Box<dyn Stream<Item = std::result::Result<Bytes, Infallible>> + Send>>,
47}
48
49impl SessionSseStream {
50    pub fn new<S>(stream: S) -> Self
51    where
52        S: Stream<Item = std::result::Result<Bytes, Infallible>> + Send + 'static,
53    {
54        Self {
55            stream: Box::pin(stream),
56        }
57    }
58}
59
60impl Drop for SessionSseStream {
61    fn drop(&mut self) {
62        debug!("DROP: SessionSseStream - HTTP response body being cleaned up");
63        debug!("This may indicate early cleanup of SSE response stream");
64    }
65}
66
67impl Body for SessionSseStream {
68    type Data = Bytes;
69    type Error = Infallible;
70
71    fn poll_frame(
72        mut self: Pin<&mut Self>,
73        cx: &mut Context<'_>,
74    ) -> Poll<Option<std::result::Result<Frame<Self::Data>, Self::Error>>> {
75        match self.stream.as_mut().poll_next(cx) {
76            Poll::Ready(Some(Ok(data))) => Poll::Ready(Some(Ok(Frame::data(data)))),
77            Poll::Ready(Some(Err(never))) => match never {},
78            Poll::Ready(None) => Poll::Ready(None),
79            Poll::Pending => Poll::Pending,
80        }
81    }
82}
83
84/// HTTP body type for JSON-RPC responses
85type JsonRpcBody = Full<Bytes>;
86
87/// HTTP body type for unified MCP responses (can handle both JSON-RPC and streaming)
88type UnifiedMcpBody = http_body_util::combinators::UnsyncBoxBody<Bytes, hyper::Error>;
89
90/// Accept header compliance mode for MCP Streamable HTTP
91#[derive(Debug, Clone, PartialEq)]
92enum AcceptMode {
93    /// Client sends both application/json and text/event-stream (MCP spec compliant)
94    Compliant,
95    /// Client sends only application/json (compatibility mode for non-compliant clients)
96    JsonOnly,
97    /// Client sends only text/event-stream (SSE only)
98    SseOnly,
99    /// Client sends neither or something else entirely
100    Invalid,
101}
102
103/// Parse MCP Accept header and determine compliance mode
104fn parse_mcp_accept_header(accept_header: &str) -> (AcceptMode, bool) {
105    let accepts_json = accept_header.contains("application/json") || accept_header.contains("*/*");
106    let accepts_sse = accept_header.contains("text/event-stream");
107
108    let mode = match (accepts_json, accepts_sse) {
109        (true, true) => AcceptMode::Compliant,
110        (true, false) => AcceptMode::JsonOnly, // MCP Inspector case
111        (false, true) => AcceptMode::SseOnly,
112        (false, false) => AcceptMode::Invalid,
113    };
114
115    // For SSE decision, we need both compliance and actual SSE support
116    // In JsonOnly mode, we never use SSE even if server would prefer it
117    let should_use_sse = match mode {
118        AcceptMode::Compliant => true, // Server can choose
119        AcceptMode::JsonOnly => false, // Force JSON for compatibility
120        AcceptMode::SseOnly => true,   // Force SSE
121        AcceptMode::Invalid => false,  // Fallback to JSON
122    };
123
124    (mode, should_use_sse)
125}
126
127/// Helper function to convert Full<Bytes> to UnsyncBoxBody<Bytes, hyper::Error>
128fn convert_to_unified_body(full_body: Full<Bytes>) -> UnifiedMcpBody {
129    full_body.map_err(|never| match never {}).boxed_unsync()
130}
131
132/// Helper function to create JSON-RPC error response as unified body
133fn jsonrpc_error_to_unified_body(error: JsonRpcError) -> Result<Response<UnifiedMcpBody>> {
134    let error_json = serde_json::to_string(&error)?;
135    Ok(Response::builder()
136        .status(StatusCode::OK) // JSON-RPC errors still use 200 OK
137        .header(CONTENT_TYPE, "application/json")
138        .body(convert_to_unified_body(Full::new(Bytes::from(error_json))))
139        .unwrap())
140}
141
142// ✅ CORRECTED ARCHITECTURE: Remove complex registry - use single shared StreamManager
143
144/// JSON-RPC 2.0 over HTTP handler with shared StreamManager
145pub struct SessionMcpHandler {
146    pub(crate) config: ServerConfig,
147    pub(crate) dispatcher: Arc<JsonRpcDispatcher<McpError>>,
148    pub(crate) session_storage: Arc<turul_mcp_session_storage::BoxedSessionStorage>,
149    pub(crate) stream_config: StreamConfig,
150    // ✅ CORRECTED ARCHITECTURE: Single shared StreamManager instance with internal session management
151    pub(crate) stream_manager: Arc<StreamManager>,
152    pub(crate) middleware_stack: Arc<crate::middleware::MiddlewareStack>,
153}
154
155impl Clone for SessionMcpHandler {
156    fn clone(&self) -> Self {
157        Self {
158            config: self.config.clone(),
159            dispatcher: Arc::clone(&self.dispatcher),
160            session_storage: Arc::clone(&self.session_storage),
161            stream_config: self.stream_config.clone(),
162            stream_manager: Arc::clone(&self.stream_manager),
163            middleware_stack: Arc::clone(&self.middleware_stack),
164        }
165    }
166}
167
168impl SessionMcpHandler {
169    /// Create a new handler with default in-memory storage (zero-configuration)
170    pub fn new(
171        config: ServerConfig,
172        dispatcher: Arc<JsonRpcDispatcher<McpError>>,
173        stream_config: StreamConfig,
174    ) -> Self {
175        let storage: Arc<turul_mcp_session_storage::BoxedSessionStorage> =
176            Arc::new(InMemorySessionStorage::new());
177        let middleware_stack = Arc::new(crate::middleware::MiddlewareStack::new());
178        Self::with_storage(config, dispatcher, storage, stream_config, middleware_stack)
179    }
180
181    /// Create handler with shared StreamManager instance (corrected architecture)
182    pub fn with_shared_stream_manager(
183        config: ServerConfig,
184        dispatcher: Arc<JsonRpcDispatcher<McpError>>,
185        session_storage: Arc<turul_mcp_session_storage::BoxedSessionStorage>,
186        stream_config: StreamConfig,
187        stream_manager: Arc<StreamManager>,
188        middleware_stack: Arc<crate::middleware::MiddlewareStack>,
189    ) -> Self {
190        Self {
191            config,
192            dispatcher,
193            session_storage,
194            stream_config,
195            stream_manager,
196            middleware_stack,
197        }
198    }
199
200    /// Create handler with specific session storage backend (creates own StreamManager)
201    /// Note: Use with_shared_stream_manager for correct architecture
202    pub fn with_storage(
203        config: ServerConfig,
204        dispatcher: Arc<JsonRpcDispatcher<McpError>>,
205        session_storage: Arc<turul_mcp_session_storage::BoxedSessionStorage>,
206        stream_config: StreamConfig,
207        middleware_stack: Arc<crate::middleware::MiddlewareStack>,
208    ) -> Self {
209        // Create own StreamManager instance (not recommended for production)
210        let stream_manager = Arc::new(StreamManager::with_config(
211            Arc::clone(&session_storage),
212            stream_config.clone(),
213        ));
214
215        Self {
216            config,
217            dispatcher,
218            session_storage,
219            stream_config,
220            stream_manager,
221            middleware_stack,
222        }
223    }
224
225    /// Get access to the StreamManager for notifications
226    pub fn get_stream_manager(&self) -> &Arc<StreamManager> {
227        &self.stream_manager
228    }
229
230    /// Handle MCP HTTP requests with full MCP 2025-06-18 compliance
231    pub async fn handle_mcp_request<B>(&self, req: Request<B>) -> Result<Response<UnifiedMcpBody>>
232    where
233        B: http_body::Body<Data = bytes::Bytes, Error = hyper::Error> + Send + 'static,
234    {
235        debug!(
236            "SESSION HANDLER processing {} {}",
237            req.method(),
238            req.uri().path()
239        );
240        match *req.method() {
241            Method::POST => {
242                let response = self.handle_json_rpc_request(req).await?;
243                Ok(response)
244            }
245            Method::GET => self.handle_sse_request(req).await,
246            Method::DELETE => {
247                let response = self.handle_delete_request(req).await?;
248                Ok(response.map(convert_to_unified_body))
249            }
250            Method::OPTIONS => {
251                let response = self.handle_preflight();
252                Ok(response.map(convert_to_unified_body))
253            }
254            _ => {
255                let response = self.method_not_allowed();
256                Ok(response.map(convert_to_unified_body))
257            }
258        }
259    }
260
261    /// Handle JSON-RPC requests over HTTP POST
262    async fn handle_json_rpc_request<B>(&self, req: Request<B>) -> Result<Response<UnifiedMcpBody>>
263    where
264        B: http_body::Body<Data = bytes::Bytes, Error = hyper::Error> + Send + 'static,
265    {
266        // Extract all headers for middleware before body is consumed
267        let headers: HashMap<String, String> = req
268            .headers()
269            .iter()
270            .filter_map(|(k, v)| v.to_str().ok().map(|s| (k.as_str().to_string(), s.to_string())))
271            .collect();
272
273        // Extract protocol version and session ID from headers
274        let protocol_version = extract_protocol_version(req.headers());
275        let session_id = extract_session_id(req.headers());
276
277        debug!(
278            "POST request - Protocol: {}, Session: {:?}",
279            protocol_version, session_id
280        );
281
282        // Check content type
283        let content_type = req
284            .headers()
285            .get(CONTENT_TYPE)
286            .and_then(|ct| ct.to_str().ok())
287            .unwrap_or("");
288
289        if !content_type.starts_with("application/json") {
290            warn!("Invalid content type: {}", content_type);
291            return Ok(
292                bad_request_response("Content-Type must be application/json")
293                    .map(convert_to_unified_body),
294            );
295        }
296
297        // Parse Accept header for MCP Streamable HTTP compliance
298        let accept_header = req
299            .headers()
300            .get(ACCEPT)
301            .and_then(|accept| accept.to_str().ok())
302            .unwrap_or("application/json");
303
304        let (accept_mode, accepts_sse) = parse_mcp_accept_header(accept_header);
305        debug!(
306            "POST request Accept header: '{}', mode: {:?}, will use SSE for tool calls: {}",
307            accept_header, accept_mode, accepts_sse
308        );
309
310        // Read request body
311        let body = req.into_body();
312        let body_bytes = match body.collect().await {
313            Ok(collected) => collected.to_bytes(),
314            Err(err) => {
315                error!("Failed to read request body: {}", err);
316                return Ok(bad_request_response("Failed to read request body")
317                    .map(convert_to_unified_body));
318            }
319        };
320
321        // Check body size
322        if body_bytes.len() > self.config.max_body_size {
323            warn!("Request body too large: {} bytes", body_bytes.len());
324            return Ok(Response::builder()
325                .status(StatusCode::PAYLOAD_TOO_LARGE)
326                .header(CONTENT_TYPE, "application/json")
327                .body(convert_to_unified_body(Full::new(Bytes::from(
328                    "Request body too large",
329                ))))
330                .unwrap());
331        }
332
333        // Parse as UTF-8
334        let body_str = match std::str::from_utf8(&body_bytes) {
335            Ok(s) => s,
336            Err(err) => {
337                error!("Invalid UTF-8 in request body: {}", err);
338                return Ok(bad_request_response("Request body must be valid UTF-8")
339                    .map(convert_to_unified_body));
340            }
341        };
342
343        debug!("Received JSON-RPC request: {}", body_str);
344
345        // Parse JSON-RPC message
346        let message = match parse_json_rpc_message(body_str) {
347            Ok(msg) => msg,
348            Err(rpc_err) => {
349                error!("JSON-RPC parse error: {}", rpc_err);
350                // Extract request ID from the error if available
351                let error_response =
352                    serde_json::to_string(&rpc_err).unwrap_or_else(|_| "{}".to_string());
353                return Ok(Response::builder()
354                    .status(StatusCode::OK) // JSON-RPC parse errors still use 200 OK
355                    .header(CONTENT_TYPE, "application/json")
356                    .body(convert_to_unified_body(Full::new(Bytes::from(
357                        error_response,
358                    ))))
359                    .unwrap());
360            }
361        };
362
363        // Handle the message using proper JSON-RPC enums
364        let (message_result, response_session_id, method_name) = match message {
365            JsonRpcMessage::Request(request) => {
366                debug!("Processing JSON-RPC request: method={}", request.method);
367                let method_name = request.method.clone();
368
369                // Special handling for initialize requests - they create new sessions
370                let (response, response_session_id) = if request.method == "initialize" {
371                    debug!(
372                        "Handling initialize request - creating new session via session storage"
373                    );
374
375                    // Let session storage create the session and generate the ID (GPS pattern)
376                    let capabilities = ServerCapabilities::default();
377                    match self.session_storage.create_session(capabilities).await {
378                        Ok(session_info) => {
379                            debug!(
380                                "Created new session via session storage: {}",
381                                session_info.session_id
382                            );
383
384                            // ✅ CORRECTED ARCHITECTURE: Create session-specific notification broadcaster from shared StreamManager
385                            let broadcaster: SharedNotificationBroadcaster =
386                                Arc::new(StreamManagerNotificationBroadcaster::new(Arc::clone(
387                                    &self.stream_manager,
388                                )));
389                            let broadcaster_any =
390                                Arc::new(broadcaster) as Arc<dyn std::any::Any + Send + Sync>;
391
392                            let session_context = SessionContext {
393                                session_id: session_info.session_id.clone(),
394                                metadata: std::collections::HashMap::new(),
395                                broadcaster: Some(broadcaster_any),
396                                timestamp: chrono::Utc::now().timestamp_millis() as u64,
397                            };
398
399                            // Run middleware pipeline and dispatch
400                            // Injection is applied immediately inside run_middleware_and_dispatch
401                            let (response, _) = self
402                                .run_middleware_and_dispatch(request, headers.clone(), session_context)
403                                .await;
404
405                            // Return the session ID created by session storage for the HTTP header
406                            (response, Some(session_info.session_id))
407                        }
408                        Err(err) => {
409                            error!("Failed to create session during initialize: {}", err);
410                            // Return error response using proper JSON-RPC error format
411                            let error_msg = format!("Session creation failed: {}", err);
412                            let error_response = turul_mcp_json_rpc_server::JsonRpcMessage::error(
413                                turul_mcp_json_rpc_server::JsonRpcError::internal_error(
414                                    Some(request.id),
415                                    Some(error_msg),
416                                ),
417                            );
418                            (error_response, None)
419                        }
420                    }
421                } else {
422                    // For non-initialize requests, create session context if session ID is provided
423                    // Let server-level handlers decide whether to enforce session requirements
424                    let session_context = if let Some(ref session_id_str) = session_id {
425                        debug!("Processing request with session: {}", session_id_str);
426                        let broadcaster: SharedNotificationBroadcaster =
427                            Arc::new(StreamManagerNotificationBroadcaster::new(Arc::clone(
428                                &self.stream_manager,
429                            )));
430                        let broadcaster_any =
431                            Arc::new(broadcaster) as Arc<dyn std::any::Any + Send + Sync>;
432                        Some(SessionContext {
433                            session_id: session_id_str.clone(),
434                            metadata: std::collections::HashMap::new(),
435                            broadcaster: Some(broadcaster_any),
436                            timestamp: chrono::Utc::now().timestamp_millis() as u64,
437                        })
438                    } else {
439                        debug!("Processing request without session (lenient mode)");
440                        None
441                    };
442
443                    // Run middleware pipeline and dispatch
444                    let (response, _stashed_injection) = if let Some(ctx) = session_context {
445                        self.run_middleware_and_dispatch(request, headers.clone(), ctx).await
446                    } else {
447                        // No session - fast path (no middleware, just dispatch)
448                        (self.dispatcher.handle_request(request).await, None)
449                    };
450                    (response, session_id)
451                };
452
453                // Convert JsonRpcMessage to JsonRpcMessageResult
454                let message_result = match response {
455                    turul_mcp_json_rpc_server::JsonRpcMessage::Response(resp) => {
456                        JsonRpcMessageResult::Response(resp)
457                    }
458                    turul_mcp_json_rpc_server::JsonRpcMessage::Error(err) => {
459                        JsonRpcMessageResult::Error(err)
460                    }
461                };
462                (message_result, response_session_id, Some(method_name))
463            }
464            JsonRpcMessage::Notification(notification) => {
465                debug!(
466                    "Processing JSON-RPC notification: method={}",
467                    notification.method
468                );
469                let method_name = notification.method.clone();
470
471                // For notifications, create session context if session ID is provided
472                // Let server-level handlers decide whether to enforce session requirements
473                let session_context = if let Some(ref session_id_str) = session_id {
474                    debug!("Processing notification with session: {}", session_id_str);
475                    let broadcaster: SharedNotificationBroadcaster = Arc::new(
476                        StreamManagerNotificationBroadcaster::new(Arc::clone(&self.stream_manager)),
477                    );
478                    let broadcaster_any =
479                        Arc::new(broadcaster) as Arc<dyn std::any::Any + Send + Sync>;
480
481                    Some(SessionContext {
482                        session_id: session_id_str.clone(),
483                        metadata: std::collections::HashMap::new(),
484                        broadcaster: Some(broadcaster_any),
485                        timestamp: chrono::Utc::now().timestamp_millis() as u64,
486                    })
487                } else {
488                    debug!("Processing notification without session (lenient mode)");
489                    None
490                };
491
492                let result = self
493                    .dispatcher
494                    .handle_notification_with_context(notification, session_context)
495                    .await;
496
497                if let Err(err) = result {
498                    error!("Notification handling error: {}", err);
499                }
500                (
501                    JsonRpcMessageResult::NoResponse,
502                    session_id.clone(),
503                    Some(method_name),
504                )
505            }
506        };
507
508        // Convert message result to HTTP response
509        match message_result {
510            JsonRpcMessageResult::Response(response) => {
511                // Check if this is a tool call that should return SSE
512                // Only use SSE if explicitly requested via Accept: text/event-stream header
513                let is_tool_call = method_name.as_ref().is_some_and(|m| m == "tools/call");
514
515                debug!(
516                    "Decision point: method={:?}, accept_mode={:?}, accepts_sse={}, server_post_sse_enabled={}, session_id={:?}, is_tool_call={}",
517                    method_name,
518                    accept_mode,
519                    accepts_sse,
520                    self.config.enable_post_sse,
521                    response_session_id,
522                    is_tool_call
523                );
524
525                // MCP Streamable HTTP decision logic based on Accept header compliance AND server configuration
526                let should_use_sse = match accept_mode {
527                    AcceptMode::JsonOnly => false, // Force JSON for compatibility (MCP Inspector)
528                    AcceptMode::Invalid => false,  // Fallback to JSON for invalid headers
529                    AcceptMode::Compliant => {
530                        self.config.enable_post_sse && accepts_sse && is_tool_call
531                    } // Server chooses for compliant clients
532                    AcceptMode::SseOnly => self.config.enable_post_sse && accepts_sse, // Force SSE if server allows and client accepts
533                };
534
535                if should_use_sse && response_session_id.is_some() {
536                    debug!(
537                        "📡 Creating POST SSE stream (mode: {:?}) for tool call with notifications",
538                        accept_mode
539                    );
540                    match self
541                        .stream_manager
542                        .create_post_sse_stream(
543                            response_session_id.clone().unwrap(),
544                            response.clone(), // Clone the response for SSE stream creation
545                        )
546                        .await
547                    {
548                        Ok(sse_response) => {
549                            debug!("✅ POST SSE stream created successfully");
550                            Ok(sse_response
551                                .map(|body| body.map_err(|never| match never {}).boxed_unsync()))
552                        }
553                        Err(e) => {
554                            warn!(
555                                "Failed to create POST SSE stream, falling back to JSON: {}",
556                                e
557                            );
558                            Ok(
559                                jsonrpc_response_with_session(response, response_session_id)?
560                                    .map(convert_to_unified_body),
561                            )
562                        }
563                    }
564                } else {
565                    debug!(
566                        "📄 Returning standard JSON response (mode: {:?}) for method: {:?}",
567                        accept_mode, method_name
568                    );
569                    Ok(
570                        jsonrpc_response_with_session(response, response_session_id)?
571                            .map(convert_to_unified_body),
572                    )
573                }
574            }
575            JsonRpcMessageResult::Error(error) => {
576                warn!("Sending JSON-RPC error response");
577                // Convert JsonRpcError to proper HTTP response
578                let error_json = serde_json::to_string(&error)?;
579                Ok(Response::builder()
580                    .status(StatusCode::OK) // JSON-RPC errors still return 200 OK
581                    .header(CONTENT_TYPE, "application/json")
582                    .body(convert_to_unified_body(Full::new(Bytes::from(error_json))))
583                    .unwrap())
584            }
585            JsonRpcMessageResult::NoResponse => {
586                // Notifications don't return responses (204 No Content)
587                Ok(jsonrpc_notification_response()?.map(convert_to_unified_body))
588            }
589        }
590    }
591
592    // Note: create_post_sse_response method removed as it's unused in MCP Inspector compatibility mode
593    // SSE for tool calls is temporarily disabled - see WORKING_MEMORY.md for details
594
595    /// Handle Server-Sent Events requests (SSE for streaming)
596    async fn handle_sse_request<B>(&self, req: Request<B>) -> Result<Response<UnifiedMcpBody>>
597    where
598        B: http_body::Body<Data = bytes::Bytes, Error = hyper::Error> + Send + 'static,
599    {
600        // Check if client accepts SSE
601        let headers = req.headers();
602        let accept = headers
603            .get(ACCEPT)
604            .and_then(|accept| accept.to_str().ok())
605            .unwrap_or("");
606
607        if !accept.contains("text/event-stream") {
608            warn!(
609                "GET request received without SSE support - header does not contain 'text/event-stream'"
610            );
611            let error = JsonRpcError::new(
612                None,
613                JsonRpcErrorObject::server_error(
614                    -32001,
615                    "SSE not accepted - missing 'text/event-stream' in Accept header",
616                    None,
617                ),
618            );
619            return jsonrpc_error_to_unified_body(error);
620        }
621
622        // Check if GET SSE is enabled on the server
623        if !self.config.enable_get_sse {
624            warn!("GET SSE request received but GET SSE is disabled on server");
625            let error = JsonRpcError::new(
626                None,
627                JsonRpcErrorObject::server_error(
628                    -32003,
629                    "GET SSE is disabled on this server",
630                    None,
631                ),
632            );
633            return jsonrpc_error_to_unified_body(error);
634        }
635
636        // Extract protocol version and session ID
637        let protocol_version = extract_protocol_version(headers);
638        let session_id = extract_session_id(headers);
639
640        debug!(
641            "GET SSE request - Protocol: {}, Session: {:?}",
642            protocol_version, session_id
643        );
644
645        // Session ID is required for SSE
646        let session_id = match session_id {
647            Some(id) => id,
648            None => {
649                warn!("Missing Mcp-Session-Id header for SSE request");
650                let error = JsonRpcError::new(
651                    None,
652                    JsonRpcErrorObject::server_error(-32002, "Missing Mcp-Session-Id header", None),
653                );
654                return jsonrpc_error_to_unified_body(error);
655            }
656        };
657
658        // Validate session exists (do NOT create if missing)
659        if let Err(err) = self.validate_session_exists(&session_id).await {
660            error!(
661                "Session validation failed for Session ID {}: {}",
662                session_id, err
663            );
664            let error = JsonRpcError::new(
665                None,
666                JsonRpcErrorObject::server_error(
667                    -32003,
668                    &format!("Session validation failed: {}", err),
669                    None,
670                ),
671            );
672            return jsonrpc_error_to_unified_body(error);
673        }
674
675        // Extract Last-Event-ID for resumability
676        let last_event_id = extract_last_event_id(headers);
677
678        // Generate unique connection ID for MCP spec compliance
679        let connection_id = Uuid::now_v7().to_string();
680
681        debug!(
682            "Creating SSE stream for session: {} with connection: {}, last_event_id: {:?}",
683            session_id, connection_id, last_event_id
684        );
685
686        // ✅ CORRECTED ARCHITECTURE: Use shared StreamManager directly (no registry needed)
687        match self
688            .stream_manager
689            .handle_sse_connection(session_id, connection_id, last_event_id)
690            .await
691        {
692            Ok(response) => Ok(response),
693            Err(err) => {
694                error!("Failed to create SSE connection: {}", err);
695                let error = JsonRpcError::new(
696                    None,
697                    JsonRpcErrorObject::internal_error(Some(format!(
698                        "SSE connection failed: {}",
699                        err
700                    ))),
701                );
702                jsonrpc_error_to_unified_body(error)
703            }
704        }
705    }
706
707    /// Handle DELETE requests for session cleanup
708    async fn handle_delete_request<B>(&self, req: Request<B>) -> Result<Response<JsonRpcBody>>
709    where
710        B: http_body::Body<Data = bytes::Bytes, Error = hyper::Error> + Send + 'static,
711    {
712        let session_id = extract_session_id(req.headers());
713
714        debug!("DELETE request - Session: {:?}", session_id);
715
716        if let Some(session_id) = session_id {
717            // First, close any active SSE connections for this session
718            let closed_connections = self
719                .stream_manager
720                .close_session_connections(&session_id)
721                .await;
722            debug!(
723                "Closed {} SSE connections for session: {}",
724                closed_connections, session_id
725            );
726
727            // Mark session as terminated instead of immediate deletion (for proper lifecycle management)
728            match self.session_storage.get_session(&session_id).await {
729                Ok(Some(mut session_info)) => {
730                    // Mark session as terminated in state
731                    session_info
732                        .state
733                        .insert("terminated".to_string(), serde_json::Value::Bool(true));
734                    session_info.state.insert(
735                        "terminated_at".to_string(),
736                        serde_json::Value::Number(serde_json::Number::from(
737                            chrono::Utc::now().timestamp_millis(),
738                        )),
739                    );
740                    session_info.touch();
741
742                    match self.session_storage.update_session(session_info).await {
743                        Ok(()) => {
744                            debug!(
745                                "Session {} marked as terminated (TTL will handle cleanup)",
746                                session_id
747                            );
748                            Ok(Response::builder()
749                                .status(StatusCode::OK)
750                                .body(Full::new(Bytes::from("Session terminated")))
751                                .unwrap())
752                        }
753                        Err(err) => {
754                            error!(
755                                "Error marking session {} as terminated: {}",
756                                session_id, err
757                            );
758                            // Fallback to deletion if update fails
759                            match self.session_storage.delete_session(&session_id).await {
760                                Ok(_) => {
761                                    debug!("Session {} deleted as fallback", session_id);
762                                    Ok(Response::builder()
763                                        .status(StatusCode::OK)
764                                        .body(Full::new(Bytes::from("Session removed")))
765                                        .unwrap())
766                                }
767                                Err(delete_err) => {
768                                    error!(
769                                        "Error deleting session {} as fallback: {}",
770                                        session_id, delete_err
771                                    );
772                                    Ok(Response::builder()
773                                        .status(StatusCode::INTERNAL_SERVER_ERROR)
774                                        .body(Full::new(Bytes::from("Session termination error")))
775                                        .unwrap())
776                                }
777                            }
778                        }
779                    }
780                }
781                Ok(None) => Ok(Response::builder()
782                    .status(StatusCode::NOT_FOUND)
783                    .body(Full::new(Bytes::from("Session not found")))
784                    .unwrap()),
785                Err(err) => {
786                    error!(
787                        "Error retrieving session {} for termination: {}",
788                        session_id, err
789                    );
790                    Ok(Response::builder()
791                        .status(StatusCode::INTERNAL_SERVER_ERROR)
792                        .body(Full::new(Bytes::from("Session lookup error")))
793                        .unwrap())
794                }
795            }
796        } else {
797            Ok(Response::builder()
798                .status(StatusCode::BAD_REQUEST)
799                .body(Full::new(Bytes::from("Missing Mcp-Session-Id header")))
800                .unwrap())
801        }
802    }
803
804    /// Handle OPTIONS preflight requests - these are essential for CORS
805    fn handle_preflight(&self) -> Response<JsonRpcBody> {
806        options_response()
807    }
808
809    /// Return method not allowed response
810    fn method_not_allowed(&self) -> Response<JsonRpcBody> {
811        method_not_allowed_response()
812    }
813
814    /// Validate that a session exists - do NOT create if missing
815    async fn validate_session_exists(&self, session_id: &str) -> Result<()> {
816        // Check if session already exists
817        match self.session_storage.get_session(session_id).await {
818            Ok(Some(_)) => {
819                debug!("Session validation successful: {}", session_id);
820                Ok(())
821            }
822            Ok(None) => {
823                error!("Session not found: {}", session_id);
824                Err(crate::HttpMcpError::InvalidRequest(format!(
825                    "Session '{}' not found. Sessions must be created via initialize request first.",
826                    session_id
827                )))
828            }
829            Err(err) => {
830                error!("Failed to validate session {}: {}", session_id, err);
831                Err(crate::HttpMcpError::InvalidRequest(format!(
832                    "Session validation failed: {}",
833                    err
834                )))
835            }
836        }
837    }
838
839    /// Helper method to run middleware pipeline and dispatch request
840    /// Shared logic between StreamableHttpHandler and SessionMcpHandler
841    async fn run_middleware_and_dispatch(
842        &self,
843        request: turul_mcp_json_rpc_server::JsonRpcRequest,
844        headers: HashMap<String, String>,
845        session: turul_mcp_json_rpc_server::SessionContext,
846    ) -> (turul_mcp_json_rpc_server::JsonRpcMessage, Option<crate::middleware::SessionInjection>) {
847        // Fast path: if middleware stack is empty, dispatch directly
848        if self.middleware_stack.is_empty() {
849            let result = self.dispatcher
850                .handle_request_with_context(request, session)
851                .await;
852            return (result, None);
853        }
854
855        // Normalize headers: lowercase String keys
856        let normalized_headers: HashMap<String, String> = headers
857            .iter()
858            .map(|(k, v)| (k.to_lowercase(), v.clone()))
859            .collect();
860
861        // Build RequestContext with method and headers
862        // Clone method and session_id for ctx (request will be moved to dispatcher)
863        let method = request.method.clone();
864        let session_id = session.session_id.clone();
865
866        // Convert params to Option<Value>
867        let params = request.params.clone().map(|p| match p {
868            turul_mcp_json_rpc_server::RequestParams::Object(map) => {
869                serde_json::Value::Object(map.into_iter().collect())
870            }
871            turul_mcp_json_rpc_server::RequestParams::Array(arr) => serde_json::Value::Array(arr),
872        });
873        let mut ctx = crate::middleware::RequestContext::new(&method, params);
874
875        for (k, v) in normalized_headers {
876            ctx.add_metadata(k, serde_json::json!(v));
877        }
878
879        // Create SessionView adapter for middleware to access storage-backed session
880        let session_view = crate::middleware::StorageBackedSessionView::new(
881            session_id.clone(),
882            Arc::clone(&self.session_storage),
883        );
884
885        // Execute before_dispatch with SessionView
886        let injection = match self.middleware_stack.execute_before(&mut ctx, Some(&session_view)).await {
887            Ok(inj) => inj,
888            Err(err) => {
889                // Map middleware error to proper JSON-RPC error code
890                return (Self::map_middleware_error_to_jsonrpc(err, request.id), None);
891            }
892        };
893
894        // Apply injection immediately to session storage
895        if !injection.is_empty() {
896            for (key, value) in injection.state() {
897                if let Err(e) = session_view.set_state(key, value.clone()).await {
898                    tracing::warn!("Failed to apply injection state '{}': {}", key, e);
899                }
900            }
901            for (key, value) in injection.metadata() {
902                if let Err(e) = session_view.set_metadata(key, value.clone()).await {
903                    tracing::warn!("Failed to apply injection metadata '{}': {}", key, e);
904                }
905            }
906        }
907
908        // Dispatch the request
909        let result = self.dispatcher
910            .handle_request_with_context(request, session)
911            .await;
912
913        // Execute after_dispatch
914        // Convert JsonRpcMessage to DispatcherResult for middleware
915        let mut dispatcher_result = match &result {
916            turul_mcp_json_rpc_server::JsonRpcMessage::Response(resp) => {
917                match &resp.result {
918                    turul_mcp_json_rpc_server::response::ResponseResult::Success(val) => {
919                        crate::middleware::DispatcherResult::Success(val.clone())
920                    }
921                    turul_mcp_json_rpc_server::response::ResponseResult::Null => {
922                        crate::middleware::DispatcherResult::Success(serde_json::Value::Null)
923                    }
924                }
925            }
926            turul_mcp_json_rpc_server::JsonRpcMessage::Error(err) => {
927                crate::middleware::DispatcherResult::Error(err.error.message.clone())
928            }
929        };
930
931        // Ignore errors from after_dispatch (they shouldn't prevent returning the result)
932        let _ = self.middleware_stack.execute_after(&ctx, &mut dispatcher_result).await;
933
934        (result, None) // Injection already applied, no need to return it
935    }
936
937    /// Map MiddlewareError to JSON-RPC error with semantic error codes
938    fn map_middleware_error_to_jsonrpc(
939        err: crate::middleware::MiddlewareError,
940        request_id: turul_mcp_json_rpc_server::RequestId,
941    ) -> turul_mcp_json_rpc_server::JsonRpcMessage {
942        use crate::middleware::error::error_codes;
943        use crate::middleware::MiddlewareError;
944
945        let (code, message, data) = match err {
946            MiddlewareError::Unauthenticated(msg) => (error_codes::UNAUTHENTICATED, msg, None),
947            MiddlewareError::Unauthorized(msg) => (error_codes::UNAUTHORIZED, msg, None),
948            MiddlewareError::RateLimitExceeded {
949                message,
950                retry_after,
951            } => {
952                let data = retry_after.map(|s| serde_json::json!({"retryAfter": s}));
953                (error_codes::RATE_LIMIT_EXCEEDED, message, data)
954            }
955            MiddlewareError::InvalidRequest(msg) => (error_codes::INVALID_REQUEST, msg, None),
956            MiddlewareError::Internal(msg) => (error_codes::INTERNAL_ERROR, msg, None),
957            MiddlewareError::Custom { message, .. } => (error_codes::INTERNAL_ERROR, message, None),
958        };
959
960        let error_obj = if let Some(d) = data {
961            turul_mcp_json_rpc_server::error::JsonRpcErrorObject::server_error(code, &message, Some(d))
962        } else {
963            turul_mcp_json_rpc_server::error::JsonRpcErrorObject::server_error(
964                code,
965                &message,
966                None::<serde_json::Value>,
967            )
968        };
969
970        turul_mcp_json_rpc_server::JsonRpcMessage::Error(turul_mcp_json_rpc_server::JsonRpcError::new(
971            Some(request_id),
972            error_obj,
973        ))
974    }
975}