turul_mcp_aws_lambda/
handler.rs

1//! Lambda MCP handler that delegates to SessionMcpHandler
2//!
3//! This module provides the LambdaMcpHandler that processes Lambda HTTP
4//! requests by delegating to SessionMcpHandler, eliminating code duplication.
5
6use std::sync::Arc;
7
8use lambda_http::{Body as LambdaBody, Request as LambdaRequest, Response as LambdaResponse};
9use tracing::{debug, info};
10
11use turul_http_mcp_server::{
12    ServerConfig, SessionMcpHandler, StreamConfig, StreamManager, StreamableHttpHandler,
13};
14use turul_mcp_json_rpc_server::JsonRpcDispatcher;
15use turul_mcp_protocol::{McpError, ServerCapabilities};
16use turul_mcp_session_storage::BoxedSessionStorage;
17
18use crate::error::Result;
19
20#[cfg(feature = "cors")]
21use crate::cors::{CorsConfig, create_preflight_response, inject_cors_headers};
22
23/// Main handler for Lambda MCP requests
24///
25/// This handler processes MCP requests in Lambda by delegating to SessionMcpHandler,
26/// eliminating 600+ lines of duplicate business logic code.
27///
28/// Features:
29/// 1. Type conversion between lambda_http and hyper
30/// 2. Delegation to SessionMcpHandler for all business logic
31/// 3. CORS support for browser clients
32/// 4. SSE validation to prevent silent failures
33#[derive(Clone)]
34pub struct LambdaMcpHandler {
35    /// SessionMcpHandler for legacy protocol support
36    session_handler: SessionMcpHandler,
37
38    /// StreamableHttpHandler for MCP 2025-06-18 with proper headers
39    streamable_handler: StreamableHttpHandler,
40
41    /// Whether SSE is enabled (used for testing and debugging)
42    #[allow(dead_code)]
43    sse_enabled: bool,
44
45    /// CORS configuration (if enabled)
46    #[cfg(feature = "cors")]
47    cors_config: Option<CorsConfig>,
48}
49
50impl LambdaMcpHandler {
51    /// Create a new Lambda MCP handler with the framework components
52    #[allow(clippy::too_many_arguments)]
53    pub fn new(
54        dispatcher: JsonRpcDispatcher<McpError>,
55        session_storage: Arc<BoxedSessionStorage>,
56        stream_manager: Arc<StreamManager>,
57        config: ServerConfig,
58        stream_config: StreamConfig,
59        _implementation: turul_mcp_protocol::Implementation,
60        capabilities: ServerCapabilities,
61        sse_enabled: bool,
62        #[cfg(feature = "cors")] cors_config: Option<CorsConfig>,
63    ) -> Self {
64        let dispatcher = Arc::new(dispatcher);
65
66        // Create empty middleware stack (shared by both handlers)
67        let middleware_stack = Arc::new(turul_http_mcp_server::middleware::MiddlewareStack::new());
68
69        // Create SessionMcpHandler for legacy protocol support
70        let session_handler = SessionMcpHandler::with_shared_stream_manager(
71            config.clone(),
72            dispatcher.clone(),
73            session_storage.clone(),
74            stream_config.clone(),
75            stream_manager.clone(),
76            middleware_stack.clone(),
77        );
78
79        // Create StreamableHttpHandler for MCP 2025-06-18 support
80        let streamable_handler = StreamableHttpHandler::new(
81            Arc::new(config.clone()),
82            dispatcher.clone(),
83            session_storage.clone(),
84            stream_manager.clone(),
85            capabilities.clone(),
86            middleware_stack,
87        );
88
89        Self {
90            session_handler,
91            streamable_handler,
92            sse_enabled,
93            #[cfg(feature = "cors")]
94            cors_config,
95        }
96    }
97
98    /// Create with shared stream manager (for advanced use cases)
99    #[allow(clippy::too_many_arguments)]
100    pub fn with_shared_stream_manager(
101        config: ServerConfig,
102        dispatcher: Arc<JsonRpcDispatcher<McpError>>,
103        session_storage: Arc<BoxedSessionStorage>,
104        stream_manager: Arc<StreamManager>,
105        stream_config: StreamConfig,
106        _implementation: turul_mcp_protocol::Implementation,
107        capabilities: ServerCapabilities,
108        sse_enabled: bool,
109    ) -> Self {
110        // Create empty middleware stack (shared by both handlers)
111        let middleware_stack = Arc::new(turul_http_mcp_server::middleware::MiddlewareStack::new());
112
113        // Create SessionMcpHandler for legacy protocol support
114        let session_handler = SessionMcpHandler::with_shared_stream_manager(
115            config.clone(),
116            dispatcher.clone(),
117            session_storage.clone(),
118            stream_config.clone(),
119            stream_manager.clone(),
120            middleware_stack.clone(),
121        );
122
123        // Create StreamableHttpHandler for MCP 2025-06-18 support
124        let streamable_handler = StreamableHttpHandler::new(
125            Arc::new(config),
126            dispatcher,
127            session_storage,
128            stream_manager,
129            capabilities,
130            middleware_stack,
131        );
132
133        Self {
134            session_handler,
135            streamable_handler,
136            sse_enabled,
137            #[cfg(feature = "cors")]
138            cors_config: None,
139        }
140    }
141
142    /// Create with custom middleware stack (for testing and examples)
143    #[allow(clippy::too_many_arguments)]
144    pub fn with_middleware(
145        config: ServerConfig,
146        dispatcher: Arc<JsonRpcDispatcher<McpError>>,
147        session_storage: Arc<BoxedSessionStorage>,
148        stream_manager: Arc<StreamManager>,
149        stream_config: StreamConfig,
150        capabilities: ServerCapabilities,
151        middleware_stack: Arc<turul_http_mcp_server::middleware::MiddlewareStack>,
152        sse_enabled: bool,
153    ) -> Self {
154        // Create SessionMcpHandler with custom middleware
155        let session_handler = SessionMcpHandler::with_shared_stream_manager(
156            config.clone(),
157            dispatcher.clone(),
158            session_storage.clone(),
159            stream_config.clone(),
160            stream_manager.clone(),
161            middleware_stack.clone(),
162        );
163
164        // Create StreamableHttpHandler with custom middleware
165        let streamable_handler = StreamableHttpHandler::new(
166            Arc::new(config),
167            dispatcher,
168            session_storage,
169            stream_manager,
170            capabilities,
171            middleware_stack,
172        );
173
174        Self {
175            session_handler,
176            streamable_handler,
177            sse_enabled,
178            #[cfg(feature = "cors")]
179            cors_config: None,
180        }
181    }
182
183    /// Set CORS configuration
184    #[cfg(feature = "cors")]
185    pub fn with_cors(mut self, cors_config: CorsConfig) -> Self {
186        self.cors_config = Some(cors_config);
187        self
188    }
189
190    /// Get access to the underlying stream manager for notifications
191    pub fn get_stream_manager(&self) -> &Arc<StreamManager> {
192        self.session_handler.get_stream_manager()
193    }
194
195    /// Handle a Lambda HTTP request (snapshot mode - no real-time SSE)
196    ///
197    /// This method performs delegation to SessionMcpHandler for all business logic.
198    /// It only handles Lambda-specific concerns: CORS and type conversion.
199    ///
200    /// Note: If SSE is enabled (.sse(true)), SSE responses may not stream properly
201    /// with regular Lambda runtime. For proper SSE streaming, use handle_streaming()
202    /// with run_with_streaming_response().
203    pub async fn handle(&self, req: LambdaRequest) -> Result<LambdaResponse<LambdaBody>> {
204        let method = req.method().clone();
205        let uri = req.uri().clone();
206
207        let request_origin = req
208            .headers()
209            .get("origin")
210            .and_then(|v| v.to_str().ok())
211            .map(|s| s.to_string());
212
213        info!(
214            "🌐 Lambda MCP request: {} {} (origin: {:?})",
215            method, uri, request_origin
216        );
217
218        // Handle CORS preflight requests first (Lambda-specific logic)
219        #[cfg(feature = "cors")]
220        if method == http::Method::OPTIONS
221            && let Some(ref cors_config) = self.cors_config
222        {
223            debug!("Handling CORS preflight request");
224            return create_preflight_response(cors_config, request_origin.as_deref());
225        }
226
227        // 🚀 DELEGATION: Convert Lambda request to hyper request
228        let hyper_req = crate::adapter::lambda_to_hyper_request(req)?;
229
230        // 🚀 DELEGATION: Use SessionMcpHandler for all business logic
231        let hyper_resp = self
232            .session_handler
233            .handle_mcp_request(hyper_req)
234            .await
235            .map_err(|e| crate::error::LambdaError::McpFramework(e.to_string()))?;
236
237        // 🚀 DELEGATION: Convert hyper response back to Lambda response
238        let mut lambda_resp = crate::adapter::hyper_to_lambda_response(hyper_resp).await?;
239
240        // Apply CORS headers if configured (Lambda-specific logic)
241        #[cfg(feature = "cors")]
242        if let Some(ref cors_config) = self.cors_config {
243            inject_cors_headers(&mut lambda_resp, cors_config, request_origin.as_deref())?;
244        }
245
246        Ok(lambda_resp)
247    }
248
249    /// Handle Lambda streaming request (real SSE streaming)
250    ///
251    /// This method enables real-time SSE streaming using Lambda's streaming response capability.
252    /// It delegates all business logic to SessionMcpHandler.
253    pub async fn handle_streaming(
254        &self,
255        req: LambdaRequest,
256    ) -> std::result::Result<
257        lambda_http::Response<
258            http_body_util::combinators::UnsyncBoxBody<bytes::Bytes, hyper::Error>,
259        >,
260        Box<dyn std::error::Error + Send + Sync>,
261    > {
262        let method = req.method().clone();
263        let uri = req.uri().clone();
264        let request_origin = req
265            .headers()
266            .get("origin")
267            .and_then(|v| v.to_str().ok())
268            .map(|s| s.to_string());
269
270        info!(
271            "🌊 Lambda streaming MCP request: {} {} (origin: {:?})",
272            method, uri, request_origin
273        );
274
275        // Handle CORS preflight requests first (Lambda-specific logic)
276        #[cfg(feature = "cors")]
277        if method == http::Method::OPTIONS
278            && let Some(ref cors_config) = self.cors_config
279        {
280            debug!("Handling CORS preflight request (streaming)");
281            let preflight_response =
282                create_preflight_response(cors_config, request_origin.as_deref())
283                    .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?;
284
285            // Convert LambdaResponse<LambdaBody> to streaming response
286            return Ok(self.convert_lambda_response_to_streaming(preflight_response));
287        }
288
289        // 🚀 DELEGATION: Convert Lambda request to hyper request
290        let hyper_req = crate::adapter::lambda_to_hyper_request(req)
291            .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?;
292
293        // 🚀 PROTOCOL ROUTING: Check protocol version and route to appropriate handler
294        use turul_http_mcp_server::protocol::McpProtocolVersion;
295        let protocol_version = hyper_req
296            .headers()
297            .get("MCP-Protocol-Version")
298            .and_then(|h| h.to_str().ok())
299            .and_then(McpProtocolVersion::parse_version)
300            .unwrap_or(McpProtocolVersion::V2025_06_18);
301
302        // Route based on protocol version
303        let hyper_resp = if protocol_version.supports_streamable_http() {
304            // Use StreamableHttpHandler for MCP 2025-06-18 (proper headers, chunked SSE)
305            debug!(
306                "Using StreamableHttpHandler for protocol {}",
307                protocol_version.to_string()
308            );
309            self.streamable_handler.handle_request(hyper_req).await
310        } else {
311            // Legacy protocol: use SessionMcpHandler
312            debug!(
313                "Using SessionMcpHandler for legacy protocol {}",
314                protocol_version.to_string()
315            );
316            self.session_handler
317                .handle_mcp_request(hyper_req)
318                .await
319                .map_err(|e| {
320                    Box::new(crate::error::LambdaError::McpFramework(e.to_string()))
321                        as Box<dyn std::error::Error + Send + Sync>
322                })?
323        };
324
325        // 🚀 DELEGATION: Convert hyper response to Lambda streaming response (preserves streaming!)
326        let mut lambda_resp = crate::adapter::hyper_to_lambda_streaming(hyper_resp);
327
328        // Apply CORS headers if configured (Lambda-specific logic)
329        #[cfg(feature = "cors")]
330        if let Some(ref cors_config) = self.cors_config {
331            inject_cors_headers(&mut lambda_resp, cors_config, request_origin.as_deref())
332                .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?;
333        }
334
335        Ok(lambda_resp)
336    }
337
338    /// Convert Lambda response to streaming format (helper for CORS preflight)
339    fn convert_lambda_response_to_streaming(
340        &self,
341        lambda_response: LambdaResponse<LambdaBody>,
342    ) -> lambda_http::Response<http_body_util::combinators::UnsyncBoxBody<bytes::Bytes, hyper::Error>>
343    {
344        use bytes::Bytes;
345        use http_body_util::{BodyExt, Full};
346
347        let (parts, body) = lambda_response.into_parts();
348        let body_bytes = match body {
349            LambdaBody::Empty => Bytes::new(),
350            LambdaBody::Text(text) => Bytes::from(text),
351            LambdaBody::Binary(bytes) => Bytes::from(bytes),
352        };
353
354        // Map error type from Infallible to hyper::Error
355        let streaming_body = Full::new(body_bytes)
356            .map_err(|e: std::convert::Infallible| match e {})
357            .boxed_unsync();
358
359        lambda_http::Response::from_parts(parts, streaming_body)
360    }
361}
362
363#[cfg(test)]
364mod tests {
365    use super::*;
366    use http::Request;
367    use turul_mcp_session_storage::InMemorySessionStorage;
368
369    #[tokio::test]
370    async fn test_handler_creation() {
371        let session_storage = Arc::new(InMemorySessionStorage::new());
372        let stream_manager = Arc::new(StreamManager::new(session_storage.clone()));
373        let dispatcher = JsonRpcDispatcher::new();
374        let config = ServerConfig::default();
375        let implementation = turul_mcp_protocol::Implementation::new("test", "1.0.0");
376        let capabilities = ServerCapabilities::default();
377
378        let handler = LambdaMcpHandler::new(
379            dispatcher,
380            session_storage,
381            stream_manager,
382            config,
383            StreamConfig::default(),
384            implementation,
385            capabilities,
386            false, // SSE disabled for test
387            #[cfg(feature = "cors")]
388            None,
389        );
390
391        // Test that handler was created successfully
392        assert!(!handler.sse_enabled);
393    }
394
395    #[tokio::test]
396    async fn test_sse_enabled_with_handle_works() {
397        let session_storage = Arc::new(InMemorySessionStorage::new());
398        let stream_manager = Arc::new(StreamManager::new(session_storage.clone()));
399        let dispatcher = JsonRpcDispatcher::new();
400        let config = ServerConfig::default();
401        let implementation = turul_mcp_protocol::Implementation::new("test", "1.0.0");
402        let capabilities = ServerCapabilities::default();
403
404        // Create handler with SSE enabled
405        let handler = LambdaMcpHandler::new(
406            dispatcher,
407            session_storage,
408            stream_manager,
409            config,
410            StreamConfig::default(),
411            implementation,
412            capabilities,
413            true, // SSE enabled - should work with handle() for snapshot-based SSE
414            #[cfg(feature = "cors")]
415            None,
416        );
417
418        // Create a test Lambda request
419        let lambda_req = Request::builder()
420            .method("POST")
421            .uri("/mcp")
422            .body(LambdaBody::Text(
423                r#"{"jsonrpc":"2.0","method":"initialize","id":1}"#.to_string(),
424            ))
425            .unwrap();
426
427        // handle() should work (provides snapshot-based SSE rather than real-time streaming)
428        let result = handler.handle(lambda_req).await;
429        assert!(
430            result.is_ok(),
431            "handle() should work with SSE enabled for snapshot-based responses"
432        );
433    }
434
435    /// Test that verifies StreamConfig is properly threaded through the delegation
436    #[tokio::test]
437    async fn test_stream_config_preservation() {
438        let session_storage = Arc::new(InMemorySessionStorage::new());
439        let dispatcher = JsonRpcDispatcher::new();
440        let config = ServerConfig::default();
441        let implementation = turul_mcp_protocol::Implementation::new("test", "1.0.0");
442        let capabilities = ServerCapabilities::default();
443
444        // Create a custom StreamConfig with non-default values
445        let custom_stream_config = StreamConfig {
446            channel_buffer_size: 1024,      // Non-default value (default is 1000)
447            max_replay_events: 200,         // Non-default value (default is 100)
448            keepalive_interval_seconds: 10, // Non-default value (default is 30)
449            cors_origin: "https://custom-test.example.com".to_string(), // Non-default value
450        };
451
452        // Create stream manager with the custom config
453        let stream_manager = Arc::new(StreamManager::with_config(
454            session_storage.clone(),
455            custom_stream_config.clone(),
456        ));
457
458        let handler = LambdaMcpHandler::new(
459            dispatcher,
460            session_storage,
461            stream_manager,
462            config,
463            custom_stream_config.clone(),
464            implementation,
465            capabilities,
466            false, // SSE disabled for test
467            #[cfg(feature = "cors")]
468            None,
469        );
470
471        // The handler should be created successfully, proving the StreamConfig was accepted
472        assert!(!handler.sse_enabled);
473
474        // Verify that the stream manager has the custom configuration
475        let stream_manager = handler.get_stream_manager();
476
477        // Verify the StreamConfig values were propagated correctly
478        let actual_config = stream_manager.get_config();
479
480        assert_eq!(
481            actual_config.channel_buffer_size, custom_stream_config.channel_buffer_size,
482            "Custom channel_buffer_size was not propagated correctly"
483        );
484        assert_eq!(
485            actual_config.max_replay_events, custom_stream_config.max_replay_events,
486            "Custom max_replay_events was not propagated correctly"
487        );
488        assert_eq!(
489            actual_config.keepalive_interval_seconds,
490            custom_stream_config.keepalive_interval_seconds,
491            "Custom keepalive_interval_seconds was not propagated correctly"
492        );
493        assert_eq!(
494            actual_config.cors_origin, custom_stream_config.cors_origin,
495            "Custom cors_origin was not propagated correctly"
496        );
497
498        // Verify the stream manager is accessible (proves delegation worked)
499        assert!(Arc::strong_count(stream_manager) >= 1);
500    }
501
502    /// Test the full builder → server → handler chain with StreamConfig
503    #[tokio::test]
504    async fn test_full_builder_chain_stream_config() {
505        use crate::LambdaMcpServerBuilder;
506        use turul_mcp_session_storage::InMemorySessionStorage;
507
508        // Create a custom StreamConfig with non-default values
509        let custom_stream_config = turul_http_mcp_server::StreamConfig {
510            channel_buffer_size: 2048,      // Non-default value
511            max_replay_events: 500,         // Non-default value
512            keepalive_interval_seconds: 15, // Non-default value
513            cors_origin: "https://full-chain-test.example.com".to_string(),
514        };
515
516        // Test the complete builder → server → handler chain
517        let server = LambdaMcpServerBuilder::new()
518            .name("full-chain-test")
519            .version("1.0.0")
520            .storage(Arc::new(InMemorySessionStorage::new()))
521            .sse(true) // Enable SSE to test streaming functionality
522            .stream_config(custom_stream_config.clone())
523            .build()
524            .await
525            .expect("Server should build successfully");
526
527        // Create handler from server (this is the critical chain step)
528        let handler = server
529            .handler()
530            .await
531            .expect("Handler should be created from server");
532
533        // Verify the handler was created successfully
534        assert!(handler.sse_enabled, "SSE should be enabled");
535
536        // Verify that the custom StreamConfig was preserved through the entire chain
537        let stream_manager = handler.get_stream_manager();
538        let actual_config = stream_manager.get_config();
539
540        assert_eq!(
541            actual_config.channel_buffer_size, custom_stream_config.channel_buffer_size,
542            "Custom channel_buffer_size should be preserved through builder → server → handler chain"
543        );
544        assert_eq!(
545            actual_config.max_replay_events, custom_stream_config.max_replay_events,
546            "Custom max_replay_events should be preserved through builder → server → handler chain"
547        );
548        assert_eq!(
549            actual_config.keepalive_interval_seconds,
550            custom_stream_config.keepalive_interval_seconds,
551            "Custom keepalive_interval_seconds should be preserved through builder → server → handler chain"
552        );
553        assert_eq!(
554            actual_config.cors_origin, custom_stream_config.cors_origin,
555            "Custom cors_origin should be preserved through builder → server → handler chain"
556        );
557
558        // Verify the stream manager is functional
559        assert!(
560            Arc::strong_count(stream_manager) >= 1,
561            "Stream manager should be properly initialized"
562        );
563
564        // Additional verification: Test that the configuration is actually used functionally
565        // by verifying the stream manager can be used with the custom configuration
566        let test_session_id = uuid::Uuid::now_v7().to_string();
567
568        // The stream manager should be able to handle session operations with the custom config
569        // This verifies the config isn't just preserved but actually used
570        let subscriptions = stream_manager.get_subscriptions(&test_session_id).await;
571        assert!(
572            subscriptions.is_empty(),
573            "New session should have no subscriptions initially"
574        );
575
576        // Verify the stream manager was constructed with our custom config values
577        // This confirms the config propagated through the entire builder → server → handler chain
578        assert_eq!(
579            stream_manager.get_config().channel_buffer_size,
580            2048,
581            "Stream manager should be using the custom buffer size functionally"
582        );
583    }
584
585    /// Test matrix: 4 combinations of streaming runtime vs SSE configuration
586    /// This ensures we don't have runtime hangs or configuration conflicts
587    ///
588    /// Test 1: Non-streaming runtime + sse(false) - This should work (snapshot mode)
589    #[tokio::test]
590    async fn test_non_streaming_runtime_sse_false() {
591        use crate::LambdaMcpServerBuilder;
592        use turul_mcp_session_storage::InMemorySessionStorage;
593
594        let server = LambdaMcpServerBuilder::new()
595            .name("test-non-streaming-sse-false")
596            .version("1.0.0")
597            .storage(Arc::new(InMemorySessionStorage::new()))
598            .sse(false) // Disable SSE for non-streaming runtime
599            .build()
600            .await
601            .expect("Server should build successfully");
602
603        let handler = server
604            .handler()
605            .await
606            .expect("Handler should be created from server");
607
608        // Verify configuration
609        assert!(!handler.sse_enabled, "SSE should be disabled");
610
611        // Create a test request (POST /mcp works in all configs)
612        let lambda_req = Request::builder()
613            .method("POST")
614            .uri("/mcp")
615            .body(LambdaBody::Text(
616                r#"{"jsonrpc":"2.0","method":"initialize","id":1}"#.to_string(),
617            ))
618            .unwrap();
619
620        // This should work without hanging
621        let result = handler.handle(lambda_req).await;
622        assert!(
623            result.is_ok(),
624            "POST /mcp should work with non-streaming + sse(false)"
625        );
626    }
627
628    /// Test 2: Non-streaming runtime + sse(true) - This should work (snapshot-based SSE)
629    #[tokio::test]
630    async fn test_non_streaming_runtime_sse_true() {
631        use crate::LambdaMcpServerBuilder;
632        use turul_mcp_session_storage::InMemorySessionStorage;
633
634        let server = LambdaMcpServerBuilder::new()
635            .name("test-non-streaming-sse-true")
636            .version("1.0.0")
637            .storage(Arc::new(InMemorySessionStorage::new()))
638            .sse(true) // Enable SSE for snapshot-based responses
639            .build()
640            .await
641            .expect("Server should build successfully");
642
643        let handler = server
644            .handler()
645            .await
646            .expect("Handler should be created from server");
647
648        // Verify configuration
649        assert!(handler.sse_enabled, "SSE should be enabled");
650
651        // Create a test request (POST /mcp works in all configs)
652        let lambda_req = Request::builder()
653            .method("POST")
654            .uri("/mcp")
655            .body(LambdaBody::Text(
656                r#"{"jsonrpc":"2.0","method":"initialize","id":1}"#.to_string(),
657            ))
658            .unwrap();
659
660        // This should work without hanging (provides snapshot-based SSE)
661        let result = handler.handle(lambda_req).await;
662        assert!(
663            result.is_ok(),
664            "POST /mcp should work with non-streaming + sse(true)"
665        );
666
667        // Note: GET /mcp would provide snapshot events, not real-time streaming
668        // This is the key difference from handle_streaming()
669    }
670
671    /// Test 3: Streaming runtime + sse(false) - This should work (SSE disabled)
672    #[tokio::test]
673    async fn test_streaming_runtime_sse_false() {
674        use crate::LambdaMcpServerBuilder;
675        use turul_mcp_session_storage::InMemorySessionStorage;
676
677        let server = LambdaMcpServerBuilder::new()
678            .name("test-streaming-sse-false")
679            .version("1.0.0")
680            .storage(Arc::new(InMemorySessionStorage::new()))
681            .sse(false) // Disable SSE even with streaming runtime
682            .build()
683            .await
684            .expect("Server should build successfully");
685
686        let handler = server
687            .handler()
688            .await
689            .expect("Handler should be created from server");
690
691        // Verify configuration
692        assert!(!handler.sse_enabled, "SSE should be disabled");
693
694        // Create a test request for streaming handler
695        let lambda_req = Request::builder()
696            .method("POST")
697            .uri("/mcp")
698            .body(LambdaBody::Text(
699                r#"{"jsonrpc":"2.0","method":"initialize","id":1}"#.to_string(),
700            ))
701            .unwrap();
702
703        // This should work with streaming runtime even when SSE is disabled
704        let result = handler.handle_streaming(lambda_req).await;
705        assert!(
706            result.is_ok(),
707            "Streaming runtime should work with sse(false)"
708        );
709    }
710
711    /// Test 4: Streaming runtime + sse(true) - This should work (real-time SSE streaming)
712    #[tokio::test]
713    async fn test_streaming_runtime_sse_true() {
714        use crate::LambdaMcpServerBuilder;
715        use turul_mcp_session_storage::InMemorySessionStorage;
716
717        let server = LambdaMcpServerBuilder::new()
718            .name("test-streaming-sse-true")
719            .version("1.0.0")
720            .storage(Arc::new(InMemorySessionStorage::new()))
721            .sse(true) // Enable SSE with streaming runtime for real-time streaming
722            .build()
723            .await
724            .expect("Server should build successfully");
725
726        let handler = server
727            .handler()
728            .await
729            .expect("Handler should be created from server");
730
731        // Verify configuration
732        assert!(handler.sse_enabled, "SSE should be enabled");
733
734        // Create a test request for streaming handler
735        let lambda_req = Request::builder()
736            .method("POST")
737            .uri("/mcp")
738            .body(LambdaBody::Text(
739                r#"{"jsonrpc":"2.0","method":"initialize","id":1}"#.to_string(),
740            ))
741            .unwrap();
742
743        // This should work and provide real-time SSE streaming
744        let result = handler.handle_streaming(lambda_req).await;
745        assert!(
746            result.is_ok(),
747            "Streaming runtime should work with sse(true) for real-time streaming"
748        );
749
750        // Note: GET /mcp would provide real-time streaming events
751        // This is the optimal configuration for real-time notifications
752    }
753}