turul_http_mcp_server/
session_handler.rs

1//! JSON-RPC 2.0 over HTTP handler for MCP requests with SessionStorage integration
2//!
3//! This handler implements proper JSON-RPC 2.0 server over HTTP transport with
4//! MCP 2025-06-18 compliance, including:
5//! - SessionStorage trait integration (defaults to InMemory)
6//! - StreamManager for SSE with resumability
7//! - 202 Accepted for notifications
8//! - Last-Event-ID header support
9//! - Per-session event targeting
10
11use std::sync::Arc;
12use std::convert::Infallible;
13use std::pin::Pin;
14use std::task::{Context, Poll};
15use tracing::{debug, warn, error};
16
17use hyper::{Request, Response, Method, StatusCode};
18use bytes::Bytes;
19use hyper::header::{CONTENT_TYPE, ACCEPT};
20use http_body_util::{BodyExt, Full};
21use http_body::{Body, Frame};
22use futures::Stream;
23
24use turul_mcp_json_rpc_server::{
25    JsonRpcDispatcher,
26    r#async::SessionContext,
27    dispatch::{parse_json_rpc_message, JsonRpcMessage, JsonRpcMessageResult},
28    error::{JsonRpcError, JsonRpcErrorObject}
29};
30use turul_mcp_session_storage::InMemorySessionStorage;
31use turul_mcp_protocol::ServerCapabilities;
32use chrono;
33use uuid::Uuid;
34
35use crate::{
36    Result, ServerConfig, StreamConfig,
37    protocol::{extract_protocol_version, extract_session_id, extract_last_event_id},
38    json_rpc_responses::*,
39    StreamManager,
40    notification_bridge::{StreamManagerNotificationBroadcaster, SharedNotificationBroadcaster}
41};
42
43/// SSE stream body that implements hyper's Body trait
44pub struct SessionSseStream {
45    stream: Pin<Box<dyn Stream<Item = std::result::Result<Bytes, Infallible>> + Send>>,
46}
47
48impl SessionSseStream {
49    pub fn new<S>(stream: S) -> Self
50    where
51        S: Stream<Item = std::result::Result<Bytes, Infallible>> + Send + 'static,
52    {
53        Self {
54            stream: Box::pin(stream),
55        }
56    }
57}
58
59impl Drop for SessionSseStream {
60    fn drop(&mut self) {
61        debug!("🔥 DROP: SessionSseStream - HTTP response body being cleaned up");
62        debug!("🔥 This may indicate early cleanup of SSE response stream");
63    }
64}
65
66impl Body for SessionSseStream {
67    type Data = Bytes;
68    type Error = Infallible;
69
70    fn poll_frame(
71        mut self: Pin<&mut Self>,
72        cx: &mut Context<'_>,
73    ) -> Poll<Option<std::result::Result<Frame<Self::Data>, Self::Error>>> {
74        match self.stream.as_mut().poll_next(cx) {
75            Poll::Ready(Some(Ok(data))) => {
76                Poll::Ready(Some(Ok(Frame::data(data))))
77            }
78            Poll::Ready(Some(Err(never))) => match never {},
79            Poll::Ready(None) => Poll::Ready(None),
80            Poll::Pending => Poll::Pending,
81        }
82    }
83}
84
85/// HTTP body type for JSON-RPC responses
86type JsonRpcBody = Full<Bytes>;
87
88/// HTTP body type for unified MCP responses (can handle both JSON-RPC and streaming)
89type UnifiedMcpBody = http_body_util::combinators::UnsyncBoxBody<Bytes, hyper::Error>;
90
91/// Accept header compliance mode for MCP Streamable HTTP
92#[derive(Debug, Clone, PartialEq)]
93enum AcceptMode {
94    /// Client sends both application/json and text/event-stream (MCP spec compliant)
95    Compliant,
96    /// Client sends only application/json (compatibility mode for non-compliant clients)
97    JsonOnly,
98    /// Client sends only text/event-stream (SSE only)
99    SseOnly,
100    /// Client sends neither or something else entirely
101    Invalid,
102}
103
104/// Parse MCP Accept header and determine compliance mode
105fn parse_mcp_accept_header(accept_header: &str) -> (AcceptMode, bool) {
106    let accepts_json = accept_header.contains("application/json") || accept_header.contains("*/*");
107    let accepts_sse = accept_header.contains("text/event-stream");
108    
109    let mode = match (accepts_json, accepts_sse) {
110        (true, true) => AcceptMode::Compliant,
111        (true, false) => AcceptMode::JsonOnly, // MCP Inspector case
112        (false, true) => AcceptMode::SseOnly,
113        (false, false) => AcceptMode::Invalid,
114    };
115    
116    // For SSE decision, we need both compliance and actual SSE support
117    // In JsonOnly mode, we never use SSE even if server would prefer it
118    let should_use_sse = match mode {
119        AcceptMode::Compliant => true, // Server can choose
120        AcceptMode::JsonOnly => false, // Force JSON for compatibility
121        AcceptMode::SseOnly => true,   // Force SSE
122        AcceptMode::Invalid => false,  // Fallback to JSON
123    };
124    
125    (mode, should_use_sse)
126}
127
128/// Helper function to convert Full<Bytes> to UnsyncBoxBody<Bytes, hyper::Error>
129fn convert_to_unified_body(full_body: Full<Bytes>) -> UnifiedMcpBody {
130    full_body.map_err(|never| match never {}).boxed_unsync()
131}
132
133/// Helper function to create JSON-RPC error response as unified body
134fn jsonrpc_error_to_unified_body(error: JsonRpcError) -> Result<Response<UnifiedMcpBody>> {
135    let error_json = serde_json::to_string(&error)?;
136    Ok(Response::builder()
137        .status(StatusCode::OK) // JSON-RPC errors still use 200 OK
138        .header(CONTENT_TYPE, "application/json")
139        .body(convert_to_unified_body(Full::new(Bytes::from(error_json))))
140        .unwrap())
141}
142
143// ✅ CORRECTED ARCHITECTURE: Remove complex registry - use single shared StreamManager
144
145/// JSON-RPC 2.0 over HTTP handler with shared StreamManager
146pub struct SessionMcpHandler {
147    pub(crate) config: ServerConfig,
148    pub(crate) dispatcher: Arc<JsonRpcDispatcher>,
149    pub(crate) session_storage: Arc<turul_mcp_session_storage::BoxedSessionStorage>,
150    pub(crate) stream_config: StreamConfig,
151    // ✅ CORRECTED ARCHITECTURE: Single shared StreamManager instance with internal session management
152    pub(crate) stream_manager: Arc<StreamManager>,
153}
154
155impl Clone for SessionMcpHandler {
156    fn clone(&self) -> Self {
157        Self {
158            config: self.config.clone(),
159            dispatcher: Arc::clone(&self.dispatcher),
160            session_storage: Arc::clone(&self.session_storage),
161            stream_config: self.stream_config.clone(),
162            stream_manager: Arc::clone(&self.stream_manager),
163        }
164    }
165}
166
167impl SessionMcpHandler {
168    /// Create a new handler with default in-memory storage (zero-configuration)
169    pub fn new(
170        config: ServerConfig,
171        dispatcher: Arc<JsonRpcDispatcher>,
172        stream_config: StreamConfig,
173    ) -> Self {
174        let storage: Arc<turul_mcp_session_storage::BoxedSessionStorage> = Arc::new(InMemorySessionStorage::new());
175        Self::with_storage(config, dispatcher, storage, stream_config)
176    }
177
178    /// Create handler with shared StreamManager instance (corrected architecture)
179    pub fn with_shared_stream_manager(
180        config: ServerConfig,
181        dispatcher: Arc<JsonRpcDispatcher>,
182        session_storage: Arc<turul_mcp_session_storage::BoxedSessionStorage>,
183        stream_config: StreamConfig,
184        stream_manager: Arc<StreamManager>,
185    ) -> Self {
186        Self {
187            config,
188            dispatcher,
189            session_storage,
190            stream_config,
191            stream_manager,
192        }
193    }
194
195    /// Create handler with specific session storage backend (creates own StreamManager)
196    /// Note: Use with_shared_stream_manager for correct architecture
197    pub fn with_storage(
198        config: ServerConfig,
199        dispatcher: Arc<JsonRpcDispatcher>,
200        session_storage: Arc<turul_mcp_session_storage::BoxedSessionStorage>,
201        stream_config: StreamConfig,
202    ) -> Self {
203        // Create own StreamManager instance (not recommended for production)
204        let stream_manager = Arc::new(StreamManager::with_config(
205            Arc::clone(&session_storage),
206            stream_config.clone()
207        ));
208
209        Self {
210            config,
211            dispatcher,
212            session_storage,
213            stream_config,
214            stream_manager,
215        }
216    }
217
218    /// Get access to the StreamManager for notifications
219    pub fn get_stream_manager(&self) -> &Arc<StreamManager> {
220        &self.stream_manager
221    }
222
223
224    /// Handle MCP HTTP requests with full MCP 2025-06-18 compliance
225    pub async fn handle_mcp_request(
226        &self,
227        req: Request<hyper::body::Incoming>,
228    ) -> Result<Response<UnifiedMcpBody>> {
229        match req.method() {
230            &Method::POST => {
231                let response = self.handle_json_rpc_request(req).await?;
232                Ok(response)
233            },
234            &Method::GET => self.handle_sse_request(req).await,
235            &Method::DELETE => {
236                let response = self.handle_delete_request(req).await?;
237                Ok(response.map(convert_to_unified_body))
238            },
239            &Method::OPTIONS => {
240                let response = self.handle_preflight();
241                Ok(response.map(convert_to_unified_body))
242            },
243            _ => {
244                let response = self.method_not_allowed();
245                Ok(response.map(convert_to_unified_body))
246            }
247        }
248    }
249
250    /// Handle JSON-RPC requests over HTTP POST
251    async fn handle_json_rpc_request(
252        &self,
253        req: Request<hyper::body::Incoming>,
254    ) -> Result<Response<UnifiedMcpBody>> {
255        // Extract protocol version and session ID from headers
256        let protocol_version = extract_protocol_version(req.headers());
257        let session_id = extract_session_id(req.headers());
258
259        debug!("POST request - Protocol: {}, Session: {:?}", protocol_version, session_id);
260
261        // Check content type
262        let content_type = req.headers()
263            .get(CONTENT_TYPE)
264            .and_then(|ct| ct.to_str().ok())
265            .unwrap_or("");
266
267        if !content_type.starts_with("application/json") {
268            warn!("Invalid content type: {}", content_type);
269            return Ok(bad_request_response("Content-Type must be application/json").map(convert_to_unified_body));
270        }
271
272        // Parse Accept header for MCP Streamable HTTP compliance
273        let accept_header = req.headers()
274            .get(ACCEPT)
275            .and_then(|accept| accept.to_str().ok())
276            .unwrap_or("application/json");
277
278        let (accept_mode, accepts_sse) = parse_mcp_accept_header(accept_header);
279        debug!("POST request Accept header: '{}', mode: {:?}, will use SSE for tool calls: {}", 
280               accept_header, accept_mode, accepts_sse);
281
282        // Read request body
283        let body = req.into_body();
284        let body_bytes = match body.collect().await {
285            Ok(collected) => collected.to_bytes(),
286            Err(err) => {
287                error!("Failed to read request body: {}", err);
288                return Ok(bad_request_response("Failed to read request body").map(convert_to_unified_body));
289            }
290        };
291
292        // Check body size
293        if body_bytes.len() > self.config.max_body_size {
294            warn!("Request body too large: {} bytes", body_bytes.len());
295            return Ok(Response::builder()
296                .status(StatusCode::PAYLOAD_TOO_LARGE)
297                .header(CONTENT_TYPE, "application/json")
298                .body(convert_to_unified_body(Full::new(Bytes::from("Request body too large"))))
299                .unwrap());
300        }
301
302        // Parse as UTF-8
303        let body_str = match std::str::from_utf8(&body_bytes) {
304            Ok(s) => s,
305            Err(err) => {
306                error!("Invalid UTF-8 in request body: {}", err);
307                return Ok(bad_request_response("Request body must be valid UTF-8").map(convert_to_unified_body));
308            }
309        };
310
311        debug!("Received JSON-RPC request: {}", body_str);
312
313        // Parse JSON-RPC message
314        let message = match parse_json_rpc_message(body_str) {
315            Ok(msg) => msg,
316            Err(rpc_err) => {
317                error!("JSON-RPC parse error: {}", rpc_err);
318                // Extract request ID from the error if available
319                let error_response = serde_json::to_string(&rpc_err)
320                    .unwrap_or_else(|_| "{}".to_string());
321                return Ok(Response::builder()
322                    .status(StatusCode::OK) // JSON-RPC parse errors still use 200 OK
323                    .header(CONTENT_TYPE, "application/json")
324                    .body(convert_to_unified_body(Full::new(Bytes::from(error_response))))
325                    .unwrap());
326            }
327        };
328
329        // Handle the message using proper JSON-RPC enums
330        let (message_result, response_session_id, method_name) = match message {
331            JsonRpcMessage::Request(request) => {
332                debug!("Processing JSON-RPC request: method={}", request.method);
333                let method_name = request.method.clone();
334
335                // Special handling for initialize requests - they create new sessions
336                let (response, response_session_id) = if request.method == "initialize" {
337                    debug!("Handling initialize request - creating new session via session storage");
338
339                    // Let session storage create the session and generate the ID (GPS pattern)
340                    let capabilities = ServerCapabilities::default();
341                    match self.session_storage.create_session(capabilities).await {
342                        Ok(session_info) => {
343                            debug!("Created new session via session storage: {}", session_info.session_id);
344
345                            // ✅ CORRECTED ARCHITECTURE: Create session-specific notification broadcaster from shared StreamManager
346                            let broadcaster: SharedNotificationBroadcaster = Arc::new(StreamManagerNotificationBroadcaster::new(
347                                Arc::clone(&self.stream_manager)
348                            ));
349                            let broadcaster_any = Arc::new(broadcaster) as Arc<dyn std::any::Any + Send + Sync>;
350
351                            let session_context = SessionContext {
352                                session_id: session_info.session_id.clone(),
353                                metadata: std::collections::HashMap::new(),
354                                broadcaster: Some(broadcaster_any),
355                                timestamp: chrono::Utc::now().timestamp_millis() as u64,
356                            };
357
358                            let response = self.dispatcher.handle_request_with_context(request, session_context).await;
359
360                            // Return the session ID created by session storage for the HTTP header
361                            (response, Some(session_info.session_id))
362                        }
363                        Err(err) => {
364                            error!("Failed to create session during initialize: {}", err);
365                            // Return error response - this will be converted to proper error response by dispatcher
366                            let error_msg = format!("Session creation failed: {}", err);
367                            let error_response = turul_mcp_json_rpc_server::JsonRpcResponse::success(
368                                request.id,
369                                serde_json::json!({"error": error_msg})
370                            );
371                            (error_response, None)
372                        }
373                    }
374                } else {
375                    // ✅ CORRECTED ARCHITECTURE: Use shared StreamManager for notification broadcaster
376                    let session_id_str = session_id.clone().unwrap_or("unknown".to_string());
377                    let broadcaster: SharedNotificationBroadcaster = Arc::new(StreamManagerNotificationBroadcaster::new(
378                        Arc::clone(&self.stream_manager)
379                    ));
380                    let broadcaster_any = Arc::new(broadcaster) as Arc<dyn std::any::Any + Send + Sync>;
381
382                    let session_context = SessionContext {
383                        session_id: session_id_str,
384                        metadata: std::collections::HashMap::new(),
385                        broadcaster: Some(broadcaster_any),
386                        timestamp: chrono::Utc::now().timestamp_millis() as u64,
387                    };
388
389                    let response = self.dispatcher.handle_request_with_context(request, session_context).await;
390                    (response, session_id)
391                };
392
393                (JsonRpcMessageResult::Response(response), response_session_id, Some(method_name))
394            }
395            JsonRpcMessage::Notification(notification) => {
396                debug!("Processing JSON-RPC notification: method={}", notification.method);
397                let method_name = notification.method.clone();
398
399                // ✅ CORRECTED ARCHITECTURE: Create session context with shared StreamManager broadcaster
400                let session_context = if let Some(ref session_id) = session_id {
401                    let broadcaster: SharedNotificationBroadcaster = Arc::new(StreamManagerNotificationBroadcaster::new(
402                        Arc::clone(&self.stream_manager)
403                    ));
404                    let broadcaster_any = Arc::new(broadcaster) as Arc<dyn std::any::Any + Send + Sync>;
405
406                    Some(SessionContext {
407                        session_id: session_id.clone(),
408                        metadata: std::collections::HashMap::new(),
409                        broadcaster: Some(broadcaster_any),
410                        timestamp: chrono::Utc::now().timestamp_millis() as u64,
411                    })
412                } else {
413                    None
414                };
415
416                if let Err(err) = self.dispatcher.handle_notification_with_context(notification, session_context).await {
417                    error!("Notification handling error: {}", err);
418                }
419                (JsonRpcMessageResult::NoResponse, session_id.clone(), Some(method_name))
420            }
421        };
422
423        // Convert message result to HTTP response
424        match message_result {
425            JsonRpcMessageResult::Response(response) => {
426                // Check if this is a tool call that should return SSE
427                // Only use SSE if explicitly requested via Accept: text/event-stream header
428                let is_tool_call = method_name.as_ref().map_or(false, |m| m == "tools/call");
429
430                debug!("Decision point: method={:?}, accept_mode={:?}, accepts_sse={}, server_post_sse_enabled={}, session_id={:?}, is_tool_call={}",
431                       method_name, accept_mode, accepts_sse, self.config.enable_post_sse, response_session_id, is_tool_call);
432
433                // MCP Streamable HTTP decision logic based on Accept header compliance AND server configuration
434                let should_use_sse = match accept_mode {
435                    AcceptMode::JsonOnly => false,    // Force JSON for compatibility (MCP Inspector)
436                    AcceptMode::Invalid => false,     // Fallback to JSON for invalid headers
437                    AcceptMode::Compliant => self.config.enable_post_sse && accepts_sse && is_tool_call, // Server chooses for compliant clients
438                    AcceptMode::SseOnly => self.config.enable_post_sse && accepts_sse,  // Force SSE if server allows and client accepts
439                };
440
441                if should_use_sse && response_session_id.is_some() {
442                    debug!("📡 Creating POST SSE stream (mode: {:?}) for tool call with notifications", accept_mode);
443                    match self.stream_manager.create_post_sse_stream(
444                        response_session_id.clone().unwrap(),
445                        response.clone(), // Clone the response for SSE stream creation
446                    ).await {
447                        Ok(sse_response) => {
448                            debug!("✅ POST SSE stream created successfully");
449                            Ok(sse_response.map(|body| body.map_err(|never| match never {}).boxed_unsync()))
450                        },
451                        Err(e) => {
452                            warn!("Failed to create POST SSE stream, falling back to JSON: {}", e);
453                            Ok(jsonrpc_response_with_session(response, response_session_id)?.map(convert_to_unified_body))
454                        }
455                    }
456                } else {
457                    debug!("📄 Returning standard JSON response (mode: {:?}) for method: {:?}", accept_mode, method_name);
458                    Ok(jsonrpc_response_with_session(response, response_session_id)?.map(convert_to_unified_body))
459                }
460            }
461            JsonRpcMessageResult::Error(error) => {
462                warn!("Sending JSON-RPC error response");
463                // Convert JsonRpcError to proper HTTP response
464                let error_json = serde_json::to_string(&error)?;
465                Ok(Response::builder()
466                    .status(StatusCode::OK) // JSON-RPC errors still return 200 OK
467                    .header(CONTENT_TYPE, "application/json")
468                    .body(convert_to_unified_body(Full::new(Bytes::from(error_json))))
469                    .unwrap())
470            }
471            JsonRpcMessageResult::NoResponse => {
472                // Notifications don't return responses (204 No Content)
473                Ok(jsonrpc_notification_response()?.map(convert_to_unified_body))
474            }
475        }
476    }
477
478    // Note: create_post_sse_response method removed as it's unused in MCP Inspector compatibility mode
479    // SSE for tool calls is temporarily disabled - see WORKING_MEMORY.md for details
480
481    /// Handle Server-Sent Events requests (SSE for streaming)
482    async fn handle_sse_request(
483        &self,
484        req: Request<hyper::body::Incoming>,
485    ) -> Result<Response<UnifiedMcpBody>> {
486        // Check if client accepts SSE
487        let headers = req.headers();
488        let accept = headers
489            .get(ACCEPT)
490            .and_then(|accept| accept.to_str().ok())
491            .unwrap_or("");
492
493        if !accept.contains("text/event-stream") {
494            warn!("GET request received without SSE support - header does not contain 'text/event-stream'");
495            let error = JsonRpcError::new(
496                None,
497                JsonRpcErrorObject::server_error(
498                    -32001,
499                    "SSE not accepted - missing 'text/event-stream' in Accept header",
500                    None
501                )
502            );
503            return jsonrpc_error_to_unified_body(error);
504        }
505
506        // Check if GET SSE is enabled on the server
507        if !self.config.enable_get_sse {
508            warn!("GET SSE request received but GET SSE is disabled on server");
509            let error = JsonRpcError::new(
510                None,
511                JsonRpcErrorObject::server_error(
512                    -32003,
513                    "GET SSE is disabled on this server",
514                    None
515                )
516            );
517            return jsonrpc_error_to_unified_body(error);
518        }
519
520        // Extract protocol version and session ID
521        let protocol_version = extract_protocol_version(headers);
522        let session_id = extract_session_id(headers);
523
524        debug!("GET SSE request - Protocol: {}, Session: {:?}", protocol_version, session_id);
525
526        // Session ID is required for SSE
527        let session_id = match session_id {
528            Some(id) => id,
529            None => {
530                warn!("Missing Mcp-Session-Id header for SSE request");
531                let error = JsonRpcError::new(
532                    None,
533                    JsonRpcErrorObject::server_error(
534                        -32002,
535                        "Missing Mcp-Session-Id header",
536                        None
537                    )
538                );
539                return jsonrpc_error_to_unified_body(error);
540            }
541        };
542
543        // Validate session exists (do NOT create if missing)
544        if let Err(err) = self.validate_session_exists(&session_id).await {
545            error!("Session validation failed for Session ID {}: {}", session_id, err);
546            let error = JsonRpcError::new(
547                None,
548                JsonRpcErrorObject::server_error(
549                    -32003,
550                    &format!("Session validation failed: {}", err),
551                    None
552                )
553            );
554            return jsonrpc_error_to_unified_body(error);
555        }
556
557        // Extract Last-Event-ID for resumability
558        let last_event_id = extract_last_event_id(headers);
559
560        // Generate unique connection ID for MCP spec compliance
561        let connection_id = Uuid::now_v7().to_string();
562
563        debug!("Creating SSE stream for session: {} with connection: {}, last_event_id: {:?}",
564               session_id, connection_id, last_event_id);
565
566        // ✅ CORRECTED ARCHITECTURE: Use shared StreamManager directly (no registry needed)
567        match self.stream_manager.handle_sse_connection(
568            session_id,
569            connection_id,
570            last_event_id,
571        ).await {
572            Ok(response) => Ok(response),
573            Err(err) => {
574                error!("Failed to create SSE connection: {}", err);
575                let error = JsonRpcError::new(
576                    None,
577                    JsonRpcErrorObject::internal_error(
578                        Some(format!("SSE connection failed: {}", err))
579                    )
580                );
581                jsonrpc_error_to_unified_body(error)
582            }
583        }
584    }
585
586    /// Handle DELETE requests for session cleanup
587    async fn handle_delete_request(
588        &self,
589        req: Request<hyper::body::Incoming>,
590    ) -> Result<Response<JsonRpcBody>> {
591        let session_id = extract_session_id(req.headers());
592
593        debug!("DELETE request - Session: {:?}", session_id);
594
595        if let Some(session_id) = session_id {
596            match self.session_storage.delete_session(&session_id).await {
597                Ok(true) => {
598                    debug!("Session {} removed via DELETE", session_id);
599                    Ok(Response::builder()
600                        .status(StatusCode::OK)
601                        .body(Full::new(Bytes::from("Session removed")))
602                        .unwrap())
603                }
604                Ok(false) => {
605                    Ok(Response::builder()
606                        .status(StatusCode::NOT_FOUND)
607                        .body(Full::new(Bytes::from("Session not found")))
608                        .unwrap())
609                }
610                Err(err) => {
611                    error!("Error deleting session {}: {}", session_id, err);
612                    Ok(Response::builder()
613                        .status(StatusCode::INTERNAL_SERVER_ERROR)
614                        .body(Full::new(Bytes::from("Session deletion error")))
615                        .unwrap())
616                }
617            }
618        } else {
619            Ok(Response::builder()
620                .status(StatusCode::BAD_REQUEST)
621                .body(Full::new(Bytes::from("Missing Mcp-Session-Id header")))
622                .unwrap())
623        }
624    }
625
626    /// Handle OPTIONS preflight requests - these are essential for CORS
627    fn handle_preflight(&self) -> Response<JsonRpcBody> {
628        options_response()
629    }
630
631    /// Return method not allowed response
632    fn method_not_allowed(&self) -> Response<JsonRpcBody> {
633        method_not_allowed_response()
634    }
635
636    /// Validate that a session exists - do NOT create if missing
637    async fn validate_session_exists(&self, session_id: &str) -> Result<()> {
638        // Check if session already exists
639        match self.session_storage.get_session(session_id).await {
640            Ok(Some(_)) => {
641                debug!("Session validation successful: {}", session_id);
642                Ok(())
643            }
644            Ok(None) => {
645                error!("Session not found: {}", session_id);
646                Err(crate::HttpMcpError::InvalidRequest(
647                    format!("Session '{}' not found. Sessions must be created via initialize request first.", session_id)
648                ))
649            }
650            Err(err) => {
651                error!("Failed to validate session {}: {}", session_id, err);
652                Err(crate::HttpMcpError::InvalidRequest(format!("Session validation failed: {}", err)))
653            }
654        }
655    }
656}