1use std::convert::Infallible;
12use std::pin::Pin;
13use std::sync::Arc;
14use std::task::{Context, Poll};
15use tracing::{debug, error, warn};
16
17use bytes::Bytes;
18use futures::Stream;
19use http_body::{Body, Frame};
20use http_body_util::{BodyExt, Full};
21use hyper::header::{ACCEPT, CONTENT_TYPE};
22use hyper::{Method, Request, Response, StatusCode};
23
24use chrono;
25use turul_mcp_json_rpc_server::{
26 JsonRpcDispatcher,
27 r#async::SessionContext,
28 dispatch::{JsonRpcMessage, JsonRpcMessageResult, parse_json_rpc_message},
29 error::{JsonRpcError, JsonRpcErrorObject},
30};
31use turul_mcp_protocol::McpError;
32use turul_mcp_protocol::ServerCapabilities;
33use turul_mcp_session_storage::{InMemorySessionStorage, SessionView};
34use uuid::Uuid;
35
36use crate::{
37 Result, ServerConfig, StreamConfig, StreamManager,
38 json_rpc_responses::*,
39 notification_bridge::{SharedNotificationBroadcaster, StreamManagerNotificationBroadcaster},
40 protocol::{extract_last_event_id, extract_protocol_version, extract_session_id},
41};
42use std::collections::HashMap;
43
44pub struct SessionSseStream {
46 stream: Pin<Box<dyn Stream<Item = std::result::Result<Bytes, Infallible>> + Send>>,
47}
48
49impl SessionSseStream {
50 pub fn new<S>(stream: S) -> Self
51 where
52 S: Stream<Item = std::result::Result<Bytes, Infallible>> + Send + 'static,
53 {
54 Self {
55 stream: Box::pin(stream),
56 }
57 }
58}
59
60impl Drop for SessionSseStream {
61 fn drop(&mut self) {
62 debug!("DROP: SessionSseStream - HTTP response body being cleaned up");
63 debug!("This may indicate early cleanup of SSE response stream");
64 }
65}
66
67impl Body for SessionSseStream {
68 type Data = Bytes;
69 type Error = Infallible;
70
71 fn poll_frame(
72 mut self: Pin<&mut Self>,
73 cx: &mut Context<'_>,
74 ) -> Poll<Option<std::result::Result<Frame<Self::Data>, Self::Error>>> {
75 match self.stream.as_mut().poll_next(cx) {
76 Poll::Ready(Some(Ok(data))) => Poll::Ready(Some(Ok(Frame::data(data)))),
77 Poll::Ready(Some(Err(never))) => match never {},
78 Poll::Ready(None) => Poll::Ready(None),
79 Poll::Pending => Poll::Pending,
80 }
81 }
82}
83
84type JsonRpcBody = Full<Bytes>;
86
87type UnifiedMcpBody = http_body_util::combinators::UnsyncBoxBody<Bytes, hyper::Error>;
89
90#[derive(Debug, Clone, PartialEq)]
92enum AcceptMode {
93 Compliant,
95 JsonOnly,
97 SseOnly,
99 Invalid,
101}
102
103fn parse_mcp_accept_header(accept_header: &str) -> (AcceptMode, bool) {
105 let accepts_json = accept_header.contains("application/json") || accept_header.contains("*/*");
106 let accepts_sse = accept_header.contains("text/event-stream");
107
108 let mode = match (accepts_json, accepts_sse) {
109 (true, true) => AcceptMode::Compliant,
110 (true, false) => AcceptMode::JsonOnly, (false, true) => AcceptMode::SseOnly,
112 (false, false) => AcceptMode::Invalid,
113 };
114
115 let should_use_sse = match mode {
118 AcceptMode::Compliant => true, AcceptMode::JsonOnly => false, AcceptMode::SseOnly => true, AcceptMode::Invalid => false, };
123
124 (mode, should_use_sse)
125}
126
127fn convert_to_unified_body(full_body: Full<Bytes>) -> UnifiedMcpBody {
129 full_body.map_err(|never| match never {}).boxed_unsync()
130}
131
132fn jsonrpc_error_to_unified_body(error: JsonRpcError) -> Result<Response<UnifiedMcpBody>> {
134 let error_json = serde_json::to_string(&error)?;
135 Ok(Response::builder()
136 .status(StatusCode::OK) .header(CONTENT_TYPE, "application/json")
138 .body(convert_to_unified_body(Full::new(Bytes::from(error_json))))
139 .unwrap())
140}
141
142pub struct SessionMcpHandler {
146 pub(crate) config: ServerConfig,
147 pub(crate) dispatcher: Arc<JsonRpcDispatcher<McpError>>,
148 pub(crate) session_storage: Arc<turul_mcp_session_storage::BoxedSessionStorage>,
149 pub(crate) stream_config: StreamConfig,
150 pub(crate) stream_manager: Arc<StreamManager>,
152 pub(crate) middleware_stack: Arc<crate::middleware::MiddlewareStack>,
153}
154
155impl Clone for SessionMcpHandler {
156 fn clone(&self) -> Self {
157 Self {
158 config: self.config.clone(),
159 dispatcher: Arc::clone(&self.dispatcher),
160 session_storage: Arc::clone(&self.session_storage),
161 stream_config: self.stream_config.clone(),
162 stream_manager: Arc::clone(&self.stream_manager),
163 middleware_stack: Arc::clone(&self.middleware_stack),
164 }
165 }
166}
167
168impl SessionMcpHandler {
169 pub fn new(
171 config: ServerConfig,
172 dispatcher: Arc<JsonRpcDispatcher<McpError>>,
173 stream_config: StreamConfig,
174 ) -> Self {
175 let storage: Arc<turul_mcp_session_storage::BoxedSessionStorage> =
176 Arc::new(InMemorySessionStorage::new());
177 let middleware_stack = Arc::new(crate::middleware::MiddlewareStack::new());
178 Self::with_storage(config, dispatcher, storage, stream_config, middleware_stack)
179 }
180
181 pub fn with_shared_stream_manager(
183 config: ServerConfig,
184 dispatcher: Arc<JsonRpcDispatcher<McpError>>,
185 session_storage: Arc<turul_mcp_session_storage::BoxedSessionStorage>,
186 stream_config: StreamConfig,
187 stream_manager: Arc<StreamManager>,
188 middleware_stack: Arc<crate::middleware::MiddlewareStack>,
189 ) -> Self {
190 Self {
191 config,
192 dispatcher,
193 session_storage,
194 stream_config,
195 stream_manager,
196 middleware_stack,
197 }
198 }
199
200 pub fn with_storage(
203 config: ServerConfig,
204 dispatcher: Arc<JsonRpcDispatcher<McpError>>,
205 session_storage: Arc<turul_mcp_session_storage::BoxedSessionStorage>,
206 stream_config: StreamConfig,
207 middleware_stack: Arc<crate::middleware::MiddlewareStack>,
208 ) -> Self {
209 let stream_manager = Arc::new(StreamManager::with_config(
211 Arc::clone(&session_storage),
212 stream_config.clone(),
213 ));
214
215 Self {
216 config,
217 dispatcher,
218 session_storage,
219 stream_config,
220 stream_manager,
221 middleware_stack,
222 }
223 }
224
225 pub fn get_stream_manager(&self) -> &Arc<StreamManager> {
227 &self.stream_manager
228 }
229
230 pub async fn handle_mcp_request<B>(&self, req: Request<B>) -> Result<Response<UnifiedMcpBody>>
232 where
233 B: http_body::Body<Data = bytes::Bytes, Error = hyper::Error> + Send + 'static,
234 {
235 debug!(
236 "SESSION HANDLER processing {} {}",
237 req.method(),
238 req.uri().path()
239 );
240 match *req.method() {
241 Method::POST => {
242 let response = self.handle_json_rpc_request(req).await?;
243 Ok(response)
244 }
245 Method::GET => self.handle_sse_request(req).await,
246 Method::DELETE => {
247 let response = self.handle_delete_request(req).await?;
248 Ok(response.map(convert_to_unified_body))
249 }
250 Method::OPTIONS => {
251 let response = self.handle_preflight();
252 Ok(response.map(convert_to_unified_body))
253 }
254 _ => {
255 let response = self.method_not_allowed();
256 Ok(response.map(convert_to_unified_body))
257 }
258 }
259 }
260
261 async fn handle_json_rpc_request<B>(&self, req: Request<B>) -> Result<Response<UnifiedMcpBody>>
263 where
264 B: http_body::Body<Data = bytes::Bytes, Error = hyper::Error> + Send + 'static,
265 {
266 let headers: HashMap<String, String> = req
268 .headers()
269 .iter()
270 .filter_map(|(k, v)| v.to_str().ok().map(|s| (k.as_str().to_string(), s.to_string())))
271 .collect();
272
273 let protocol_version = extract_protocol_version(req.headers());
275 let session_id = extract_session_id(req.headers());
276
277 debug!(
278 "POST request - Protocol: {}, Session: {:?}",
279 protocol_version, session_id
280 );
281
282 let content_type = req
284 .headers()
285 .get(CONTENT_TYPE)
286 .and_then(|ct| ct.to_str().ok())
287 .unwrap_or("");
288
289 if !content_type.starts_with("application/json") {
290 warn!("Invalid content type: {}", content_type);
291 return Ok(
292 bad_request_response("Content-Type must be application/json")
293 .map(convert_to_unified_body),
294 );
295 }
296
297 let accept_header = req
299 .headers()
300 .get(ACCEPT)
301 .and_then(|accept| accept.to_str().ok())
302 .unwrap_or("application/json");
303
304 let (accept_mode, accepts_sse) = parse_mcp_accept_header(accept_header);
305 debug!(
306 "POST request Accept header: '{}', mode: {:?}, will use SSE for tool calls: {}",
307 accept_header, accept_mode, accepts_sse
308 );
309
310 let body = req.into_body();
312 let body_bytes = match body.collect().await {
313 Ok(collected) => collected.to_bytes(),
314 Err(err) => {
315 error!("Failed to read request body: {}", err);
316 return Ok(bad_request_response("Failed to read request body")
317 .map(convert_to_unified_body));
318 }
319 };
320
321 if body_bytes.len() > self.config.max_body_size {
323 warn!("Request body too large: {} bytes", body_bytes.len());
324 return Ok(Response::builder()
325 .status(StatusCode::PAYLOAD_TOO_LARGE)
326 .header(CONTENT_TYPE, "application/json")
327 .body(convert_to_unified_body(Full::new(Bytes::from(
328 "Request body too large",
329 ))))
330 .unwrap());
331 }
332
333 let body_str = match std::str::from_utf8(&body_bytes) {
335 Ok(s) => s,
336 Err(err) => {
337 error!("Invalid UTF-8 in request body: {}", err);
338 return Ok(bad_request_response("Request body must be valid UTF-8")
339 .map(convert_to_unified_body));
340 }
341 };
342
343 debug!("Received JSON-RPC request: {}", body_str);
344
345 let message = match parse_json_rpc_message(body_str) {
347 Ok(msg) => msg,
348 Err(rpc_err) => {
349 error!("JSON-RPC parse error: {}", rpc_err);
350 let error_response =
352 serde_json::to_string(&rpc_err).unwrap_or_else(|_| "{}".to_string());
353 return Ok(Response::builder()
354 .status(StatusCode::OK) .header(CONTENT_TYPE, "application/json")
356 .body(convert_to_unified_body(Full::new(Bytes::from(
357 error_response,
358 ))))
359 .unwrap());
360 }
361 };
362
363 let (message_result, response_session_id, method_name) = match message {
365 JsonRpcMessage::Request(request) => {
366 debug!("Processing JSON-RPC request: method={}", request.method);
367 let method_name = request.method.clone();
368
369 let (response, response_session_id) = if request.method == "initialize" {
371 debug!(
372 "Handling initialize request - creating new session via session storage"
373 );
374
375 let capabilities = ServerCapabilities::default();
377 match self.session_storage.create_session(capabilities).await {
378 Ok(session_info) => {
379 debug!(
380 "Created new session via session storage: {}",
381 session_info.session_id
382 );
383
384 let broadcaster: SharedNotificationBroadcaster =
386 Arc::new(StreamManagerNotificationBroadcaster::new(Arc::clone(
387 &self.stream_manager,
388 )));
389 let broadcaster_any =
390 Arc::new(broadcaster) as Arc<dyn std::any::Any + Send + Sync>;
391
392 let session_context = SessionContext {
393 session_id: session_info.session_id.clone(),
394 metadata: std::collections::HashMap::new(),
395 broadcaster: Some(broadcaster_any),
396 timestamp: chrono::Utc::now().timestamp_millis() as u64,
397 };
398
399 let (response, _) = self
402 .run_middleware_and_dispatch(request, headers.clone(), session_context)
403 .await;
404
405 (response, Some(session_info.session_id))
407 }
408 Err(err) => {
409 error!("Failed to create session during initialize: {}", err);
410 let error_msg = format!("Session creation failed: {}", err);
412 let error_response = turul_mcp_json_rpc_server::JsonRpcMessage::error(
413 turul_mcp_json_rpc_server::JsonRpcError::internal_error(
414 Some(request.id),
415 Some(error_msg),
416 ),
417 );
418 (error_response, None)
419 }
420 }
421 } else {
422 let session_context = if let Some(ref session_id_str) = session_id {
425 debug!("Processing request with session: {}", session_id_str);
426 let broadcaster: SharedNotificationBroadcaster =
427 Arc::new(StreamManagerNotificationBroadcaster::new(Arc::clone(
428 &self.stream_manager,
429 )));
430 let broadcaster_any =
431 Arc::new(broadcaster) as Arc<dyn std::any::Any + Send + Sync>;
432 Some(SessionContext {
433 session_id: session_id_str.clone(),
434 metadata: std::collections::HashMap::new(),
435 broadcaster: Some(broadcaster_any),
436 timestamp: chrono::Utc::now().timestamp_millis() as u64,
437 })
438 } else {
439 debug!("Processing request without session (lenient mode)");
440 None
441 };
442
443 let (response, _stashed_injection) = if let Some(ctx) = session_context {
445 self.run_middleware_and_dispatch(request, headers.clone(), ctx).await
446 } else {
447 (self.dispatcher.handle_request(request).await, None)
449 };
450 (response, session_id)
451 };
452
453 let message_result = match response {
455 turul_mcp_json_rpc_server::JsonRpcMessage::Response(resp) => {
456 JsonRpcMessageResult::Response(resp)
457 }
458 turul_mcp_json_rpc_server::JsonRpcMessage::Error(err) => {
459 JsonRpcMessageResult::Error(err)
460 }
461 };
462 (message_result, response_session_id, Some(method_name))
463 }
464 JsonRpcMessage::Notification(notification) => {
465 debug!(
466 "Processing JSON-RPC notification: method={}",
467 notification.method
468 );
469 let method_name = notification.method.clone();
470
471 let session_context = if let Some(ref session_id_str) = session_id {
474 debug!("Processing notification with session: {}", session_id_str);
475 let broadcaster: SharedNotificationBroadcaster = Arc::new(
476 StreamManagerNotificationBroadcaster::new(Arc::clone(&self.stream_manager)),
477 );
478 let broadcaster_any =
479 Arc::new(broadcaster) as Arc<dyn std::any::Any + Send + Sync>;
480
481 Some(SessionContext {
482 session_id: session_id_str.clone(),
483 metadata: std::collections::HashMap::new(),
484 broadcaster: Some(broadcaster_any),
485 timestamp: chrono::Utc::now().timestamp_millis() as u64,
486 })
487 } else {
488 debug!("Processing notification without session (lenient mode)");
489 None
490 };
491
492 let result = self
493 .dispatcher
494 .handle_notification_with_context(notification, session_context)
495 .await;
496
497 if let Err(err) = result {
498 error!("Notification handling error: {}", err);
499 }
500 (
501 JsonRpcMessageResult::NoResponse,
502 session_id.clone(),
503 Some(method_name),
504 )
505 }
506 };
507
508 match message_result {
510 JsonRpcMessageResult::Response(response) => {
511 let is_tool_call = method_name.as_ref().is_some_and(|m| m == "tools/call");
514
515 debug!(
516 "Decision point: method={:?}, accept_mode={:?}, accepts_sse={}, server_post_sse_enabled={}, session_id={:?}, is_tool_call={}",
517 method_name,
518 accept_mode,
519 accepts_sse,
520 self.config.enable_post_sse,
521 response_session_id,
522 is_tool_call
523 );
524
525 let should_use_sse = match accept_mode {
527 AcceptMode::JsonOnly => false, AcceptMode::Invalid => false, AcceptMode::Compliant => {
530 self.config.enable_post_sse && accepts_sse && is_tool_call
531 } AcceptMode::SseOnly => self.config.enable_post_sse && accepts_sse, };
534
535 if should_use_sse && response_session_id.is_some() {
536 debug!(
537 "📡 Creating POST SSE stream (mode: {:?}) for tool call with notifications",
538 accept_mode
539 );
540 match self
541 .stream_manager
542 .create_post_sse_stream(
543 response_session_id.clone().unwrap(),
544 response.clone(), )
546 .await
547 {
548 Ok(sse_response) => {
549 debug!("✅ POST SSE stream created successfully");
550 Ok(sse_response
551 .map(|body| body.map_err(|never| match never {}).boxed_unsync()))
552 }
553 Err(e) => {
554 warn!(
555 "Failed to create POST SSE stream, falling back to JSON: {}",
556 e
557 );
558 Ok(
559 jsonrpc_response_with_session(response, response_session_id)?
560 .map(convert_to_unified_body),
561 )
562 }
563 }
564 } else {
565 debug!(
566 "📄 Returning standard JSON response (mode: {:?}) for method: {:?}",
567 accept_mode, method_name
568 );
569 Ok(
570 jsonrpc_response_with_session(response, response_session_id)?
571 .map(convert_to_unified_body),
572 )
573 }
574 }
575 JsonRpcMessageResult::Error(error) => {
576 warn!("Sending JSON-RPC error response");
577 let error_json = serde_json::to_string(&error)?;
579 Ok(Response::builder()
580 .status(StatusCode::OK) .header(CONTENT_TYPE, "application/json")
582 .body(convert_to_unified_body(Full::new(Bytes::from(error_json))))
583 .unwrap())
584 }
585 JsonRpcMessageResult::NoResponse => {
586 Ok(jsonrpc_notification_response()?.map(convert_to_unified_body))
588 }
589 }
590 }
591
592 async fn handle_sse_request<B>(&self, req: Request<B>) -> Result<Response<UnifiedMcpBody>>
597 where
598 B: http_body::Body<Data = bytes::Bytes, Error = hyper::Error> + Send + 'static,
599 {
600 let headers = req.headers();
602 let accept = headers
603 .get(ACCEPT)
604 .and_then(|accept| accept.to_str().ok())
605 .unwrap_or("");
606
607 if !accept.contains("text/event-stream") {
608 warn!(
609 "GET request received without SSE support - header does not contain 'text/event-stream'"
610 );
611 let error = JsonRpcError::new(
612 None,
613 JsonRpcErrorObject::server_error(
614 -32001,
615 "SSE not accepted - missing 'text/event-stream' in Accept header",
616 None,
617 ),
618 );
619 return jsonrpc_error_to_unified_body(error);
620 }
621
622 if !self.config.enable_get_sse {
624 warn!("GET SSE request received but GET SSE is disabled on server");
625 let error = JsonRpcError::new(
626 None,
627 JsonRpcErrorObject::server_error(
628 -32003,
629 "GET SSE is disabled on this server",
630 None,
631 ),
632 );
633 return jsonrpc_error_to_unified_body(error);
634 }
635
636 let protocol_version = extract_protocol_version(headers);
638 let session_id = extract_session_id(headers);
639
640 debug!(
641 "GET SSE request - Protocol: {}, Session: {:?}",
642 protocol_version, session_id
643 );
644
645 let session_id = match session_id {
647 Some(id) => id,
648 None => {
649 warn!("Missing Mcp-Session-Id header for SSE request");
650 let error = JsonRpcError::new(
651 None,
652 JsonRpcErrorObject::server_error(-32002, "Missing Mcp-Session-Id header", None),
653 );
654 return jsonrpc_error_to_unified_body(error);
655 }
656 };
657
658 if let Err(err) = self.validate_session_exists(&session_id).await {
660 error!(
661 "Session validation failed for Session ID {}: {}",
662 session_id, err
663 );
664 let error = JsonRpcError::new(
665 None,
666 JsonRpcErrorObject::server_error(
667 -32003,
668 &format!("Session validation failed: {}", err),
669 None,
670 ),
671 );
672 return jsonrpc_error_to_unified_body(error);
673 }
674
675 let last_event_id = extract_last_event_id(headers);
677
678 let connection_id = Uuid::now_v7().to_string();
680
681 debug!(
682 "Creating SSE stream for session: {} with connection: {}, last_event_id: {:?}",
683 session_id, connection_id, last_event_id
684 );
685
686 match self
688 .stream_manager
689 .handle_sse_connection(session_id, connection_id, last_event_id)
690 .await
691 {
692 Ok(response) => Ok(response),
693 Err(err) => {
694 error!("Failed to create SSE connection: {}", err);
695 let error = JsonRpcError::new(
696 None,
697 JsonRpcErrorObject::internal_error(Some(format!(
698 "SSE connection failed: {}",
699 err
700 ))),
701 );
702 jsonrpc_error_to_unified_body(error)
703 }
704 }
705 }
706
707 async fn handle_delete_request<B>(&self, req: Request<B>) -> Result<Response<JsonRpcBody>>
709 where
710 B: http_body::Body<Data = bytes::Bytes, Error = hyper::Error> + Send + 'static,
711 {
712 let session_id = extract_session_id(req.headers());
713
714 debug!("DELETE request - Session: {:?}", session_id);
715
716 if let Some(session_id) = session_id {
717 let closed_connections = self
719 .stream_manager
720 .close_session_connections(&session_id)
721 .await;
722 debug!(
723 "Closed {} SSE connections for session: {}",
724 closed_connections, session_id
725 );
726
727 match self.session_storage.get_session(&session_id).await {
729 Ok(Some(mut session_info)) => {
730 session_info
732 .state
733 .insert("terminated".to_string(), serde_json::Value::Bool(true));
734 session_info.state.insert(
735 "terminated_at".to_string(),
736 serde_json::Value::Number(serde_json::Number::from(
737 chrono::Utc::now().timestamp_millis(),
738 )),
739 );
740 session_info.touch();
741
742 match self.session_storage.update_session(session_info).await {
743 Ok(()) => {
744 debug!(
745 "Session {} marked as terminated (TTL will handle cleanup)",
746 session_id
747 );
748 Ok(Response::builder()
749 .status(StatusCode::OK)
750 .body(Full::new(Bytes::from("Session terminated")))
751 .unwrap())
752 }
753 Err(err) => {
754 error!(
755 "Error marking session {} as terminated: {}",
756 session_id, err
757 );
758 match self.session_storage.delete_session(&session_id).await {
760 Ok(_) => {
761 debug!("Session {} deleted as fallback", session_id);
762 Ok(Response::builder()
763 .status(StatusCode::OK)
764 .body(Full::new(Bytes::from("Session removed")))
765 .unwrap())
766 }
767 Err(delete_err) => {
768 error!(
769 "Error deleting session {} as fallback: {}",
770 session_id, delete_err
771 );
772 Ok(Response::builder()
773 .status(StatusCode::INTERNAL_SERVER_ERROR)
774 .body(Full::new(Bytes::from("Session termination error")))
775 .unwrap())
776 }
777 }
778 }
779 }
780 }
781 Ok(None) => Ok(Response::builder()
782 .status(StatusCode::NOT_FOUND)
783 .body(Full::new(Bytes::from("Session not found")))
784 .unwrap()),
785 Err(err) => {
786 error!(
787 "Error retrieving session {} for termination: {}",
788 session_id, err
789 );
790 Ok(Response::builder()
791 .status(StatusCode::INTERNAL_SERVER_ERROR)
792 .body(Full::new(Bytes::from("Session lookup error")))
793 .unwrap())
794 }
795 }
796 } else {
797 Ok(Response::builder()
798 .status(StatusCode::BAD_REQUEST)
799 .body(Full::new(Bytes::from("Missing Mcp-Session-Id header")))
800 .unwrap())
801 }
802 }
803
804 fn handle_preflight(&self) -> Response<JsonRpcBody> {
806 options_response()
807 }
808
809 fn method_not_allowed(&self) -> Response<JsonRpcBody> {
811 method_not_allowed_response()
812 }
813
814 async fn validate_session_exists(&self, session_id: &str) -> Result<()> {
816 match self.session_storage.get_session(session_id).await {
818 Ok(Some(_)) => {
819 debug!("Session validation successful: {}", session_id);
820 Ok(())
821 }
822 Ok(None) => {
823 error!("Session not found: {}", session_id);
824 Err(crate::HttpMcpError::InvalidRequest(format!(
825 "Session '{}' not found. Sessions must be created via initialize request first.",
826 session_id
827 )))
828 }
829 Err(err) => {
830 error!("Failed to validate session {}: {}", session_id, err);
831 Err(crate::HttpMcpError::InvalidRequest(format!(
832 "Session validation failed: {}",
833 err
834 )))
835 }
836 }
837 }
838
839 async fn run_middleware_and_dispatch(
842 &self,
843 request: turul_mcp_json_rpc_server::JsonRpcRequest,
844 headers: HashMap<String, String>,
845 session: turul_mcp_json_rpc_server::SessionContext,
846 ) -> (turul_mcp_json_rpc_server::JsonRpcMessage, Option<crate::middleware::SessionInjection>) {
847 if self.middleware_stack.is_empty() {
849 let result = self.dispatcher
850 .handle_request_with_context(request, session)
851 .await;
852 return (result, None);
853 }
854
855 let normalized_headers: HashMap<String, String> = headers
857 .iter()
858 .map(|(k, v)| (k.to_lowercase(), v.clone()))
859 .collect();
860
861 let method = request.method.clone();
864 let session_id = session.session_id.clone();
865
866 let params = request.params.clone().map(|p| match p {
868 turul_mcp_json_rpc_server::RequestParams::Object(map) => {
869 serde_json::Value::Object(map.into_iter().collect())
870 }
871 turul_mcp_json_rpc_server::RequestParams::Array(arr) => serde_json::Value::Array(arr),
872 });
873 let mut ctx = crate::middleware::RequestContext::new(&method, params);
874
875 for (k, v) in normalized_headers {
876 ctx.add_metadata(k, serde_json::json!(v));
877 }
878
879 let session_view = crate::middleware::StorageBackedSessionView::new(
881 session_id.clone(),
882 Arc::clone(&self.session_storage),
883 );
884
885 let injection = match self.middleware_stack.execute_before(&mut ctx, Some(&session_view)).await {
887 Ok(inj) => inj,
888 Err(err) => {
889 return (Self::map_middleware_error_to_jsonrpc(err, request.id), None);
891 }
892 };
893
894 if !injection.is_empty() {
896 for (key, value) in injection.state() {
897 if let Err(e) = session_view.set_state(key, value.clone()).await {
898 tracing::warn!("Failed to apply injection state '{}': {}", key, e);
899 }
900 }
901 for (key, value) in injection.metadata() {
902 if let Err(e) = session_view.set_metadata(key, value.clone()).await {
903 tracing::warn!("Failed to apply injection metadata '{}': {}", key, e);
904 }
905 }
906 }
907
908 let result = self.dispatcher
910 .handle_request_with_context(request, session)
911 .await;
912
913 let mut dispatcher_result = match &result {
916 turul_mcp_json_rpc_server::JsonRpcMessage::Response(resp) => {
917 match &resp.result {
918 turul_mcp_json_rpc_server::response::ResponseResult::Success(val) => {
919 crate::middleware::DispatcherResult::Success(val.clone())
920 }
921 turul_mcp_json_rpc_server::response::ResponseResult::Null => {
922 crate::middleware::DispatcherResult::Success(serde_json::Value::Null)
923 }
924 }
925 }
926 turul_mcp_json_rpc_server::JsonRpcMessage::Error(err) => {
927 crate::middleware::DispatcherResult::Error(err.error.message.clone())
928 }
929 };
930
931 let _ = self.middleware_stack.execute_after(&ctx, &mut dispatcher_result).await;
933
934 (result, None) }
936
937 fn map_middleware_error_to_jsonrpc(
939 err: crate::middleware::MiddlewareError,
940 request_id: turul_mcp_json_rpc_server::RequestId,
941 ) -> turul_mcp_json_rpc_server::JsonRpcMessage {
942 use crate::middleware::error::error_codes;
943 use crate::middleware::MiddlewareError;
944
945 let (code, message, data) = match err {
946 MiddlewareError::Unauthenticated(msg) => (error_codes::UNAUTHENTICATED, msg, None),
947 MiddlewareError::Unauthorized(msg) => (error_codes::UNAUTHORIZED, msg, None),
948 MiddlewareError::RateLimitExceeded {
949 message,
950 retry_after,
951 } => {
952 let data = retry_after.map(|s| serde_json::json!({"retryAfter": s}));
953 (error_codes::RATE_LIMIT_EXCEEDED, message, data)
954 }
955 MiddlewareError::InvalidRequest(msg) => (error_codes::INVALID_REQUEST, msg, None),
956 MiddlewareError::Internal(msg) => (error_codes::INTERNAL_ERROR, msg, None),
957 MiddlewareError::Custom { message, .. } => (error_codes::INTERNAL_ERROR, message, None),
958 };
959
960 let error_obj = if let Some(d) = data {
961 turul_mcp_json_rpc_server::error::JsonRpcErrorObject::server_error(code, &message, Some(d))
962 } else {
963 turul_mcp_json_rpc_server::error::JsonRpcErrorObject::server_error(
964 code,
965 &message,
966 None::<serde_json::Value>,
967 )
968 };
969
970 turul_mcp_json_rpc_server::JsonRpcMessage::Error(turul_mcp_json_rpc_server::JsonRpcError::new(
971 Some(request_id),
972 error_obj,
973 ))
974 }
975}