Skip to main content

turul_mcp_aws_lambda/
handler.rs

1//! Lambda MCP handler that delegates to SessionMcpHandler
2//!
3//! This module provides the LambdaMcpHandler that processes Lambda HTTP
4//! requests by delegating to SessionMcpHandler, eliminating code duplication.
5
6use std::sync::Arc;
7
8use lambda_http::{Body as LambdaBody, Request as LambdaRequest, Response as LambdaResponse};
9use tracing::{debug, info};
10
11use turul_http_mcp_server::{
12    ServerConfig, SessionMcpHandler, StreamConfig, StreamManager, StreamableHttpHandler,
13};
14use turul_mcp_json_rpc_server::JsonRpcDispatcher;
15use turul_mcp_protocol::{McpError, ServerCapabilities};
16use turul_mcp_session_storage::BoxedSessionStorage;
17
18use crate::error::Result;
19
20#[cfg(feature = "cors")]
21use crate::cors::{CorsConfig, create_preflight_response, inject_cors_headers};
22
23/// Main handler for Lambda MCP requests
24///
25/// This handler processes MCP requests in Lambda by delegating to SessionMcpHandler,
26/// eliminating 600+ lines of duplicate business logic code.
27///
28/// Features:
29/// 1. Type conversion between lambda_http and hyper
30/// 2. Delegation to SessionMcpHandler for all business logic
31/// 3. CORS support for browser clients
32/// 4. SSE validation to prevent silent failures
33#[derive(Clone)]
34pub struct LambdaMcpHandler {
35    /// SessionMcpHandler for legacy protocol support
36    session_handler: SessionMcpHandler,
37
38    /// StreamableHttpHandler for MCP 2025-11-25 with proper headers
39    streamable_handler: StreamableHttpHandler,
40
41    /// Whether SSE is enabled (used for testing and debugging)
42    #[allow(dead_code)]
43    sse_enabled: bool,
44
45    /// CORS configuration (if enabled)
46    #[cfg(feature = "cors")]
47    cors_config: Option<CorsConfig>,
48}
49
50impl LambdaMcpHandler {
51    /// Create a new Lambda MCP handler with the framework components
52    #[allow(clippy::too_many_arguments)]
53    pub fn new(
54        dispatcher: JsonRpcDispatcher<McpError>,
55        session_storage: Arc<BoxedSessionStorage>,
56        stream_manager: Arc<StreamManager>,
57        config: ServerConfig,
58        stream_config: StreamConfig,
59        _implementation: turul_mcp_protocol::Implementation,
60        capabilities: ServerCapabilities,
61        sse_enabled: bool,
62        #[cfg(feature = "cors")] cors_config: Option<CorsConfig>,
63    ) -> Self {
64        let dispatcher = Arc::new(dispatcher);
65
66        // Create empty middleware stack (shared by both handlers)
67        let middleware_stack = Arc::new(turul_http_mcp_server::middleware::MiddlewareStack::new());
68
69        // Create SessionMcpHandler for legacy protocol support
70        let session_handler = SessionMcpHandler::with_shared_stream_manager(
71            config.clone(),
72            dispatcher.clone(),
73            session_storage.clone(),
74            stream_config.clone(),
75            stream_manager.clone(),
76            middleware_stack.clone(),
77        );
78
79        // Create StreamableHttpHandler for MCP 2025-11-25 support
80        let streamable_handler = StreamableHttpHandler::new(
81            Arc::new(config.clone()),
82            dispatcher.clone(),
83            session_storage.clone(),
84            stream_manager.clone(),
85            capabilities.clone(),
86            middleware_stack,
87        );
88
89        Self {
90            session_handler,
91            streamable_handler,
92            sse_enabled,
93            #[cfg(feature = "cors")]
94            cors_config,
95        }
96    }
97
98    /// Create with shared stream manager (for advanced use cases)
99    #[allow(clippy::too_many_arguments)]
100    pub fn with_shared_stream_manager(
101        config: ServerConfig,
102        dispatcher: Arc<JsonRpcDispatcher<McpError>>,
103        session_storage: Arc<BoxedSessionStorage>,
104        stream_manager: Arc<StreamManager>,
105        stream_config: StreamConfig,
106        _implementation: turul_mcp_protocol::Implementation,
107        capabilities: ServerCapabilities,
108        sse_enabled: bool,
109    ) -> Self {
110        // Create empty middleware stack (shared by both handlers)
111        let middleware_stack = Arc::new(turul_http_mcp_server::middleware::MiddlewareStack::new());
112
113        // Create SessionMcpHandler for legacy protocol support
114        let session_handler = SessionMcpHandler::with_shared_stream_manager(
115            config.clone(),
116            dispatcher.clone(),
117            session_storage.clone(),
118            stream_config.clone(),
119            stream_manager.clone(),
120            middleware_stack.clone(),
121        );
122
123        // Create StreamableHttpHandler for MCP 2025-11-25 support
124        let streamable_handler = StreamableHttpHandler::new(
125            Arc::new(config),
126            dispatcher,
127            session_storage,
128            stream_manager,
129            capabilities,
130            middleware_stack,
131        );
132
133        Self {
134            session_handler,
135            streamable_handler,
136            sse_enabled,
137            #[cfg(feature = "cors")]
138            cors_config: None,
139        }
140    }
141
142    /// Create with custom middleware stack (for testing and examples)
143    #[allow(clippy::too_many_arguments)]
144    pub fn with_middleware(
145        config: ServerConfig,
146        dispatcher: Arc<JsonRpcDispatcher<McpError>>,
147        session_storage: Arc<BoxedSessionStorage>,
148        stream_manager: Arc<StreamManager>,
149        stream_config: StreamConfig,
150        capabilities: ServerCapabilities,
151        middleware_stack: Arc<turul_http_mcp_server::middleware::MiddlewareStack>,
152        sse_enabled: bool,
153    ) -> Self {
154        // Create SessionMcpHandler with custom middleware
155        let session_handler = SessionMcpHandler::with_shared_stream_manager(
156            config.clone(),
157            dispatcher.clone(),
158            session_storage.clone(),
159            stream_config.clone(),
160            stream_manager.clone(),
161            middleware_stack.clone(),
162        );
163
164        // Create StreamableHttpHandler with custom middleware
165        let streamable_handler = StreamableHttpHandler::new(
166            Arc::new(config),
167            dispatcher,
168            session_storage,
169            stream_manager,
170            capabilities,
171            middleware_stack,
172        );
173
174        Self {
175            session_handler,
176            streamable_handler,
177            sse_enabled,
178            #[cfg(feature = "cors")]
179            cors_config: None,
180        }
181    }
182
183    /// Set CORS configuration
184    #[cfg(feature = "cors")]
185    pub fn with_cors(mut self, cors_config: CorsConfig) -> Self {
186        self.cors_config = Some(cors_config);
187        self
188    }
189
190    /// Get access to the underlying stream manager for notifications
191    pub fn get_stream_manager(&self) -> &Arc<StreamManager> {
192        self.session_handler.get_stream_manager()
193    }
194
195    /// Handle a Lambda HTTP request (snapshot mode - no real-time SSE)
196    ///
197    /// This method performs delegation to SessionMcpHandler for all business logic.
198    /// It only handles Lambda-specific concerns: CORS and type conversion.
199    ///
200    /// Note: If SSE is enabled (.sse(true)), SSE responses may not stream properly
201    /// with regular Lambda runtime. For proper SSE streaming, use handle_streaming()
202    /// with run_with_streaming_response().
203    pub async fn handle(&self, req: LambdaRequest) -> Result<LambdaResponse<LambdaBody>> {
204        let method = req.method().clone();
205        let uri = req.uri().clone();
206
207        let request_origin = req
208            .headers()
209            .get("origin")
210            .and_then(|v| v.to_str().ok())
211            .map(|s| s.to_string());
212
213        info!(
214            "🌐 Lambda MCP request: {} {} (origin: {:?})",
215            method, uri, request_origin
216        );
217
218        // Handle CORS preflight requests first (Lambda-specific logic)
219        #[cfg(feature = "cors")]
220        if method == http::Method::OPTIONS
221            && let Some(ref cors_config) = self.cors_config
222        {
223            debug!("Handling CORS preflight request");
224            return create_preflight_response(cors_config, request_origin.as_deref());
225        }
226
227        // 🚀 DELEGATION: Convert Lambda request to hyper request
228        let hyper_req = crate::adapter::lambda_to_hyper_request(req)?;
229
230        // 🚀 DELEGATION: Use SessionMcpHandler for all business logic
231        let hyper_resp = self
232            .session_handler
233            .handle_mcp_request(hyper_req)
234            .await
235            .map_err(|e| crate::error::LambdaError::McpFramework(e.to_string()))?;
236
237        // 🚀 DELEGATION: Convert hyper response back to Lambda response
238        let mut lambda_resp = crate::adapter::hyper_to_lambda_response(hyper_resp).await?;
239
240        // Apply CORS headers if configured (Lambda-specific logic)
241        #[cfg(feature = "cors")]
242        if let Some(ref cors_config) = self.cors_config {
243            inject_cors_headers(&mut lambda_resp, cors_config, request_origin.as_deref())?;
244        }
245
246        Ok(lambda_resp)
247    }
248
249    /// Handle Lambda streaming request (real SSE streaming)
250    ///
251    /// This method enables real-time SSE streaming using Lambda's streaming response capability.
252    /// It delegates all business logic to SessionMcpHandler.
253    pub async fn handle_streaming(
254        &self,
255        req: LambdaRequest,
256    ) -> std::result::Result<
257        lambda_http::Response<
258            http_body_util::combinators::UnsyncBoxBody<bytes::Bytes, hyper::Error>,
259        >,
260        Box<dyn std::error::Error + Send + Sync>,
261    > {
262        let method = req.method().clone();
263        let uri = req.uri().clone();
264        let request_origin = req
265            .headers()
266            .get("origin")
267            .and_then(|v| v.to_str().ok())
268            .map(|s| s.to_string());
269
270        debug!(
271            "🌊 Lambda streaming MCP request: {} {} (origin: {:?})",
272            method, uri, request_origin
273        );
274
275        // Handle CORS preflight requests first (Lambda-specific logic)
276        #[cfg(feature = "cors")]
277        if method == http::Method::OPTIONS
278            && let Some(ref cors_config) = self.cors_config
279        {
280            debug!("Handling CORS preflight request (streaming)");
281            let preflight_response =
282                create_preflight_response(cors_config, request_origin.as_deref())
283                    .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?;
284
285            // Convert LambdaResponse<LambdaBody> to streaming response
286            return Ok(self.convert_lambda_response_to_streaming(preflight_response));
287        }
288
289        // 🚀 DELEGATION: Convert Lambda request to hyper request
290        let hyper_req = crate::adapter::lambda_to_hyper_request(req)
291            .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?;
292
293        // 🚀 PROTOCOL ROUTING: Check protocol version and route to appropriate handler
294        use turul_http_mcp_server::protocol::McpProtocolVersion;
295        let protocol_version = hyper_req
296            .headers()
297            .get("MCP-Protocol-Version")
298            .and_then(|h| h.to_str().ok())
299            .and_then(McpProtocolVersion::parse_version)
300            .unwrap_or(McpProtocolVersion::V2025_06_18);
301
302        // Route based on protocol version
303        let hyper_resp = if protocol_version.supports_streamable_http() {
304            // Use StreamableHttpHandler for MCP 2025-11-25 (proper headers, chunked SSE)
305            debug!(
306                "Using StreamableHttpHandler for protocol {}",
307                protocol_version.to_string()
308            );
309            self.streamable_handler.handle_request(hyper_req).await
310        } else {
311            // Legacy protocol: use SessionMcpHandler
312            debug!(
313                "Using SessionMcpHandler for legacy protocol {}",
314                protocol_version.to_string()
315            );
316            self.session_handler
317                .handle_mcp_request(hyper_req)
318                .await
319                .map_err(|e| {
320                    Box::new(crate::error::LambdaError::McpFramework(e.to_string()))
321                        as Box<dyn std::error::Error + Send + Sync>
322                })?
323        };
324
325        // 🚀 DELEGATION: Convert hyper response to Lambda streaming response (preserves streaming!)
326        let mut lambda_resp = crate::adapter::hyper_to_lambda_streaming(hyper_resp);
327
328        // Apply CORS headers if configured (Lambda-specific logic)
329        #[cfg(feature = "cors")]
330        if let Some(ref cors_config) = self.cors_config {
331            inject_cors_headers(&mut lambda_resp, cors_config, request_origin.as_deref())
332                .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?;
333        }
334
335        Ok(lambda_resp)
336    }
337
338    /// Convert Lambda response to streaming format (helper for CORS preflight)
339    fn convert_lambda_response_to_streaming(
340        &self,
341        lambda_response: LambdaResponse<LambdaBody>,
342    ) -> lambda_http::Response<http_body_util::combinators::UnsyncBoxBody<bytes::Bytes, hyper::Error>>
343    {
344        use bytes::Bytes;
345        use http_body_util::{BodyExt, Full};
346
347        let (parts, body) = lambda_response.into_parts();
348        let body_bytes = match body {
349            LambdaBody::Empty => Bytes::new(),
350            LambdaBody::Text(text) => Bytes::from(text),
351            LambdaBody::Binary(bytes) => Bytes::from(bytes),
352            _ => Bytes::new(),
353        };
354
355        // Map error type from Infallible to hyper::Error
356        let streaming_body = Full::new(body_bytes)
357            .map_err(|e: std::convert::Infallible| match e {})
358            .boxed_unsync();
359
360        lambda_http::Response::from_parts(parts, streaming_body)
361    }
362}
363
364#[cfg(test)]
365mod tests {
366    use super::*;
367    use http::Request;
368    use turul_mcp_session_storage::InMemorySessionStorage;
369
370    #[tokio::test]
371    async fn test_handler_creation() {
372        let session_storage = Arc::new(InMemorySessionStorage::new());
373        let stream_manager = Arc::new(StreamManager::new(session_storage.clone()));
374        let dispatcher = JsonRpcDispatcher::new();
375        let config = ServerConfig::default();
376        let implementation = turul_mcp_protocol::Implementation::new("test", "1.0.0");
377        let capabilities = ServerCapabilities::default();
378
379        let handler = LambdaMcpHandler::new(
380            dispatcher,
381            session_storage,
382            stream_manager,
383            config,
384            StreamConfig::default(),
385            implementation,
386            capabilities,
387            false, // SSE disabled for test
388            #[cfg(feature = "cors")]
389            None,
390        );
391
392        // Test that handler was created successfully
393        assert!(!handler.sse_enabled);
394    }
395
396    #[tokio::test]
397    async fn test_sse_enabled_with_handle_works() {
398        let session_storage = Arc::new(InMemorySessionStorage::new());
399        let stream_manager = Arc::new(StreamManager::new(session_storage.clone()));
400        let dispatcher = JsonRpcDispatcher::new();
401        let config = ServerConfig::default();
402        let implementation = turul_mcp_protocol::Implementation::new("test", "1.0.0");
403        let capabilities = ServerCapabilities::default();
404
405        // Create handler with SSE enabled
406        let handler = LambdaMcpHandler::new(
407            dispatcher,
408            session_storage,
409            stream_manager,
410            config,
411            StreamConfig::default(),
412            implementation,
413            capabilities,
414            true, // SSE enabled - should work with handle() for snapshot-based SSE
415            #[cfg(feature = "cors")]
416            None,
417        );
418
419        // Create a test Lambda request
420        let lambda_req = Request::builder()
421            .method("POST")
422            .uri("/mcp")
423            .body(LambdaBody::Text(
424                r#"{"jsonrpc":"2.0","method":"initialize","id":1}"#.to_string(),
425            ))
426            .unwrap();
427
428        // handle() should work (provides snapshot-based SSE rather than real-time streaming)
429        let result = handler.handle(lambda_req).await;
430        assert!(
431            result.is_ok(),
432            "handle() should work with SSE enabled for snapshot-based responses"
433        );
434    }
435
436    /// Test that verifies StreamConfig is properly threaded through the delegation
437    #[tokio::test]
438    async fn test_stream_config_preservation() {
439        let session_storage = Arc::new(InMemorySessionStorage::new());
440        let dispatcher = JsonRpcDispatcher::new();
441        let config = ServerConfig::default();
442        let implementation = turul_mcp_protocol::Implementation::new("test", "1.0.0");
443        let capabilities = ServerCapabilities::default();
444
445        // Create a custom StreamConfig with non-default values
446        let custom_stream_config = StreamConfig {
447            channel_buffer_size: 1024,      // Non-default value (default is 1000)
448            max_replay_events: 200,         // Non-default value (default is 100)
449            keepalive_interval_seconds: 10, // Non-default value (default is 30)
450            cors_origin: "https://custom-test.example.com".to_string(), // Non-default value
451        };
452
453        // Create stream manager with the custom config
454        let stream_manager = Arc::new(StreamManager::with_config(
455            session_storage.clone(),
456            custom_stream_config.clone(),
457        ));
458
459        let handler = LambdaMcpHandler::new(
460            dispatcher,
461            session_storage,
462            stream_manager,
463            config,
464            custom_stream_config.clone(),
465            implementation,
466            capabilities,
467            false, // SSE disabled for test
468            #[cfg(feature = "cors")]
469            None,
470        );
471
472        // The handler should be created successfully, proving the StreamConfig was accepted
473        assert!(!handler.sse_enabled);
474
475        // Verify that the stream manager has the custom configuration
476        let stream_manager = handler.get_stream_manager();
477
478        // Verify the StreamConfig values were propagated correctly
479        let actual_config = stream_manager.get_config();
480
481        assert_eq!(
482            actual_config.channel_buffer_size, custom_stream_config.channel_buffer_size,
483            "Custom channel_buffer_size was not propagated correctly"
484        );
485        assert_eq!(
486            actual_config.max_replay_events, custom_stream_config.max_replay_events,
487            "Custom max_replay_events was not propagated correctly"
488        );
489        assert_eq!(
490            actual_config.keepalive_interval_seconds,
491            custom_stream_config.keepalive_interval_seconds,
492            "Custom keepalive_interval_seconds was not propagated correctly"
493        );
494        assert_eq!(
495            actual_config.cors_origin, custom_stream_config.cors_origin,
496            "Custom cors_origin was not propagated correctly"
497        );
498
499        // Verify the stream manager is accessible (proves delegation worked)
500        assert!(Arc::strong_count(stream_manager) >= 1);
501    }
502
503    /// Test the full builder → server → handler chain with StreamConfig
504    #[tokio::test]
505    async fn test_full_builder_chain_stream_config() {
506        use crate::LambdaMcpServerBuilder;
507        use turul_mcp_session_storage::InMemorySessionStorage;
508
509        // Create a custom StreamConfig with non-default values
510        let custom_stream_config = turul_http_mcp_server::StreamConfig {
511            channel_buffer_size: 2048,      // Non-default value
512            max_replay_events: 500,         // Non-default value
513            keepalive_interval_seconds: 15, // Non-default value
514            cors_origin: "https://full-chain-test.example.com".to_string(),
515        };
516
517        // Test the complete builder → server → handler chain
518        let server = LambdaMcpServerBuilder::new()
519            .name("full-chain-test")
520            .version("1.0.0")
521            .storage(Arc::new(InMemorySessionStorage::new()))
522            .sse(true) // Enable SSE to test streaming functionality
523            .stream_config(custom_stream_config.clone())
524            .build()
525            .await
526            .expect("Server should build successfully");
527
528        // Create handler from server (this is the critical chain step)
529        let handler = server
530            .handler()
531            .await
532            .expect("Handler should be created from server");
533
534        // Verify the handler was created successfully
535        assert!(handler.sse_enabled, "SSE should be enabled");
536
537        // Verify that the custom StreamConfig was preserved through the entire chain
538        let stream_manager = handler.get_stream_manager();
539        let actual_config = stream_manager.get_config();
540
541        assert_eq!(
542            actual_config.channel_buffer_size, custom_stream_config.channel_buffer_size,
543            "Custom channel_buffer_size should be preserved through builder → server → handler chain"
544        );
545        assert_eq!(
546            actual_config.max_replay_events, custom_stream_config.max_replay_events,
547            "Custom max_replay_events should be preserved through builder → server → handler chain"
548        );
549        assert_eq!(
550            actual_config.keepalive_interval_seconds,
551            custom_stream_config.keepalive_interval_seconds,
552            "Custom keepalive_interval_seconds should be preserved through builder → server → handler chain"
553        );
554        assert_eq!(
555            actual_config.cors_origin, custom_stream_config.cors_origin,
556            "Custom cors_origin should be preserved through builder → server → handler chain"
557        );
558
559        // Verify the stream manager is functional
560        assert!(
561            Arc::strong_count(stream_manager) >= 1,
562            "Stream manager should be properly initialized"
563        );
564
565        // Additional verification: Test that the configuration is actually used functionally
566        // by verifying the stream manager can be used with the custom configuration
567        let test_session_id = uuid::Uuid::now_v7().as_simple().to_string();
568
569        // The stream manager should be able to handle session operations with the custom config
570        // This verifies the config isn't just preserved but actually used
571        let subscriptions = stream_manager.get_subscriptions(&test_session_id).await;
572        assert!(
573            subscriptions.is_empty(),
574            "New session should have no subscriptions initially"
575        );
576
577        // Verify the stream manager was constructed with our custom config values
578        // This confirms the config propagated through the entire builder → server → handler chain
579        assert_eq!(
580            stream_manager.get_config().channel_buffer_size,
581            2048,
582            "Stream manager should be using the custom buffer size functionally"
583        );
584    }
585
586    /// Test matrix: 4 combinations of streaming runtime vs SSE configuration
587    /// This ensures we don't have runtime hangs or configuration conflicts
588    ///
589    /// Test 1: Non-streaming runtime + sse(false) - This should work (snapshot mode)
590    #[tokio::test]
591    async fn test_non_streaming_runtime_sse_false() {
592        use crate::LambdaMcpServerBuilder;
593        use turul_mcp_session_storage::InMemorySessionStorage;
594
595        let server = LambdaMcpServerBuilder::new()
596            .name("test-non-streaming-sse-false")
597            .version("1.0.0")
598            .storage(Arc::new(InMemorySessionStorage::new()))
599            .sse(false) // Disable SSE for non-streaming runtime
600            .build()
601            .await
602            .expect("Server should build successfully");
603
604        let handler = server
605            .handler()
606            .await
607            .expect("Handler should be created from server");
608
609        // Verify configuration
610        assert!(!handler.sse_enabled, "SSE should be disabled");
611
612        // Create a test request (POST /mcp works in all configs)
613        let lambda_req = Request::builder()
614            .method("POST")
615            .uri("/mcp")
616            .body(LambdaBody::Text(
617                r#"{"jsonrpc":"2.0","method":"initialize","id":1}"#.to_string(),
618            ))
619            .unwrap();
620
621        // This should work without hanging
622        let result = handler.handle(lambda_req).await;
623        assert!(
624            result.is_ok(),
625            "POST /mcp should work with non-streaming + sse(false)"
626        );
627    }
628
629    /// Test 2: Non-streaming runtime + sse(true) - This should work (snapshot-based SSE)
630    #[tokio::test]
631    async fn test_non_streaming_runtime_sse_true() {
632        use crate::LambdaMcpServerBuilder;
633        use turul_mcp_session_storage::InMemorySessionStorage;
634
635        let server = LambdaMcpServerBuilder::new()
636            .name("test-non-streaming-sse-true")
637            .version("1.0.0")
638            .storage(Arc::new(InMemorySessionStorage::new()))
639            .sse(true) // Enable SSE for snapshot-based responses
640            .build()
641            .await
642            .expect("Server should build successfully");
643
644        let handler = server
645            .handler()
646            .await
647            .expect("Handler should be created from server");
648
649        // Verify configuration
650        assert!(handler.sse_enabled, "SSE should be enabled");
651
652        // Create a test request (POST /mcp works in all configs)
653        let lambda_req = Request::builder()
654            .method("POST")
655            .uri("/mcp")
656            .body(LambdaBody::Text(
657                r#"{"jsonrpc":"2.0","method":"initialize","id":1}"#.to_string(),
658            ))
659            .unwrap();
660
661        // This should work without hanging (provides snapshot-based SSE)
662        let result = handler.handle(lambda_req).await;
663        assert!(
664            result.is_ok(),
665            "POST /mcp should work with non-streaming + sse(true)"
666        );
667
668        // Note: GET /mcp would provide snapshot events, not real-time streaming
669        // This is the key difference from handle_streaming()
670    }
671
672    /// Test 3: Streaming runtime + sse(false) - This should work (SSE disabled)
673    #[tokio::test]
674    async fn test_streaming_runtime_sse_false() {
675        use crate::LambdaMcpServerBuilder;
676        use turul_mcp_session_storage::InMemorySessionStorage;
677
678        let server = LambdaMcpServerBuilder::new()
679            .name("test-streaming-sse-false")
680            .version("1.0.0")
681            .storage(Arc::new(InMemorySessionStorage::new()))
682            .sse(false) // Disable SSE even with streaming runtime
683            .build()
684            .await
685            .expect("Server should build successfully");
686
687        let handler = server
688            .handler()
689            .await
690            .expect("Handler should be created from server");
691
692        // Verify configuration
693        assert!(!handler.sse_enabled, "SSE should be disabled");
694
695        // Create a test request for streaming handler
696        let lambda_req = Request::builder()
697            .method("POST")
698            .uri("/mcp")
699            .body(LambdaBody::Text(
700                r#"{"jsonrpc":"2.0","method":"initialize","id":1}"#.to_string(),
701            ))
702            .unwrap();
703
704        // This should work with streaming runtime even when SSE is disabled
705        let result = handler.handle_streaming(lambda_req).await;
706        assert!(
707            result.is_ok(),
708            "Streaming runtime should work with sse(false)"
709        );
710    }
711
712    /// Test 4: Streaming runtime + sse(true) - This should work (real-time SSE streaming)
713    #[tokio::test]
714    async fn test_streaming_runtime_sse_true() {
715        use crate::LambdaMcpServerBuilder;
716        use turul_mcp_session_storage::InMemorySessionStorage;
717
718        let server = LambdaMcpServerBuilder::new()
719            .name("test-streaming-sse-true")
720            .version("1.0.0")
721            .storage(Arc::new(InMemorySessionStorage::new()))
722            .sse(true) // Enable SSE with streaming runtime for real-time streaming
723            .build()
724            .await
725            .expect("Server should build successfully");
726
727        let handler = server
728            .handler()
729            .await
730            .expect("Handler should be created from server");
731
732        // Verify configuration
733        assert!(handler.sse_enabled, "SSE should be enabled");
734
735        // Create a test request for streaming handler
736        let lambda_req = Request::builder()
737            .method("POST")
738            .uri("/mcp")
739            .body(LambdaBody::Text(
740                r#"{"jsonrpc":"2.0","method":"initialize","id":1}"#.to_string(),
741            ))
742            .unwrap();
743
744        // This should work and provide real-time SSE streaming
745        let result = handler.handle_streaming(lambda_req).await;
746        assert!(
747            result.is_ok(),
748            "Streaming runtime should work with sse(true) for real-time streaming"
749        );
750
751        // Note: GET /mcp would provide real-time streaming events
752        // This is the optimal configuration for real-time notifications
753    }
754}