1use std::sync::Arc;
7
8use lambda_http::{Body as LambdaBody, Request as LambdaRequest, Response as LambdaResponse};
9use tracing::{debug, info};
10
11use turul_http_mcp_server::{
12 ServerConfig, SessionMcpHandler, StreamConfig, StreamManager, StreamableHttpHandler,
13};
14use turul_mcp_json_rpc_server::JsonRpcDispatcher;
15use turul_mcp_protocol::{McpError, ServerCapabilities};
16use turul_mcp_session_storage::BoxedSessionStorage;
17
18use crate::error::Result;
19
20#[cfg(feature = "cors")]
21use crate::cors::{CorsConfig, create_preflight_response, inject_cors_headers};
22
23#[derive(Clone)]
34pub struct LambdaMcpHandler {
35 session_handler: SessionMcpHandler,
37
38 streamable_handler: StreamableHttpHandler,
40
41 #[allow(dead_code)]
43 sse_enabled: bool,
44
45 route_registry: Arc<turul_http_mcp_server::RouteRegistry>,
47
48 #[cfg(feature = "cors")]
50 cors_config: Option<CorsConfig>,
51}
52
53impl LambdaMcpHandler {
54 #[allow(clippy::too_many_arguments)]
56 pub fn new(
57 dispatcher: JsonRpcDispatcher<McpError>,
58 session_storage: Arc<BoxedSessionStorage>,
59 stream_manager: Arc<StreamManager>,
60 config: ServerConfig,
61 stream_config: StreamConfig,
62 _implementation: turul_mcp_protocol::Implementation,
63 capabilities: ServerCapabilities,
64 sse_enabled: bool,
65 #[cfg(feature = "cors")] cors_config: Option<CorsConfig>,
66 ) -> Self {
67 let dispatcher = Arc::new(dispatcher);
68
69 let middleware_stack = Arc::new(turul_http_mcp_server::middleware::MiddlewareStack::new());
71
72 let session_handler = SessionMcpHandler::with_shared_stream_manager(
74 config.clone(),
75 dispatcher.clone(),
76 session_storage.clone(),
77 stream_config.clone(),
78 stream_manager.clone(),
79 middleware_stack.clone(),
80 );
81
82 let streamable_handler = StreamableHttpHandler::new(
84 Arc::new(config.clone()),
85 dispatcher.clone(),
86 session_storage.clone(),
87 stream_manager.clone(),
88 capabilities.clone(),
89 middleware_stack,
90 );
91
92 Self {
93 session_handler,
94 streamable_handler,
95 sse_enabled,
96 route_registry: Arc::new(turul_http_mcp_server::RouteRegistry::new()),
97 #[cfg(feature = "cors")]
98 cors_config,
99 }
100 }
101
102 #[allow(clippy::too_many_arguments)]
104 pub fn with_shared_stream_manager(
105 config: ServerConfig,
106 dispatcher: Arc<JsonRpcDispatcher<McpError>>,
107 session_storage: Arc<BoxedSessionStorage>,
108 stream_manager: Arc<StreamManager>,
109 stream_config: StreamConfig,
110 _implementation: turul_mcp_protocol::Implementation,
111 capabilities: ServerCapabilities,
112 sse_enabled: bool,
113 ) -> Self {
114 let middleware_stack = Arc::new(turul_http_mcp_server::middleware::MiddlewareStack::new());
116
117 let session_handler = SessionMcpHandler::with_shared_stream_manager(
119 config.clone(),
120 dispatcher.clone(),
121 session_storage.clone(),
122 stream_config.clone(),
123 stream_manager.clone(),
124 middleware_stack.clone(),
125 );
126
127 let streamable_handler = StreamableHttpHandler::new(
129 Arc::new(config),
130 dispatcher,
131 session_storage,
132 stream_manager,
133 capabilities,
134 middleware_stack,
135 );
136
137 Self {
138 session_handler,
139 streamable_handler,
140 sse_enabled,
141 route_registry: Arc::new(turul_http_mcp_server::RouteRegistry::new()),
142 #[cfg(feature = "cors")]
143 cors_config: None,
144 }
145 }
146
147 #[allow(clippy::too_many_arguments)]
149 pub fn with_middleware(
150 config: ServerConfig,
151 dispatcher: Arc<JsonRpcDispatcher<McpError>>,
152 session_storage: Arc<BoxedSessionStorage>,
153 stream_manager: Arc<StreamManager>,
154 stream_config: StreamConfig,
155 capabilities: ServerCapabilities,
156 middleware_stack: Arc<turul_http_mcp_server::middleware::MiddlewareStack>,
157 sse_enabled: bool,
158 route_registry: Arc<turul_http_mcp_server::RouteRegistry>,
159 ) -> Self {
160 let session_handler = SessionMcpHandler::with_shared_stream_manager(
162 config.clone(),
163 dispatcher.clone(),
164 session_storage.clone(),
165 stream_config.clone(),
166 stream_manager.clone(),
167 middleware_stack.clone(),
168 );
169
170 let streamable_handler = StreamableHttpHandler::new(
172 Arc::new(config),
173 dispatcher,
174 session_storage,
175 stream_manager,
176 capabilities,
177 middleware_stack,
178 );
179
180 Self {
181 session_handler,
182 streamable_handler,
183 sse_enabled,
184 route_registry,
185 #[cfg(feature = "cors")]
186 cors_config: None,
187 }
188 }
189
190 #[cfg(feature = "cors")]
192 pub fn with_cors(mut self, cors_config: CorsConfig) -> Self {
193 self.cors_config = Some(cors_config);
194 self
195 }
196
197 pub fn get_stream_manager(&self) -> &Arc<StreamManager> {
199 self.session_handler.get_stream_manager()
200 }
201
202 pub async fn handle(&self, req: LambdaRequest) -> Result<LambdaResponse<LambdaBody>> {
211 let method = req.method().clone();
212 let uri = req.uri().clone();
213
214 let request_origin = req
215 .headers()
216 .get("origin")
217 .and_then(|v| v.to_str().ok())
218 .map(|s| s.to_string());
219
220 info!(
221 "🌐 Lambda MCP request: {} {} (origin: {:?})",
222 method, uri, request_origin
223 );
224
225 #[cfg(feature = "cors")]
227 if method == http::Method::OPTIONS
228 && let Some(ref cors_config) = self.cors_config
229 {
230 debug!("Handling CORS preflight request");
231 return create_preflight_response(cors_config, request_origin.as_deref());
232 }
233
234 let hyper_req = crate::adapter::lambda_to_hyper_request(req)?;
236
237 let path = hyper_req.uri().path().to_string();
239 if !self.route_registry.is_empty() {
240 match self.route_registry.match_route(&path) {
241 Ok(Some(route_handler)) => {
242 debug!("Custom route matched: {}", path);
243 use http_body_util::BodyExt;
244 let (parts, body) = hyper_req.into_parts();
245 let boxed_req = hyper::Request::from_parts(parts, body.boxed_unsync());
246 let route_resp = route_handler.handle(boxed_req).await;
247 let mut lambda_resp =
248 crate::adapter::hyper_to_lambda_response(route_resp).await?;
249 #[cfg(feature = "cors")]
250 if let Some(ref cors_config) = self.cors_config {
251 inject_cors_headers(
252 &mut lambda_resp,
253 cors_config,
254 request_origin.as_deref(),
255 )?;
256 }
257 return Ok(lambda_resp);
258 }
259 Ok(None) => {} Err(e) => {
261 debug!("Route validation error: {}", e);
262 let route_resp = e.into_response();
263 let mut lambda_resp =
264 crate::adapter::hyper_to_lambda_response(route_resp).await?;
265 #[cfg(feature = "cors")]
266 if let Some(ref cors_config) = self.cors_config {
267 inject_cors_headers(
268 &mut lambda_resp,
269 cors_config,
270 request_origin.as_deref(),
271 )?;
272 }
273 return Ok(lambda_resp);
274 }
275 }
276 }
277
278 let hyper_resp = self
280 .session_handler
281 .handle_mcp_request(hyper_req)
282 .await
283 .map_err(|e| crate::error::LambdaError::McpFramework(e.to_string()))?;
284
285 let mut lambda_resp = crate::adapter::hyper_to_lambda_response(hyper_resp).await?;
287
288 #[cfg(feature = "cors")]
290 if let Some(ref cors_config) = self.cors_config {
291 inject_cors_headers(&mut lambda_resp, cors_config, request_origin.as_deref())?;
292 }
293
294 Ok(lambda_resp)
295 }
296
297 pub async fn handle_streaming(
302 &self,
303 req: LambdaRequest,
304 ) -> std::result::Result<
305 lambda_http::Response<
306 http_body_util::combinators::UnsyncBoxBody<bytes::Bytes, hyper::Error>,
307 >,
308 Box<dyn std::error::Error + Send + Sync>,
309 > {
310 let method = req.method().clone();
311 let uri = req.uri().clone();
312 let request_origin = req
313 .headers()
314 .get("origin")
315 .and_then(|v| v.to_str().ok())
316 .map(|s| s.to_string());
317
318 debug!(
319 "🌊 Lambda streaming MCP request: {} {} (origin: {:?})",
320 method, uri, request_origin
321 );
322
323 #[cfg(feature = "cors")]
325 if method == http::Method::OPTIONS
326 && let Some(ref cors_config) = self.cors_config
327 {
328 debug!("Handling CORS preflight request (streaming)");
329 let preflight_response =
330 create_preflight_response(cors_config, request_origin.as_deref())
331 .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?;
332
333 return Ok(self.convert_lambda_response_to_streaming(preflight_response));
335 }
336
337 let hyper_req = crate::adapter::lambda_to_hyper_request(req)
339 .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?;
340
341 let path = hyper_req.uri().path().to_string();
343 if !self.route_registry.is_empty() {
344 match self.route_registry.match_route(&path) {
345 Ok(Some(route_handler)) => {
346 debug!("Custom route matched (streaming): {}", path);
347 use http_body_util::BodyExt;
348 let (parts, body) = hyper_req.into_parts();
349 let boxed_req = hyper::Request::from_parts(parts, body.boxed_unsync());
350 return Ok(route_handler.handle(boxed_req).await);
351 }
352 Ok(None) => {} Err(e) => {
354 debug!("Route validation error (streaming): {}", e);
355 return Ok(e.into_response());
356 }
357 }
358 }
359
360 use turul_http_mcp_server::protocol::McpProtocolVersion;
362 let protocol_version = hyper_req
363 .headers()
364 .get("MCP-Protocol-Version")
365 .and_then(|h| h.to_str().ok())
366 .and_then(McpProtocolVersion::parse_version)
367 .unwrap_or(McpProtocolVersion::V2025_06_18);
368
369 let hyper_resp = if protocol_version.supports_streamable_http() {
371 debug!(
373 "Using StreamableHttpHandler for protocol {}",
374 protocol_version.to_string()
375 );
376 self.streamable_handler.handle_request(hyper_req).await
377 } else {
378 debug!(
380 "Using SessionMcpHandler for legacy protocol {}",
381 protocol_version.to_string()
382 );
383 self.session_handler
384 .handle_mcp_request(hyper_req)
385 .await
386 .map_err(|e| {
387 Box::new(crate::error::LambdaError::McpFramework(e.to_string()))
388 as Box<dyn std::error::Error + Send + Sync>
389 })?
390 };
391
392 let mut lambda_resp = crate::adapter::hyper_to_lambda_streaming(hyper_resp);
394
395 #[cfg(feature = "cors")]
397 if let Some(ref cors_config) = self.cors_config {
398 inject_cors_headers(&mut lambda_resp, cors_config, request_origin.as_deref())
399 .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?;
400 }
401
402 Ok(lambda_resp)
403 }
404
405 fn convert_lambda_response_to_streaming(
407 &self,
408 lambda_response: LambdaResponse<LambdaBody>,
409 ) -> lambda_http::Response<http_body_util::combinators::UnsyncBoxBody<bytes::Bytes, hyper::Error>>
410 {
411 use bytes::Bytes;
412 use http_body_util::{BodyExt, Full};
413
414 let (parts, body) = lambda_response.into_parts();
415 let body_bytes = match body {
416 LambdaBody::Empty => Bytes::new(),
417 LambdaBody::Text(text) => Bytes::from(text),
418 LambdaBody::Binary(bytes) => Bytes::from(bytes),
419 _ => Bytes::new(),
420 };
421
422 let streaming_body = Full::new(body_bytes)
424 .map_err(|e: std::convert::Infallible| match e {})
425 .boxed_unsync();
426
427 lambda_http::Response::from_parts(parts, streaming_body)
428 }
429}
430
431#[cfg(test)]
432mod tests {
433 use super::*;
434 use http::Request;
435 use turul_mcp_session_storage::InMemorySessionStorage;
436
437 #[tokio::test]
438 async fn test_handler_creation() {
439 let session_storage = Arc::new(InMemorySessionStorage::new());
440 let stream_manager = Arc::new(StreamManager::new(session_storage.clone()));
441 let dispatcher = JsonRpcDispatcher::new();
442 let config = ServerConfig::default();
443 let implementation = turul_mcp_protocol::Implementation::new("test", "1.0.0");
444 let capabilities = ServerCapabilities::default();
445
446 let handler = LambdaMcpHandler::new(
447 dispatcher,
448 session_storage,
449 stream_manager,
450 config,
451 StreamConfig::default(),
452 implementation,
453 capabilities,
454 false, #[cfg(feature = "cors")]
456 None,
457 );
458
459 assert!(!handler.sse_enabled);
461 }
462
463 #[tokio::test]
464 async fn test_sse_enabled_with_handle_works() {
465 let session_storage = Arc::new(InMemorySessionStorage::new());
466 let stream_manager = Arc::new(StreamManager::new(session_storage.clone()));
467 let dispatcher = JsonRpcDispatcher::new();
468 let config = ServerConfig::default();
469 let implementation = turul_mcp_protocol::Implementation::new("test", "1.0.0");
470 let capabilities = ServerCapabilities::default();
471
472 let handler = LambdaMcpHandler::new(
474 dispatcher,
475 session_storage,
476 stream_manager,
477 config,
478 StreamConfig::default(),
479 implementation,
480 capabilities,
481 true, #[cfg(feature = "cors")]
483 None,
484 );
485
486 let lambda_req = Request::builder()
488 .method("POST")
489 .uri("/mcp")
490 .body(LambdaBody::Text(
491 r#"{"jsonrpc":"2.0","method":"initialize","id":1}"#.to_string(),
492 ))
493 .unwrap();
494
495 let result = handler.handle(lambda_req).await;
497 assert!(
498 result.is_ok(),
499 "handle() should work with SSE enabled for snapshot-based responses"
500 );
501 }
502
503 #[tokio::test]
505 async fn test_stream_config_preservation() {
506 let session_storage = Arc::new(InMemorySessionStorage::new());
507 let dispatcher = JsonRpcDispatcher::new();
508 let config = ServerConfig::default();
509 let implementation = turul_mcp_protocol::Implementation::new("test", "1.0.0");
510 let capabilities = ServerCapabilities::default();
511
512 let custom_stream_config = StreamConfig {
514 channel_buffer_size: 1024, max_replay_events: 200, keepalive_interval_seconds: 10, cors_origin: "https://custom-test.example.com".to_string(), };
519
520 let stream_manager = Arc::new(StreamManager::with_config(
522 session_storage.clone(),
523 custom_stream_config.clone(),
524 ));
525
526 let handler = LambdaMcpHandler::new(
527 dispatcher,
528 session_storage,
529 stream_manager,
530 config,
531 custom_stream_config.clone(),
532 implementation,
533 capabilities,
534 false, #[cfg(feature = "cors")]
536 None,
537 );
538
539 assert!(!handler.sse_enabled);
541
542 let stream_manager = handler.get_stream_manager();
544
545 let actual_config = stream_manager.get_config();
547
548 assert_eq!(
549 actual_config.channel_buffer_size, custom_stream_config.channel_buffer_size,
550 "Custom channel_buffer_size was not propagated correctly"
551 );
552 assert_eq!(
553 actual_config.max_replay_events, custom_stream_config.max_replay_events,
554 "Custom max_replay_events was not propagated correctly"
555 );
556 assert_eq!(
557 actual_config.keepalive_interval_seconds,
558 custom_stream_config.keepalive_interval_seconds,
559 "Custom keepalive_interval_seconds was not propagated correctly"
560 );
561 assert_eq!(
562 actual_config.cors_origin, custom_stream_config.cors_origin,
563 "Custom cors_origin was not propagated correctly"
564 );
565
566 assert!(Arc::strong_count(stream_manager) >= 1);
568 }
569
570 #[tokio::test]
572 async fn test_full_builder_chain_stream_config() {
573 use crate::LambdaMcpServerBuilder;
574 use turul_mcp_session_storage::InMemorySessionStorage;
575
576 let custom_stream_config = turul_http_mcp_server::StreamConfig {
578 channel_buffer_size: 2048, max_replay_events: 500, keepalive_interval_seconds: 15, cors_origin: "https://full-chain-test.example.com".to_string(),
582 };
583
584 let server = LambdaMcpServerBuilder::new()
586 .name("full-chain-test")
587 .version("1.0.0")
588 .storage(Arc::new(InMemorySessionStorage::new()))
589 .sse(true) .stream_config(custom_stream_config.clone())
591 .build()
592 .await
593 .expect("Server should build successfully");
594
595 let handler = server
597 .handler()
598 .await
599 .expect("Handler should be created from server");
600
601 assert!(handler.sse_enabled, "SSE should be enabled");
603
604 let stream_manager = handler.get_stream_manager();
606 let actual_config = stream_manager.get_config();
607
608 assert_eq!(
609 actual_config.channel_buffer_size, custom_stream_config.channel_buffer_size,
610 "Custom channel_buffer_size should be preserved through builder → server → handler chain"
611 );
612 assert_eq!(
613 actual_config.max_replay_events, custom_stream_config.max_replay_events,
614 "Custom max_replay_events should be preserved through builder → server → handler chain"
615 );
616 assert_eq!(
617 actual_config.keepalive_interval_seconds,
618 custom_stream_config.keepalive_interval_seconds,
619 "Custom keepalive_interval_seconds should be preserved through builder → server → handler chain"
620 );
621 assert_eq!(
622 actual_config.cors_origin, custom_stream_config.cors_origin,
623 "Custom cors_origin should be preserved through builder → server → handler chain"
624 );
625
626 assert!(
628 Arc::strong_count(stream_manager) >= 1,
629 "Stream manager should be properly initialized"
630 );
631
632 let test_session_id = uuid::Uuid::now_v7().as_simple().to_string();
635
636 let subscriptions = stream_manager.get_subscriptions(&test_session_id).await;
639 assert!(
640 subscriptions.is_empty(),
641 "New session should have no subscriptions initially"
642 );
643
644 assert_eq!(
647 stream_manager.get_config().channel_buffer_size,
648 2048,
649 "Stream manager should be using the custom buffer size functionally"
650 );
651 }
652
653 #[tokio::test]
658 async fn test_non_streaming_runtime_sse_false() {
659 use crate::LambdaMcpServerBuilder;
660 use turul_mcp_session_storage::InMemorySessionStorage;
661
662 let server = LambdaMcpServerBuilder::new()
663 .name("test-non-streaming-sse-false")
664 .version("1.0.0")
665 .storage(Arc::new(InMemorySessionStorage::new()))
666 .sse(false) .build()
668 .await
669 .expect("Server should build successfully");
670
671 let handler = server
672 .handler()
673 .await
674 .expect("Handler should be created from server");
675
676 assert!(!handler.sse_enabled, "SSE should be disabled");
678
679 let lambda_req = Request::builder()
681 .method("POST")
682 .uri("/mcp")
683 .body(LambdaBody::Text(
684 r#"{"jsonrpc":"2.0","method":"initialize","id":1}"#.to_string(),
685 ))
686 .unwrap();
687
688 let result = handler.handle(lambda_req).await;
690 assert!(
691 result.is_ok(),
692 "POST /mcp should work with non-streaming + sse(false)"
693 );
694 }
695
696 #[tokio::test]
698 async fn test_non_streaming_runtime_sse_true() {
699 use crate::LambdaMcpServerBuilder;
700 use turul_mcp_session_storage::InMemorySessionStorage;
701
702 let server = LambdaMcpServerBuilder::new()
703 .name("test-non-streaming-sse-true")
704 .version("1.0.0")
705 .storage(Arc::new(InMemorySessionStorage::new()))
706 .sse(true) .build()
708 .await
709 .expect("Server should build successfully");
710
711 let handler = server
712 .handler()
713 .await
714 .expect("Handler should be created from server");
715
716 assert!(handler.sse_enabled, "SSE should be enabled");
718
719 let lambda_req = Request::builder()
721 .method("POST")
722 .uri("/mcp")
723 .body(LambdaBody::Text(
724 r#"{"jsonrpc":"2.0","method":"initialize","id":1}"#.to_string(),
725 ))
726 .unwrap();
727
728 let result = handler.handle(lambda_req).await;
730 assert!(
731 result.is_ok(),
732 "POST /mcp should work with non-streaming + sse(true)"
733 );
734
735 }
738
739 #[tokio::test]
741 async fn test_streaming_runtime_sse_false() {
742 use crate::LambdaMcpServerBuilder;
743 use turul_mcp_session_storage::InMemorySessionStorage;
744
745 let server = LambdaMcpServerBuilder::new()
746 .name("test-streaming-sse-false")
747 .version("1.0.0")
748 .storage(Arc::new(InMemorySessionStorage::new()))
749 .sse(false) .build()
751 .await
752 .expect("Server should build successfully");
753
754 let handler = server
755 .handler()
756 .await
757 .expect("Handler should be created from server");
758
759 assert!(!handler.sse_enabled, "SSE should be disabled");
761
762 let lambda_req = Request::builder()
764 .method("POST")
765 .uri("/mcp")
766 .body(LambdaBody::Text(
767 r#"{"jsonrpc":"2.0","method":"initialize","id":1}"#.to_string(),
768 ))
769 .unwrap();
770
771 let result = handler.handle_streaming(lambda_req).await;
773 assert!(
774 result.is_ok(),
775 "Streaming runtime should work with sse(false)"
776 );
777 }
778
779 #[tokio::test]
781 async fn test_streaming_runtime_sse_true() {
782 use crate::LambdaMcpServerBuilder;
783 use turul_mcp_session_storage::InMemorySessionStorage;
784
785 let server = LambdaMcpServerBuilder::new()
786 .name("test-streaming-sse-true")
787 .version("1.0.0")
788 .storage(Arc::new(InMemorySessionStorage::new()))
789 .sse(true) .build()
791 .await
792 .expect("Server should build successfully");
793
794 let handler = server
795 .handler()
796 .await
797 .expect("Handler should be created from server");
798
799 assert!(handler.sse_enabled, "SSE should be enabled");
801
802 let lambda_req = Request::builder()
804 .method("POST")
805 .uri("/mcp")
806 .body(LambdaBody::Text(
807 r#"{"jsonrpc":"2.0","method":"initialize","id":1}"#.to_string(),
808 ))
809 .unwrap();
810
811 let result = handler.handle_streaming(lambda_req).await;
813 assert!(
814 result.is_ok(),
815 "Streaming runtime should work with sse(true) for real-time streaming"
816 );
817
818 }
821
822 async fn build_strict_streaming_handler() -> LambdaMcpHandler {
826 use crate::LambdaMcpServerBuilder;
827 use turul_mcp_session_storage::InMemorySessionStorage;
828
829 let server = LambdaMcpServerBuilder::new()
830 .name("lifecycle-test")
831 .version("1.0.0")
832 .tool(LifecycleTestTool)
833 .storage(Arc::new(InMemorySessionStorage::new()))
834 .strict_lifecycle(true) .sse(true)
836 .build()
837 .await
838 .expect("build should succeed");
839
840 server.handler().await.expect("handler should succeed")
841 }
842
843 #[derive(Clone, Default)]
845 struct LifecycleTestTool;
846
847 impl turul_mcp_builders::traits::HasBaseMetadata for LifecycleTestTool {
848 fn name(&self) -> &str {
849 "ping_tool"
850 }
851 }
852 impl turul_mcp_builders::traits::HasDescription for LifecycleTestTool {
853 fn description(&self) -> Option<&str> {
854 Some("test tool")
855 }
856 }
857 impl turul_mcp_builders::traits::HasInputSchema for LifecycleTestTool {
858 fn input_schema(&self) -> &turul_mcp_protocol::ToolSchema {
859 static SCHEMA: std::sync::OnceLock<turul_mcp_protocol::ToolSchema> =
860 std::sync::OnceLock::new();
861 SCHEMA.get_or_init(turul_mcp_protocol::ToolSchema::object)
862 }
863 }
864 impl turul_mcp_builders::traits::HasOutputSchema for LifecycleTestTool {
865 fn output_schema(&self) -> Option<&turul_mcp_protocol::ToolSchema> {
866 None
867 }
868 }
869 impl turul_mcp_builders::traits::HasAnnotations for LifecycleTestTool {
870 fn annotations(&self) -> Option<&turul_mcp_protocol::tools::ToolAnnotations> {
871 None
872 }
873 }
874 impl turul_mcp_builders::traits::HasToolMeta for LifecycleTestTool {
875 fn tool_meta(&self) -> Option<&std::collections::HashMap<String, serde_json::Value>> {
876 None
877 }
878 }
879 impl turul_mcp_builders::traits::HasIcons for LifecycleTestTool {}
880 impl turul_mcp_builders::traits::HasExecution for LifecycleTestTool {}
881
882 #[async_trait::async_trait]
883 impl turul_mcp_server::McpTool for LifecycleTestTool {
884 async fn call(
885 &self,
886 _args: serde_json::Value,
887 _session: Option<turul_mcp_server::SessionContext>,
888 ) -> turul_mcp_server::McpResult<turul_mcp_protocol::tools::CallToolResult> {
889 Ok(turul_mcp_protocol::tools::CallToolResult::success(vec![
890 turul_mcp_protocol::tools::ToolResult::text("pong"),
891 ]))
892 }
893 }
894
895 fn streaming_mcp_request(body: &str, session_id: Option<&str>) -> LambdaRequest {
897 let mut builder = Request::builder()
898 .method("POST")
899 .uri("/mcp")
900 .header("Content-Type", "application/json")
901 .header("Accept", "application/json, text/event-stream")
902 .header("MCP-Protocol-Version", "2025-11-25");
903
904 if let Some(sid) = session_id {
905 builder = builder.header("Mcp-Session-Id", sid);
906 }
907
908 builder.body(LambdaBody::Text(body.to_string())).unwrap()
909 }
910
911 async fn collect_streaming_body(
913 response: lambda_http::Response<
914 http_body_util::combinators::UnsyncBoxBody<bytes::Bytes, hyper::Error>,
915 >,
916 ) -> (http::StatusCode, String) {
917 use http_body_util::BodyExt;
918 let status = response.status();
919 let session_id = response
920 .headers()
921 .get("Mcp-Session-Id")
922 .and_then(|v| v.to_str().ok())
923 .map(String::from);
924 let body_bytes = response
925 .into_body()
926 .collect()
927 .await
928 .map(|c| c.to_bytes())
929 .unwrap_or_default();
930 let body_str = String::from_utf8_lossy(&body_bytes).to_string();
931 let _ = session_id; (status, body_str)
933 }
934
935 fn extract_session_id(
937 response: &lambda_http::Response<
938 http_body_util::combinators::UnsyncBoxBody<bytes::Bytes, hyper::Error>,
939 >,
940 ) -> Option<String> {
941 response
942 .headers()
943 .get("Mcp-Session-Id")
944 .and_then(|v| v.to_str().ok())
945 .map(String::from)
946 }
947
948 fn parse_response_json(body: &str) -> serde_json::Value {
950 let json_str = body
952 .lines()
953 .find(|line| line.starts_with("data: "))
954 .map(|line| &line[6..])
955 .unwrap_or(body.trim());
956 serde_json::from_str(json_str)
957 .unwrap_or_else(|e| panic!("Failed to parse JSON from body: {e}\nBody: {body}"))
958 }
959
960 #[tokio::test]
962 async fn test_lambda_streaming_strict_handshake_succeeds() {
963 let handler = build_strict_streaming_handler().await;
964
965 let init_req = streaming_mcp_request(
967 &serde_json::json!({
968 "jsonrpc": "2.0", "method": "initialize", "id": 1,
969 "params": {
970 "protocolVersion": "2025-11-25",
971 "capabilities": {},
972 "clientInfo": { "name": "test", "version": "1.0.0" }
973 }
974 })
975 .to_string(),
976 None,
977 );
978 let init_resp = handler
979 .handle_streaming(init_req)
980 .await
981 .expect("initialize should succeed");
982 let session_id = extract_session_id(&init_resp).expect("must return session ID");
983 let (status, _body) = collect_streaming_body(init_resp).await;
984 assert_eq!(status, 200, "initialize should return 200");
985
986 let notif_req = streaming_mcp_request(
988 &serde_json::json!({
989 "jsonrpc": "2.0",
990 "method": "notifications/initialized",
991 "params": {}
992 })
993 .to_string(),
994 Some(&session_id),
995 );
996 let notif_resp = handler
997 .handle_streaming(notif_req)
998 .await
999 .expect("notification should succeed");
1000 let (status, _) = collect_streaming_body(notif_resp).await;
1001 assert_eq!(status, 202, "notifications/initialized should return 202");
1002
1003 let list_req = streaming_mcp_request(
1005 &serde_json::json!({
1006 "jsonrpc": "2.0", "method": "tools/list", "id": 2
1007 })
1008 .to_string(),
1009 Some(&session_id),
1010 );
1011 let list_resp = handler
1012 .handle_streaming(list_req)
1013 .await
1014 .expect("tools/list should succeed");
1015 let (status, body) = collect_streaming_body(list_resp).await;
1016 assert_eq!(status, 200, "tools/list should return 200");
1017 let json = parse_response_json(&body);
1018 assert!(
1019 json["result"]["tools"].is_array(),
1020 "tools/list should return tools array: {json}"
1021 );
1022
1023 let call_req = streaming_mcp_request(
1025 &serde_json::json!({
1026 "jsonrpc": "2.0", "method": "tools/call", "id": 3,
1027 "params": { "name": "ping_tool", "arguments": {} }
1028 })
1029 .to_string(),
1030 Some(&session_id),
1031 );
1032 let call_resp = handler
1033 .handle_streaming(call_req)
1034 .await
1035 .expect("tools/call should succeed");
1036 let (status, body) = collect_streaming_body(call_resp).await;
1037 assert_eq!(status, 200, "tools/call should return 200");
1038 let json = parse_response_json(&body);
1039 assert!(
1040 json["result"].is_object(),
1041 "tools/call should return result: {json}"
1042 );
1043 }
1044
1045 #[tokio::test]
1047 async fn test_lambda_streaming_strict_rejects_before_initialized() {
1048 let handler = build_strict_streaming_handler().await;
1049
1050 let init_req = streaming_mcp_request(
1052 &serde_json::json!({
1053 "jsonrpc": "2.0", "method": "initialize", "id": 1,
1054 "params": {
1055 "protocolVersion": "2025-11-25",
1056 "capabilities": {},
1057 "clientInfo": { "name": "test", "version": "1.0.0" }
1058 }
1059 })
1060 .to_string(),
1061 None,
1062 );
1063 let init_resp = handler.handle_streaming(init_req).await.unwrap();
1064 let session_id = extract_session_id(&init_resp).unwrap();
1065 let _ = collect_streaming_body(init_resp).await;
1066
1067 let list_req = streaming_mcp_request(
1069 &serde_json::json!({
1070 "jsonrpc": "2.0", "method": "tools/list", "id": 2
1071 })
1072 .to_string(),
1073 Some(&session_id),
1074 );
1075 let list_resp = handler.handle_streaming(list_req).await.unwrap();
1076 let (_, body) = collect_streaming_body(list_resp).await;
1077 let json = parse_response_json(&body);
1078 assert!(
1079 json["error"].is_object(),
1080 "tools/list should return JSON-RPC error: {json}"
1081 );
1082 assert_eq!(
1083 json["error"]["code"].as_i64().unwrap(),
1084 -32031,
1085 "tools/list must return SessionError code -32031, got: {json}"
1086 );
1087 assert!(
1088 json["error"]["message"]
1089 .as_str()
1090 .unwrap()
1091 .contains("notifications/initialized"),
1092 "Error must mention notifications/initialized: {}",
1093 json["error"]["message"]
1094 );
1095
1096 let call_req = streaming_mcp_request(
1098 &serde_json::json!({
1099 "jsonrpc": "2.0", "method": "tools/call", "id": 3,
1100 "params": { "name": "ping_tool", "arguments": {} }
1101 })
1102 .to_string(),
1103 Some(&session_id),
1104 );
1105 let call_resp = handler.handle_streaming(call_req).await.unwrap();
1106 let (_, body) = collect_streaming_body(call_resp).await;
1107 let json = parse_response_json(&body);
1108 assert!(
1109 json["error"].is_object(),
1110 "tools/call should return JSON-RPC error: {json}"
1111 );
1112 assert_eq!(
1113 json["error"]["code"].as_i64().unwrap(),
1114 -32031,
1115 "tools/call must return SessionError code -32031, got: {json}"
1116 );
1117 assert!(
1118 json["error"]["message"]
1119 .as_str()
1120 .unwrap()
1121 .contains("notifications/initialized"),
1122 "Error must mention notifications/initialized: {}",
1123 json["error"]["message"]
1124 );
1125 }
1126
1127 #[tokio::test]
1129 async fn test_lambda_streaming_initialized_is_effective_immediately() {
1130 let handler = build_strict_streaming_handler().await;
1131
1132 let init_req = streaming_mcp_request(
1134 &serde_json::json!({
1135 "jsonrpc": "2.0", "method": "initialize", "id": 1,
1136 "params": {
1137 "protocolVersion": "2025-11-25",
1138 "capabilities": {},
1139 "clientInfo": { "name": "test", "version": "1.0.0" }
1140 }
1141 })
1142 .to_string(),
1143 None,
1144 );
1145 let init_resp = handler.handle_streaming(init_req).await.unwrap();
1146 let session_id = extract_session_id(&init_resp).unwrap();
1147 let _ = collect_streaming_body(init_resp).await;
1148
1149 let notif_req = streaming_mcp_request(
1151 &serde_json::json!({
1152 "jsonrpc": "2.0",
1153 "method": "notifications/initialized",
1154 "params": {}
1155 })
1156 .to_string(),
1157 Some(&session_id),
1158 );
1159 let notif_resp = handler.handle_streaming(notif_req).await.unwrap();
1160 let (status, _) = collect_streaming_body(notif_resp).await;
1161 assert_eq!(status, 202);
1162
1163 let list_req = streaming_mcp_request(
1165 &serde_json::json!({
1166 "jsonrpc": "2.0", "method": "tools/list", "id": 2
1167 })
1168 .to_string(),
1169 Some(&session_id),
1170 );
1171 let list_resp = handler.handle_streaming(list_req).await.unwrap();
1172 let (status, body) = collect_streaming_body(list_resp).await;
1173 assert_eq!(
1174 status, 200,
1175 "tools/list must succeed immediately after initialized"
1176 );
1177 let json = parse_response_json(&body);
1178 assert!(
1179 json["result"]["tools"].is_array(),
1180 "Must return tools list, not error: {json}"
1181 );
1182 }
1183
1184 #[tokio::test]
1186 async fn test_lambda_streaming_lenient_mode_allows_without_initialized() {
1187 use crate::LambdaMcpServerBuilder;
1188 use turul_mcp_session_storage::InMemorySessionStorage;
1189
1190 let server = LambdaMcpServerBuilder::new()
1191 .name("lenient-test")
1192 .version("1.0.0")
1193 .tool(LifecycleTestTool)
1194 .storage(Arc::new(InMemorySessionStorage::new()))
1195 .strict_lifecycle(false) .sse(true)
1197 .build()
1198 .await
1199 .unwrap();
1200
1201 let handler = server.handler().await.unwrap();
1202
1203 let init_req = streaming_mcp_request(
1205 &serde_json::json!({
1206 "jsonrpc": "2.0", "method": "initialize", "id": 1,
1207 "params": {
1208 "protocolVersion": "2025-11-25",
1209 "capabilities": {},
1210 "clientInfo": { "name": "test", "version": "1.0.0" }
1211 }
1212 })
1213 .to_string(),
1214 None,
1215 );
1216 let init_resp = handler.handle_streaming(init_req).await.unwrap();
1217 let session_id = extract_session_id(&init_resp).unwrap();
1218 let _ = collect_streaming_body(init_resp).await;
1219
1220 let list_req = streaming_mcp_request(
1222 &serde_json::json!({
1223 "jsonrpc": "2.0", "method": "tools/list", "id": 2
1224 })
1225 .to_string(),
1226 Some(&session_id),
1227 );
1228 let list_resp = handler.handle_streaming(list_req).await.unwrap();
1229 let (status, body) = collect_streaming_body(list_resp).await;
1230 assert_eq!(
1231 status, 200,
1232 "Lenient mode should allow tools/list without initialized"
1233 );
1234 let json = parse_response_json(&body);
1235 assert!(
1236 json["result"]["tools"].is_array(),
1237 "Must return tools list in lenient mode: {json}"
1238 );
1239 }
1240}