1use std::sync::Arc;
7
8use lambda_http::{Body as LambdaBody, Request as LambdaRequest, Response as LambdaResponse};
9use tracing::{debug, info};
10
11use turul_http_mcp_server::{
12 ServerConfig, SessionMcpHandler, StreamConfig, StreamManager, StreamableHttpHandler,
13};
14use turul_mcp_json_rpc_server::JsonRpcDispatcher;
15use turul_mcp_protocol::{McpError, ServerCapabilities};
16use turul_mcp_session_storage::BoxedSessionStorage;
17
18use crate::error::Result;
19
20#[cfg(feature = "cors")]
21use crate::cors::{CorsConfig, create_preflight_response, inject_cors_headers};
22
23#[derive(Clone)]
34pub struct LambdaMcpHandler {
35 session_handler: SessionMcpHandler,
37
38 streamable_handler: StreamableHttpHandler,
40
41 #[allow(dead_code)]
43 sse_enabled: bool,
44
45 route_registry: Arc<turul_http_mcp_server::RouteRegistry>,
47
48 #[cfg(feature = "cors")]
50 cors_config: Option<CorsConfig>,
51}
52
53impl LambdaMcpHandler {
54 #[allow(clippy::too_many_arguments)]
56 pub fn new(
57 dispatcher: JsonRpcDispatcher<McpError>,
58 session_storage: Arc<BoxedSessionStorage>,
59 stream_manager: Arc<StreamManager>,
60 config: ServerConfig,
61 stream_config: StreamConfig,
62 _implementation: turul_mcp_protocol::Implementation,
63 capabilities: ServerCapabilities,
64 sse_enabled: bool,
65 #[cfg(feature = "cors")] cors_config: Option<CorsConfig>,
66 ) -> Self {
67 let dispatcher = Arc::new(dispatcher);
68
69 let middleware_stack = Arc::new(turul_http_mcp_server::middleware::MiddlewareStack::new());
71
72 let session_handler = SessionMcpHandler::with_shared_stream_manager(
74 config.clone(),
75 dispatcher.clone(),
76 session_storage.clone(),
77 stream_config.clone(),
78 stream_manager.clone(),
79 middleware_stack.clone(),
80 );
81
82 let streamable_handler = StreamableHttpHandler::new(
84 Arc::new(config.clone()),
85 dispatcher.clone(),
86 session_storage.clone(),
87 stream_manager.clone(),
88 capabilities.clone(),
89 middleware_stack,
90 );
91
92 Self {
93 session_handler,
94 streamable_handler,
95 sse_enabled,
96 route_registry: Arc::new(turul_http_mcp_server::RouteRegistry::new()),
97 #[cfg(feature = "cors")]
98 cors_config,
99 }
100 }
101
102 #[allow(clippy::too_many_arguments)]
104 pub fn with_shared_stream_manager(
105 config: ServerConfig,
106 dispatcher: Arc<JsonRpcDispatcher<McpError>>,
107 session_storage: Arc<BoxedSessionStorage>,
108 stream_manager: Arc<StreamManager>,
109 stream_config: StreamConfig,
110 _implementation: turul_mcp_protocol::Implementation,
111 capabilities: ServerCapabilities,
112 sse_enabled: bool,
113 ) -> Self {
114 let middleware_stack = Arc::new(turul_http_mcp_server::middleware::MiddlewareStack::new());
116
117 let session_handler = SessionMcpHandler::with_shared_stream_manager(
119 config.clone(),
120 dispatcher.clone(),
121 session_storage.clone(),
122 stream_config.clone(),
123 stream_manager.clone(),
124 middleware_stack.clone(),
125 );
126
127 let streamable_handler = StreamableHttpHandler::new(
129 Arc::new(config),
130 dispatcher,
131 session_storage,
132 stream_manager,
133 capabilities,
134 middleware_stack,
135 );
136
137 Self {
138 session_handler,
139 streamable_handler,
140 sse_enabled,
141 route_registry: Arc::new(turul_http_mcp_server::RouteRegistry::new()),
142 #[cfg(feature = "cors")]
143 cors_config: None,
144 }
145 }
146
147 #[allow(clippy::too_many_arguments)]
149 pub fn with_middleware(
150 config: ServerConfig,
151 dispatcher: Arc<JsonRpcDispatcher<McpError>>,
152 session_storage: Arc<BoxedSessionStorage>,
153 stream_manager: Arc<StreamManager>,
154 stream_config: StreamConfig,
155 capabilities: ServerCapabilities,
156 middleware_stack: Arc<turul_http_mcp_server::middleware::MiddlewareStack>,
157 sse_enabled: bool,
158 route_registry: Arc<turul_http_mcp_server::RouteRegistry>,
159 ) -> Self {
160 let session_handler = SessionMcpHandler::with_shared_stream_manager(
162 config.clone(),
163 dispatcher.clone(),
164 session_storage.clone(),
165 stream_config.clone(),
166 stream_manager.clone(),
167 middleware_stack.clone(),
168 );
169
170 let streamable_handler = StreamableHttpHandler::new(
172 Arc::new(config),
173 dispatcher,
174 session_storage,
175 stream_manager,
176 capabilities,
177 middleware_stack,
178 );
179
180 Self {
181 session_handler,
182 streamable_handler,
183 sse_enabled,
184 route_registry,
185 #[cfg(feature = "cors")]
186 cors_config: None,
187 }
188 }
189
190 #[cfg(feature = "cors")]
192 pub fn with_cors(mut self, cors_config: CorsConfig) -> Self {
193 self.cors_config = Some(cors_config);
194 self
195 }
196
197 pub fn get_stream_manager(&self) -> &Arc<StreamManager> {
199 self.session_handler.get_stream_manager()
200 }
201
202 pub async fn handle(&self, req: LambdaRequest) -> Result<LambdaResponse<LambdaBody>> {
211 let method = req.method().clone();
212 let uri = req.uri().clone();
213
214 let request_origin = req
215 .headers()
216 .get("origin")
217 .and_then(|v| v.to_str().ok())
218 .map(|s| s.to_string());
219
220 info!(
221 "🌐 Lambda MCP request: {} {} (origin: {:?})",
222 method, uri, request_origin
223 );
224
225 #[cfg(feature = "cors")]
227 if method == http::Method::OPTIONS
228 && let Some(ref cors_config) = self.cors_config
229 {
230 debug!("Handling CORS preflight request");
231 return create_preflight_response(cors_config, request_origin.as_deref());
232 }
233
234 let hyper_req = crate::adapter::lambda_to_hyper_request(req)?;
236
237 let path = hyper_req.uri().path().to_string();
239 if !self.route_registry.is_empty() {
240 match self.route_registry.match_route(&path) {
241 Ok(Some(route_handler)) => {
242 debug!("Custom route matched: {}", path);
243 use http_body_util::BodyExt;
244 let (parts, body) = hyper_req.into_parts();
245 let boxed_req = hyper::Request::from_parts(parts, body.boxed_unsync());
246 let route_resp = route_handler.handle(boxed_req).await;
247 let mut lambda_resp =
248 crate::adapter::hyper_to_lambda_response(route_resp).await?;
249 #[cfg(feature = "cors")]
250 if let Some(ref cors_config) = self.cors_config {
251 inject_cors_headers(
252 &mut lambda_resp,
253 cors_config,
254 request_origin.as_deref(),
255 )?;
256 }
257 return Ok(lambda_resp);
258 }
259 Ok(None) => {} Err(e) => {
261 debug!("Route validation error: {}", e);
262 let route_resp = e.into_response();
263 let mut lambda_resp =
264 crate::adapter::hyper_to_lambda_response(route_resp).await?;
265 #[cfg(feature = "cors")]
266 if let Some(ref cors_config) = self.cors_config {
267 inject_cors_headers(
268 &mut lambda_resp,
269 cors_config,
270 request_origin.as_deref(),
271 )?;
272 }
273 return Ok(lambda_resp);
274 }
275 }
276 }
277
278 let hyper_resp = self
280 .session_handler
281 .handle_mcp_request(hyper_req)
282 .await
283 .map_err(|e| crate::error::LambdaError::McpFramework(e.to_string()))?;
284
285 let mut lambda_resp = crate::adapter::hyper_to_lambda_response(hyper_resp).await?;
287
288 #[cfg(feature = "cors")]
290 if let Some(ref cors_config) = self.cors_config {
291 inject_cors_headers(&mut lambda_resp, cors_config, request_origin.as_deref())?;
292 }
293
294 Ok(lambda_resp)
295 }
296
297 pub async fn handle_streaming(
302 &self,
303 req: LambdaRequest,
304 ) -> std::result::Result<
305 lambda_http::Response<
306 http_body_util::combinators::UnsyncBoxBody<bytes::Bytes, hyper::Error>,
307 >,
308 Box<dyn std::error::Error + Send + Sync>,
309 > {
310 let method = req.method().clone();
311 let uri = req.uri().clone();
312 let request_origin = req
313 .headers()
314 .get("origin")
315 .and_then(|v| v.to_str().ok())
316 .map(|s| s.to_string());
317
318 debug!(
319 "🌊 Lambda streaming MCP request: {} {} (origin: {:?})",
320 method, uri, request_origin
321 );
322
323 #[cfg(feature = "cors")]
325 if method == http::Method::OPTIONS
326 && let Some(ref cors_config) = self.cors_config
327 {
328 debug!("Handling CORS preflight request (streaming)");
329 let preflight_response =
330 create_preflight_response(cors_config, request_origin.as_deref())
331 .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?;
332
333 return Ok(self.convert_lambda_response_to_streaming(preflight_response));
335 }
336
337 let hyper_req = crate::adapter::lambda_to_hyper_request(req)
339 .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?;
340
341 let path = hyper_req.uri().path().to_string();
343 if !self.route_registry.is_empty() {
344 match self.route_registry.match_route(&path) {
345 Ok(Some(route_handler)) => {
346 debug!("Custom route matched (streaming): {}", path);
347 use http_body_util::BodyExt;
348 let (parts, body) = hyper_req.into_parts();
349 let boxed_req = hyper::Request::from_parts(parts, body.boxed_unsync());
350 return Ok(route_handler.handle(boxed_req).await);
351 }
352 Ok(None) => {} Err(e) => {
354 debug!("Route validation error (streaming): {}", e);
355 return Ok(e.into_response());
356 }
357 }
358 }
359
360 use turul_http_mcp_server::protocol::McpProtocolVersion;
362 let protocol_version = hyper_req
363 .headers()
364 .get("MCP-Protocol-Version")
365 .and_then(|h| h.to_str().ok())
366 .and_then(McpProtocolVersion::parse_version)
367 .unwrap_or(McpProtocolVersion::V2025_06_18);
368
369 let hyper_resp = if protocol_version.supports_streamable_http() {
371 debug!(
373 "Using StreamableHttpHandler for protocol {}",
374 protocol_version.to_string()
375 );
376 self.streamable_handler.handle_request(hyper_req).await
377 } else {
378 debug!(
380 "Using SessionMcpHandler for legacy protocol {}",
381 protocol_version.to_string()
382 );
383 self.session_handler
384 .handle_mcp_request(hyper_req)
385 .await
386 .map_err(|e| {
387 Box::new(crate::error::LambdaError::McpFramework(e.to_string()))
388 as Box<dyn std::error::Error + Send + Sync>
389 })?
390 };
391
392 let mut lambda_resp = crate::adapter::hyper_to_lambda_streaming(hyper_resp);
394
395 #[cfg(feature = "cors")]
397 if let Some(ref cors_config) = self.cors_config {
398 inject_cors_headers(&mut lambda_resp, cors_config, request_origin.as_deref())
399 .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?;
400 }
401
402 Ok(lambda_resp)
403 }
404
405 fn convert_lambda_response_to_streaming(
407 &self,
408 lambda_response: LambdaResponse<LambdaBody>,
409 ) -> lambda_http::Response<http_body_util::combinators::UnsyncBoxBody<bytes::Bytes, hyper::Error>>
410 {
411 use bytes::Bytes;
412 use http_body_util::{BodyExt, Full};
413
414 let (parts, body) = lambda_response.into_parts();
415 let body_bytes = match body {
416 LambdaBody::Empty => Bytes::new(),
417 LambdaBody::Text(text) => Bytes::from(text),
418 LambdaBody::Binary(bytes) => Bytes::from(bytes),
419 _ => Bytes::new(),
420 };
421
422 let streaming_body = Full::new(body_bytes)
424 .map_err(|e: std::convert::Infallible| match e {})
425 .boxed_unsync();
426
427 lambda_http::Response::from_parts(parts, streaming_body)
428 }
429}
430
431#[cfg(test)]
432mod tests {
433 use super::*;
434 use http::Request;
435 use turul_mcp_session_storage::InMemorySessionStorage;
436
437 #[tokio::test]
438 async fn test_handler_creation() {
439 let session_storage = Arc::new(InMemorySessionStorage::new());
440 let stream_manager = Arc::new(StreamManager::new(session_storage.clone()));
441 let dispatcher = JsonRpcDispatcher::new();
442 let config = ServerConfig::default();
443 let implementation = turul_mcp_protocol::Implementation::new("test", "1.0.0");
444 let capabilities = ServerCapabilities::default();
445
446 let handler = LambdaMcpHandler::new(
447 dispatcher,
448 session_storage,
449 stream_manager,
450 config,
451 StreamConfig::default(),
452 implementation,
453 capabilities,
454 false, #[cfg(feature = "cors")]
456 None,
457 );
458
459 assert!(!handler.sse_enabled);
461 }
462
463 #[tokio::test]
464 async fn test_sse_enabled_with_handle_works() {
465 let session_storage = Arc::new(InMemorySessionStorage::new());
466 let stream_manager = Arc::new(StreamManager::new(session_storage.clone()));
467 let dispatcher = JsonRpcDispatcher::new();
468 let config = ServerConfig::default();
469 let implementation = turul_mcp_protocol::Implementation::new("test", "1.0.0");
470 let capabilities = ServerCapabilities::default();
471
472 let handler = LambdaMcpHandler::new(
474 dispatcher,
475 session_storage,
476 stream_manager,
477 config,
478 StreamConfig::default(),
479 implementation,
480 capabilities,
481 true, #[cfg(feature = "cors")]
483 None,
484 );
485
486 let lambda_req = Request::builder()
488 .method("POST")
489 .uri("/mcp")
490 .body(LambdaBody::Text(
491 r#"{"jsonrpc":"2.0","method":"initialize","id":1}"#.to_string(),
492 ))
493 .unwrap();
494
495 let result = handler.handle(lambda_req).await;
497 assert!(
498 result.is_ok(),
499 "handle() should work with SSE enabled for snapshot-based responses"
500 );
501 }
502
503 #[tokio::test]
505 async fn test_stream_config_preservation() {
506 let session_storage = Arc::new(InMemorySessionStorage::new());
507 let dispatcher = JsonRpcDispatcher::new();
508 let config = ServerConfig::default();
509 let implementation = turul_mcp_protocol::Implementation::new("test", "1.0.0");
510 let capabilities = ServerCapabilities::default();
511
512 let custom_stream_config = StreamConfig {
514 channel_buffer_size: 1024, max_replay_events: 200, keepalive_interval_seconds: 10, cors_origin: "https://custom-test.example.com".to_string(), };
519
520 let stream_manager = Arc::new(StreamManager::with_config(
522 session_storage.clone(),
523 custom_stream_config.clone(),
524 ));
525
526 let handler = LambdaMcpHandler::new(
527 dispatcher,
528 session_storage,
529 stream_manager,
530 config,
531 custom_stream_config.clone(),
532 implementation,
533 capabilities,
534 false, #[cfg(feature = "cors")]
536 None,
537 );
538
539 assert!(!handler.sse_enabled);
541
542 let stream_manager = handler.get_stream_manager();
544
545 let actual_config = stream_manager.get_config();
547
548 assert_eq!(
549 actual_config.channel_buffer_size, custom_stream_config.channel_buffer_size,
550 "Custom channel_buffer_size was not propagated correctly"
551 );
552 assert_eq!(
553 actual_config.max_replay_events, custom_stream_config.max_replay_events,
554 "Custom max_replay_events was not propagated correctly"
555 );
556 assert_eq!(
557 actual_config.keepalive_interval_seconds,
558 custom_stream_config.keepalive_interval_seconds,
559 "Custom keepalive_interval_seconds was not propagated correctly"
560 );
561 assert_eq!(
562 actual_config.cors_origin, custom_stream_config.cors_origin,
563 "Custom cors_origin was not propagated correctly"
564 );
565
566 assert!(Arc::strong_count(stream_manager) >= 1);
568 }
569
570 #[tokio::test]
572 async fn test_full_builder_chain_stream_config() {
573 use crate::LambdaMcpServerBuilder;
574 use turul_mcp_session_storage::InMemorySessionStorage;
575
576 let custom_stream_config = turul_http_mcp_server::StreamConfig {
578 channel_buffer_size: 2048, max_replay_events: 500, keepalive_interval_seconds: 15, cors_origin: "https://full-chain-test.example.com".to_string(),
582 };
583
584 let server = LambdaMcpServerBuilder::new()
586 .name("full-chain-test")
587 .version("1.0.0")
588 .storage(Arc::new(InMemorySessionStorage::new()))
589 .sse(true) .stream_config(custom_stream_config.clone())
591 .build()
592 .await
593 .expect("Server should build successfully");
594
595 let handler = server
597 .handler()
598 .await
599 .expect("Handler should be created from server");
600
601 assert!(handler.sse_enabled, "SSE should be enabled");
603
604 let stream_manager = handler.get_stream_manager();
606 let actual_config = stream_manager.get_config();
607
608 assert_eq!(
609 actual_config.channel_buffer_size, custom_stream_config.channel_buffer_size,
610 "Custom channel_buffer_size should be preserved through builder → server → handler chain"
611 );
612 assert_eq!(
613 actual_config.max_replay_events, custom_stream_config.max_replay_events,
614 "Custom max_replay_events should be preserved through builder → server → handler chain"
615 );
616 assert_eq!(
617 actual_config.keepalive_interval_seconds,
618 custom_stream_config.keepalive_interval_seconds,
619 "Custom keepalive_interval_seconds should be preserved through builder → server → handler chain"
620 );
621 assert_eq!(
622 actual_config.cors_origin, custom_stream_config.cors_origin,
623 "Custom cors_origin should be preserved through builder → server → handler chain"
624 );
625
626 assert!(
628 Arc::strong_count(stream_manager) >= 1,
629 "Stream manager should be properly initialized"
630 );
631
632 let test_session_id = uuid::Uuid::now_v7().as_simple().to_string();
635
636 let subscriptions = stream_manager.get_subscriptions(&test_session_id).await;
639 assert!(
640 subscriptions.is_empty(),
641 "New session should have no subscriptions initially"
642 );
643
644 assert_eq!(
647 stream_manager.get_config().channel_buffer_size,
648 2048,
649 "Stream manager should be using the custom buffer size functionally"
650 );
651 }
652
653 #[tokio::test]
658 async fn test_non_streaming_runtime_sse_false() {
659 use crate::LambdaMcpServerBuilder;
660 use turul_mcp_session_storage::InMemorySessionStorage;
661
662 let server = LambdaMcpServerBuilder::new()
663 .name("test-non-streaming-sse-false")
664 .version("1.0.0")
665 .storage(Arc::new(InMemorySessionStorage::new()))
666 .sse(false) .build()
668 .await
669 .expect("Server should build successfully");
670
671 let handler = server
672 .handler()
673 .await
674 .expect("Handler should be created from server");
675
676 assert!(!handler.sse_enabled, "SSE should be disabled");
678
679 let lambda_req = Request::builder()
681 .method("POST")
682 .uri("/mcp")
683 .body(LambdaBody::Text(
684 r#"{"jsonrpc":"2.0","method":"initialize","id":1}"#.to_string(),
685 ))
686 .unwrap();
687
688 let result = handler.handle(lambda_req).await;
690 assert!(
691 result.is_ok(),
692 "POST /mcp should work with non-streaming + sse(false)"
693 );
694 }
695
696 #[tokio::test]
698 async fn test_non_streaming_runtime_sse_true() {
699 use crate::LambdaMcpServerBuilder;
700 use turul_mcp_session_storage::InMemorySessionStorage;
701
702 let server = LambdaMcpServerBuilder::new()
703 .name("test-non-streaming-sse-true")
704 .version("1.0.0")
705 .storage(Arc::new(InMemorySessionStorage::new()))
706 .sse(true) .build()
708 .await
709 .expect("Server should build successfully");
710
711 let handler = server
712 .handler()
713 .await
714 .expect("Handler should be created from server");
715
716 assert!(handler.sse_enabled, "SSE should be enabled");
718
719 let lambda_req = Request::builder()
721 .method("POST")
722 .uri("/mcp")
723 .body(LambdaBody::Text(
724 r#"{"jsonrpc":"2.0","method":"initialize","id":1}"#.to_string(),
725 ))
726 .unwrap();
727
728 let result = handler.handle(lambda_req).await;
730 assert!(
731 result.is_ok(),
732 "POST /mcp should work with non-streaming + sse(true)"
733 );
734
735 }
738
739 #[tokio::test]
741 async fn test_streaming_runtime_sse_false() {
742 use crate::LambdaMcpServerBuilder;
743 use turul_mcp_session_storage::InMemorySessionStorage;
744
745 let server = LambdaMcpServerBuilder::new()
746 .name("test-streaming-sse-false")
747 .version("1.0.0")
748 .storage(Arc::new(InMemorySessionStorage::new()))
749 .sse(false) .build()
751 .await
752 .expect("Server should build successfully");
753
754 let handler = server
755 .handler()
756 .await
757 .expect("Handler should be created from server");
758
759 assert!(!handler.sse_enabled, "SSE should be disabled");
761
762 let lambda_req = Request::builder()
764 .method("POST")
765 .uri("/mcp")
766 .body(LambdaBody::Text(
767 r#"{"jsonrpc":"2.0","method":"initialize","id":1}"#.to_string(),
768 ))
769 .unwrap();
770
771 let result = handler.handle_streaming(lambda_req).await;
773 assert!(
774 result.is_ok(),
775 "Streaming runtime should work with sse(false)"
776 );
777 }
778
779 #[tokio::test]
781 async fn test_streaming_runtime_sse_true() {
782 use crate::LambdaMcpServerBuilder;
783 use turul_mcp_session_storage::InMemorySessionStorage;
784
785 let server = LambdaMcpServerBuilder::new()
786 .name("test-streaming-sse-true")
787 .version("1.0.0")
788 .storage(Arc::new(InMemorySessionStorage::new()))
789 .sse(true) .build()
791 .await
792 .expect("Server should build successfully");
793
794 let handler = server
795 .handler()
796 .await
797 .expect("Handler should be created from server");
798
799 assert!(handler.sse_enabled, "SSE should be enabled");
801
802 let lambda_req = Request::builder()
804 .method("POST")
805 .uri("/mcp")
806 .body(LambdaBody::Text(
807 r#"{"jsonrpc":"2.0","method":"initialize","id":1}"#.to_string(),
808 ))
809 .unwrap();
810
811 let result = handler.handle_streaming(lambda_req).await;
813 assert!(
814 result.is_ok(),
815 "Streaming runtime should work with sse(true) for real-time streaming"
816 );
817
818 }
821
822 async fn build_strict_streaming_handler() -> LambdaMcpHandler {
826 use crate::LambdaMcpServerBuilder;
827 use turul_mcp_session_storage::InMemorySessionStorage;
828
829 let server = LambdaMcpServerBuilder::new()
830 .name("lifecycle-test")
831 .version("1.0.0")
832 .tool(LifecycleTestTool)
833 .storage(Arc::new(InMemorySessionStorage::new()))
834 .strict_lifecycle(true) .sse(true)
836 .build()
837 .await
838 .expect("build should succeed");
839
840 server.handler().await.expect("handler should succeed")
841 }
842
843 #[derive(Clone, Default)]
845 struct LifecycleTestTool;
846
847 impl turul_mcp_builders::traits::HasBaseMetadata for LifecycleTestTool {
848 fn name(&self) -> &str { "ping_tool" }
849 }
850 impl turul_mcp_builders::traits::HasDescription for LifecycleTestTool {
851 fn description(&self) -> Option<&str> { Some("test tool") }
852 }
853 impl turul_mcp_builders::traits::HasInputSchema for LifecycleTestTool {
854 fn input_schema(&self) -> &turul_mcp_protocol::ToolSchema {
855 static SCHEMA: std::sync::OnceLock<turul_mcp_protocol::ToolSchema> = std::sync::OnceLock::new();
856 SCHEMA.get_or_init(turul_mcp_protocol::ToolSchema::object)
857 }
858 }
859 impl turul_mcp_builders::traits::HasOutputSchema for LifecycleTestTool {
860 fn output_schema(&self) -> Option<&turul_mcp_protocol::ToolSchema> { None }
861 }
862 impl turul_mcp_builders::traits::HasAnnotations for LifecycleTestTool {
863 fn annotations(&self) -> Option<&turul_mcp_protocol::tools::ToolAnnotations> { None }
864 }
865 impl turul_mcp_builders::traits::HasToolMeta for LifecycleTestTool {
866 fn tool_meta(&self) -> Option<&std::collections::HashMap<String, serde_json::Value>> { None }
867 }
868 impl turul_mcp_builders::traits::HasIcons for LifecycleTestTool {}
869 impl turul_mcp_builders::traits::HasExecution for LifecycleTestTool {}
870
871 #[async_trait::async_trait]
872 impl turul_mcp_server::McpTool for LifecycleTestTool {
873 async fn call(
874 &self,
875 _args: serde_json::Value,
876 _session: Option<turul_mcp_server::SessionContext>,
877 ) -> turul_mcp_server::McpResult<turul_mcp_protocol::tools::CallToolResult> {
878 Ok(turul_mcp_protocol::tools::CallToolResult::success(vec![
879 turul_mcp_protocol::tools::ToolResult::text("pong"),
880 ]))
881 }
882 }
883
884 fn streaming_mcp_request(body: &str, session_id: Option<&str>) -> LambdaRequest {
886 let mut builder = Request::builder()
887 .method("POST")
888 .uri("/mcp")
889 .header("Content-Type", "application/json")
890 .header("Accept", "application/json, text/event-stream")
891 .header("MCP-Protocol-Version", "2025-11-25");
892
893 if let Some(sid) = session_id {
894 builder = builder.header("Mcp-Session-Id", sid);
895 }
896
897 builder
898 .body(LambdaBody::Text(body.to_string()))
899 .unwrap()
900 }
901
902 async fn collect_streaming_body(
904 response: lambda_http::Response<
905 http_body_util::combinators::UnsyncBoxBody<bytes::Bytes, hyper::Error>,
906 >,
907 ) -> (http::StatusCode, String) {
908 use http_body_util::BodyExt;
909 let status = response.status();
910 let session_id = response
911 .headers()
912 .get("Mcp-Session-Id")
913 .and_then(|v| v.to_str().ok())
914 .map(String::from);
915 let body_bytes = response
916 .into_body()
917 .collect()
918 .await
919 .map(|c| c.to_bytes())
920 .unwrap_or_default();
921 let body_str = String::from_utf8_lossy(&body_bytes).to_string();
922 let _ = session_id; (status, body_str)
924 }
925
926 fn extract_session_id(
928 response: &lambda_http::Response<
929 http_body_util::combinators::UnsyncBoxBody<bytes::Bytes, hyper::Error>,
930 >,
931 ) -> Option<String> {
932 response
933 .headers()
934 .get("Mcp-Session-Id")
935 .and_then(|v| v.to_str().ok())
936 .map(String::from)
937 }
938
939 fn parse_response_json(body: &str) -> serde_json::Value {
941 let json_str = body
943 .lines()
944 .find(|line| line.starts_with("data: "))
945 .map(|line| &line[6..])
946 .unwrap_or(body.trim());
947 serde_json::from_str(json_str)
948 .unwrap_or_else(|e| panic!("Failed to parse JSON from body: {e}\nBody: {body}"))
949 }
950
951 #[tokio::test]
953 async fn test_lambda_streaming_strict_handshake_succeeds() {
954 let handler = build_strict_streaming_handler().await;
955
956 let init_req = streaming_mcp_request(
958 &serde_json::json!({
959 "jsonrpc": "2.0", "method": "initialize", "id": 1,
960 "params": {
961 "protocolVersion": "2025-11-25",
962 "capabilities": {},
963 "clientInfo": { "name": "test", "version": "1.0.0" }
964 }
965 }).to_string(),
966 None,
967 );
968 let init_resp = handler.handle_streaming(init_req).await.expect("initialize should succeed");
969 let session_id = extract_session_id(&init_resp).expect("must return session ID");
970 let (status, _body) = collect_streaming_body(init_resp).await;
971 assert_eq!(status, 200, "initialize should return 200");
972
973 let notif_req = streaming_mcp_request(
975 &serde_json::json!({
976 "jsonrpc": "2.0",
977 "method": "notifications/initialized",
978 "params": {}
979 }).to_string(),
980 Some(&session_id),
981 );
982 let notif_resp = handler.handle_streaming(notif_req).await.expect("notification should succeed");
983 let (status, _) = collect_streaming_body(notif_resp).await;
984 assert_eq!(status, 202, "notifications/initialized should return 202");
985
986 let list_req = streaming_mcp_request(
988 &serde_json::json!({
989 "jsonrpc": "2.0", "method": "tools/list", "id": 2
990 }).to_string(),
991 Some(&session_id),
992 );
993 let list_resp = handler.handle_streaming(list_req).await.expect("tools/list should succeed");
994 let (status, body) = collect_streaming_body(list_resp).await;
995 assert_eq!(status, 200, "tools/list should return 200");
996 let json = parse_response_json(&body);
997 assert!(json["result"]["tools"].is_array(), "tools/list should return tools array: {json}");
998
999 let call_req = streaming_mcp_request(
1001 &serde_json::json!({
1002 "jsonrpc": "2.0", "method": "tools/call", "id": 3,
1003 "params": { "name": "ping_tool", "arguments": {} }
1004 }).to_string(),
1005 Some(&session_id),
1006 );
1007 let call_resp = handler.handle_streaming(call_req).await.expect("tools/call should succeed");
1008 let (status, body) = collect_streaming_body(call_resp).await;
1009 assert_eq!(status, 200, "tools/call should return 200");
1010 let json = parse_response_json(&body);
1011 assert!(json["result"].is_object(), "tools/call should return result: {json}");
1012 }
1013
1014 #[tokio::test]
1016 async fn test_lambda_streaming_strict_rejects_before_initialized() {
1017 let handler = build_strict_streaming_handler().await;
1018
1019 let init_req = streaming_mcp_request(
1021 &serde_json::json!({
1022 "jsonrpc": "2.0", "method": "initialize", "id": 1,
1023 "params": {
1024 "protocolVersion": "2025-11-25",
1025 "capabilities": {},
1026 "clientInfo": { "name": "test", "version": "1.0.0" }
1027 }
1028 }).to_string(),
1029 None,
1030 );
1031 let init_resp = handler.handle_streaming(init_req).await.unwrap();
1032 let session_id = extract_session_id(&init_resp).unwrap();
1033 let _ = collect_streaming_body(init_resp).await;
1034
1035 let list_req = streaming_mcp_request(
1037 &serde_json::json!({
1038 "jsonrpc": "2.0", "method": "tools/list", "id": 2
1039 }).to_string(),
1040 Some(&session_id),
1041 );
1042 let list_resp = handler.handle_streaming(list_req).await.unwrap();
1043 let (_, body) = collect_streaming_body(list_resp).await;
1044 let json = parse_response_json(&body);
1045 assert!(json["error"].is_object(), "tools/list should return JSON-RPC error: {json}");
1046 assert_eq!(
1047 json["error"]["code"].as_i64().unwrap(),
1048 -32031,
1049 "tools/list must return SessionError code -32031, got: {json}"
1050 );
1051 assert!(
1052 json["error"]["message"].as_str().unwrap().contains("notifications/initialized"),
1053 "Error must mention notifications/initialized: {}",
1054 json["error"]["message"]
1055 );
1056
1057 let call_req = streaming_mcp_request(
1059 &serde_json::json!({
1060 "jsonrpc": "2.0", "method": "tools/call", "id": 3,
1061 "params": { "name": "ping_tool", "arguments": {} }
1062 }).to_string(),
1063 Some(&session_id),
1064 );
1065 let call_resp = handler.handle_streaming(call_req).await.unwrap();
1066 let (_, body) = collect_streaming_body(call_resp).await;
1067 let json = parse_response_json(&body);
1068 assert!(json["error"].is_object(), "tools/call should return JSON-RPC error: {json}");
1069 assert_eq!(
1070 json["error"]["code"].as_i64().unwrap(),
1071 -32031,
1072 "tools/call must return SessionError code -32031, got: {json}"
1073 );
1074 assert!(
1075 json["error"]["message"].as_str().unwrap().contains("notifications/initialized"),
1076 "Error must mention notifications/initialized: {}",
1077 json["error"]["message"]
1078 );
1079 }
1080
1081 #[tokio::test]
1083 async fn test_lambda_streaming_initialized_is_effective_immediately() {
1084 let handler = build_strict_streaming_handler().await;
1085
1086 let init_req = streaming_mcp_request(
1088 &serde_json::json!({
1089 "jsonrpc": "2.0", "method": "initialize", "id": 1,
1090 "params": {
1091 "protocolVersion": "2025-11-25",
1092 "capabilities": {},
1093 "clientInfo": { "name": "test", "version": "1.0.0" }
1094 }
1095 }).to_string(),
1096 None,
1097 );
1098 let init_resp = handler.handle_streaming(init_req).await.unwrap();
1099 let session_id = extract_session_id(&init_resp).unwrap();
1100 let _ = collect_streaming_body(init_resp).await;
1101
1102 let notif_req = streaming_mcp_request(
1104 &serde_json::json!({
1105 "jsonrpc": "2.0",
1106 "method": "notifications/initialized",
1107 "params": {}
1108 }).to_string(),
1109 Some(&session_id),
1110 );
1111 let notif_resp = handler.handle_streaming(notif_req).await.unwrap();
1112 let (status, _) = collect_streaming_body(notif_resp).await;
1113 assert_eq!(status, 202);
1114
1115 let list_req = streaming_mcp_request(
1117 &serde_json::json!({
1118 "jsonrpc": "2.0", "method": "tools/list", "id": 2
1119 }).to_string(),
1120 Some(&session_id),
1121 );
1122 let list_resp = handler.handle_streaming(list_req).await.unwrap();
1123 let (status, body) = collect_streaming_body(list_resp).await;
1124 assert_eq!(status, 200, "tools/list must succeed immediately after initialized");
1125 let json = parse_response_json(&body);
1126 assert!(
1127 json["result"]["tools"].is_array(),
1128 "Must return tools list, not error: {json}"
1129 );
1130 }
1131
1132 #[tokio::test]
1134 async fn test_lambda_streaming_lenient_mode_allows_without_initialized() {
1135 use crate::LambdaMcpServerBuilder;
1136 use turul_mcp_session_storage::InMemorySessionStorage;
1137
1138 let server = LambdaMcpServerBuilder::new()
1139 .name("lenient-test")
1140 .version("1.0.0")
1141 .tool(LifecycleTestTool)
1142 .storage(Arc::new(InMemorySessionStorage::new()))
1143 .strict_lifecycle(false) .sse(true)
1145 .build()
1146 .await
1147 .unwrap();
1148
1149 let handler = server.handler().await.unwrap();
1150
1151 let init_req = streaming_mcp_request(
1153 &serde_json::json!({
1154 "jsonrpc": "2.0", "method": "initialize", "id": 1,
1155 "params": {
1156 "protocolVersion": "2025-11-25",
1157 "capabilities": {},
1158 "clientInfo": { "name": "test", "version": "1.0.0" }
1159 }
1160 }).to_string(),
1161 None,
1162 );
1163 let init_resp = handler.handle_streaming(init_req).await.unwrap();
1164 let session_id = extract_session_id(&init_resp).unwrap();
1165 let _ = collect_streaming_body(init_resp).await;
1166
1167 let list_req = streaming_mcp_request(
1169 &serde_json::json!({
1170 "jsonrpc": "2.0", "method": "tools/list", "id": 2
1171 }).to_string(),
1172 Some(&session_id),
1173 );
1174 let list_resp = handler.handle_streaming(list_req).await.unwrap();
1175 let (status, body) = collect_streaming_body(list_resp).await;
1176 assert_eq!(status, 200, "Lenient mode should allow tools/list without initialized");
1177 let json = parse_response_json(&body);
1178 assert!(
1179 json["result"]["tools"].is_array(),
1180 "Must return tools list in lenient mode: {json}"
1181 );
1182 }
1183}