Skip to main content

turul_mcp_aws_lambda/
handler.rs

1//! Lambda MCP handler that delegates to SessionMcpHandler
2//!
3//! This module provides the LambdaMcpHandler that processes Lambda HTTP
4//! requests by delegating to SessionMcpHandler, eliminating code duplication.
5
6use std::sync::Arc;
7
8use lambda_http::{Body as LambdaBody, Request as LambdaRequest, Response as LambdaResponse};
9use tracing::{debug, info};
10
11use turul_http_mcp_server::{
12    ServerConfig, SessionMcpHandler, StreamConfig, StreamManager, StreamableHttpHandler,
13};
14use turul_mcp_json_rpc_server::JsonRpcDispatcher;
15use turul_mcp_protocol::{McpError, ServerCapabilities};
16use turul_mcp_session_storage::BoxedSessionStorage;
17
18use crate::error::Result;
19
20#[cfg(feature = "cors")]
21use crate::cors::{CorsConfig, create_preflight_response, inject_cors_headers};
22
23/// Main handler for Lambda MCP requests
24///
25/// This handler processes MCP requests in Lambda by delegating to SessionMcpHandler,
26/// eliminating 600+ lines of duplicate business logic code.
27///
28/// Features:
29/// 1. Type conversion between lambda_http and hyper
30/// 2. Delegation to SessionMcpHandler for all business logic
31/// 3. CORS support for browser clients
32/// 4. SSE validation to prevent silent failures
33#[derive(Clone)]
34pub struct LambdaMcpHandler {
35    /// SessionMcpHandler for legacy protocol support
36    session_handler: SessionMcpHandler,
37
38    /// StreamableHttpHandler for MCP 2025-11-25 with proper headers
39    streamable_handler: StreamableHttpHandler,
40
41    /// Whether SSE is enabled (used for testing and debugging)
42    #[allow(dead_code)]
43    sse_enabled: bool,
44
45    /// Custom route registry (e.g., .well-known endpoints)
46    route_registry: Arc<turul_http_mcp_server::RouteRegistry>,
47
48    /// CORS configuration (if enabled)
49    #[cfg(feature = "cors")]
50    cors_config: Option<CorsConfig>,
51}
52
53impl LambdaMcpHandler {
54    /// Create a new Lambda MCP handler with the framework components
55    #[allow(clippy::too_many_arguments)]
56    pub fn new(
57        dispatcher: JsonRpcDispatcher<McpError>,
58        session_storage: Arc<BoxedSessionStorage>,
59        stream_manager: Arc<StreamManager>,
60        config: ServerConfig,
61        stream_config: StreamConfig,
62        _implementation: turul_mcp_protocol::Implementation,
63        capabilities: ServerCapabilities,
64        sse_enabled: bool,
65        #[cfg(feature = "cors")] cors_config: Option<CorsConfig>,
66    ) -> Self {
67        let dispatcher = Arc::new(dispatcher);
68
69        // Create empty middleware stack (shared by both handlers)
70        let middleware_stack = Arc::new(turul_http_mcp_server::middleware::MiddlewareStack::new());
71
72        // Create SessionMcpHandler for legacy protocol support
73        let session_handler = SessionMcpHandler::with_shared_stream_manager(
74            config.clone(),
75            dispatcher.clone(),
76            session_storage.clone(),
77            stream_config.clone(),
78            stream_manager.clone(),
79            middleware_stack.clone(),
80        );
81
82        // Create StreamableHttpHandler for MCP 2025-11-25 support
83        let streamable_handler = StreamableHttpHandler::new(
84            Arc::new(config.clone()),
85            dispatcher.clone(),
86            session_storage.clone(),
87            stream_manager.clone(),
88            capabilities.clone(),
89            middleware_stack,
90        );
91
92        Self {
93            session_handler,
94            streamable_handler,
95            sse_enabled,
96            route_registry: Arc::new(turul_http_mcp_server::RouteRegistry::new()),
97            #[cfg(feature = "cors")]
98            cors_config,
99        }
100    }
101
102    /// Create with shared stream manager (for advanced use cases)
103    #[allow(clippy::too_many_arguments)]
104    pub fn with_shared_stream_manager(
105        config: ServerConfig,
106        dispatcher: Arc<JsonRpcDispatcher<McpError>>,
107        session_storage: Arc<BoxedSessionStorage>,
108        stream_manager: Arc<StreamManager>,
109        stream_config: StreamConfig,
110        _implementation: turul_mcp_protocol::Implementation,
111        capabilities: ServerCapabilities,
112        sse_enabled: bool,
113    ) -> Self {
114        // Create empty middleware stack (shared by both handlers)
115        let middleware_stack = Arc::new(turul_http_mcp_server::middleware::MiddlewareStack::new());
116
117        // Create SessionMcpHandler for legacy protocol support
118        let session_handler = SessionMcpHandler::with_shared_stream_manager(
119            config.clone(),
120            dispatcher.clone(),
121            session_storage.clone(),
122            stream_config.clone(),
123            stream_manager.clone(),
124            middleware_stack.clone(),
125        );
126
127        // Create StreamableHttpHandler for MCP 2025-11-25 support
128        let streamable_handler = StreamableHttpHandler::new(
129            Arc::new(config),
130            dispatcher,
131            session_storage,
132            stream_manager,
133            capabilities,
134            middleware_stack,
135        );
136
137        Self {
138            session_handler,
139            streamable_handler,
140            sse_enabled,
141            route_registry: Arc::new(turul_http_mcp_server::RouteRegistry::new()),
142            #[cfg(feature = "cors")]
143            cors_config: None,
144        }
145    }
146
147    /// Create with custom middleware stack (for testing and examples)
148    #[allow(clippy::too_many_arguments)]
149    pub fn with_middleware(
150        config: ServerConfig,
151        dispatcher: Arc<JsonRpcDispatcher<McpError>>,
152        session_storage: Arc<BoxedSessionStorage>,
153        stream_manager: Arc<StreamManager>,
154        stream_config: StreamConfig,
155        capabilities: ServerCapabilities,
156        middleware_stack: Arc<turul_http_mcp_server::middleware::MiddlewareStack>,
157        sse_enabled: bool,
158        route_registry: Arc<turul_http_mcp_server::RouteRegistry>,
159    ) -> Self {
160        // Create SessionMcpHandler with custom middleware
161        let session_handler = SessionMcpHandler::with_shared_stream_manager(
162            config.clone(),
163            dispatcher.clone(),
164            session_storage.clone(),
165            stream_config.clone(),
166            stream_manager.clone(),
167            middleware_stack.clone(),
168        );
169
170        // Create StreamableHttpHandler with custom middleware
171        let streamable_handler = StreamableHttpHandler::new(
172            Arc::new(config),
173            dispatcher,
174            session_storage,
175            stream_manager,
176            capabilities,
177            middleware_stack,
178        );
179
180        Self {
181            session_handler,
182            streamable_handler,
183            sse_enabled,
184            route_registry,
185            #[cfg(feature = "cors")]
186            cors_config: None,
187        }
188    }
189
190    /// Set CORS configuration
191    #[cfg(feature = "cors")]
192    pub fn with_cors(mut self, cors_config: CorsConfig) -> Self {
193        self.cors_config = Some(cors_config);
194        self
195    }
196
197    /// Get access to the underlying stream manager for notifications
198    pub fn get_stream_manager(&self) -> &Arc<StreamManager> {
199        self.session_handler.get_stream_manager()
200    }
201
202    /// Handle a Lambda HTTP request (snapshot mode - no real-time SSE)
203    ///
204    /// This method performs delegation to SessionMcpHandler for all business logic.
205    /// It only handles Lambda-specific concerns: CORS and type conversion.
206    ///
207    /// Note: If SSE is enabled (.sse(true)), SSE responses may not stream properly
208    /// with regular Lambda runtime. For proper SSE streaming, use handle_streaming()
209    /// with run_with_streaming_response().
210    pub async fn handle(&self, req: LambdaRequest) -> Result<LambdaResponse<LambdaBody>> {
211        let method = req.method().clone();
212        let uri = req.uri().clone();
213
214        let request_origin = req
215            .headers()
216            .get("origin")
217            .and_then(|v| v.to_str().ok())
218            .map(|s| s.to_string());
219
220        info!(
221            "🌐 Lambda MCP request: {} {} (origin: {:?})",
222            method, uri, request_origin
223        );
224
225        // Handle CORS preflight requests first (Lambda-specific logic)
226        #[cfg(feature = "cors")]
227        if method == http::Method::OPTIONS
228            && let Some(ref cors_config) = self.cors_config
229        {
230            debug!("Handling CORS preflight request");
231            return create_preflight_response(cors_config, request_origin.as_deref());
232        }
233
234        // 🚀 DELEGATION: Convert Lambda request to hyper request
235        let hyper_req = crate::adapter::lambda_to_hyper_request(req)?;
236
237        // Check custom routes (e.g., .well-known) before MCP delegation
238        let path = hyper_req.uri().path().to_string();
239        if !self.route_registry.is_empty() {
240            match self.route_registry.match_route(&path) {
241                Ok(Some(route_handler)) => {
242                    debug!("Custom route matched: {}", path);
243                    use http_body_util::BodyExt;
244                    let (parts, body) = hyper_req.into_parts();
245                    let boxed_req = hyper::Request::from_parts(parts, body.boxed_unsync());
246                    let route_resp = route_handler.handle(boxed_req).await;
247                    let mut lambda_resp =
248                        crate::adapter::hyper_to_lambda_response(route_resp).await?;
249                    #[cfg(feature = "cors")]
250                    if let Some(ref cors_config) = self.cors_config {
251                        inject_cors_headers(
252                            &mut lambda_resp,
253                            cors_config,
254                            request_origin.as_deref(),
255                        )?;
256                    }
257                    return Ok(lambda_resp);
258                }
259                Ok(None) => {} // No match, continue to MCP handler
260                Err(e) => {
261                    debug!("Route validation error: {}", e);
262                    let route_resp = e.into_response();
263                    let mut lambda_resp =
264                        crate::adapter::hyper_to_lambda_response(route_resp).await?;
265                    #[cfg(feature = "cors")]
266                    if let Some(ref cors_config) = self.cors_config {
267                        inject_cors_headers(
268                            &mut lambda_resp,
269                            cors_config,
270                            request_origin.as_deref(),
271                        )?;
272                    }
273                    return Ok(lambda_resp);
274                }
275            }
276        }
277
278        // 🚀 DELEGATION: Use SessionMcpHandler for all business logic
279        let hyper_resp = self
280            .session_handler
281            .handle_mcp_request(hyper_req)
282            .await
283            .map_err(|e| crate::error::LambdaError::McpFramework(e.to_string()))?;
284
285        // 🚀 DELEGATION: Convert hyper response back to Lambda response
286        let mut lambda_resp = crate::adapter::hyper_to_lambda_response(hyper_resp).await?;
287
288        // Apply CORS headers if configured (Lambda-specific logic)
289        #[cfg(feature = "cors")]
290        if let Some(ref cors_config) = self.cors_config {
291            inject_cors_headers(&mut lambda_resp, cors_config, request_origin.as_deref())?;
292        }
293
294        Ok(lambda_resp)
295    }
296
297    /// Handle Lambda streaming request (real SSE streaming)
298    ///
299    /// This method enables real-time SSE streaming using Lambda's streaming response capability.
300    /// It delegates all business logic to SessionMcpHandler.
301    pub async fn handle_streaming(
302        &self,
303        req: LambdaRequest,
304    ) -> std::result::Result<
305        lambda_http::Response<
306            http_body_util::combinators::UnsyncBoxBody<bytes::Bytes, hyper::Error>,
307        >,
308        Box<dyn std::error::Error + Send + Sync>,
309    > {
310        let method = req.method().clone();
311        let uri = req.uri().clone();
312        let request_origin = req
313            .headers()
314            .get("origin")
315            .and_then(|v| v.to_str().ok())
316            .map(|s| s.to_string());
317
318        debug!(
319            "🌊 Lambda streaming MCP request: {} {} (origin: {:?})",
320            method, uri, request_origin
321        );
322
323        // Handle CORS preflight requests first (Lambda-specific logic)
324        #[cfg(feature = "cors")]
325        if method == http::Method::OPTIONS
326            && let Some(ref cors_config) = self.cors_config
327        {
328            debug!("Handling CORS preflight request (streaming)");
329            let preflight_response =
330                create_preflight_response(cors_config, request_origin.as_deref())
331                    .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?;
332
333            // Convert LambdaResponse<LambdaBody> to streaming response
334            return Ok(self.convert_lambda_response_to_streaming(preflight_response));
335        }
336
337        // 🚀 DELEGATION: Convert Lambda request to hyper request
338        let hyper_req = crate::adapter::lambda_to_hyper_request(req)
339            .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?;
340
341        // Check custom routes (e.g., .well-known) before MCP delegation
342        let path = hyper_req.uri().path().to_string();
343        if !self.route_registry.is_empty() {
344            match self.route_registry.match_route(&path) {
345                Ok(Some(route_handler)) => {
346                    debug!("Custom route matched (streaming): {}", path);
347                    use http_body_util::BodyExt;
348                    let (parts, body) = hyper_req.into_parts();
349                    let boxed_req = hyper::Request::from_parts(parts, body.boxed_unsync());
350                    return Ok(route_handler.handle(boxed_req).await);
351                }
352                Ok(None) => {} // No match, continue to MCP handler
353                Err(e) => {
354                    debug!("Route validation error (streaming): {}", e);
355                    return Ok(e.into_response());
356                }
357            }
358        }
359
360        // 🚀 PROTOCOL ROUTING: Check protocol version and route to appropriate handler
361        use turul_http_mcp_server::protocol::McpProtocolVersion;
362        let protocol_version = hyper_req
363            .headers()
364            .get("MCP-Protocol-Version")
365            .and_then(|h| h.to_str().ok())
366            .and_then(McpProtocolVersion::parse_version)
367            .unwrap_or(McpProtocolVersion::V2025_06_18);
368
369        // Route based on protocol version
370        let hyper_resp = if protocol_version.supports_streamable_http() {
371            // Use StreamableHttpHandler for MCP 2025-11-25 (proper headers, chunked SSE)
372            debug!(
373                "Using StreamableHttpHandler for protocol {}",
374                protocol_version.to_string()
375            );
376            self.streamable_handler.handle_request(hyper_req).await
377        } else {
378            // Legacy protocol: use SessionMcpHandler
379            debug!(
380                "Using SessionMcpHandler for legacy protocol {}",
381                protocol_version.to_string()
382            );
383            self.session_handler
384                .handle_mcp_request(hyper_req)
385                .await
386                .map_err(|e| {
387                    Box::new(crate::error::LambdaError::McpFramework(e.to_string()))
388                        as Box<dyn std::error::Error + Send + Sync>
389                })?
390        };
391
392        // 🚀 DELEGATION: Convert hyper response to Lambda streaming response (preserves streaming!)
393        let mut lambda_resp = crate::adapter::hyper_to_lambda_streaming(hyper_resp);
394
395        // Apply CORS headers if configured (Lambda-specific logic)
396        #[cfg(feature = "cors")]
397        if let Some(ref cors_config) = self.cors_config {
398            inject_cors_headers(&mut lambda_resp, cors_config, request_origin.as_deref())
399                .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?;
400        }
401
402        Ok(lambda_resp)
403    }
404
405    /// Convert Lambda response to streaming format (helper for CORS preflight)
406    fn convert_lambda_response_to_streaming(
407        &self,
408        lambda_response: LambdaResponse<LambdaBody>,
409    ) -> lambda_http::Response<http_body_util::combinators::UnsyncBoxBody<bytes::Bytes, hyper::Error>>
410    {
411        use bytes::Bytes;
412        use http_body_util::{BodyExt, Full};
413
414        let (parts, body) = lambda_response.into_parts();
415        let body_bytes = match body {
416            LambdaBody::Empty => Bytes::new(),
417            LambdaBody::Text(text) => Bytes::from(text),
418            LambdaBody::Binary(bytes) => Bytes::from(bytes),
419            _ => Bytes::new(),
420        };
421
422        // Map error type from Infallible to hyper::Error
423        let streaming_body = Full::new(body_bytes)
424            .map_err(|e: std::convert::Infallible| match e {})
425            .boxed_unsync();
426
427        lambda_http::Response::from_parts(parts, streaming_body)
428    }
429}
430
431#[cfg(test)]
432mod tests {
433    use super::*;
434    use http::Request;
435    use turul_mcp_session_storage::InMemorySessionStorage;
436
437    #[tokio::test]
438    async fn test_handler_creation() {
439        let session_storage = Arc::new(InMemorySessionStorage::new());
440        let stream_manager = Arc::new(StreamManager::new(session_storage.clone()));
441        let dispatcher = JsonRpcDispatcher::new();
442        let config = ServerConfig::default();
443        let implementation = turul_mcp_protocol::Implementation::new("test", "1.0.0");
444        let capabilities = ServerCapabilities::default();
445
446        let handler = LambdaMcpHandler::new(
447            dispatcher,
448            session_storage,
449            stream_manager,
450            config,
451            StreamConfig::default(),
452            implementation,
453            capabilities,
454            false, // SSE disabled for test
455            #[cfg(feature = "cors")]
456            None,
457        );
458
459        // Test that handler was created successfully
460        assert!(!handler.sse_enabled);
461    }
462
463    #[tokio::test]
464    async fn test_sse_enabled_with_handle_works() {
465        let session_storage = Arc::new(InMemorySessionStorage::new());
466        let stream_manager = Arc::new(StreamManager::new(session_storage.clone()));
467        let dispatcher = JsonRpcDispatcher::new();
468        let config = ServerConfig::default();
469        let implementation = turul_mcp_protocol::Implementation::new("test", "1.0.0");
470        let capabilities = ServerCapabilities::default();
471
472        // Create handler with SSE enabled
473        let handler = LambdaMcpHandler::new(
474            dispatcher,
475            session_storage,
476            stream_manager,
477            config,
478            StreamConfig::default(),
479            implementation,
480            capabilities,
481            true, // SSE enabled - should work with handle() for snapshot-based SSE
482            #[cfg(feature = "cors")]
483            None,
484        );
485
486        // Create a test Lambda request
487        let lambda_req = Request::builder()
488            .method("POST")
489            .uri("/mcp")
490            .body(LambdaBody::Text(
491                r#"{"jsonrpc":"2.0","method":"initialize","id":1}"#.to_string(),
492            ))
493            .unwrap();
494
495        // handle() should work (provides snapshot-based SSE rather than real-time streaming)
496        let result = handler.handle(lambda_req).await;
497        assert!(
498            result.is_ok(),
499            "handle() should work with SSE enabled for snapshot-based responses"
500        );
501    }
502
503    /// Test that verifies StreamConfig is properly threaded through the delegation
504    #[tokio::test]
505    async fn test_stream_config_preservation() {
506        let session_storage = Arc::new(InMemorySessionStorage::new());
507        let dispatcher = JsonRpcDispatcher::new();
508        let config = ServerConfig::default();
509        let implementation = turul_mcp_protocol::Implementation::new("test", "1.0.0");
510        let capabilities = ServerCapabilities::default();
511
512        // Create a custom StreamConfig with non-default values
513        let custom_stream_config = StreamConfig {
514            channel_buffer_size: 1024,      // Non-default value (default is 1000)
515            max_replay_events: 200,         // Non-default value (default is 100)
516            keepalive_interval_seconds: 10, // Non-default value (default is 30)
517            cors_origin: "https://custom-test.example.com".to_string(), // Non-default value
518        };
519
520        // Create stream manager with the custom config
521        let stream_manager = Arc::new(StreamManager::with_config(
522            session_storage.clone(),
523            custom_stream_config.clone(),
524        ));
525
526        let handler = LambdaMcpHandler::new(
527            dispatcher,
528            session_storage,
529            stream_manager,
530            config,
531            custom_stream_config.clone(),
532            implementation,
533            capabilities,
534            false, // SSE disabled for test
535            #[cfg(feature = "cors")]
536            None,
537        );
538
539        // The handler should be created successfully, proving the StreamConfig was accepted
540        assert!(!handler.sse_enabled);
541
542        // Verify that the stream manager has the custom configuration
543        let stream_manager = handler.get_stream_manager();
544
545        // Verify the StreamConfig values were propagated correctly
546        let actual_config = stream_manager.get_config();
547
548        assert_eq!(
549            actual_config.channel_buffer_size, custom_stream_config.channel_buffer_size,
550            "Custom channel_buffer_size was not propagated correctly"
551        );
552        assert_eq!(
553            actual_config.max_replay_events, custom_stream_config.max_replay_events,
554            "Custom max_replay_events was not propagated correctly"
555        );
556        assert_eq!(
557            actual_config.keepalive_interval_seconds,
558            custom_stream_config.keepalive_interval_seconds,
559            "Custom keepalive_interval_seconds was not propagated correctly"
560        );
561        assert_eq!(
562            actual_config.cors_origin, custom_stream_config.cors_origin,
563            "Custom cors_origin was not propagated correctly"
564        );
565
566        // Verify the stream manager is accessible (proves delegation worked)
567        assert!(Arc::strong_count(stream_manager) >= 1);
568    }
569
570    /// Test the full builder → server → handler chain with StreamConfig
571    #[tokio::test]
572    async fn test_full_builder_chain_stream_config() {
573        use crate::LambdaMcpServerBuilder;
574        use turul_mcp_session_storage::InMemorySessionStorage;
575
576        // Create a custom StreamConfig with non-default values
577        let custom_stream_config = turul_http_mcp_server::StreamConfig {
578            channel_buffer_size: 2048,      // Non-default value
579            max_replay_events: 500,         // Non-default value
580            keepalive_interval_seconds: 15, // Non-default value
581            cors_origin: "https://full-chain-test.example.com".to_string(),
582        };
583
584        // Test the complete builder → server → handler chain
585        let server = LambdaMcpServerBuilder::new()
586            .name("full-chain-test")
587            .version("1.0.0")
588            .storage(Arc::new(InMemorySessionStorage::new()))
589            .sse(true) // Enable SSE to test streaming functionality
590            .stream_config(custom_stream_config.clone())
591            .build()
592            .await
593            .expect("Server should build successfully");
594
595        // Create handler from server (this is the critical chain step)
596        let handler = server
597            .handler()
598            .await
599            .expect("Handler should be created from server");
600
601        // Verify the handler was created successfully
602        assert!(handler.sse_enabled, "SSE should be enabled");
603
604        // Verify that the custom StreamConfig was preserved through the entire chain
605        let stream_manager = handler.get_stream_manager();
606        let actual_config = stream_manager.get_config();
607
608        assert_eq!(
609            actual_config.channel_buffer_size, custom_stream_config.channel_buffer_size,
610            "Custom channel_buffer_size should be preserved through builder → server → handler chain"
611        );
612        assert_eq!(
613            actual_config.max_replay_events, custom_stream_config.max_replay_events,
614            "Custom max_replay_events should be preserved through builder → server → handler chain"
615        );
616        assert_eq!(
617            actual_config.keepalive_interval_seconds,
618            custom_stream_config.keepalive_interval_seconds,
619            "Custom keepalive_interval_seconds should be preserved through builder → server → handler chain"
620        );
621        assert_eq!(
622            actual_config.cors_origin, custom_stream_config.cors_origin,
623            "Custom cors_origin should be preserved through builder → server → handler chain"
624        );
625
626        // Verify the stream manager is functional
627        assert!(
628            Arc::strong_count(stream_manager) >= 1,
629            "Stream manager should be properly initialized"
630        );
631
632        // Additional verification: Test that the configuration is actually used functionally
633        // by verifying the stream manager can be used with the custom configuration
634        let test_session_id = uuid::Uuid::now_v7().as_simple().to_string();
635
636        // The stream manager should be able to handle session operations with the custom config
637        // This verifies the config isn't just preserved but actually used
638        let subscriptions = stream_manager.get_subscriptions(&test_session_id).await;
639        assert!(
640            subscriptions.is_empty(),
641            "New session should have no subscriptions initially"
642        );
643
644        // Verify the stream manager was constructed with our custom config values
645        // This confirms the config propagated through the entire builder → server → handler chain
646        assert_eq!(
647            stream_manager.get_config().channel_buffer_size,
648            2048,
649            "Stream manager should be using the custom buffer size functionally"
650        );
651    }
652
653    /// Test matrix: 4 combinations of streaming runtime vs SSE configuration
654    /// This ensures we don't have runtime hangs or configuration conflicts
655    ///
656    /// Test 1: Non-streaming runtime + sse(false) - This should work (snapshot mode)
657    #[tokio::test]
658    async fn test_non_streaming_runtime_sse_false() {
659        use crate::LambdaMcpServerBuilder;
660        use turul_mcp_session_storage::InMemorySessionStorage;
661
662        let server = LambdaMcpServerBuilder::new()
663            .name("test-non-streaming-sse-false")
664            .version("1.0.0")
665            .storage(Arc::new(InMemorySessionStorage::new()))
666            .sse(false) // Disable SSE for non-streaming runtime
667            .build()
668            .await
669            .expect("Server should build successfully");
670
671        let handler = server
672            .handler()
673            .await
674            .expect("Handler should be created from server");
675
676        // Verify configuration
677        assert!(!handler.sse_enabled, "SSE should be disabled");
678
679        // Create a test request (POST /mcp works in all configs)
680        let lambda_req = Request::builder()
681            .method("POST")
682            .uri("/mcp")
683            .body(LambdaBody::Text(
684                r#"{"jsonrpc":"2.0","method":"initialize","id":1}"#.to_string(),
685            ))
686            .unwrap();
687
688        // This should work without hanging
689        let result = handler.handle(lambda_req).await;
690        assert!(
691            result.is_ok(),
692            "POST /mcp should work with non-streaming + sse(false)"
693        );
694    }
695
696    /// Test 2: Non-streaming runtime + sse(true) - This should work (snapshot-based SSE)
697    #[tokio::test]
698    async fn test_non_streaming_runtime_sse_true() {
699        use crate::LambdaMcpServerBuilder;
700        use turul_mcp_session_storage::InMemorySessionStorage;
701
702        let server = LambdaMcpServerBuilder::new()
703            .name("test-non-streaming-sse-true")
704            .version("1.0.0")
705            .storage(Arc::new(InMemorySessionStorage::new()))
706            .sse(true) // Enable SSE for snapshot-based responses
707            .build()
708            .await
709            .expect("Server should build successfully");
710
711        let handler = server
712            .handler()
713            .await
714            .expect("Handler should be created from server");
715
716        // Verify configuration
717        assert!(handler.sse_enabled, "SSE should be enabled");
718
719        // Create a test request (POST /mcp works in all configs)
720        let lambda_req = Request::builder()
721            .method("POST")
722            .uri("/mcp")
723            .body(LambdaBody::Text(
724                r#"{"jsonrpc":"2.0","method":"initialize","id":1}"#.to_string(),
725            ))
726            .unwrap();
727
728        // This should work without hanging (provides snapshot-based SSE)
729        let result = handler.handle(lambda_req).await;
730        assert!(
731            result.is_ok(),
732            "POST /mcp should work with non-streaming + sse(true)"
733        );
734
735        // Note: GET /mcp would provide snapshot events, not real-time streaming
736        // This is the key difference from handle_streaming()
737    }
738
739    /// Test 3: Streaming runtime + sse(false) - This should work (SSE disabled)
740    #[tokio::test]
741    async fn test_streaming_runtime_sse_false() {
742        use crate::LambdaMcpServerBuilder;
743        use turul_mcp_session_storage::InMemorySessionStorage;
744
745        let server = LambdaMcpServerBuilder::new()
746            .name("test-streaming-sse-false")
747            .version("1.0.0")
748            .storage(Arc::new(InMemorySessionStorage::new()))
749            .sse(false) // Disable SSE even with streaming runtime
750            .build()
751            .await
752            .expect("Server should build successfully");
753
754        let handler = server
755            .handler()
756            .await
757            .expect("Handler should be created from server");
758
759        // Verify configuration
760        assert!(!handler.sse_enabled, "SSE should be disabled");
761
762        // Create a test request for streaming handler
763        let lambda_req = Request::builder()
764            .method("POST")
765            .uri("/mcp")
766            .body(LambdaBody::Text(
767                r#"{"jsonrpc":"2.0","method":"initialize","id":1}"#.to_string(),
768            ))
769            .unwrap();
770
771        // This should work with streaming runtime even when SSE is disabled
772        let result = handler.handle_streaming(lambda_req).await;
773        assert!(
774            result.is_ok(),
775            "Streaming runtime should work with sse(false)"
776        );
777    }
778
779    /// Test 4: Streaming runtime + sse(true) - This should work (real-time SSE streaming)
780    #[tokio::test]
781    async fn test_streaming_runtime_sse_true() {
782        use crate::LambdaMcpServerBuilder;
783        use turul_mcp_session_storage::InMemorySessionStorage;
784
785        let server = LambdaMcpServerBuilder::new()
786            .name("test-streaming-sse-true")
787            .version("1.0.0")
788            .storage(Arc::new(InMemorySessionStorage::new()))
789            .sse(true) // Enable SSE with streaming runtime for real-time streaming
790            .build()
791            .await
792            .expect("Server should build successfully");
793
794        let handler = server
795            .handler()
796            .await
797            .expect("Handler should be created from server");
798
799        // Verify configuration
800        assert!(handler.sse_enabled, "SSE should be enabled");
801
802        // Create a test request for streaming handler
803        let lambda_req = Request::builder()
804            .method("POST")
805            .uri("/mcp")
806            .body(LambdaBody::Text(
807                r#"{"jsonrpc":"2.0","method":"initialize","id":1}"#.to_string(),
808            ))
809            .unwrap();
810
811        // This should work and provide real-time SSE streaming
812        let result = handler.handle_streaming(lambda_req).await;
813        assert!(
814            result.is_ok(),
815            "Streaming runtime should work with sse(true) for real-time streaming"
816        );
817
818        // Note: GET /mcp would provide real-time streaming events
819        // This is the optimal configuration for real-time notifications
820    }
821}