1use std::sync::Arc;
7
8use lambda_http::{Body as LambdaBody, Request as LambdaRequest, Response as LambdaResponse};
9use tracing::{debug, info};
10
11use turul_http_mcp_server::{
12 ServerConfig, SessionMcpHandler, StreamConfig, StreamManager, StreamableHttpHandler,
13};
14use turul_mcp_json_rpc_server::JsonRpcDispatcher;
15use turul_mcp_protocol::{McpError, ServerCapabilities};
16use turul_mcp_session_storage::BoxedSessionStorage;
17
18use crate::error::Result;
19
20#[cfg(feature = "cors")]
21use crate::cors::{CorsConfig, create_preflight_response, inject_cors_headers};
22
23#[derive(Clone)]
34pub struct LambdaMcpHandler {
35 session_handler: SessionMcpHandler,
37
38 streamable_handler: StreamableHttpHandler,
40
41 #[allow(dead_code)]
43 sse_enabled: bool,
44
45 route_registry: Arc<turul_http_mcp_server::RouteRegistry>,
47
48 #[cfg(feature = "cors")]
50 cors_config: Option<CorsConfig>,
51}
52
53impl LambdaMcpHandler {
54 #[allow(clippy::too_many_arguments)]
56 pub fn new(
57 dispatcher: JsonRpcDispatcher<McpError>,
58 session_storage: Arc<BoxedSessionStorage>,
59 stream_manager: Arc<StreamManager>,
60 config: ServerConfig,
61 stream_config: StreamConfig,
62 _implementation: turul_mcp_protocol::Implementation,
63 capabilities: ServerCapabilities,
64 sse_enabled: bool,
65 #[cfg(feature = "cors")] cors_config: Option<CorsConfig>,
66 ) -> Self {
67 let dispatcher = Arc::new(dispatcher);
68
69 let middleware_stack = Arc::new(turul_http_mcp_server::middleware::MiddlewareStack::new());
71
72 let session_handler = SessionMcpHandler::with_shared_stream_manager(
74 config.clone(),
75 dispatcher.clone(),
76 session_storage.clone(),
77 stream_config.clone(),
78 stream_manager.clone(),
79 middleware_stack.clone(),
80 );
81
82 let streamable_handler = StreamableHttpHandler::new(
84 Arc::new(config.clone()),
85 dispatcher.clone(),
86 session_storage.clone(),
87 stream_manager.clone(),
88 capabilities.clone(),
89 middleware_stack,
90 );
91
92 Self {
93 session_handler,
94 streamable_handler,
95 sse_enabled,
96 route_registry: Arc::new(turul_http_mcp_server::RouteRegistry::new()),
97 #[cfg(feature = "cors")]
98 cors_config,
99 }
100 }
101
102 #[allow(clippy::too_many_arguments)]
104 pub fn with_shared_stream_manager(
105 config: ServerConfig,
106 dispatcher: Arc<JsonRpcDispatcher<McpError>>,
107 session_storage: Arc<BoxedSessionStorage>,
108 stream_manager: Arc<StreamManager>,
109 stream_config: StreamConfig,
110 _implementation: turul_mcp_protocol::Implementation,
111 capabilities: ServerCapabilities,
112 sse_enabled: bool,
113 ) -> Self {
114 let middleware_stack = Arc::new(turul_http_mcp_server::middleware::MiddlewareStack::new());
116
117 let session_handler = SessionMcpHandler::with_shared_stream_manager(
119 config.clone(),
120 dispatcher.clone(),
121 session_storage.clone(),
122 stream_config.clone(),
123 stream_manager.clone(),
124 middleware_stack.clone(),
125 );
126
127 let streamable_handler = StreamableHttpHandler::new(
129 Arc::new(config),
130 dispatcher,
131 session_storage,
132 stream_manager,
133 capabilities,
134 middleware_stack,
135 );
136
137 Self {
138 session_handler,
139 streamable_handler,
140 sse_enabled,
141 route_registry: Arc::new(turul_http_mcp_server::RouteRegistry::new()),
142 #[cfg(feature = "cors")]
143 cors_config: None,
144 }
145 }
146
147 #[allow(clippy::too_many_arguments)]
149 pub fn with_middleware(
150 config: ServerConfig,
151 dispatcher: Arc<JsonRpcDispatcher<McpError>>,
152 session_storage: Arc<BoxedSessionStorage>,
153 stream_manager: Arc<StreamManager>,
154 stream_config: StreamConfig,
155 capabilities: ServerCapabilities,
156 middleware_stack: Arc<turul_http_mcp_server::middleware::MiddlewareStack>,
157 sse_enabled: bool,
158 route_registry: Arc<turul_http_mcp_server::RouteRegistry>,
159 ) -> Self {
160 let session_handler = SessionMcpHandler::with_shared_stream_manager(
162 config.clone(),
163 dispatcher.clone(),
164 session_storage.clone(),
165 stream_config.clone(),
166 stream_manager.clone(),
167 middleware_stack.clone(),
168 );
169
170 let streamable_handler = StreamableHttpHandler::new(
172 Arc::new(config),
173 dispatcher,
174 session_storage,
175 stream_manager,
176 capabilities,
177 middleware_stack,
178 );
179
180 Self {
181 session_handler,
182 streamable_handler,
183 sse_enabled,
184 route_registry,
185 #[cfg(feature = "cors")]
186 cors_config: None,
187 }
188 }
189
190 #[cfg(feature = "cors")]
192 pub fn with_cors(mut self, cors_config: CorsConfig) -> Self {
193 self.cors_config = Some(cors_config);
194 self
195 }
196
197 pub fn get_stream_manager(&self) -> &Arc<StreamManager> {
199 self.session_handler.get_stream_manager()
200 }
201
202 pub async fn handle(&self, req: LambdaRequest) -> Result<LambdaResponse<LambdaBody>> {
211 let method = req.method().clone();
212 let uri = req.uri().clone();
213
214 let request_origin = req
215 .headers()
216 .get("origin")
217 .and_then(|v| v.to_str().ok())
218 .map(|s| s.to_string());
219
220 info!(
221 "🌐 Lambda MCP request: {} {} (origin: {:?})",
222 method, uri, request_origin
223 );
224
225 #[cfg(feature = "cors")]
227 if method == http::Method::OPTIONS
228 && let Some(ref cors_config) = self.cors_config
229 {
230 debug!("Handling CORS preflight request");
231 return create_preflight_response(cors_config, request_origin.as_deref());
232 }
233
234 let hyper_req = crate::adapter::lambda_to_hyper_request(req)?;
236
237 let path = hyper_req.uri().path().to_string();
239 if !self.route_registry.is_empty() {
240 match self.route_registry.match_route(&path) {
241 Ok(Some(route_handler)) => {
242 debug!("Custom route matched: {}", path);
243 use http_body_util::BodyExt;
244 let (parts, body) = hyper_req.into_parts();
245 let boxed_req = hyper::Request::from_parts(parts, body.boxed_unsync());
246 let route_resp = route_handler.handle(boxed_req).await;
247 let mut lambda_resp =
248 crate::adapter::hyper_to_lambda_response(route_resp).await?;
249 #[cfg(feature = "cors")]
250 if let Some(ref cors_config) = self.cors_config {
251 inject_cors_headers(
252 &mut lambda_resp,
253 cors_config,
254 request_origin.as_deref(),
255 )?;
256 }
257 return Ok(lambda_resp);
258 }
259 Ok(None) => {} Err(e) => {
261 debug!("Route validation error: {}", e);
262 let route_resp = e.into_response();
263 let mut lambda_resp =
264 crate::adapter::hyper_to_lambda_response(route_resp).await?;
265 #[cfg(feature = "cors")]
266 if let Some(ref cors_config) = self.cors_config {
267 inject_cors_headers(
268 &mut lambda_resp,
269 cors_config,
270 request_origin.as_deref(),
271 )?;
272 }
273 return Ok(lambda_resp);
274 }
275 }
276 }
277
278 let hyper_resp = self
280 .session_handler
281 .handle_mcp_request(hyper_req)
282 .await
283 .map_err(|e| crate::error::LambdaError::McpFramework(e.to_string()))?;
284
285 let mut lambda_resp = crate::adapter::hyper_to_lambda_response(hyper_resp).await?;
287
288 #[cfg(feature = "cors")]
290 if let Some(ref cors_config) = self.cors_config {
291 inject_cors_headers(&mut lambda_resp, cors_config, request_origin.as_deref())?;
292 }
293
294 Ok(lambda_resp)
295 }
296
297 pub async fn handle_streaming(
302 &self,
303 req: LambdaRequest,
304 ) -> std::result::Result<
305 lambda_http::Response<
306 http_body_util::combinators::UnsyncBoxBody<bytes::Bytes, hyper::Error>,
307 >,
308 Box<dyn std::error::Error + Send + Sync>,
309 > {
310 let method = req.method().clone();
311 let uri = req.uri().clone();
312 let request_origin = req
313 .headers()
314 .get("origin")
315 .and_then(|v| v.to_str().ok())
316 .map(|s| s.to_string());
317
318 debug!(
319 "🌊 Lambda streaming MCP request: {} {} (origin: {:?})",
320 method, uri, request_origin
321 );
322
323 #[cfg(feature = "cors")]
325 if method == http::Method::OPTIONS
326 && let Some(ref cors_config) = self.cors_config
327 {
328 debug!("Handling CORS preflight request (streaming)");
329 let preflight_response =
330 create_preflight_response(cors_config, request_origin.as_deref())
331 .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?;
332
333 return Ok(self.convert_lambda_response_to_streaming(preflight_response));
335 }
336
337 let hyper_req = crate::adapter::lambda_to_hyper_request(req)
339 .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?;
340
341 let path = hyper_req.uri().path().to_string();
343 if !self.route_registry.is_empty() {
344 match self.route_registry.match_route(&path) {
345 Ok(Some(route_handler)) => {
346 debug!("Custom route matched (streaming): {}", path);
347 use http_body_util::BodyExt;
348 let (parts, body) = hyper_req.into_parts();
349 let boxed_req = hyper::Request::from_parts(parts, body.boxed_unsync());
350 return Ok(route_handler.handle(boxed_req).await);
351 }
352 Ok(None) => {} Err(e) => {
354 debug!("Route validation error (streaming): {}", e);
355 return Ok(e.into_response());
356 }
357 }
358 }
359
360 use turul_http_mcp_server::protocol::McpProtocolVersion;
362 let protocol_version = hyper_req
363 .headers()
364 .get("MCP-Protocol-Version")
365 .and_then(|h| h.to_str().ok())
366 .and_then(McpProtocolVersion::parse_version)
367 .unwrap_or(McpProtocolVersion::V2025_06_18);
368
369 let hyper_resp = if protocol_version.supports_streamable_http() {
371 debug!(
373 "Using StreamableHttpHandler for protocol {}",
374 protocol_version.to_string()
375 );
376 self.streamable_handler.handle_request(hyper_req).await
377 } else {
378 debug!(
380 "Using SessionMcpHandler for legacy protocol {}",
381 protocol_version.to_string()
382 );
383 self.session_handler
384 .handle_mcp_request(hyper_req)
385 .await
386 .map_err(|e| {
387 Box::new(crate::error::LambdaError::McpFramework(e.to_string()))
388 as Box<dyn std::error::Error + Send + Sync>
389 })?
390 };
391
392 let mut lambda_resp = crate::adapter::hyper_to_lambda_streaming(hyper_resp);
394
395 #[cfg(feature = "cors")]
397 if let Some(ref cors_config) = self.cors_config {
398 inject_cors_headers(&mut lambda_resp, cors_config, request_origin.as_deref())
399 .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?;
400 }
401
402 Ok(lambda_resp)
403 }
404
405 fn convert_lambda_response_to_streaming(
407 &self,
408 lambda_response: LambdaResponse<LambdaBody>,
409 ) -> lambda_http::Response<http_body_util::combinators::UnsyncBoxBody<bytes::Bytes, hyper::Error>>
410 {
411 use bytes::Bytes;
412 use http_body_util::{BodyExt, Full};
413
414 let (parts, body) = lambda_response.into_parts();
415 let body_bytes = match body {
416 LambdaBody::Empty => Bytes::new(),
417 LambdaBody::Text(text) => Bytes::from(text),
418 LambdaBody::Binary(bytes) => Bytes::from(bytes),
419 _ => Bytes::new(),
420 };
421
422 let streaming_body = Full::new(body_bytes)
424 .map_err(|e: std::convert::Infallible| match e {})
425 .boxed_unsync();
426
427 lambda_http::Response::from_parts(parts, streaming_body)
428 }
429}
430
431#[cfg(test)]
432mod tests {
433 use super::*;
434 use http::Request;
435 use turul_mcp_session_storage::InMemorySessionStorage;
436
437 #[tokio::test]
438 async fn test_handler_creation() {
439 let session_storage = Arc::new(InMemorySessionStorage::new());
440 let stream_manager = Arc::new(StreamManager::new(session_storage.clone()));
441 let dispatcher = JsonRpcDispatcher::new();
442 let config = ServerConfig::default();
443 let implementation = turul_mcp_protocol::Implementation::new("test", "1.0.0");
444 let capabilities = ServerCapabilities::default();
445
446 let handler = LambdaMcpHandler::new(
447 dispatcher,
448 session_storage,
449 stream_manager,
450 config,
451 StreamConfig::default(),
452 implementation,
453 capabilities,
454 false, #[cfg(feature = "cors")]
456 None,
457 );
458
459 assert!(!handler.sse_enabled);
461 }
462
463 #[tokio::test]
464 async fn test_sse_enabled_with_handle_works() {
465 let session_storage = Arc::new(InMemorySessionStorage::new());
466 let stream_manager = Arc::new(StreamManager::new(session_storage.clone()));
467 let dispatcher = JsonRpcDispatcher::new();
468 let config = ServerConfig::default();
469 let implementation = turul_mcp_protocol::Implementation::new("test", "1.0.0");
470 let capabilities = ServerCapabilities::default();
471
472 let handler = LambdaMcpHandler::new(
474 dispatcher,
475 session_storage,
476 stream_manager,
477 config,
478 StreamConfig::default(),
479 implementation,
480 capabilities,
481 true, #[cfg(feature = "cors")]
483 None,
484 );
485
486 let lambda_req = Request::builder()
488 .method("POST")
489 .uri("/mcp")
490 .body(LambdaBody::Text(
491 r#"{"jsonrpc":"2.0","method":"initialize","id":1}"#.to_string(),
492 ))
493 .unwrap();
494
495 let result = handler.handle(lambda_req).await;
497 assert!(
498 result.is_ok(),
499 "handle() should work with SSE enabled for snapshot-based responses"
500 );
501 }
502
503 #[tokio::test]
505 async fn test_stream_config_preservation() {
506 let session_storage = Arc::new(InMemorySessionStorage::new());
507 let dispatcher = JsonRpcDispatcher::new();
508 let config = ServerConfig::default();
509 let implementation = turul_mcp_protocol::Implementation::new("test", "1.0.0");
510 let capabilities = ServerCapabilities::default();
511
512 let custom_stream_config = StreamConfig {
514 channel_buffer_size: 1024, max_replay_events: 200, keepalive_interval_seconds: 10, cors_origin: "https://custom-test.example.com".to_string(), };
519
520 let stream_manager = Arc::new(StreamManager::with_config(
522 session_storage.clone(),
523 custom_stream_config.clone(),
524 ));
525
526 let handler = LambdaMcpHandler::new(
527 dispatcher,
528 session_storage,
529 stream_manager,
530 config,
531 custom_stream_config.clone(),
532 implementation,
533 capabilities,
534 false, #[cfg(feature = "cors")]
536 None,
537 );
538
539 assert!(!handler.sse_enabled);
541
542 let stream_manager = handler.get_stream_manager();
544
545 let actual_config = stream_manager.get_config();
547
548 assert_eq!(
549 actual_config.channel_buffer_size, custom_stream_config.channel_buffer_size,
550 "Custom channel_buffer_size was not propagated correctly"
551 );
552 assert_eq!(
553 actual_config.max_replay_events, custom_stream_config.max_replay_events,
554 "Custom max_replay_events was not propagated correctly"
555 );
556 assert_eq!(
557 actual_config.keepalive_interval_seconds,
558 custom_stream_config.keepalive_interval_seconds,
559 "Custom keepalive_interval_seconds was not propagated correctly"
560 );
561 assert_eq!(
562 actual_config.cors_origin, custom_stream_config.cors_origin,
563 "Custom cors_origin was not propagated correctly"
564 );
565
566 assert!(Arc::strong_count(stream_manager) >= 1);
568 }
569
570 #[tokio::test]
572 async fn test_full_builder_chain_stream_config() {
573 use crate::LambdaMcpServerBuilder;
574 use turul_mcp_session_storage::InMemorySessionStorage;
575
576 let custom_stream_config = turul_http_mcp_server::StreamConfig {
578 channel_buffer_size: 2048, max_replay_events: 500, keepalive_interval_seconds: 15, cors_origin: "https://full-chain-test.example.com".to_string(),
582 };
583
584 let server = LambdaMcpServerBuilder::new()
586 .name("full-chain-test")
587 .version("1.0.0")
588 .storage(Arc::new(InMemorySessionStorage::new()))
589 .sse(true) .stream_config(custom_stream_config.clone())
591 .build()
592 .await
593 .expect("Server should build successfully");
594
595 let handler = server
597 .handler()
598 .await
599 .expect("Handler should be created from server");
600
601 assert!(handler.sse_enabled, "SSE should be enabled");
603
604 let stream_manager = handler.get_stream_manager();
606 let actual_config = stream_manager.get_config();
607
608 assert_eq!(
609 actual_config.channel_buffer_size, custom_stream_config.channel_buffer_size,
610 "Custom channel_buffer_size should be preserved through builder → server → handler chain"
611 );
612 assert_eq!(
613 actual_config.max_replay_events, custom_stream_config.max_replay_events,
614 "Custom max_replay_events should be preserved through builder → server → handler chain"
615 );
616 assert_eq!(
617 actual_config.keepalive_interval_seconds,
618 custom_stream_config.keepalive_interval_seconds,
619 "Custom keepalive_interval_seconds should be preserved through builder → server → handler chain"
620 );
621 assert_eq!(
622 actual_config.cors_origin, custom_stream_config.cors_origin,
623 "Custom cors_origin should be preserved through builder → server → handler chain"
624 );
625
626 assert!(
628 Arc::strong_count(stream_manager) >= 1,
629 "Stream manager should be properly initialized"
630 );
631
632 let test_session_id = uuid::Uuid::now_v7().as_simple().to_string();
635
636 let subscriptions = stream_manager.get_subscriptions(&test_session_id).await;
639 assert!(
640 subscriptions.is_empty(),
641 "New session should have no subscriptions initially"
642 );
643
644 assert_eq!(
647 stream_manager.get_config().channel_buffer_size,
648 2048,
649 "Stream manager should be using the custom buffer size functionally"
650 );
651 }
652
653 #[tokio::test]
658 async fn test_non_streaming_runtime_sse_false() {
659 use crate::LambdaMcpServerBuilder;
660 use turul_mcp_session_storage::InMemorySessionStorage;
661
662 let server = LambdaMcpServerBuilder::new()
663 .name("test-non-streaming-sse-false")
664 .version("1.0.0")
665 .storage(Arc::new(InMemorySessionStorage::new()))
666 .sse(false) .build()
668 .await
669 .expect("Server should build successfully");
670
671 let handler = server
672 .handler()
673 .await
674 .expect("Handler should be created from server");
675
676 assert!(!handler.sse_enabled, "SSE should be disabled");
678
679 let lambda_req = Request::builder()
681 .method("POST")
682 .uri("/mcp")
683 .body(LambdaBody::Text(
684 r#"{"jsonrpc":"2.0","method":"initialize","id":1}"#.to_string(),
685 ))
686 .unwrap();
687
688 let result = handler.handle(lambda_req).await;
690 assert!(
691 result.is_ok(),
692 "POST /mcp should work with non-streaming + sse(false)"
693 );
694 }
695
696 #[tokio::test]
698 async fn test_non_streaming_runtime_sse_true() {
699 use crate::LambdaMcpServerBuilder;
700 use turul_mcp_session_storage::InMemorySessionStorage;
701
702 let server = LambdaMcpServerBuilder::new()
703 .name("test-non-streaming-sse-true")
704 .version("1.0.0")
705 .storage(Arc::new(InMemorySessionStorage::new()))
706 .sse(true) .build()
708 .await
709 .expect("Server should build successfully");
710
711 let handler = server
712 .handler()
713 .await
714 .expect("Handler should be created from server");
715
716 assert!(handler.sse_enabled, "SSE should be enabled");
718
719 let lambda_req = Request::builder()
721 .method("POST")
722 .uri("/mcp")
723 .body(LambdaBody::Text(
724 r#"{"jsonrpc":"2.0","method":"initialize","id":1}"#.to_string(),
725 ))
726 .unwrap();
727
728 let result = handler.handle(lambda_req).await;
730 assert!(
731 result.is_ok(),
732 "POST /mcp should work with non-streaming + sse(true)"
733 );
734
735 }
738
739 #[tokio::test]
741 async fn test_streaming_runtime_sse_false() {
742 use crate::LambdaMcpServerBuilder;
743 use turul_mcp_session_storage::InMemorySessionStorage;
744
745 let server = LambdaMcpServerBuilder::new()
746 .name("test-streaming-sse-false")
747 .version("1.0.0")
748 .storage(Arc::new(InMemorySessionStorage::new()))
749 .sse(false) .build()
751 .await
752 .expect("Server should build successfully");
753
754 let handler = server
755 .handler()
756 .await
757 .expect("Handler should be created from server");
758
759 assert!(!handler.sse_enabled, "SSE should be disabled");
761
762 let lambda_req = Request::builder()
764 .method("POST")
765 .uri("/mcp")
766 .body(LambdaBody::Text(
767 r#"{"jsonrpc":"2.0","method":"initialize","id":1}"#.to_string(),
768 ))
769 .unwrap();
770
771 let result = handler.handle_streaming(lambda_req).await;
773 assert!(
774 result.is_ok(),
775 "Streaming runtime should work with sse(false)"
776 );
777 }
778
779 #[tokio::test]
781 async fn test_streaming_runtime_sse_true() {
782 use crate::LambdaMcpServerBuilder;
783 use turul_mcp_session_storage::InMemorySessionStorage;
784
785 let server = LambdaMcpServerBuilder::new()
786 .name("test-streaming-sse-true")
787 .version("1.0.0")
788 .storage(Arc::new(InMemorySessionStorage::new()))
789 .sse(true) .build()
791 .await
792 .expect("Server should build successfully");
793
794 let handler = server
795 .handler()
796 .await
797 .expect("Handler should be created from server");
798
799 assert!(handler.sse_enabled, "SSE should be enabled");
801
802 let lambda_req = Request::builder()
804 .method("POST")
805 .uri("/mcp")
806 .body(LambdaBody::Text(
807 r#"{"jsonrpc":"2.0","method":"initialize","id":1}"#.to_string(),
808 ))
809 .unwrap();
810
811 let result = handler.handle_streaming(lambda_req).await;
813 assert!(
814 result.is_ok(),
815 "Streaming runtime should work with sse(true) for real-time streaming"
816 );
817
818 }
821
822 async fn build_strict_streaming_handler() -> LambdaMcpHandler {
826 use crate::LambdaMcpServerBuilder;
827 use turul_mcp_session_storage::InMemorySessionStorage;
828
829 let server = LambdaMcpServerBuilder::new()
830 .name("lifecycle-test")
831 .version("1.0.0")
832 .tool(LifecycleTestTool)
833 .storage(Arc::new(InMemorySessionStorage::new()))
834 .sse(true)
835 .build()
836 .await
837 .expect("build should succeed");
838
839 server.handler().await.expect("handler should succeed")
840 }
841
842 #[derive(Clone, Default)]
844 struct LifecycleTestTool;
845
846 impl turul_mcp_builders::traits::HasBaseMetadata for LifecycleTestTool {
847 fn name(&self) -> &str { "ping_tool" }
848 }
849 impl turul_mcp_builders::traits::HasDescription for LifecycleTestTool {
850 fn description(&self) -> Option<&str> { Some("test tool") }
851 }
852 impl turul_mcp_builders::traits::HasInputSchema for LifecycleTestTool {
853 fn input_schema(&self) -> &turul_mcp_protocol::ToolSchema {
854 static SCHEMA: std::sync::OnceLock<turul_mcp_protocol::ToolSchema> = std::sync::OnceLock::new();
855 SCHEMA.get_or_init(turul_mcp_protocol::ToolSchema::object)
856 }
857 }
858 impl turul_mcp_builders::traits::HasOutputSchema for LifecycleTestTool {
859 fn output_schema(&self) -> Option<&turul_mcp_protocol::ToolSchema> { None }
860 }
861 impl turul_mcp_builders::traits::HasAnnotations for LifecycleTestTool {
862 fn annotations(&self) -> Option<&turul_mcp_protocol::tools::ToolAnnotations> { None }
863 }
864 impl turul_mcp_builders::traits::HasToolMeta for LifecycleTestTool {
865 fn tool_meta(&self) -> Option<&std::collections::HashMap<String, serde_json::Value>> { None }
866 }
867 impl turul_mcp_builders::traits::HasIcons for LifecycleTestTool {}
868 impl turul_mcp_builders::traits::HasExecution for LifecycleTestTool {}
869
870 #[async_trait::async_trait]
871 impl turul_mcp_server::McpTool for LifecycleTestTool {
872 async fn call(
873 &self,
874 _args: serde_json::Value,
875 _session: Option<turul_mcp_server::SessionContext>,
876 ) -> turul_mcp_server::McpResult<turul_mcp_protocol::tools::CallToolResult> {
877 Ok(turul_mcp_protocol::tools::CallToolResult::success(vec![
878 turul_mcp_protocol::tools::ToolResult::text("pong"),
879 ]))
880 }
881 }
882
883 fn streaming_mcp_request(body: &str, session_id: Option<&str>) -> LambdaRequest {
885 let mut builder = Request::builder()
886 .method("POST")
887 .uri("/mcp")
888 .header("Content-Type", "application/json")
889 .header("Accept", "application/json, text/event-stream")
890 .header("MCP-Protocol-Version", "2025-11-25");
891
892 if let Some(sid) = session_id {
893 builder = builder.header("Mcp-Session-Id", sid);
894 }
895
896 builder
897 .body(LambdaBody::Text(body.to_string()))
898 .unwrap()
899 }
900
901 async fn collect_streaming_body(
903 response: lambda_http::Response<
904 http_body_util::combinators::UnsyncBoxBody<bytes::Bytes, hyper::Error>,
905 >,
906 ) -> (http::StatusCode, String) {
907 use http_body_util::BodyExt;
908 let status = response.status();
909 let session_id = response
910 .headers()
911 .get("Mcp-Session-Id")
912 .and_then(|v| v.to_str().ok())
913 .map(String::from);
914 let body_bytes = response
915 .into_body()
916 .collect()
917 .await
918 .map(|c| c.to_bytes())
919 .unwrap_or_default();
920 let body_str = String::from_utf8_lossy(&body_bytes).to_string();
921 let _ = session_id; (status, body_str)
923 }
924
925 fn extract_session_id(
927 response: &lambda_http::Response<
928 http_body_util::combinators::UnsyncBoxBody<bytes::Bytes, hyper::Error>,
929 >,
930 ) -> Option<String> {
931 response
932 .headers()
933 .get("Mcp-Session-Id")
934 .and_then(|v| v.to_str().ok())
935 .map(String::from)
936 }
937
938 fn parse_response_json(body: &str) -> serde_json::Value {
940 let json_str = body
942 .lines()
943 .find(|line| line.starts_with("data: "))
944 .map(|line| &line[6..])
945 .unwrap_or(body.trim());
946 serde_json::from_str(json_str)
947 .unwrap_or_else(|e| panic!("Failed to parse JSON from body: {e}\nBody: {body}"))
948 }
949
950 #[tokio::test]
952 async fn test_lambda_streaming_strict_handshake_succeeds() {
953 let handler = build_strict_streaming_handler().await;
954
955 let init_req = streaming_mcp_request(
957 &serde_json::json!({
958 "jsonrpc": "2.0", "method": "initialize", "id": 1,
959 "params": {
960 "protocolVersion": "2025-11-25",
961 "capabilities": {},
962 "clientInfo": { "name": "test", "version": "1.0.0" }
963 }
964 }).to_string(),
965 None,
966 );
967 let init_resp = handler.handle_streaming(init_req).await.expect("initialize should succeed");
968 let session_id = extract_session_id(&init_resp).expect("must return session ID");
969 let (status, _body) = collect_streaming_body(init_resp).await;
970 assert_eq!(status, 200, "initialize should return 200");
971
972 let notif_req = streaming_mcp_request(
974 &serde_json::json!({
975 "jsonrpc": "2.0",
976 "method": "notifications/initialized",
977 "params": {}
978 }).to_string(),
979 Some(&session_id),
980 );
981 let notif_resp = handler.handle_streaming(notif_req).await.expect("notification should succeed");
982 let (status, _) = collect_streaming_body(notif_resp).await;
983 assert_eq!(status, 202, "notifications/initialized should return 202");
984
985 let list_req = streaming_mcp_request(
987 &serde_json::json!({
988 "jsonrpc": "2.0", "method": "tools/list", "id": 2
989 }).to_string(),
990 Some(&session_id),
991 );
992 let list_resp = handler.handle_streaming(list_req).await.expect("tools/list should succeed");
993 let (status, body) = collect_streaming_body(list_resp).await;
994 assert_eq!(status, 200, "tools/list should return 200");
995 let json = parse_response_json(&body);
996 assert!(json["result"]["tools"].is_array(), "tools/list should return tools array: {json}");
997
998 let call_req = streaming_mcp_request(
1000 &serde_json::json!({
1001 "jsonrpc": "2.0", "method": "tools/call", "id": 3,
1002 "params": { "name": "ping_tool", "arguments": {} }
1003 }).to_string(),
1004 Some(&session_id),
1005 );
1006 let call_resp = handler.handle_streaming(call_req).await.expect("tools/call should succeed");
1007 let (status, body) = collect_streaming_body(call_resp).await;
1008 assert_eq!(status, 200, "tools/call should return 200");
1009 let json = parse_response_json(&body);
1010 assert!(json["result"].is_object(), "tools/call should return result: {json}");
1011 }
1012
1013 #[tokio::test]
1015 async fn test_lambda_streaming_strict_rejects_before_initialized() {
1016 let handler = build_strict_streaming_handler().await;
1017
1018 let init_req = streaming_mcp_request(
1020 &serde_json::json!({
1021 "jsonrpc": "2.0", "method": "initialize", "id": 1,
1022 "params": {
1023 "protocolVersion": "2025-11-25",
1024 "capabilities": {},
1025 "clientInfo": { "name": "test", "version": "1.0.0" }
1026 }
1027 }).to_string(),
1028 None,
1029 );
1030 let init_resp = handler.handle_streaming(init_req).await.unwrap();
1031 let session_id = extract_session_id(&init_resp).unwrap();
1032 let _ = collect_streaming_body(init_resp).await;
1033
1034 let list_req = streaming_mcp_request(
1036 &serde_json::json!({
1037 "jsonrpc": "2.0", "method": "tools/list", "id": 2
1038 }).to_string(),
1039 Some(&session_id),
1040 );
1041 let list_resp = handler.handle_streaming(list_req).await.unwrap();
1042 let (_, body) = collect_streaming_body(list_resp).await;
1043 let json = parse_response_json(&body);
1044 assert!(json["error"].is_object(), "tools/list should return JSON-RPC error: {json}");
1045 assert_eq!(
1046 json["error"]["code"].as_i64().unwrap(),
1047 -32031,
1048 "tools/list must return SessionError code -32031, got: {json}"
1049 );
1050 assert!(
1051 json["error"]["message"].as_str().unwrap().contains("notifications/initialized"),
1052 "Error must mention notifications/initialized: {}",
1053 json["error"]["message"]
1054 );
1055
1056 let call_req = streaming_mcp_request(
1058 &serde_json::json!({
1059 "jsonrpc": "2.0", "method": "tools/call", "id": 3,
1060 "params": { "name": "ping_tool", "arguments": {} }
1061 }).to_string(),
1062 Some(&session_id),
1063 );
1064 let call_resp = handler.handle_streaming(call_req).await.unwrap();
1065 let (_, body) = collect_streaming_body(call_resp).await;
1066 let json = parse_response_json(&body);
1067 assert!(json["error"].is_object(), "tools/call should return JSON-RPC error: {json}");
1068 assert_eq!(
1069 json["error"]["code"].as_i64().unwrap(),
1070 -32031,
1071 "tools/call must return SessionError code -32031, got: {json}"
1072 );
1073 assert!(
1074 json["error"]["message"].as_str().unwrap().contains("notifications/initialized"),
1075 "Error must mention notifications/initialized: {}",
1076 json["error"]["message"]
1077 );
1078 }
1079
1080 #[tokio::test]
1082 async fn test_lambda_streaming_initialized_is_effective_immediately() {
1083 let handler = build_strict_streaming_handler().await;
1084
1085 let init_req = streaming_mcp_request(
1087 &serde_json::json!({
1088 "jsonrpc": "2.0", "method": "initialize", "id": 1,
1089 "params": {
1090 "protocolVersion": "2025-11-25",
1091 "capabilities": {},
1092 "clientInfo": { "name": "test", "version": "1.0.0" }
1093 }
1094 }).to_string(),
1095 None,
1096 );
1097 let init_resp = handler.handle_streaming(init_req).await.unwrap();
1098 let session_id = extract_session_id(&init_resp).unwrap();
1099 let _ = collect_streaming_body(init_resp).await;
1100
1101 let notif_req = streaming_mcp_request(
1103 &serde_json::json!({
1104 "jsonrpc": "2.0",
1105 "method": "notifications/initialized",
1106 "params": {}
1107 }).to_string(),
1108 Some(&session_id),
1109 );
1110 let notif_resp = handler.handle_streaming(notif_req).await.unwrap();
1111 let (status, _) = collect_streaming_body(notif_resp).await;
1112 assert_eq!(status, 202);
1113
1114 let list_req = streaming_mcp_request(
1116 &serde_json::json!({
1117 "jsonrpc": "2.0", "method": "tools/list", "id": 2
1118 }).to_string(),
1119 Some(&session_id),
1120 );
1121 let list_resp = handler.handle_streaming(list_req).await.unwrap();
1122 let (status, body) = collect_streaming_body(list_resp).await;
1123 assert_eq!(status, 200, "tools/list must succeed immediately after initialized");
1124 let json = parse_response_json(&body);
1125 assert!(
1126 json["result"]["tools"].is_array(),
1127 "Must return tools list, not error: {json}"
1128 );
1129 }
1130
1131 #[tokio::test]
1133 async fn test_lambda_streaming_lenient_mode_allows_without_initialized() {
1134 use crate::LambdaMcpServerBuilder;
1135 use turul_mcp_session_storage::InMemorySessionStorage;
1136
1137 let server = LambdaMcpServerBuilder::new()
1138 .name("lenient-test")
1139 .version("1.0.0")
1140 .tool(LifecycleTestTool)
1141 .storage(Arc::new(InMemorySessionStorage::new()))
1142 .strict_lifecycle(false) .sse(true)
1144 .build()
1145 .await
1146 .unwrap();
1147
1148 let handler = server.handler().await.unwrap();
1149
1150 let init_req = streaming_mcp_request(
1152 &serde_json::json!({
1153 "jsonrpc": "2.0", "method": "initialize", "id": 1,
1154 "params": {
1155 "protocolVersion": "2025-11-25",
1156 "capabilities": {},
1157 "clientInfo": { "name": "test", "version": "1.0.0" }
1158 }
1159 }).to_string(),
1160 None,
1161 );
1162 let init_resp = handler.handle_streaming(init_req).await.unwrap();
1163 let session_id = extract_session_id(&init_resp).unwrap();
1164 let _ = collect_streaming_body(init_resp).await;
1165
1166 let list_req = streaming_mcp_request(
1168 &serde_json::json!({
1169 "jsonrpc": "2.0", "method": "tools/list", "id": 2
1170 }).to_string(),
1171 Some(&session_id),
1172 );
1173 let list_resp = handler.handle_streaming(list_req).await.unwrap();
1174 let (status, body) = collect_streaming_body(list_resp).await;
1175 assert_eq!(status, 200, "Lenient mode should allow tools/list without initialized");
1176 let json = parse_response_json(&body);
1177 assert!(
1178 json["result"]["tools"].is_array(),
1179 "Must return tools list in lenient mode: {json}"
1180 );
1181 }
1182}