turul_http_mcp_server/
session_handler.rs

1//! JSON-RPC 2.0 over HTTP handler for MCP requests with SessionStorage integration
2//!
3//! This handler implements proper JSON-RPC 2.0 server over HTTP transport with
4//! MCP 2025-06-18 compliance, including:
5//! - SessionStorage trait integration (defaults to InMemory)
6//! - StreamManager for SSE with resumability
7//! - 202 Accepted for notifications
8//! - Last-Event-ID header support
9//! - Per-session event targeting
10
11use std::sync::Arc;
12use std::convert::Infallible;
13use std::pin::Pin;
14use std::task::{Context, Poll};
15use tracing::{debug, warn, error};
16
17use hyper::{Request, Response, Method, StatusCode};
18use bytes::Bytes;
19use hyper::header::{CONTENT_TYPE, ACCEPT};
20use http_body_util::{BodyExt, Full};
21use http_body::{Body, Frame};
22use futures::Stream;
23
24use turul_mcp_json_rpc_server::{
25    JsonRpcDispatcher,
26    r#async::SessionContext,
27    dispatch::{parse_json_rpc_message, JsonRpcMessage, JsonRpcMessageResult},
28    error::{JsonRpcError, JsonRpcErrorObject}
29};
30use turul_mcp_session_storage::InMemorySessionStorage;
31use turul_mcp_protocol::ServerCapabilities;
32use chrono;
33use uuid::Uuid;
34
35use crate::{
36    Result, ServerConfig, StreamConfig,
37    protocol::{extract_protocol_version, extract_session_id, extract_last_event_id},
38    json_rpc_responses::*,
39    StreamManager,
40    notification_bridge::{StreamManagerNotificationBroadcaster, SharedNotificationBroadcaster}
41};
42
43/// SSE stream body that implements hyper's Body trait
44pub struct SessionSseStream {
45    stream: Pin<Box<dyn Stream<Item = std::result::Result<Bytes, Infallible>> + Send>>,
46}
47
48impl SessionSseStream {
49    pub fn new<S>(stream: S) -> Self
50    where
51        S: Stream<Item = std::result::Result<Bytes, Infallible>> + Send + 'static,
52    {
53        Self {
54            stream: Box::pin(stream),
55        }
56    }
57}
58
59impl Drop for SessionSseStream {
60    fn drop(&mut self) {
61        debug!("🔥 DROP: SessionSseStream - HTTP response body being cleaned up");
62        debug!("🔥 This may indicate early cleanup of SSE response stream");
63    }
64}
65
66impl Body for SessionSseStream {
67    type Data = Bytes;
68    type Error = Infallible;
69
70    fn poll_frame(
71        mut self: Pin<&mut Self>,
72        cx: &mut Context<'_>,
73    ) -> Poll<Option<std::result::Result<Frame<Self::Data>, Self::Error>>> {
74        match self.stream.as_mut().poll_next(cx) {
75            Poll::Ready(Some(Ok(data))) => {
76                Poll::Ready(Some(Ok(Frame::data(data))))
77            }
78            Poll::Ready(Some(Err(never))) => match never {},
79            Poll::Ready(None) => Poll::Ready(None),
80            Poll::Pending => Poll::Pending,
81        }
82    }
83}
84
85/// HTTP body type for JSON-RPC responses
86type JsonRpcBody = Full<Bytes>;
87
88/// HTTP body type for unified MCP responses (can handle both JSON-RPC and streaming)
89type UnifiedMcpBody = http_body_util::combinators::UnsyncBoxBody<Bytes, hyper::Error>;
90
91/// Helper function to convert Full<Bytes> to UnsyncBoxBody<Bytes, hyper::Error>
92fn convert_to_unified_body(full_body: Full<Bytes>) -> UnifiedMcpBody {
93    full_body.map_err(|never| match never {}).boxed_unsync()
94}
95
96/// Helper function to create JSON-RPC error response as unified body
97fn jsonrpc_error_to_unified_body(error: JsonRpcError) -> Result<Response<UnifiedMcpBody>> {
98    let error_json = serde_json::to_string(&error)?;
99    Ok(Response::builder()
100        .status(StatusCode::OK) // JSON-RPC errors still use 200 OK
101        .header(CONTENT_TYPE, "application/json")
102        .body(convert_to_unified_body(Full::new(Bytes::from(error_json))))
103        .unwrap())
104}
105
106// ✅ CORRECTED ARCHITECTURE: Remove complex registry - use single shared StreamManager
107
108/// JSON-RPC 2.0 over HTTP handler with shared StreamManager
109pub struct SessionMcpHandler {
110    pub(crate) config: ServerConfig,
111    pub(crate) dispatcher: Arc<JsonRpcDispatcher>,
112    pub(crate) session_storage: Arc<turul_mcp_session_storage::BoxedSessionStorage>,
113    pub(crate) stream_config: StreamConfig,
114    // ✅ CORRECTED ARCHITECTURE: Single shared StreamManager instance with internal session management
115    pub(crate) stream_manager: Arc<StreamManager>,
116}
117
118impl Clone for SessionMcpHandler {
119    fn clone(&self) -> Self {
120        Self {
121            config: self.config.clone(),
122            dispatcher: Arc::clone(&self.dispatcher),
123            session_storage: Arc::clone(&self.session_storage),
124            stream_config: self.stream_config.clone(),
125            stream_manager: Arc::clone(&self.stream_manager),
126        }
127    }
128}
129
130impl SessionMcpHandler {
131    /// Create a new handler with default in-memory storage (zero-configuration)
132    pub fn new(
133        config: ServerConfig,
134        dispatcher: Arc<JsonRpcDispatcher>,
135        stream_config: StreamConfig,
136    ) -> Self {
137        let storage: Arc<turul_mcp_session_storage::BoxedSessionStorage> = Arc::new(InMemorySessionStorage::new());
138        Self::with_storage(config, dispatcher, storage, stream_config)
139    }
140
141    /// Create handler with shared StreamManager instance (corrected architecture)
142    pub fn with_shared_stream_manager(
143        config: ServerConfig,
144        dispatcher: Arc<JsonRpcDispatcher>,
145        session_storage: Arc<turul_mcp_session_storage::BoxedSessionStorage>,
146        stream_config: StreamConfig,
147        stream_manager: Arc<StreamManager>,
148    ) -> Self {
149        Self {
150            config,
151            dispatcher,
152            session_storage,
153            stream_config,
154            stream_manager,
155        }
156    }
157
158    /// Create handler with specific session storage backend (creates own StreamManager)
159    /// Note: Use with_shared_stream_manager for correct architecture
160    pub fn with_storage(
161        config: ServerConfig,
162        dispatcher: Arc<JsonRpcDispatcher>,
163        session_storage: Arc<turul_mcp_session_storage::BoxedSessionStorage>,
164        stream_config: StreamConfig,
165    ) -> Self {
166        // Create own StreamManager instance (not recommended for production)
167        let stream_manager = Arc::new(StreamManager::with_config(
168            Arc::clone(&session_storage),
169            stream_config.clone()
170        ));
171
172        Self {
173            config,
174            dispatcher,
175            session_storage,
176            stream_config,
177            stream_manager,
178        }
179    }
180
181    /// Get access to the StreamManager for notifications
182    pub fn get_stream_manager(&self) -> &Arc<StreamManager> {
183        &self.stream_manager
184    }
185
186
187    /// Handle MCP HTTP requests with full MCP 2025-06-18 compliance
188    pub async fn handle_mcp_request(
189        &self,
190        req: Request<hyper::body::Incoming>,
191    ) -> Result<Response<UnifiedMcpBody>> {
192        match req.method() {
193            &Method::POST => {
194                let response = self.handle_json_rpc_request(req).await?;
195                Ok(response.map(convert_to_unified_body))
196            },
197            &Method::GET => self.handle_sse_request(req).await,
198            &Method::DELETE => {
199                let response = self.handle_delete_request(req).await?;
200                Ok(response.map(convert_to_unified_body))
201            },
202            &Method::OPTIONS => {
203                let response = self.handle_preflight();
204                Ok(response.map(convert_to_unified_body))
205            },
206            _ => {
207                let response = self.method_not_allowed();
208                Ok(response.map(convert_to_unified_body))
209            }
210        }
211    }
212
213    /// Handle JSON-RPC requests over HTTP POST
214    async fn handle_json_rpc_request(
215        &self,
216        req: Request<hyper::body::Incoming>,
217    ) -> Result<Response<Full<Bytes>>> {
218        // Extract protocol version and session ID from headers
219        let protocol_version = extract_protocol_version(req.headers());
220        let session_id = extract_session_id(req.headers());
221
222        debug!("POST request - Protocol: {}, Session: {:?}", protocol_version, session_id);
223
224        // Check content type
225        let content_type = req.headers()
226            .get(CONTENT_TYPE)
227            .and_then(|ct| ct.to_str().ok())
228            .unwrap_or("");
229
230        if !content_type.starts_with("application/json") {
231            warn!("Invalid content type: {}", content_type);
232            return Ok(bad_request_response("Content-Type must be application/json"));
233        }
234
235        // Check if client accepts SSE for streaming responses (MCP Streamable HTTP)
236        let accept_header = req.headers()
237            .get(ACCEPT)
238            .and_then(|accept| accept.to_str().ok())
239            .unwrap_or("application/json");
240
241        let accepts_sse = accept_header.contains("text/event-stream");
242        debug!("POST request Accept header: '{}', will use SSE for tool calls: {}", accept_header, accepts_sse);
243
244        // Read request body
245        let body = req.into_body();
246        let body_bytes = match body.collect().await {
247            Ok(collected) => collected.to_bytes(),
248            Err(err) => {
249                error!("Failed to read request body: {}", err);
250                return Ok(bad_request_response("Failed to read request body"));
251            }
252        };
253
254        // Check body size
255        if body_bytes.len() > self.config.max_body_size {
256            warn!("Request body too large: {} bytes", body_bytes.len());
257            return Ok(Response::builder()
258                .status(StatusCode::PAYLOAD_TOO_LARGE)
259                .header(CONTENT_TYPE, "application/json")
260                .body(Full::new(Bytes::from("Request body too large")))
261                .unwrap());
262        }
263
264        // Parse as UTF-8
265        let body_str = match std::str::from_utf8(&body_bytes) {
266            Ok(s) => s,
267            Err(err) => {
268                error!("Invalid UTF-8 in request body: {}", err);
269                return Ok(bad_request_response("Request body must be valid UTF-8"));
270            }
271        };
272
273        debug!("Received JSON-RPC request: {}", body_str);
274
275        // Parse JSON-RPC message
276        let message = match parse_json_rpc_message(body_str) {
277            Ok(msg) => msg,
278            Err(rpc_err) => {
279                error!("JSON-RPC parse error: {}", rpc_err);
280                // Extract request ID from the error if available
281                let error_response = serde_json::to_string(&rpc_err)
282                    .unwrap_or_else(|_| "{}".to_string());
283                return Ok(Response::builder()
284                    .status(StatusCode::OK) // JSON-RPC parse errors still use 200 OK
285                    .header(CONTENT_TYPE, "application/json")
286                    .body(Full::new(Bytes::from(error_response)))
287                    .unwrap());
288            }
289        };
290
291        // Handle the message using proper JSON-RPC enums
292        let (message_result, response_session_id, method_name) = match message {
293            JsonRpcMessage::Request(request) => {
294                debug!("Processing JSON-RPC request: method={}", request.method);
295                let method_name = request.method.clone();
296
297                // Special handling for initialize requests - they create new sessions
298                let (response, response_session_id) = if request.method == "initialize" {
299                    debug!("Handling initialize request - creating new session via session storage");
300
301                    // Let session storage create the session and generate the ID (GPS pattern)
302                    let capabilities = ServerCapabilities::default();
303                    match self.session_storage.create_session(capabilities).await {
304                        Ok(session_info) => {
305                            debug!("Created new session via session storage: {}", session_info.session_id);
306
307                            // ✅ CORRECTED ARCHITECTURE: Create session-specific notification broadcaster from shared StreamManager
308                            let broadcaster: SharedNotificationBroadcaster = Arc::new(StreamManagerNotificationBroadcaster::new(
309                                Arc::clone(&self.stream_manager)
310                            ));
311                            let broadcaster_any = Arc::new(broadcaster) as Arc<dyn std::any::Any + Send + Sync>;
312
313                            let session_context = SessionContext {
314                                session_id: session_info.session_id.clone(),
315                                metadata: std::collections::HashMap::new(),
316                                broadcaster: Some(broadcaster_any),
317                                timestamp: chrono::Utc::now().timestamp_millis() as u64,
318                            };
319
320                            let response = self.dispatcher.handle_request_with_context(request, session_context).await;
321
322                            // Return the session ID created by session storage for the HTTP header
323                            (response, Some(session_info.session_id))
324                        }
325                        Err(err) => {
326                            error!("Failed to create session during initialize: {}", err);
327                            // Return error response - this will be converted to proper error response by dispatcher
328                            let error_msg = format!("Session creation failed: {}", err);
329                            let error_response = turul_mcp_json_rpc_server::JsonRpcResponse::success(
330                                request.id,
331                                serde_json::json!({"error": error_msg})
332                            );
333                            (error_response, None)
334                        }
335                    }
336                } else {
337                    // ✅ CORRECTED ARCHITECTURE: Use shared StreamManager for notification broadcaster
338                    let session_id_str = session_id.clone().unwrap_or("unknown".to_string());
339                    let broadcaster: SharedNotificationBroadcaster = Arc::new(StreamManagerNotificationBroadcaster::new(
340                        Arc::clone(&self.stream_manager)
341                    ));
342                    let broadcaster_any = Arc::new(broadcaster) as Arc<dyn std::any::Any + Send + Sync>;
343
344                    let session_context = SessionContext {
345                        session_id: session_id_str,
346                        metadata: std::collections::HashMap::new(),
347                        broadcaster: Some(broadcaster_any),
348                        timestamp: chrono::Utc::now().timestamp_millis() as u64,
349                    };
350
351                    let response = self.dispatcher.handle_request_with_context(request, session_context).await;
352                    (response, session_id)
353                };
354
355                (JsonRpcMessageResult::Response(response), response_session_id, Some(method_name))
356            }
357            JsonRpcMessage::Notification(notification) => {
358                debug!("Processing JSON-RPC notification: method={}", notification.method);
359                let method_name = notification.method.clone();
360
361                // ✅ CORRECTED ARCHITECTURE: Create session context with shared StreamManager broadcaster
362                let session_context = if let Some(ref session_id) = session_id {
363                    let broadcaster: SharedNotificationBroadcaster = Arc::new(StreamManagerNotificationBroadcaster::new(
364                        Arc::clone(&self.stream_manager)
365                    ));
366                    let broadcaster_any = Arc::new(broadcaster) as Arc<dyn std::any::Any + Send + Sync>;
367
368                    Some(SessionContext {
369                        session_id: session_id.clone(),
370                        metadata: std::collections::HashMap::new(),
371                        broadcaster: Some(broadcaster_any),
372                        timestamp: chrono::Utc::now().timestamp_millis() as u64,
373                    })
374                } else {
375                    None
376                };
377
378                if let Err(err) = self.dispatcher.handle_notification_with_context(notification, session_context).await {
379                    error!("Notification handling error: {}", err);
380                }
381                (JsonRpcMessageResult::NoResponse, session_id.clone(), Some(method_name))
382            }
383        };
384
385        // Convert message result to HTTP response
386        match message_result {
387            JsonRpcMessageResult::Response(response) => {
388                // Check if this is a tool call that should return SSE
389                // Only use SSE if explicitly requested via Accept: text/event-stream header
390                let is_tool_call = method_name.as_ref().map_or(false, |m| m == "tools/call");
391
392                debug!("Decision point: method={:?}, accepts_sse={}, session_id={:?}, is_tool_call={}",
393                       method_name, accepts_sse, response_session_id, is_tool_call);
394
395                // TEMPORARY FIX: Disable MCP Streamable HTTP for tool calls to ensure MCP Inspector compatibility
396                // Always return JSON responses for all operations until SSE implementation is fixed
397                debug!("🔧 COMPATIBILITY MODE: Always returning JSON response for method: {:?} (SSE disabled for tool calls)", method_name);
398                Ok(jsonrpc_response_with_session(response, response_session_id)?)
399            }
400            JsonRpcMessageResult::Error(error) => {
401                warn!("Sending JSON-RPC error response");
402                // Convert JsonRpcError to proper HTTP response
403                let error_json = serde_json::to_string(&error)?;
404                Ok(Response::builder()
405                    .status(StatusCode::OK) // JSON-RPC errors still return 200 OK
406                    .header(CONTENT_TYPE, "application/json")
407                    .body(Full::new(Bytes::from(error_json)))
408                    .unwrap())
409            }
410            JsonRpcMessageResult::NoResponse => {
411                // Notifications don't return responses (204 No Content)
412                Ok(jsonrpc_notification_response()?)
413            }
414        }
415    }
416
417    // Note: create_post_sse_response method removed as it's unused in MCP Inspector compatibility mode
418    // SSE for tool calls is temporarily disabled - see WORKING_MEMORY.md for details
419
420    /// Handle Server-Sent Events requests (SSE for streaming)
421    async fn handle_sse_request(
422        &self,
423        req: Request<hyper::body::Incoming>,
424    ) -> Result<Response<UnifiedMcpBody>> {
425        // Check if client accepts SSE
426        let headers = req.headers();
427        let accept = headers
428            .get(ACCEPT)
429            .and_then(|accept| accept.to_str().ok())
430            .unwrap_or("");
431
432        if !accept.contains("text/event-stream") {
433            warn!("GET request received without SSE support - header does not contain 'text/event-stream'");
434            let error = JsonRpcError::new(
435                None,
436                JsonRpcErrorObject::server_error(
437                    -32001,
438                    "SSE not accepted - missing 'text/event-stream' in Accept header",
439                    None
440                )
441            );
442            return jsonrpc_error_to_unified_body(error);
443        }
444
445        // Extract protocol version and session ID
446        let protocol_version = extract_protocol_version(headers);
447        let session_id = extract_session_id(headers);
448
449        debug!("GET SSE request - Protocol: {}, Session: {:?}", protocol_version, session_id);
450
451        // Session ID is required for SSE
452        let session_id = match session_id {
453            Some(id) => id,
454            None => {
455                warn!("Missing Mcp-Session-Id header for SSE request");
456                let error = JsonRpcError::new(
457                    None,
458                    JsonRpcErrorObject::server_error(
459                        -32002,
460                        "Missing Mcp-Session-Id header",
461                        None
462                    )
463                );
464                return jsonrpc_error_to_unified_body(error);
465            }
466        };
467
468        // Validate session exists (do NOT create if missing)
469        if let Err(err) = self.validate_session_exists(&session_id).await {
470            error!("Session validation failed for Session ID {}: {}", session_id, err);
471            let error = JsonRpcError::new(
472                None,
473                JsonRpcErrorObject::server_error(
474                    -32003,
475                    &format!("Session validation failed: {}", err),
476                    None
477                )
478            );
479            return jsonrpc_error_to_unified_body(error);
480        }
481
482        // Extract Last-Event-ID for resumability
483        let last_event_id = extract_last_event_id(headers);
484
485        // Generate unique connection ID for MCP spec compliance
486        let connection_id = Uuid::now_v7().to_string();
487
488        debug!("Creating SSE stream for session: {} with connection: {}, last_event_id: {:?}",
489               session_id, connection_id, last_event_id);
490
491        // ✅ CORRECTED ARCHITECTURE: Use shared StreamManager directly (no registry needed)
492        match self.stream_manager.handle_sse_connection(
493            session_id,
494            connection_id,
495            last_event_id,
496        ).await {
497            Ok(response) => Ok(response),
498            Err(err) => {
499                error!("Failed to create SSE connection: {}", err);
500                let error = JsonRpcError::new(
501                    None,
502                    JsonRpcErrorObject::internal_error(
503                        Some(format!("SSE connection failed: {}", err))
504                    )
505                );
506                jsonrpc_error_to_unified_body(error)
507            }
508        }
509    }
510
511    /// Handle DELETE requests for session cleanup
512    async fn handle_delete_request(
513        &self,
514        req: Request<hyper::body::Incoming>,
515    ) -> Result<Response<JsonRpcBody>> {
516        let session_id = extract_session_id(req.headers());
517
518        debug!("DELETE request - Session: {:?}", session_id);
519
520        if let Some(session_id) = session_id {
521            match self.session_storage.delete_session(&session_id).await {
522                Ok(true) => {
523                    debug!("Session {} removed via DELETE", session_id);
524                    Ok(Response::builder()
525                        .status(StatusCode::OK)
526                        .body(Full::new(Bytes::from("Session removed")))
527                        .unwrap())
528                }
529                Ok(false) => {
530                    Ok(Response::builder()
531                        .status(StatusCode::NOT_FOUND)
532                        .body(Full::new(Bytes::from("Session not found")))
533                        .unwrap())
534                }
535                Err(err) => {
536                    error!("Error deleting session {}: {}", session_id, err);
537                    Ok(Response::builder()
538                        .status(StatusCode::INTERNAL_SERVER_ERROR)
539                        .body(Full::new(Bytes::from("Session deletion error")))
540                        .unwrap())
541                }
542            }
543        } else {
544            Ok(Response::builder()
545                .status(StatusCode::BAD_REQUEST)
546                .body(Full::new(Bytes::from("Missing Mcp-Session-Id header")))
547                .unwrap())
548        }
549    }
550
551    /// Handle OPTIONS preflight requests - these are essential for CORS
552    fn handle_preflight(&self) -> Response<JsonRpcBody> {
553        options_response()
554    }
555
556    /// Return method not allowed response
557    fn method_not_allowed(&self) -> Response<JsonRpcBody> {
558        method_not_allowed_response()
559    }
560
561    /// Validate that a session exists - do NOT create if missing
562    async fn validate_session_exists(&self, session_id: &str) -> Result<()> {
563        // Check if session already exists
564        match self.session_storage.get_session(session_id).await {
565            Ok(Some(_)) => {
566                debug!("Session validation successful: {}", session_id);
567                Ok(())
568            }
569            Ok(None) => {
570                error!("Session not found: {}", session_id);
571                Err(crate::HttpMcpError::InvalidRequest(
572                    format!("Session '{}' not found. Sessions must be created via initialize request first.", session_id)
573                ))
574            }
575            Err(err) => {
576                error!("Failed to validate session {}: {}", session_id, err);
577                Err(crate::HttpMcpError::InvalidRequest(format!("Session validation failed: {}", err)))
578            }
579        }
580    }
581}