1use std::sync::Arc;
7
8use lambda_http::{Body as LambdaBody, Request as LambdaRequest, Response as LambdaResponse};
9use tracing::{debug, info};
10
11use turul_http_mcp_server::{
12 ServerConfig, SessionMcpHandler, StreamConfig, StreamManager, StreamableHttpHandler,
13};
14use turul_mcp_json_rpc_server::JsonRpcDispatcher;
15use turul_mcp_protocol::{McpError, ServerCapabilities};
16use turul_mcp_session_storage::BoxedSessionStorage;
17
18use crate::error::Result;
19
20#[cfg(feature = "cors")]
21use crate::cors::{CorsConfig, create_preflight_response, inject_cors_headers};
22
23#[derive(Clone)]
34pub struct LambdaMcpHandler {
35 session_handler: SessionMcpHandler,
37
38 streamable_handler: StreamableHttpHandler,
40
41 #[allow(dead_code)]
43 sse_enabled: bool,
44
45 #[cfg(feature = "cors")]
47 cors_config: Option<CorsConfig>,
48}
49
50impl LambdaMcpHandler {
51 #[allow(clippy::too_many_arguments)]
53 pub fn new(
54 dispatcher: JsonRpcDispatcher<McpError>,
55 session_storage: Arc<BoxedSessionStorage>,
56 stream_manager: Arc<StreamManager>,
57 config: ServerConfig,
58 stream_config: StreamConfig,
59 _implementation: turul_mcp_protocol::Implementation,
60 capabilities: ServerCapabilities,
61 sse_enabled: bool,
62 #[cfg(feature = "cors")] cors_config: Option<CorsConfig>,
63 ) -> Self {
64 let dispatcher = Arc::new(dispatcher);
65
66 let middleware_stack = Arc::new(turul_http_mcp_server::middleware::MiddlewareStack::new());
68
69 let session_handler = SessionMcpHandler::with_shared_stream_manager(
71 config.clone(),
72 dispatcher.clone(),
73 session_storage.clone(),
74 stream_config.clone(),
75 stream_manager.clone(),
76 middleware_stack.clone(),
77 );
78
79 let streamable_handler = StreamableHttpHandler::new(
81 Arc::new(config.clone()),
82 dispatcher.clone(),
83 session_storage.clone(),
84 stream_manager.clone(),
85 capabilities.clone(),
86 middleware_stack,
87 );
88
89 Self {
90 session_handler,
91 streamable_handler,
92 sse_enabled,
93 #[cfg(feature = "cors")]
94 cors_config,
95 }
96 }
97
98 #[allow(clippy::too_many_arguments)]
100 pub fn with_shared_stream_manager(
101 config: ServerConfig,
102 dispatcher: Arc<JsonRpcDispatcher<McpError>>,
103 session_storage: Arc<BoxedSessionStorage>,
104 stream_manager: Arc<StreamManager>,
105 stream_config: StreamConfig,
106 _implementation: turul_mcp_protocol::Implementation,
107 capabilities: ServerCapabilities,
108 sse_enabled: bool,
109 ) -> Self {
110 let middleware_stack = Arc::new(turul_http_mcp_server::middleware::MiddlewareStack::new());
112
113 let session_handler = SessionMcpHandler::with_shared_stream_manager(
115 config.clone(),
116 dispatcher.clone(),
117 session_storage.clone(),
118 stream_config.clone(),
119 stream_manager.clone(),
120 middleware_stack.clone(),
121 );
122
123 let streamable_handler = StreamableHttpHandler::new(
125 Arc::new(config),
126 dispatcher,
127 session_storage,
128 stream_manager,
129 capabilities,
130 middleware_stack,
131 );
132
133 Self {
134 session_handler,
135 streamable_handler,
136 sse_enabled,
137 #[cfg(feature = "cors")]
138 cors_config: None,
139 }
140 }
141
142 #[allow(clippy::too_many_arguments)]
144 pub fn with_middleware(
145 config: ServerConfig,
146 dispatcher: Arc<JsonRpcDispatcher<McpError>>,
147 session_storage: Arc<BoxedSessionStorage>,
148 stream_manager: Arc<StreamManager>,
149 stream_config: StreamConfig,
150 capabilities: ServerCapabilities,
151 middleware_stack: Arc<turul_http_mcp_server::middleware::MiddlewareStack>,
152 sse_enabled: bool,
153 ) -> Self {
154 let session_handler = SessionMcpHandler::with_shared_stream_manager(
156 config.clone(),
157 dispatcher.clone(),
158 session_storage.clone(),
159 stream_config.clone(),
160 stream_manager.clone(),
161 middleware_stack.clone(),
162 );
163
164 let streamable_handler = StreamableHttpHandler::new(
166 Arc::new(config),
167 dispatcher,
168 session_storage,
169 stream_manager,
170 capabilities,
171 middleware_stack,
172 );
173
174 Self {
175 session_handler,
176 streamable_handler,
177 sse_enabled,
178 #[cfg(feature = "cors")]
179 cors_config: None,
180 }
181 }
182
183 #[cfg(feature = "cors")]
185 pub fn with_cors(mut self, cors_config: CorsConfig) -> Self {
186 self.cors_config = Some(cors_config);
187 self
188 }
189
190 pub fn get_stream_manager(&self) -> &Arc<StreamManager> {
192 self.session_handler.get_stream_manager()
193 }
194
195 pub async fn handle(&self, req: LambdaRequest) -> Result<LambdaResponse<LambdaBody>> {
204 let method = req.method().clone();
205 let uri = req.uri().clone();
206
207 let request_origin = req
208 .headers()
209 .get("origin")
210 .and_then(|v| v.to_str().ok())
211 .map(|s| s.to_string());
212
213 info!(
214 "🌐 Lambda MCP request: {} {} (origin: {:?})",
215 method, uri, request_origin
216 );
217
218 #[cfg(feature = "cors")]
220 if method == http::Method::OPTIONS
221 && let Some(ref cors_config) = self.cors_config
222 {
223 debug!("Handling CORS preflight request");
224 return create_preflight_response(cors_config, request_origin.as_deref());
225 }
226
227 let hyper_req = crate::adapter::lambda_to_hyper_request(req)?;
229
230 let hyper_resp = self
232 .session_handler
233 .handle_mcp_request(hyper_req)
234 .await
235 .map_err(|e| crate::error::LambdaError::McpFramework(e.to_string()))?;
236
237 let mut lambda_resp = crate::adapter::hyper_to_lambda_response(hyper_resp).await?;
239
240 #[cfg(feature = "cors")]
242 if let Some(ref cors_config) = self.cors_config {
243 inject_cors_headers(&mut lambda_resp, cors_config, request_origin.as_deref())?;
244 }
245
246 Ok(lambda_resp)
247 }
248
249 pub async fn handle_streaming(
254 &self,
255 req: LambdaRequest,
256 ) -> std::result::Result<
257 lambda_http::Response<
258 http_body_util::combinators::UnsyncBoxBody<bytes::Bytes, hyper::Error>,
259 >,
260 Box<dyn std::error::Error + Send + Sync>,
261 > {
262 let method = req.method().clone();
263 let uri = req.uri().clone();
264 let request_origin = req
265 .headers()
266 .get("origin")
267 .and_then(|v| v.to_str().ok())
268 .map(|s| s.to_string());
269
270 info!(
271 "🌊 Lambda streaming MCP request: {} {} (origin: {:?})",
272 method, uri, request_origin
273 );
274
275 #[cfg(feature = "cors")]
277 if method == http::Method::OPTIONS
278 && let Some(ref cors_config) = self.cors_config
279 {
280 debug!("Handling CORS preflight request (streaming)");
281 let preflight_response =
282 create_preflight_response(cors_config, request_origin.as_deref())
283 .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?;
284
285 return Ok(self.convert_lambda_response_to_streaming(preflight_response));
287 }
288
289 let hyper_req = crate::adapter::lambda_to_hyper_request(req)
291 .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?;
292
293 use turul_http_mcp_server::protocol::McpProtocolVersion;
295 let protocol_version = hyper_req
296 .headers()
297 .get("MCP-Protocol-Version")
298 .and_then(|h| h.to_str().ok())
299 .and_then(McpProtocolVersion::parse_version)
300 .unwrap_or(McpProtocolVersion::V2025_06_18);
301
302 let hyper_resp = if protocol_version.supports_streamable_http() {
304 debug!(
306 "Using StreamableHttpHandler for protocol {}",
307 protocol_version.to_string()
308 );
309 self.streamable_handler.handle_request(hyper_req).await
310 } else {
311 debug!(
313 "Using SessionMcpHandler for legacy protocol {}",
314 protocol_version.to_string()
315 );
316 self.session_handler
317 .handle_mcp_request(hyper_req)
318 .await
319 .map_err(|e| {
320 Box::new(crate::error::LambdaError::McpFramework(e.to_string()))
321 as Box<dyn std::error::Error + Send + Sync>
322 })?
323 };
324
325 let mut lambda_resp = crate::adapter::hyper_to_lambda_streaming(hyper_resp);
327
328 #[cfg(feature = "cors")]
330 if let Some(ref cors_config) = self.cors_config {
331 inject_cors_headers(&mut lambda_resp, cors_config, request_origin.as_deref())
332 .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?;
333 }
334
335 Ok(lambda_resp)
336 }
337
338 fn convert_lambda_response_to_streaming(
340 &self,
341 lambda_response: LambdaResponse<LambdaBody>,
342 ) -> lambda_http::Response<http_body_util::combinators::UnsyncBoxBody<bytes::Bytes, hyper::Error>>
343 {
344 use bytes::Bytes;
345 use http_body_util::{BodyExt, Full};
346
347 let (parts, body) = lambda_response.into_parts();
348 let body_bytes = match body {
349 LambdaBody::Empty => Bytes::new(),
350 LambdaBody::Text(text) => Bytes::from(text),
351 LambdaBody::Binary(bytes) => Bytes::from(bytes),
352 };
353
354 let streaming_body = Full::new(body_bytes)
356 .map_err(|e: std::convert::Infallible| match e {})
357 .boxed_unsync();
358
359 lambda_http::Response::from_parts(parts, streaming_body)
360 }
361}
362
363#[cfg(test)]
364mod tests {
365 use super::*;
366 use http::Request;
367 use turul_mcp_session_storage::InMemorySessionStorage;
368
369 #[tokio::test]
370 async fn test_handler_creation() {
371 let session_storage = Arc::new(InMemorySessionStorage::new());
372 let stream_manager = Arc::new(StreamManager::new(session_storage.clone()));
373 let dispatcher = JsonRpcDispatcher::new();
374 let config = ServerConfig::default();
375 let implementation = turul_mcp_protocol::Implementation::new("test", "1.0.0");
376 let capabilities = ServerCapabilities::default();
377
378 let handler = LambdaMcpHandler::new(
379 dispatcher,
380 session_storage,
381 stream_manager,
382 config,
383 StreamConfig::default(),
384 implementation,
385 capabilities,
386 false, #[cfg(feature = "cors")]
388 None,
389 );
390
391 assert!(!handler.sse_enabled);
393 }
394
395 #[tokio::test]
396 async fn test_sse_enabled_with_handle_works() {
397 let session_storage = Arc::new(InMemorySessionStorage::new());
398 let stream_manager = Arc::new(StreamManager::new(session_storage.clone()));
399 let dispatcher = JsonRpcDispatcher::new();
400 let config = ServerConfig::default();
401 let implementation = turul_mcp_protocol::Implementation::new("test", "1.0.0");
402 let capabilities = ServerCapabilities::default();
403
404 let handler = LambdaMcpHandler::new(
406 dispatcher,
407 session_storage,
408 stream_manager,
409 config,
410 StreamConfig::default(),
411 implementation,
412 capabilities,
413 true, #[cfg(feature = "cors")]
415 None,
416 );
417
418 let lambda_req = Request::builder()
420 .method("POST")
421 .uri("/mcp")
422 .body(LambdaBody::Text(
423 r#"{"jsonrpc":"2.0","method":"initialize","id":1}"#.to_string(),
424 ))
425 .unwrap();
426
427 let result = handler.handle(lambda_req).await;
429 assert!(
430 result.is_ok(),
431 "handle() should work with SSE enabled for snapshot-based responses"
432 );
433 }
434
435 #[tokio::test]
437 async fn test_stream_config_preservation() {
438 let session_storage = Arc::new(InMemorySessionStorage::new());
439 let dispatcher = JsonRpcDispatcher::new();
440 let config = ServerConfig::default();
441 let implementation = turul_mcp_protocol::Implementation::new("test", "1.0.0");
442 let capabilities = ServerCapabilities::default();
443
444 let custom_stream_config = StreamConfig {
446 channel_buffer_size: 1024, max_replay_events: 200, keepalive_interval_seconds: 10, cors_origin: "https://custom-test.example.com".to_string(), };
451
452 let stream_manager = Arc::new(StreamManager::with_config(
454 session_storage.clone(),
455 custom_stream_config.clone(),
456 ));
457
458 let handler = LambdaMcpHandler::new(
459 dispatcher,
460 session_storage,
461 stream_manager,
462 config,
463 custom_stream_config.clone(),
464 implementation,
465 capabilities,
466 false, #[cfg(feature = "cors")]
468 None,
469 );
470
471 assert!(!handler.sse_enabled);
473
474 let stream_manager = handler.get_stream_manager();
476
477 let actual_config = stream_manager.get_config();
479
480 assert_eq!(
481 actual_config.channel_buffer_size, custom_stream_config.channel_buffer_size,
482 "Custom channel_buffer_size was not propagated correctly"
483 );
484 assert_eq!(
485 actual_config.max_replay_events, custom_stream_config.max_replay_events,
486 "Custom max_replay_events was not propagated correctly"
487 );
488 assert_eq!(
489 actual_config.keepalive_interval_seconds,
490 custom_stream_config.keepalive_interval_seconds,
491 "Custom keepalive_interval_seconds was not propagated correctly"
492 );
493 assert_eq!(
494 actual_config.cors_origin, custom_stream_config.cors_origin,
495 "Custom cors_origin was not propagated correctly"
496 );
497
498 assert!(Arc::strong_count(stream_manager) >= 1);
500 }
501
502 #[tokio::test]
504 async fn test_full_builder_chain_stream_config() {
505 use crate::LambdaMcpServerBuilder;
506 use turul_mcp_session_storage::InMemorySessionStorage;
507
508 let custom_stream_config = turul_http_mcp_server::StreamConfig {
510 channel_buffer_size: 2048, max_replay_events: 500, keepalive_interval_seconds: 15, cors_origin: "https://full-chain-test.example.com".to_string(),
514 };
515
516 let server = LambdaMcpServerBuilder::new()
518 .name("full-chain-test")
519 .version("1.0.0")
520 .storage(Arc::new(InMemorySessionStorage::new()))
521 .sse(true) .stream_config(custom_stream_config.clone())
523 .build()
524 .await
525 .expect("Server should build successfully");
526
527 let handler = server
529 .handler()
530 .await
531 .expect("Handler should be created from server");
532
533 assert!(handler.sse_enabled, "SSE should be enabled");
535
536 let stream_manager = handler.get_stream_manager();
538 let actual_config = stream_manager.get_config();
539
540 assert_eq!(
541 actual_config.channel_buffer_size, custom_stream_config.channel_buffer_size,
542 "Custom channel_buffer_size should be preserved through builder → server → handler chain"
543 );
544 assert_eq!(
545 actual_config.max_replay_events, custom_stream_config.max_replay_events,
546 "Custom max_replay_events should be preserved through builder → server → handler chain"
547 );
548 assert_eq!(
549 actual_config.keepalive_interval_seconds,
550 custom_stream_config.keepalive_interval_seconds,
551 "Custom keepalive_interval_seconds should be preserved through builder → server → handler chain"
552 );
553 assert_eq!(
554 actual_config.cors_origin, custom_stream_config.cors_origin,
555 "Custom cors_origin should be preserved through builder → server → handler chain"
556 );
557
558 assert!(
560 Arc::strong_count(stream_manager) >= 1,
561 "Stream manager should be properly initialized"
562 );
563
564 let test_session_id = uuid::Uuid::now_v7().to_string();
567
568 let subscriptions = stream_manager.get_subscriptions(&test_session_id).await;
571 assert!(
572 subscriptions.is_empty(),
573 "New session should have no subscriptions initially"
574 );
575
576 assert_eq!(
579 stream_manager.get_config().channel_buffer_size,
580 2048,
581 "Stream manager should be using the custom buffer size functionally"
582 );
583 }
584
585 #[tokio::test]
590 async fn test_non_streaming_runtime_sse_false() {
591 use crate::LambdaMcpServerBuilder;
592 use turul_mcp_session_storage::InMemorySessionStorage;
593
594 let server = LambdaMcpServerBuilder::new()
595 .name("test-non-streaming-sse-false")
596 .version("1.0.0")
597 .storage(Arc::new(InMemorySessionStorage::new()))
598 .sse(false) .build()
600 .await
601 .expect("Server should build successfully");
602
603 let handler = server
604 .handler()
605 .await
606 .expect("Handler should be created from server");
607
608 assert!(!handler.sse_enabled, "SSE should be disabled");
610
611 let lambda_req = Request::builder()
613 .method("POST")
614 .uri("/mcp")
615 .body(LambdaBody::Text(
616 r#"{"jsonrpc":"2.0","method":"initialize","id":1}"#.to_string(),
617 ))
618 .unwrap();
619
620 let result = handler.handle(lambda_req).await;
622 assert!(
623 result.is_ok(),
624 "POST /mcp should work with non-streaming + sse(false)"
625 );
626 }
627
628 #[tokio::test]
630 async fn test_non_streaming_runtime_sse_true() {
631 use crate::LambdaMcpServerBuilder;
632 use turul_mcp_session_storage::InMemorySessionStorage;
633
634 let server = LambdaMcpServerBuilder::new()
635 .name("test-non-streaming-sse-true")
636 .version("1.0.0")
637 .storage(Arc::new(InMemorySessionStorage::new()))
638 .sse(true) .build()
640 .await
641 .expect("Server should build successfully");
642
643 let handler = server
644 .handler()
645 .await
646 .expect("Handler should be created from server");
647
648 assert!(handler.sse_enabled, "SSE should be enabled");
650
651 let lambda_req = Request::builder()
653 .method("POST")
654 .uri("/mcp")
655 .body(LambdaBody::Text(
656 r#"{"jsonrpc":"2.0","method":"initialize","id":1}"#.to_string(),
657 ))
658 .unwrap();
659
660 let result = handler.handle(lambda_req).await;
662 assert!(
663 result.is_ok(),
664 "POST /mcp should work with non-streaming + sse(true)"
665 );
666
667 }
670
671 #[tokio::test]
673 async fn test_streaming_runtime_sse_false() {
674 use crate::LambdaMcpServerBuilder;
675 use turul_mcp_session_storage::InMemorySessionStorage;
676
677 let server = LambdaMcpServerBuilder::new()
678 .name("test-streaming-sse-false")
679 .version("1.0.0")
680 .storage(Arc::new(InMemorySessionStorage::new()))
681 .sse(false) .build()
683 .await
684 .expect("Server should build successfully");
685
686 let handler = server
687 .handler()
688 .await
689 .expect("Handler should be created from server");
690
691 assert!(!handler.sse_enabled, "SSE should be disabled");
693
694 let lambda_req = Request::builder()
696 .method("POST")
697 .uri("/mcp")
698 .body(LambdaBody::Text(
699 r#"{"jsonrpc":"2.0","method":"initialize","id":1}"#.to_string(),
700 ))
701 .unwrap();
702
703 let result = handler.handle_streaming(lambda_req).await;
705 assert!(
706 result.is_ok(),
707 "Streaming runtime should work with sse(false)"
708 );
709 }
710
711 #[tokio::test]
713 async fn test_streaming_runtime_sse_true() {
714 use crate::LambdaMcpServerBuilder;
715 use turul_mcp_session_storage::InMemorySessionStorage;
716
717 let server = LambdaMcpServerBuilder::new()
718 .name("test-streaming-sse-true")
719 .version("1.0.0")
720 .storage(Arc::new(InMemorySessionStorage::new()))
721 .sse(true) .build()
723 .await
724 .expect("Server should build successfully");
725
726 let handler = server
727 .handler()
728 .await
729 .expect("Handler should be created from server");
730
731 assert!(handler.sse_enabled, "SSE should be enabled");
733
734 let lambda_req = Request::builder()
736 .method("POST")
737 .uri("/mcp")
738 .body(LambdaBody::Text(
739 r#"{"jsonrpc":"2.0","method":"initialize","id":1}"#.to_string(),
740 ))
741 .unwrap();
742
743 let result = handler.handle_streaming(lambda_req).await;
745 assert!(
746 result.is_ok(),
747 "Streaming runtime should work with sse(true) for real-time streaming"
748 );
749
750 }
753}