1use std::sync::Arc;
7
8use lambda_http::{Body as LambdaBody, Request as LambdaRequest, Response as LambdaResponse};
9use tracing::{debug, info};
10
11use turul_http_mcp_server::{
12 ServerConfig, SessionMcpHandler, StreamConfig, StreamManager, StreamableHttpHandler,
13};
14use turul_mcp_json_rpc_server::JsonRpcDispatcher;
15use turul_mcp_protocol::{McpError, ServerCapabilities};
16use turul_mcp_session_storage::BoxedSessionStorage;
17
18use crate::error::Result;
19
20#[cfg(feature = "cors")]
21use crate::cors::{CorsConfig, create_preflight_response, inject_cors_headers};
22
23#[derive(Clone)]
34pub struct LambdaMcpHandler {
35 session_handler: SessionMcpHandler,
37
38 streamable_handler: StreamableHttpHandler,
40
41 #[allow(dead_code)]
43 sse_enabled: bool,
44
45 #[cfg(feature = "cors")]
47 cors_config: Option<CorsConfig>,
48}
49
50impl LambdaMcpHandler {
51 #[allow(clippy::too_many_arguments)]
53 pub fn new(
54 dispatcher: JsonRpcDispatcher<McpError>,
55 session_storage: Arc<BoxedSessionStorage>,
56 stream_manager: Arc<StreamManager>,
57 config: ServerConfig,
58 stream_config: StreamConfig,
59 _implementation: turul_mcp_protocol::Implementation,
60 capabilities: ServerCapabilities,
61 sse_enabled: bool,
62 #[cfg(feature = "cors")] cors_config: Option<CorsConfig>,
63 ) -> Self {
64 let dispatcher = Arc::new(dispatcher);
65
66 let middleware_stack = Arc::new(turul_http_mcp_server::middleware::MiddlewareStack::new());
68
69 let session_handler = SessionMcpHandler::with_shared_stream_manager(
71 config.clone(),
72 dispatcher.clone(),
73 session_storage.clone(),
74 stream_config.clone(),
75 stream_manager.clone(),
76 middleware_stack.clone(),
77 );
78
79 let streamable_handler = StreamableHttpHandler::new(
81 Arc::new(config.clone()),
82 dispatcher.clone(),
83 session_storage.clone(),
84 stream_manager.clone(),
85 capabilities.clone(),
86 middleware_stack,
87 );
88
89 Self {
90 session_handler,
91 streamable_handler,
92 sse_enabled,
93 #[cfg(feature = "cors")]
94 cors_config,
95 }
96 }
97
98 #[allow(clippy::too_many_arguments)]
100 pub fn with_shared_stream_manager(
101 config: ServerConfig,
102 dispatcher: Arc<JsonRpcDispatcher<McpError>>,
103 session_storage: Arc<BoxedSessionStorage>,
104 stream_manager: Arc<StreamManager>,
105 stream_config: StreamConfig,
106 _implementation: turul_mcp_protocol::Implementation,
107 capabilities: ServerCapabilities,
108 sse_enabled: bool,
109 ) -> Self {
110 let middleware_stack = Arc::new(turul_http_mcp_server::middleware::MiddlewareStack::new());
112
113 let session_handler = SessionMcpHandler::with_shared_stream_manager(
115 config.clone(),
116 dispatcher.clone(),
117 session_storage.clone(),
118 stream_config.clone(),
119 stream_manager.clone(),
120 middleware_stack.clone(),
121 );
122
123 let streamable_handler = StreamableHttpHandler::new(
125 Arc::new(config),
126 dispatcher,
127 session_storage,
128 stream_manager,
129 capabilities,
130 middleware_stack,
131 );
132
133 Self {
134 session_handler,
135 streamable_handler,
136 sse_enabled,
137 #[cfg(feature = "cors")]
138 cors_config: None,
139 }
140 }
141
142 #[allow(clippy::too_many_arguments)]
144 pub fn with_middleware(
145 config: ServerConfig,
146 dispatcher: Arc<JsonRpcDispatcher<McpError>>,
147 session_storage: Arc<BoxedSessionStorage>,
148 stream_manager: Arc<StreamManager>,
149 stream_config: StreamConfig,
150 capabilities: ServerCapabilities,
151 middleware_stack: Arc<turul_http_mcp_server::middleware::MiddlewareStack>,
152 sse_enabled: bool,
153 ) -> Self {
154 let session_handler = SessionMcpHandler::with_shared_stream_manager(
156 config.clone(),
157 dispatcher.clone(),
158 session_storage.clone(),
159 stream_config.clone(),
160 stream_manager.clone(),
161 middleware_stack.clone(),
162 );
163
164 let streamable_handler = StreamableHttpHandler::new(
166 Arc::new(config),
167 dispatcher,
168 session_storage,
169 stream_manager,
170 capabilities,
171 middleware_stack,
172 );
173
174 Self {
175 session_handler,
176 streamable_handler,
177 sse_enabled,
178 #[cfg(feature = "cors")]
179 cors_config: None,
180 }
181 }
182
183 #[cfg(feature = "cors")]
185 pub fn with_cors(mut self, cors_config: CorsConfig) -> Self {
186 self.cors_config = Some(cors_config);
187 self
188 }
189
190 pub fn get_stream_manager(&self) -> &Arc<StreamManager> {
192 self.session_handler.get_stream_manager()
193 }
194
195 pub async fn handle(&self, req: LambdaRequest) -> Result<LambdaResponse<LambdaBody>> {
204 let method = req.method().clone();
205 let uri = req.uri().clone();
206
207 let request_origin = req
208 .headers()
209 .get("origin")
210 .and_then(|v| v.to_str().ok())
211 .map(|s| s.to_string());
212
213 info!(
214 "🌐 Lambda MCP request: {} {} (origin: {:?})",
215 method, uri, request_origin
216 );
217
218 #[cfg(feature = "cors")]
220 if method == http::Method::OPTIONS
221 && let Some(ref cors_config) = self.cors_config
222 {
223 debug!("Handling CORS preflight request");
224 return create_preflight_response(cors_config, request_origin.as_deref());
225 }
226
227 let hyper_req = crate::adapter::lambda_to_hyper_request(req)?;
229
230 let hyper_resp = self
232 .session_handler
233 .handle_mcp_request(hyper_req)
234 .await
235 .map_err(|e| crate::error::LambdaError::McpFramework(e.to_string()))?;
236
237 let mut lambda_resp = crate::adapter::hyper_to_lambda_response(hyper_resp).await?;
239
240 #[cfg(feature = "cors")]
242 if let Some(ref cors_config) = self.cors_config {
243 inject_cors_headers(&mut lambda_resp, cors_config, request_origin.as_deref())?;
244 }
245
246 Ok(lambda_resp)
247 }
248
249 pub async fn handle_streaming(
254 &self,
255 req: LambdaRequest,
256 ) -> std::result::Result<
257 lambda_http::Response<
258 http_body_util::combinators::UnsyncBoxBody<bytes::Bytes, hyper::Error>,
259 >,
260 Box<dyn std::error::Error + Send + Sync>,
261 > {
262 let method = req.method().clone();
263 let uri = req.uri().clone();
264 let request_origin = req
265 .headers()
266 .get("origin")
267 .and_then(|v| v.to_str().ok())
268 .map(|s| s.to_string());
269
270 debug!(
271 "🌊 Lambda streaming MCP request: {} {} (origin: {:?})",
272 method, uri, request_origin
273 );
274
275 #[cfg(feature = "cors")]
277 if method == http::Method::OPTIONS
278 && let Some(ref cors_config) = self.cors_config
279 {
280 debug!("Handling CORS preflight request (streaming)");
281 let preflight_response =
282 create_preflight_response(cors_config, request_origin.as_deref())
283 .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?;
284
285 return Ok(self.convert_lambda_response_to_streaming(preflight_response));
287 }
288
289 let hyper_req = crate::adapter::lambda_to_hyper_request(req)
291 .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?;
292
293 use turul_http_mcp_server::protocol::McpProtocolVersion;
295 let protocol_version = hyper_req
296 .headers()
297 .get("MCP-Protocol-Version")
298 .and_then(|h| h.to_str().ok())
299 .and_then(McpProtocolVersion::parse_version)
300 .unwrap_or(McpProtocolVersion::V2025_06_18);
301
302 let hyper_resp = if protocol_version.supports_streamable_http() {
304 debug!(
306 "Using StreamableHttpHandler for protocol {}",
307 protocol_version.to_string()
308 );
309 self.streamable_handler.handle_request(hyper_req).await
310 } else {
311 debug!(
313 "Using SessionMcpHandler for legacy protocol {}",
314 protocol_version.to_string()
315 );
316 self.session_handler
317 .handle_mcp_request(hyper_req)
318 .await
319 .map_err(|e| {
320 Box::new(crate::error::LambdaError::McpFramework(e.to_string()))
321 as Box<dyn std::error::Error + Send + Sync>
322 })?
323 };
324
325 let mut lambda_resp = crate::adapter::hyper_to_lambda_streaming(hyper_resp);
327
328 #[cfg(feature = "cors")]
330 if let Some(ref cors_config) = self.cors_config {
331 inject_cors_headers(&mut lambda_resp, cors_config, request_origin.as_deref())
332 .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?;
333 }
334
335 Ok(lambda_resp)
336 }
337
338 fn convert_lambda_response_to_streaming(
340 &self,
341 lambda_response: LambdaResponse<LambdaBody>,
342 ) -> lambda_http::Response<http_body_util::combinators::UnsyncBoxBody<bytes::Bytes, hyper::Error>>
343 {
344 use bytes::Bytes;
345 use http_body_util::{BodyExt, Full};
346
347 let (parts, body) = lambda_response.into_parts();
348 let body_bytes = match body {
349 LambdaBody::Empty => Bytes::new(),
350 LambdaBody::Text(text) => Bytes::from(text),
351 LambdaBody::Binary(bytes) => Bytes::from(bytes),
352 _ => Bytes::new(),
353 };
354
355 let streaming_body = Full::new(body_bytes)
357 .map_err(|e: std::convert::Infallible| match e {})
358 .boxed_unsync();
359
360 lambda_http::Response::from_parts(parts, streaming_body)
361 }
362}
363
364#[cfg(test)]
365mod tests {
366 use super::*;
367 use http::Request;
368 use turul_mcp_session_storage::InMemorySessionStorage;
369
370 #[tokio::test]
371 async fn test_handler_creation() {
372 let session_storage = Arc::new(InMemorySessionStorage::new());
373 let stream_manager = Arc::new(StreamManager::new(session_storage.clone()));
374 let dispatcher = JsonRpcDispatcher::new();
375 let config = ServerConfig::default();
376 let implementation = turul_mcp_protocol::Implementation::new("test", "1.0.0");
377 let capabilities = ServerCapabilities::default();
378
379 let handler = LambdaMcpHandler::new(
380 dispatcher,
381 session_storage,
382 stream_manager,
383 config,
384 StreamConfig::default(),
385 implementation,
386 capabilities,
387 false, #[cfg(feature = "cors")]
389 None,
390 );
391
392 assert!(!handler.sse_enabled);
394 }
395
396 #[tokio::test]
397 async fn test_sse_enabled_with_handle_works() {
398 let session_storage = Arc::new(InMemorySessionStorage::new());
399 let stream_manager = Arc::new(StreamManager::new(session_storage.clone()));
400 let dispatcher = JsonRpcDispatcher::new();
401 let config = ServerConfig::default();
402 let implementation = turul_mcp_protocol::Implementation::new("test", "1.0.0");
403 let capabilities = ServerCapabilities::default();
404
405 let handler = LambdaMcpHandler::new(
407 dispatcher,
408 session_storage,
409 stream_manager,
410 config,
411 StreamConfig::default(),
412 implementation,
413 capabilities,
414 true, #[cfg(feature = "cors")]
416 None,
417 );
418
419 let lambda_req = Request::builder()
421 .method("POST")
422 .uri("/mcp")
423 .body(LambdaBody::Text(
424 r#"{"jsonrpc":"2.0","method":"initialize","id":1}"#.to_string(),
425 ))
426 .unwrap();
427
428 let result = handler.handle(lambda_req).await;
430 assert!(
431 result.is_ok(),
432 "handle() should work with SSE enabled for snapshot-based responses"
433 );
434 }
435
436 #[tokio::test]
438 async fn test_stream_config_preservation() {
439 let session_storage = Arc::new(InMemorySessionStorage::new());
440 let dispatcher = JsonRpcDispatcher::new();
441 let config = ServerConfig::default();
442 let implementation = turul_mcp_protocol::Implementation::new("test", "1.0.0");
443 let capabilities = ServerCapabilities::default();
444
445 let custom_stream_config = StreamConfig {
447 channel_buffer_size: 1024, max_replay_events: 200, keepalive_interval_seconds: 10, cors_origin: "https://custom-test.example.com".to_string(), };
452
453 let stream_manager = Arc::new(StreamManager::with_config(
455 session_storage.clone(),
456 custom_stream_config.clone(),
457 ));
458
459 let handler = LambdaMcpHandler::new(
460 dispatcher,
461 session_storage,
462 stream_manager,
463 config,
464 custom_stream_config.clone(),
465 implementation,
466 capabilities,
467 false, #[cfg(feature = "cors")]
469 None,
470 );
471
472 assert!(!handler.sse_enabled);
474
475 let stream_manager = handler.get_stream_manager();
477
478 let actual_config = stream_manager.get_config();
480
481 assert_eq!(
482 actual_config.channel_buffer_size, custom_stream_config.channel_buffer_size,
483 "Custom channel_buffer_size was not propagated correctly"
484 );
485 assert_eq!(
486 actual_config.max_replay_events, custom_stream_config.max_replay_events,
487 "Custom max_replay_events was not propagated correctly"
488 );
489 assert_eq!(
490 actual_config.keepalive_interval_seconds,
491 custom_stream_config.keepalive_interval_seconds,
492 "Custom keepalive_interval_seconds was not propagated correctly"
493 );
494 assert_eq!(
495 actual_config.cors_origin, custom_stream_config.cors_origin,
496 "Custom cors_origin was not propagated correctly"
497 );
498
499 assert!(Arc::strong_count(stream_manager) >= 1);
501 }
502
503 #[tokio::test]
505 async fn test_full_builder_chain_stream_config() {
506 use crate::LambdaMcpServerBuilder;
507 use turul_mcp_session_storage::InMemorySessionStorage;
508
509 let custom_stream_config = turul_http_mcp_server::StreamConfig {
511 channel_buffer_size: 2048, max_replay_events: 500, keepalive_interval_seconds: 15, cors_origin: "https://full-chain-test.example.com".to_string(),
515 };
516
517 let server = LambdaMcpServerBuilder::new()
519 .name("full-chain-test")
520 .version("1.0.0")
521 .storage(Arc::new(InMemorySessionStorage::new()))
522 .sse(true) .stream_config(custom_stream_config.clone())
524 .build()
525 .await
526 .expect("Server should build successfully");
527
528 let handler = server
530 .handler()
531 .await
532 .expect("Handler should be created from server");
533
534 assert!(handler.sse_enabled, "SSE should be enabled");
536
537 let stream_manager = handler.get_stream_manager();
539 let actual_config = stream_manager.get_config();
540
541 assert_eq!(
542 actual_config.channel_buffer_size, custom_stream_config.channel_buffer_size,
543 "Custom channel_buffer_size should be preserved through builder → server → handler chain"
544 );
545 assert_eq!(
546 actual_config.max_replay_events, custom_stream_config.max_replay_events,
547 "Custom max_replay_events should be preserved through builder → server → handler chain"
548 );
549 assert_eq!(
550 actual_config.keepalive_interval_seconds,
551 custom_stream_config.keepalive_interval_seconds,
552 "Custom keepalive_interval_seconds should be preserved through builder → server → handler chain"
553 );
554 assert_eq!(
555 actual_config.cors_origin, custom_stream_config.cors_origin,
556 "Custom cors_origin should be preserved through builder → server → handler chain"
557 );
558
559 assert!(
561 Arc::strong_count(stream_manager) >= 1,
562 "Stream manager should be properly initialized"
563 );
564
565 let test_session_id = uuid::Uuid::now_v7().as_simple().to_string();
568
569 let subscriptions = stream_manager.get_subscriptions(&test_session_id).await;
572 assert!(
573 subscriptions.is_empty(),
574 "New session should have no subscriptions initially"
575 );
576
577 assert_eq!(
580 stream_manager.get_config().channel_buffer_size,
581 2048,
582 "Stream manager should be using the custom buffer size functionally"
583 );
584 }
585
586 #[tokio::test]
591 async fn test_non_streaming_runtime_sse_false() {
592 use crate::LambdaMcpServerBuilder;
593 use turul_mcp_session_storage::InMemorySessionStorage;
594
595 let server = LambdaMcpServerBuilder::new()
596 .name("test-non-streaming-sse-false")
597 .version("1.0.0")
598 .storage(Arc::new(InMemorySessionStorage::new()))
599 .sse(false) .build()
601 .await
602 .expect("Server should build successfully");
603
604 let handler = server
605 .handler()
606 .await
607 .expect("Handler should be created from server");
608
609 assert!(!handler.sse_enabled, "SSE should be disabled");
611
612 let lambda_req = Request::builder()
614 .method("POST")
615 .uri("/mcp")
616 .body(LambdaBody::Text(
617 r#"{"jsonrpc":"2.0","method":"initialize","id":1}"#.to_string(),
618 ))
619 .unwrap();
620
621 let result = handler.handle(lambda_req).await;
623 assert!(
624 result.is_ok(),
625 "POST /mcp should work with non-streaming + sse(false)"
626 );
627 }
628
629 #[tokio::test]
631 async fn test_non_streaming_runtime_sse_true() {
632 use crate::LambdaMcpServerBuilder;
633 use turul_mcp_session_storage::InMemorySessionStorage;
634
635 let server = LambdaMcpServerBuilder::new()
636 .name("test-non-streaming-sse-true")
637 .version("1.0.0")
638 .storage(Arc::new(InMemorySessionStorage::new()))
639 .sse(true) .build()
641 .await
642 .expect("Server should build successfully");
643
644 let handler = server
645 .handler()
646 .await
647 .expect("Handler should be created from server");
648
649 assert!(handler.sse_enabled, "SSE should be enabled");
651
652 let lambda_req = Request::builder()
654 .method("POST")
655 .uri("/mcp")
656 .body(LambdaBody::Text(
657 r#"{"jsonrpc":"2.0","method":"initialize","id":1}"#.to_string(),
658 ))
659 .unwrap();
660
661 let result = handler.handle(lambda_req).await;
663 assert!(
664 result.is_ok(),
665 "POST /mcp should work with non-streaming + sse(true)"
666 );
667
668 }
671
672 #[tokio::test]
674 async fn test_streaming_runtime_sse_false() {
675 use crate::LambdaMcpServerBuilder;
676 use turul_mcp_session_storage::InMemorySessionStorage;
677
678 let server = LambdaMcpServerBuilder::new()
679 .name("test-streaming-sse-false")
680 .version("1.0.0")
681 .storage(Arc::new(InMemorySessionStorage::new()))
682 .sse(false) .build()
684 .await
685 .expect("Server should build successfully");
686
687 let handler = server
688 .handler()
689 .await
690 .expect("Handler should be created from server");
691
692 assert!(!handler.sse_enabled, "SSE should be disabled");
694
695 let lambda_req = Request::builder()
697 .method("POST")
698 .uri("/mcp")
699 .body(LambdaBody::Text(
700 r#"{"jsonrpc":"2.0","method":"initialize","id":1}"#.to_string(),
701 ))
702 .unwrap();
703
704 let result = handler.handle_streaming(lambda_req).await;
706 assert!(
707 result.is_ok(),
708 "Streaming runtime should work with sse(false)"
709 );
710 }
711
712 #[tokio::test]
714 async fn test_streaming_runtime_sse_true() {
715 use crate::LambdaMcpServerBuilder;
716 use turul_mcp_session_storage::InMemorySessionStorage;
717
718 let server = LambdaMcpServerBuilder::new()
719 .name("test-streaming-sse-true")
720 .version("1.0.0")
721 .storage(Arc::new(InMemorySessionStorage::new()))
722 .sse(true) .build()
724 .await
725 .expect("Server should build successfully");
726
727 let handler = server
728 .handler()
729 .await
730 .expect("Handler should be created from server");
731
732 assert!(handler.sse_enabled, "SSE should be enabled");
734
735 let lambda_req = Request::builder()
737 .method("POST")
738 .uri("/mcp")
739 .body(LambdaBody::Text(
740 r#"{"jsonrpc":"2.0","method":"initialize","id":1}"#.to_string(),
741 ))
742 .unwrap();
743
744 let result = handler.handle_streaming(lambda_req).await;
746 assert!(
747 result.is_ok(),
748 "Streaming runtime should work with sse(true) for real-time streaming"
749 );
750
751 }
754}