Skip to main content

turul_mcp_aws_lambda/
handler.rs

1//! Lambda MCP handler that delegates to SessionMcpHandler
2//!
3//! This module provides the LambdaMcpHandler that processes Lambda HTTP
4//! requests by delegating to SessionMcpHandler, eliminating code duplication.
5
6use std::sync::Arc;
7
8use lambda_http::{Body as LambdaBody, Request as LambdaRequest, Response as LambdaResponse};
9use tracing::{debug, info};
10
11use turul_http_mcp_server::{
12    ServerConfig, SessionMcpHandler, StreamConfig, StreamManager, StreamableHttpHandler,
13};
14use turul_mcp_json_rpc_server::JsonRpcDispatcher;
15use turul_mcp_protocol::{McpError, ServerCapabilities};
16use turul_mcp_session_storage::BoxedSessionStorage;
17
18use crate::error::Result;
19
20#[cfg(feature = "cors")]
21use crate::cors::{CorsConfig, create_preflight_response, inject_cors_headers};
22
23/// Main handler for Lambda MCP requests
24///
25/// This handler processes MCP requests in Lambda by delegating to SessionMcpHandler,
26/// eliminating 600+ lines of duplicate business logic code.
27///
28/// Features:
29/// 1. Type conversion between lambda_http and hyper
30/// 2. Delegation to SessionMcpHandler for all business logic
31/// 3. CORS support for browser clients
32/// 4. SSE validation to prevent silent failures
33#[derive(Clone)]
34pub struct LambdaMcpHandler {
35    /// SessionMcpHandler for legacy protocol support
36    session_handler: SessionMcpHandler,
37
38    /// StreamableHttpHandler for MCP 2025-11-25 with proper headers
39    streamable_handler: StreamableHttpHandler,
40
41    /// Whether SSE is enabled (used for testing and debugging)
42    #[allow(dead_code)]
43    sse_enabled: bool,
44
45    /// Custom route registry (e.g., .well-known endpoints)
46    route_registry: Arc<turul_http_mcp_server::RouteRegistry>,
47
48    /// CORS configuration (if enabled)
49    #[cfg(feature = "cors")]
50    cors_config: Option<CorsConfig>,
51}
52
53impl LambdaMcpHandler {
54    /// Create a new Lambda MCP handler with the framework components
55    #[allow(clippy::too_many_arguments)]
56    pub fn new(
57        dispatcher: JsonRpcDispatcher<McpError>,
58        session_storage: Arc<BoxedSessionStorage>,
59        stream_manager: Arc<StreamManager>,
60        config: ServerConfig,
61        stream_config: StreamConfig,
62        _implementation: turul_mcp_protocol::Implementation,
63        capabilities: ServerCapabilities,
64        sse_enabled: bool,
65        #[cfg(feature = "cors")] cors_config: Option<CorsConfig>,
66    ) -> Self {
67        let dispatcher = Arc::new(dispatcher);
68
69        // Create empty middleware stack (shared by both handlers)
70        let middleware_stack = Arc::new(turul_http_mcp_server::middleware::MiddlewareStack::new());
71
72        // Create SessionMcpHandler for legacy protocol support
73        let session_handler = SessionMcpHandler::with_shared_stream_manager(
74            config.clone(),
75            dispatcher.clone(),
76            session_storage.clone(),
77            stream_config.clone(),
78            stream_manager.clone(),
79            middleware_stack.clone(),
80        );
81
82        // Create StreamableHttpHandler for MCP 2025-11-25 support
83        let streamable_handler = StreamableHttpHandler::new(
84            Arc::new(config.clone()),
85            dispatcher.clone(),
86            session_storage.clone(),
87            stream_manager.clone(),
88            capabilities.clone(),
89            middleware_stack,
90        );
91
92        Self {
93            session_handler,
94            streamable_handler,
95            sse_enabled,
96            route_registry: Arc::new(turul_http_mcp_server::RouteRegistry::new()),
97            #[cfg(feature = "cors")]
98            cors_config,
99        }
100    }
101
102    /// Create with shared stream manager (for advanced use cases)
103    #[allow(clippy::too_many_arguments)]
104    pub fn with_shared_stream_manager(
105        config: ServerConfig,
106        dispatcher: Arc<JsonRpcDispatcher<McpError>>,
107        session_storage: Arc<BoxedSessionStorage>,
108        stream_manager: Arc<StreamManager>,
109        stream_config: StreamConfig,
110        _implementation: turul_mcp_protocol::Implementation,
111        capabilities: ServerCapabilities,
112        sse_enabled: bool,
113    ) -> Self {
114        // Create empty middleware stack (shared by both handlers)
115        let middleware_stack = Arc::new(turul_http_mcp_server::middleware::MiddlewareStack::new());
116
117        // Create SessionMcpHandler for legacy protocol support
118        let session_handler = SessionMcpHandler::with_shared_stream_manager(
119            config.clone(),
120            dispatcher.clone(),
121            session_storage.clone(),
122            stream_config.clone(),
123            stream_manager.clone(),
124            middleware_stack.clone(),
125        );
126
127        // Create StreamableHttpHandler for MCP 2025-11-25 support
128        let streamable_handler = StreamableHttpHandler::new(
129            Arc::new(config),
130            dispatcher,
131            session_storage,
132            stream_manager,
133            capabilities,
134            middleware_stack,
135        );
136
137        Self {
138            session_handler,
139            streamable_handler,
140            sse_enabled,
141            route_registry: Arc::new(turul_http_mcp_server::RouteRegistry::new()),
142            #[cfg(feature = "cors")]
143            cors_config: None,
144        }
145    }
146
147    /// Create with custom middleware stack (for testing and examples)
148    #[allow(clippy::too_many_arguments)]
149    pub fn with_middleware(
150        config: ServerConfig,
151        dispatcher: Arc<JsonRpcDispatcher<McpError>>,
152        session_storage: Arc<BoxedSessionStorage>,
153        stream_manager: Arc<StreamManager>,
154        stream_config: StreamConfig,
155        capabilities: ServerCapabilities,
156        middleware_stack: Arc<turul_http_mcp_server::middleware::MiddlewareStack>,
157        sse_enabled: bool,
158        route_registry: Arc<turul_http_mcp_server::RouteRegistry>,
159    ) -> Self {
160        // Create SessionMcpHandler with custom middleware
161        let session_handler = SessionMcpHandler::with_shared_stream_manager(
162            config.clone(),
163            dispatcher.clone(),
164            session_storage.clone(),
165            stream_config.clone(),
166            stream_manager.clone(),
167            middleware_stack.clone(),
168        );
169
170        // Create StreamableHttpHandler with custom middleware
171        let streamable_handler = StreamableHttpHandler::new(
172            Arc::new(config),
173            dispatcher,
174            session_storage,
175            stream_manager,
176            capabilities,
177            middleware_stack,
178        );
179
180        Self {
181            session_handler,
182            streamable_handler,
183            sse_enabled,
184            route_registry,
185            #[cfg(feature = "cors")]
186            cors_config: None,
187        }
188    }
189
190    /// Set CORS configuration
191    #[cfg(feature = "cors")]
192    pub fn with_cors(mut self, cors_config: CorsConfig) -> Self {
193        self.cors_config = Some(cors_config);
194        self
195    }
196
197    /// Get access to the underlying stream manager for notifications
198    pub fn get_stream_manager(&self) -> &Arc<StreamManager> {
199        self.session_handler.get_stream_manager()
200    }
201
202    /// Handle a Lambda HTTP request (snapshot mode - no real-time SSE)
203    ///
204    /// This method performs delegation to SessionMcpHandler for all business logic.
205    /// It only handles Lambda-specific concerns: CORS and type conversion.
206    ///
207    /// Note: If SSE is enabled (.sse(true)), SSE responses may not stream properly
208    /// with regular Lambda runtime. For proper SSE streaming, use handle_streaming()
209    /// with run_with_streaming_response().
210    pub async fn handle(&self, req: LambdaRequest) -> Result<LambdaResponse<LambdaBody>> {
211        let method = req.method().clone();
212        let uri = req.uri().clone();
213
214        let request_origin = req
215            .headers()
216            .get("origin")
217            .and_then(|v| v.to_str().ok())
218            .map(|s| s.to_string());
219
220        info!(
221            "🌐 Lambda MCP request: {} {} (origin: {:?})",
222            method, uri, request_origin
223        );
224
225        // Handle CORS preflight requests first (Lambda-specific logic)
226        #[cfg(feature = "cors")]
227        if method == http::Method::OPTIONS
228            && let Some(ref cors_config) = self.cors_config
229        {
230            debug!("Handling CORS preflight request");
231            return create_preflight_response(cors_config, request_origin.as_deref());
232        }
233
234        // 🚀 DELEGATION: Convert Lambda request to hyper request
235        let hyper_req = crate::adapter::lambda_to_hyper_request(req)?;
236
237        // Check custom routes (e.g., .well-known) before MCP delegation
238        let path = hyper_req.uri().path().to_string();
239        if !self.route_registry.is_empty() {
240            match self.route_registry.match_route(&path) {
241                Ok(Some(route_handler)) => {
242                    debug!("Custom route matched: {}", path);
243                    use http_body_util::BodyExt;
244                    let (parts, body) = hyper_req.into_parts();
245                    let boxed_req = hyper::Request::from_parts(parts, body.boxed_unsync());
246                    let route_resp = route_handler.handle(boxed_req).await;
247                    let mut lambda_resp =
248                        crate::adapter::hyper_to_lambda_response(route_resp).await?;
249                    #[cfg(feature = "cors")]
250                    if let Some(ref cors_config) = self.cors_config {
251                        inject_cors_headers(
252                            &mut lambda_resp,
253                            cors_config,
254                            request_origin.as_deref(),
255                        )?;
256                    }
257                    return Ok(lambda_resp);
258                }
259                Ok(None) => {} // No match, continue to MCP handler
260                Err(e) => {
261                    debug!("Route validation error: {}", e);
262                    let route_resp = e.into_response();
263                    let mut lambda_resp =
264                        crate::adapter::hyper_to_lambda_response(route_resp).await?;
265                    #[cfg(feature = "cors")]
266                    if let Some(ref cors_config) = self.cors_config {
267                        inject_cors_headers(
268                            &mut lambda_resp,
269                            cors_config,
270                            request_origin.as_deref(),
271                        )?;
272                    }
273                    return Ok(lambda_resp);
274                }
275            }
276        }
277
278        // 🚀 DELEGATION: Use SessionMcpHandler for all business logic
279        let hyper_resp = self
280            .session_handler
281            .handle_mcp_request(hyper_req)
282            .await
283            .map_err(|e| crate::error::LambdaError::McpFramework(e.to_string()))?;
284
285        // 🚀 DELEGATION: Convert hyper response back to Lambda response
286        let mut lambda_resp = crate::adapter::hyper_to_lambda_response(hyper_resp).await?;
287
288        // Apply CORS headers if configured (Lambda-specific logic)
289        #[cfg(feature = "cors")]
290        if let Some(ref cors_config) = self.cors_config {
291            inject_cors_headers(&mut lambda_resp, cors_config, request_origin.as_deref())?;
292        }
293
294        Ok(lambda_resp)
295    }
296
297    /// Handle Lambda streaming request (real SSE streaming)
298    ///
299    /// This method enables real-time SSE streaming using Lambda's streaming response capability.
300    /// It delegates all business logic to SessionMcpHandler.
301    pub async fn handle_streaming(
302        &self,
303        req: LambdaRequest,
304    ) -> std::result::Result<
305        lambda_http::Response<
306            http_body_util::combinators::UnsyncBoxBody<bytes::Bytes, hyper::Error>,
307        >,
308        Box<dyn std::error::Error + Send + Sync>,
309    > {
310        let method = req.method().clone();
311        let uri = req.uri().clone();
312        let request_origin = req
313            .headers()
314            .get("origin")
315            .and_then(|v| v.to_str().ok())
316            .map(|s| s.to_string());
317
318        debug!(
319            "🌊 Lambda streaming MCP request: {} {} (origin: {:?})",
320            method, uri, request_origin
321        );
322
323        // Handle CORS preflight requests first (Lambda-specific logic)
324        #[cfg(feature = "cors")]
325        if method == http::Method::OPTIONS
326            && let Some(ref cors_config) = self.cors_config
327        {
328            debug!("Handling CORS preflight request (streaming)");
329            let preflight_response =
330                create_preflight_response(cors_config, request_origin.as_deref())
331                    .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?;
332
333            // Convert LambdaResponse<LambdaBody> to streaming response
334            return Ok(self.convert_lambda_response_to_streaming(preflight_response));
335        }
336
337        // 🚀 DELEGATION: Convert Lambda request to hyper request
338        let hyper_req = crate::adapter::lambda_to_hyper_request(req)
339            .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?;
340
341        // Check custom routes (e.g., .well-known) before MCP delegation
342        let path = hyper_req.uri().path().to_string();
343        if !self.route_registry.is_empty() {
344            match self.route_registry.match_route(&path) {
345                Ok(Some(route_handler)) => {
346                    debug!("Custom route matched (streaming): {}", path);
347                    use http_body_util::BodyExt;
348                    let (parts, body) = hyper_req.into_parts();
349                    let boxed_req = hyper::Request::from_parts(parts, body.boxed_unsync());
350                    return Ok(route_handler.handle(boxed_req).await);
351                }
352                Ok(None) => {} // No match, continue to MCP handler
353                Err(e) => {
354                    debug!("Route validation error (streaming): {}", e);
355                    return Ok(e.into_response());
356                }
357            }
358        }
359
360        // 🚀 PROTOCOL ROUTING: Check protocol version and route to appropriate handler
361        use turul_http_mcp_server::protocol::McpProtocolVersion;
362        let protocol_version = hyper_req
363            .headers()
364            .get("MCP-Protocol-Version")
365            .and_then(|h| h.to_str().ok())
366            .and_then(McpProtocolVersion::parse_version)
367            .unwrap_or(McpProtocolVersion::V2025_06_18);
368
369        // Route based on protocol version
370        let hyper_resp = if protocol_version.supports_streamable_http() {
371            // Use StreamableHttpHandler for MCP 2025-11-25 (proper headers, chunked SSE)
372            debug!(
373                "Using StreamableHttpHandler for protocol {}",
374                protocol_version.to_string()
375            );
376            self.streamable_handler.handle_request(hyper_req).await
377        } else {
378            // Legacy protocol: use SessionMcpHandler
379            debug!(
380                "Using SessionMcpHandler for legacy protocol {}",
381                protocol_version.to_string()
382            );
383            self.session_handler
384                .handle_mcp_request(hyper_req)
385                .await
386                .map_err(|e| {
387                    Box::new(crate::error::LambdaError::McpFramework(e.to_string()))
388                        as Box<dyn std::error::Error + Send + Sync>
389                })?
390        };
391
392        // 🚀 DELEGATION: Convert hyper response to Lambda streaming response (preserves streaming!)
393        let mut lambda_resp = crate::adapter::hyper_to_lambda_streaming(hyper_resp);
394
395        // Apply CORS headers if configured (Lambda-specific logic)
396        #[cfg(feature = "cors")]
397        if let Some(ref cors_config) = self.cors_config {
398            inject_cors_headers(&mut lambda_resp, cors_config, request_origin.as_deref())
399                .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?;
400        }
401
402        Ok(lambda_resp)
403    }
404
405    /// Convert Lambda response to streaming format (helper for CORS preflight)
406    fn convert_lambda_response_to_streaming(
407        &self,
408        lambda_response: LambdaResponse<LambdaBody>,
409    ) -> lambda_http::Response<http_body_util::combinators::UnsyncBoxBody<bytes::Bytes, hyper::Error>>
410    {
411        use bytes::Bytes;
412        use http_body_util::{BodyExt, Full};
413
414        let (parts, body) = lambda_response.into_parts();
415        let body_bytes = match body {
416            LambdaBody::Empty => Bytes::new(),
417            LambdaBody::Text(text) => Bytes::from(text),
418            LambdaBody::Binary(bytes) => Bytes::from(bytes),
419            _ => Bytes::new(),
420        };
421
422        // Map error type from Infallible to hyper::Error
423        let streaming_body = Full::new(body_bytes)
424            .map_err(|e: std::convert::Infallible| match e {})
425            .boxed_unsync();
426
427        lambda_http::Response::from_parts(parts, streaming_body)
428    }
429}
430
431#[cfg(test)]
432mod tests {
433    use super::*;
434    use http::Request;
435    use turul_mcp_session_storage::InMemorySessionStorage;
436
437    #[tokio::test]
438    async fn test_handler_creation() {
439        let session_storage = Arc::new(InMemorySessionStorage::new());
440        let stream_manager = Arc::new(StreamManager::new(session_storage.clone()));
441        let dispatcher = JsonRpcDispatcher::new();
442        let config = ServerConfig::default();
443        let implementation = turul_mcp_protocol::Implementation::new("test", "1.0.0");
444        let capabilities = ServerCapabilities::default();
445
446        let handler = LambdaMcpHandler::new(
447            dispatcher,
448            session_storage,
449            stream_manager,
450            config,
451            StreamConfig::default(),
452            implementation,
453            capabilities,
454            false, // SSE disabled for test
455            #[cfg(feature = "cors")]
456            None,
457        );
458
459        // Test that handler was created successfully
460        assert!(!handler.sse_enabled);
461    }
462
463    #[tokio::test]
464    async fn test_sse_enabled_with_handle_works() {
465        let session_storage = Arc::new(InMemorySessionStorage::new());
466        let stream_manager = Arc::new(StreamManager::new(session_storage.clone()));
467        let dispatcher = JsonRpcDispatcher::new();
468        let config = ServerConfig::default();
469        let implementation = turul_mcp_protocol::Implementation::new("test", "1.0.0");
470        let capabilities = ServerCapabilities::default();
471
472        // Create handler with SSE enabled
473        let handler = LambdaMcpHandler::new(
474            dispatcher,
475            session_storage,
476            stream_manager,
477            config,
478            StreamConfig::default(),
479            implementation,
480            capabilities,
481            true, // SSE enabled - should work with handle() for snapshot-based SSE
482            #[cfg(feature = "cors")]
483            None,
484        );
485
486        // Create a test Lambda request
487        let lambda_req = Request::builder()
488            .method("POST")
489            .uri("/mcp")
490            .body(LambdaBody::Text(
491                r#"{"jsonrpc":"2.0","method":"initialize","id":1}"#.to_string(),
492            ))
493            .unwrap();
494
495        // handle() should work (provides snapshot-based SSE rather than real-time streaming)
496        let result = handler.handle(lambda_req).await;
497        assert!(
498            result.is_ok(),
499            "handle() should work with SSE enabled for snapshot-based responses"
500        );
501    }
502
503    /// Test that verifies StreamConfig is properly threaded through the delegation
504    #[tokio::test]
505    async fn test_stream_config_preservation() {
506        let session_storage = Arc::new(InMemorySessionStorage::new());
507        let dispatcher = JsonRpcDispatcher::new();
508        let config = ServerConfig::default();
509        let implementation = turul_mcp_protocol::Implementation::new("test", "1.0.0");
510        let capabilities = ServerCapabilities::default();
511
512        // Create a custom StreamConfig with non-default values
513        let custom_stream_config = StreamConfig {
514            channel_buffer_size: 1024,      // Non-default value (default is 1000)
515            max_replay_events: 200,         // Non-default value (default is 100)
516            keepalive_interval_seconds: 10, // Non-default value (default is 30)
517            cors_origin: "https://custom-test.example.com".to_string(), // Non-default value
518        };
519
520        // Create stream manager with the custom config
521        let stream_manager = Arc::new(StreamManager::with_config(
522            session_storage.clone(),
523            custom_stream_config.clone(),
524        ));
525
526        let handler = LambdaMcpHandler::new(
527            dispatcher,
528            session_storage,
529            stream_manager,
530            config,
531            custom_stream_config.clone(),
532            implementation,
533            capabilities,
534            false, // SSE disabled for test
535            #[cfg(feature = "cors")]
536            None,
537        );
538
539        // The handler should be created successfully, proving the StreamConfig was accepted
540        assert!(!handler.sse_enabled);
541
542        // Verify that the stream manager has the custom configuration
543        let stream_manager = handler.get_stream_manager();
544
545        // Verify the StreamConfig values were propagated correctly
546        let actual_config = stream_manager.get_config();
547
548        assert_eq!(
549            actual_config.channel_buffer_size, custom_stream_config.channel_buffer_size,
550            "Custom channel_buffer_size was not propagated correctly"
551        );
552        assert_eq!(
553            actual_config.max_replay_events, custom_stream_config.max_replay_events,
554            "Custom max_replay_events was not propagated correctly"
555        );
556        assert_eq!(
557            actual_config.keepalive_interval_seconds,
558            custom_stream_config.keepalive_interval_seconds,
559            "Custom keepalive_interval_seconds was not propagated correctly"
560        );
561        assert_eq!(
562            actual_config.cors_origin, custom_stream_config.cors_origin,
563            "Custom cors_origin was not propagated correctly"
564        );
565
566        // Verify the stream manager is accessible (proves delegation worked)
567        assert!(Arc::strong_count(stream_manager) >= 1);
568    }
569
570    /// Test the full builder → server → handler chain with StreamConfig
571    #[tokio::test]
572    async fn test_full_builder_chain_stream_config() {
573        use crate::LambdaMcpServerBuilder;
574        use turul_mcp_session_storage::InMemorySessionStorage;
575
576        // Create a custom StreamConfig with non-default values
577        let custom_stream_config = turul_http_mcp_server::StreamConfig {
578            channel_buffer_size: 2048,      // Non-default value
579            max_replay_events: 500,         // Non-default value
580            keepalive_interval_seconds: 15, // Non-default value
581            cors_origin: "https://full-chain-test.example.com".to_string(),
582        };
583
584        // Test the complete builder → server → handler chain
585        let server = LambdaMcpServerBuilder::new()
586            .name("full-chain-test")
587            .version("1.0.0")
588            .storage(Arc::new(InMemorySessionStorage::new()))
589            .sse(true) // Enable SSE to test streaming functionality
590            .stream_config(custom_stream_config.clone())
591            .build()
592            .await
593            .expect("Server should build successfully");
594
595        // Create handler from server (this is the critical chain step)
596        let handler = server
597            .handler()
598            .await
599            .expect("Handler should be created from server");
600
601        // Verify the handler was created successfully
602        assert!(handler.sse_enabled, "SSE should be enabled");
603
604        // Verify that the custom StreamConfig was preserved through the entire chain
605        let stream_manager = handler.get_stream_manager();
606        let actual_config = stream_manager.get_config();
607
608        assert_eq!(
609            actual_config.channel_buffer_size, custom_stream_config.channel_buffer_size,
610            "Custom channel_buffer_size should be preserved through builder → server → handler chain"
611        );
612        assert_eq!(
613            actual_config.max_replay_events, custom_stream_config.max_replay_events,
614            "Custom max_replay_events should be preserved through builder → server → handler chain"
615        );
616        assert_eq!(
617            actual_config.keepalive_interval_seconds,
618            custom_stream_config.keepalive_interval_seconds,
619            "Custom keepalive_interval_seconds should be preserved through builder → server → handler chain"
620        );
621        assert_eq!(
622            actual_config.cors_origin, custom_stream_config.cors_origin,
623            "Custom cors_origin should be preserved through builder → server → handler chain"
624        );
625
626        // Verify the stream manager is functional
627        assert!(
628            Arc::strong_count(stream_manager) >= 1,
629            "Stream manager should be properly initialized"
630        );
631
632        // Additional verification: Test that the configuration is actually used functionally
633        // by verifying the stream manager can be used with the custom configuration
634        let test_session_id = uuid::Uuid::now_v7().as_simple().to_string();
635
636        // The stream manager should be able to handle session operations with the custom config
637        // This verifies the config isn't just preserved but actually used
638        let subscriptions = stream_manager.get_subscriptions(&test_session_id).await;
639        assert!(
640            subscriptions.is_empty(),
641            "New session should have no subscriptions initially"
642        );
643
644        // Verify the stream manager was constructed with our custom config values
645        // This confirms the config propagated through the entire builder → server → handler chain
646        assert_eq!(
647            stream_manager.get_config().channel_buffer_size,
648            2048,
649            "Stream manager should be using the custom buffer size functionally"
650        );
651    }
652
653    /// Test matrix: 4 combinations of streaming runtime vs SSE configuration
654    /// This ensures we don't have runtime hangs or configuration conflicts
655    ///
656    /// Test 1: Non-streaming runtime + sse(false) - This should work (snapshot mode)
657    #[tokio::test]
658    async fn test_non_streaming_runtime_sse_false() {
659        use crate::LambdaMcpServerBuilder;
660        use turul_mcp_session_storage::InMemorySessionStorage;
661
662        let server = LambdaMcpServerBuilder::new()
663            .name("test-non-streaming-sse-false")
664            .version("1.0.0")
665            .storage(Arc::new(InMemorySessionStorage::new()))
666            .sse(false) // Disable SSE for non-streaming runtime
667            .build()
668            .await
669            .expect("Server should build successfully");
670
671        let handler = server
672            .handler()
673            .await
674            .expect("Handler should be created from server");
675
676        // Verify configuration
677        assert!(!handler.sse_enabled, "SSE should be disabled");
678
679        // Create a test request (POST /mcp works in all configs)
680        let lambda_req = Request::builder()
681            .method("POST")
682            .uri("/mcp")
683            .body(LambdaBody::Text(
684                r#"{"jsonrpc":"2.0","method":"initialize","id":1}"#.to_string(),
685            ))
686            .unwrap();
687
688        // This should work without hanging
689        let result = handler.handle(lambda_req).await;
690        assert!(
691            result.is_ok(),
692            "POST /mcp should work with non-streaming + sse(false)"
693        );
694    }
695
696    /// Test 2: Non-streaming runtime + sse(true) - This should work (snapshot-based SSE)
697    #[tokio::test]
698    async fn test_non_streaming_runtime_sse_true() {
699        use crate::LambdaMcpServerBuilder;
700        use turul_mcp_session_storage::InMemorySessionStorage;
701
702        let server = LambdaMcpServerBuilder::new()
703            .name("test-non-streaming-sse-true")
704            .version("1.0.0")
705            .storage(Arc::new(InMemorySessionStorage::new()))
706            .sse(true) // Enable SSE for snapshot-based responses
707            .build()
708            .await
709            .expect("Server should build successfully");
710
711        let handler = server
712            .handler()
713            .await
714            .expect("Handler should be created from server");
715
716        // Verify configuration
717        assert!(handler.sse_enabled, "SSE should be enabled");
718
719        // Create a test request (POST /mcp works in all configs)
720        let lambda_req = Request::builder()
721            .method("POST")
722            .uri("/mcp")
723            .body(LambdaBody::Text(
724                r#"{"jsonrpc":"2.0","method":"initialize","id":1}"#.to_string(),
725            ))
726            .unwrap();
727
728        // This should work without hanging (provides snapshot-based SSE)
729        let result = handler.handle(lambda_req).await;
730        assert!(
731            result.is_ok(),
732            "POST /mcp should work with non-streaming + sse(true)"
733        );
734
735        // Note: GET /mcp would provide snapshot events, not real-time streaming
736        // This is the key difference from handle_streaming()
737    }
738
739    /// Test 3: Streaming runtime + sse(false) - This should work (SSE disabled)
740    #[tokio::test]
741    async fn test_streaming_runtime_sse_false() {
742        use crate::LambdaMcpServerBuilder;
743        use turul_mcp_session_storage::InMemorySessionStorage;
744
745        let server = LambdaMcpServerBuilder::new()
746            .name("test-streaming-sse-false")
747            .version("1.0.0")
748            .storage(Arc::new(InMemorySessionStorage::new()))
749            .sse(false) // Disable SSE even with streaming runtime
750            .build()
751            .await
752            .expect("Server should build successfully");
753
754        let handler = server
755            .handler()
756            .await
757            .expect("Handler should be created from server");
758
759        // Verify configuration
760        assert!(!handler.sse_enabled, "SSE should be disabled");
761
762        // Create a test request for streaming handler
763        let lambda_req = Request::builder()
764            .method("POST")
765            .uri("/mcp")
766            .body(LambdaBody::Text(
767                r#"{"jsonrpc":"2.0","method":"initialize","id":1}"#.to_string(),
768            ))
769            .unwrap();
770
771        // This should work with streaming runtime even when SSE is disabled
772        let result = handler.handle_streaming(lambda_req).await;
773        assert!(
774            result.is_ok(),
775            "Streaming runtime should work with sse(false)"
776        );
777    }
778
779    /// Test 4: Streaming runtime + sse(true) - This should work (real-time SSE streaming)
780    #[tokio::test]
781    async fn test_streaming_runtime_sse_true() {
782        use crate::LambdaMcpServerBuilder;
783        use turul_mcp_session_storage::InMemorySessionStorage;
784
785        let server = LambdaMcpServerBuilder::new()
786            .name("test-streaming-sse-true")
787            .version("1.0.0")
788            .storage(Arc::new(InMemorySessionStorage::new()))
789            .sse(true) // Enable SSE with streaming runtime for real-time streaming
790            .build()
791            .await
792            .expect("Server should build successfully");
793
794        let handler = server
795            .handler()
796            .await
797            .expect("Handler should be created from server");
798
799        // Verify configuration
800        assert!(handler.sse_enabled, "SSE should be enabled");
801
802        // Create a test request for streaming handler
803        let lambda_req = Request::builder()
804            .method("POST")
805            .uri("/mcp")
806            .body(LambdaBody::Text(
807                r#"{"jsonrpc":"2.0","method":"initialize","id":1}"#.to_string(),
808            ))
809            .unwrap();
810
811        // This should work and provide real-time SSE streaming
812        let result = handler.handle_streaming(lambda_req).await;
813        assert!(
814            result.is_ok(),
815            "Streaming runtime should work with sse(true) for real-time streaming"
816        );
817
818        // Note: GET /mcp would provide real-time streaming events
819        // This is the optimal configuration for real-time notifications
820    }
821
822    // ── Strict lifecycle tests over handle_streaming() ────────────────
823
824    /// Helper: build a Lambda handler with strict lifecycle and a test tool via the builder.
825    async fn build_strict_streaming_handler() -> LambdaMcpHandler {
826        use crate::LambdaMcpServerBuilder;
827        use turul_mcp_session_storage::InMemorySessionStorage;
828
829        let server = LambdaMcpServerBuilder::new()
830            .name("lifecycle-test")
831            .version("1.0.0")
832            .tool(LifecycleTestTool)
833            .storage(Arc::new(InMemorySessionStorage::new()))
834            .sse(true)
835            .build()
836            .await
837            .expect("build should succeed");
838
839        server.handler().await.expect("handler should succeed")
840    }
841
842    // Test tool for lifecycle tests — satisfies all required traits
843    #[derive(Clone, Default)]
844    struct LifecycleTestTool;
845
846    impl turul_mcp_builders::traits::HasBaseMetadata for LifecycleTestTool {
847        fn name(&self) -> &str { "ping_tool" }
848    }
849    impl turul_mcp_builders::traits::HasDescription for LifecycleTestTool {
850        fn description(&self) -> Option<&str> { Some("test tool") }
851    }
852    impl turul_mcp_builders::traits::HasInputSchema for LifecycleTestTool {
853        fn input_schema(&self) -> &turul_mcp_protocol::ToolSchema {
854            static SCHEMA: std::sync::OnceLock<turul_mcp_protocol::ToolSchema> = std::sync::OnceLock::new();
855            SCHEMA.get_or_init(turul_mcp_protocol::ToolSchema::object)
856        }
857    }
858    impl turul_mcp_builders::traits::HasOutputSchema for LifecycleTestTool {
859        fn output_schema(&self) -> Option<&turul_mcp_protocol::ToolSchema> { None }
860    }
861    impl turul_mcp_builders::traits::HasAnnotations for LifecycleTestTool {
862        fn annotations(&self) -> Option<&turul_mcp_protocol::tools::ToolAnnotations> { None }
863    }
864    impl turul_mcp_builders::traits::HasToolMeta for LifecycleTestTool {
865        fn tool_meta(&self) -> Option<&std::collections::HashMap<String, serde_json::Value>> { None }
866    }
867    impl turul_mcp_builders::traits::HasIcons for LifecycleTestTool {}
868    impl turul_mcp_builders::traits::HasExecution for LifecycleTestTool {}
869
870    #[async_trait::async_trait]
871    impl turul_mcp_server::McpTool for LifecycleTestTool {
872        async fn call(
873            &self,
874            _args: serde_json::Value,
875            _session: Option<turul_mcp_server::SessionContext>,
876        ) -> turul_mcp_server::McpResult<turul_mcp_protocol::tools::CallToolResult> {
877            Ok(turul_mcp_protocol::tools::CallToolResult::success(vec![
878                turul_mcp_protocol::tools::ToolResult::text("pong"),
879            ]))
880        }
881    }
882
883    /// Helper: create a Lambda POST request for handle_streaming()
884    fn streaming_mcp_request(body: &str, session_id: Option<&str>) -> LambdaRequest {
885        let mut builder = Request::builder()
886            .method("POST")
887            .uri("/mcp")
888            .header("Content-Type", "application/json")
889            .header("Accept", "application/json, text/event-stream")
890            .header("MCP-Protocol-Version", "2025-11-25");
891
892        if let Some(sid) = session_id {
893            builder = builder.header("Mcp-Session-Id", sid);
894        }
895
896        builder
897            .body(LambdaBody::Text(body.to_string()))
898            .unwrap()
899    }
900
901    /// Helper: collect streaming response body into a string
902    async fn collect_streaming_body(
903        response: lambda_http::Response<
904            http_body_util::combinators::UnsyncBoxBody<bytes::Bytes, hyper::Error>,
905        >,
906    ) -> (http::StatusCode, String) {
907        use http_body_util::BodyExt;
908        let status = response.status();
909        let session_id = response
910            .headers()
911            .get("Mcp-Session-Id")
912            .and_then(|v| v.to_str().ok())
913            .map(String::from);
914        let body_bytes = response
915            .into_body()
916            .collect()
917            .await
918            .map(|c| c.to_bytes())
919            .unwrap_or_default();
920        let body_str = String::from_utf8_lossy(&body_bytes).to_string();
921        let _ = session_id; // available if needed
922        (status, body_str)
923    }
924
925    /// Helper: extract session ID from a streaming response
926    fn extract_session_id(
927        response: &lambda_http::Response<
928            http_body_util::combinators::UnsyncBoxBody<bytes::Bytes, hyper::Error>,
929        >,
930    ) -> Option<String> {
931        response
932            .headers()
933            .get("Mcp-Session-Id")
934            .and_then(|v| v.to_str().ok())
935            .map(String::from)
936    }
937
938    /// Helper: parse JSON from a response body (handles SSE "data: " prefix)
939    fn parse_response_json(body: &str) -> serde_json::Value {
940        // Strip SSE framing if present
941        let json_str = body
942            .lines()
943            .find(|line| line.starts_with("data: "))
944            .map(|line| &line[6..])
945            .unwrap_or(body.trim());
946        serde_json::from_str(json_str)
947            .unwrap_or_else(|e| panic!("Failed to parse JSON from body: {e}\nBody: {body}"))
948    }
949
950    /// P0: Full strict lifecycle handshake succeeds on handle_streaming()
951    #[tokio::test]
952    async fn test_lambda_streaming_strict_handshake_succeeds() {
953        let handler = build_strict_streaming_handler().await;
954
955        // Step 1: initialize
956        let init_req = streaming_mcp_request(
957            &serde_json::json!({
958                "jsonrpc": "2.0", "method": "initialize", "id": 1,
959                "params": {
960                    "protocolVersion": "2025-11-25",
961                    "capabilities": {},
962                    "clientInfo": { "name": "test", "version": "1.0.0" }
963                }
964            }).to_string(),
965            None,
966        );
967        let init_resp = handler.handle_streaming(init_req).await.expect("initialize should succeed");
968        let session_id = extract_session_id(&init_resp).expect("must return session ID");
969        let (status, _body) = collect_streaming_body(init_resp).await;
970        assert_eq!(status, 200, "initialize should return 200");
971
972        // Step 2: notifications/initialized
973        let notif_req = streaming_mcp_request(
974            &serde_json::json!({
975                "jsonrpc": "2.0",
976                "method": "notifications/initialized",
977                "params": {}
978            }).to_string(),
979            Some(&session_id),
980        );
981        let notif_resp = handler.handle_streaming(notif_req).await.expect("notification should succeed");
982        let (status, _) = collect_streaming_body(notif_resp).await;
983        assert_eq!(status, 202, "notifications/initialized should return 202");
984
985        // Step 3: tools/list
986        let list_req = streaming_mcp_request(
987            &serde_json::json!({
988                "jsonrpc": "2.0", "method": "tools/list", "id": 2
989            }).to_string(),
990            Some(&session_id),
991        );
992        let list_resp = handler.handle_streaming(list_req).await.expect("tools/list should succeed");
993        let (status, body) = collect_streaming_body(list_resp).await;
994        assert_eq!(status, 200, "tools/list should return 200");
995        let json = parse_response_json(&body);
996        assert!(json["result"]["tools"].is_array(), "tools/list should return tools array: {json}");
997
998        // Step 4: tools/call
999        let call_req = streaming_mcp_request(
1000            &serde_json::json!({
1001                "jsonrpc": "2.0", "method": "tools/call", "id": 3,
1002                "params": { "name": "ping_tool", "arguments": {} }
1003            }).to_string(),
1004            Some(&session_id),
1005        );
1006        let call_resp = handler.handle_streaming(call_req).await.expect("tools/call should succeed");
1007        let (status, body) = collect_streaming_body(call_resp).await;
1008        assert_eq!(status, 200, "tools/call should return 200");
1009        let json = parse_response_json(&body);
1010        assert!(json["result"].is_object(), "tools/call should return result: {json}");
1011    }
1012
1013    /// P0: Strict lifecycle rejects both tools/list and tools/call before notifications/initialized
1014    #[tokio::test]
1015    async fn test_lambda_streaming_strict_rejects_before_initialized() {
1016        let handler = build_strict_streaming_handler().await;
1017
1018        // Initialize to get session
1019        let init_req = streaming_mcp_request(
1020            &serde_json::json!({
1021                "jsonrpc": "2.0", "method": "initialize", "id": 1,
1022                "params": {
1023                    "protocolVersion": "2025-11-25",
1024                    "capabilities": {},
1025                    "clientInfo": { "name": "test", "version": "1.0.0" }
1026                }
1027            }).to_string(),
1028            None,
1029        );
1030        let init_resp = handler.handle_streaming(init_req).await.unwrap();
1031        let session_id = extract_session_id(&init_resp).unwrap();
1032        let _ = collect_streaming_body(init_resp).await;
1033
1034        // tools/list without notifications/initialized — must fail
1035        let list_req = streaming_mcp_request(
1036            &serde_json::json!({
1037                "jsonrpc": "2.0", "method": "tools/list", "id": 2
1038            }).to_string(),
1039            Some(&session_id),
1040        );
1041        let list_resp = handler.handle_streaming(list_req).await.unwrap();
1042        let (_, body) = collect_streaming_body(list_resp).await;
1043        let json = parse_response_json(&body);
1044        assert!(json["error"].is_object(), "tools/list should return JSON-RPC error: {json}");
1045        assert_eq!(
1046            json["error"]["code"].as_i64().unwrap(),
1047            -32031,
1048            "tools/list must return SessionError code -32031, got: {json}"
1049        );
1050        assert!(
1051            json["error"]["message"].as_str().unwrap().contains("notifications/initialized"),
1052            "Error must mention notifications/initialized: {}",
1053            json["error"]["message"]
1054        );
1055
1056        // tools/call without notifications/initialized — must also fail
1057        let call_req = streaming_mcp_request(
1058            &serde_json::json!({
1059                "jsonrpc": "2.0", "method": "tools/call", "id": 3,
1060                "params": { "name": "ping_tool", "arguments": {} }
1061            }).to_string(),
1062            Some(&session_id),
1063        );
1064        let call_resp = handler.handle_streaming(call_req).await.unwrap();
1065        let (_, body) = collect_streaming_body(call_resp).await;
1066        let json = parse_response_json(&body);
1067        assert!(json["error"].is_object(), "tools/call should return JSON-RPC error: {json}");
1068        assert_eq!(
1069            json["error"]["code"].as_i64().unwrap(),
1070            -32031,
1071            "tools/call must return SessionError code -32031, got: {json}"
1072        );
1073        assert!(
1074            json["error"]["message"].as_str().unwrap().contains("notifications/initialized"),
1075            "Error must mention notifications/initialized: {}",
1076            json["error"]["message"]
1077        );
1078    }
1079
1080    /// P0: tools/list succeeds immediately after notifications/initialized (race fix proof)
1081    #[tokio::test]
1082    async fn test_lambda_streaming_initialized_is_effective_immediately() {
1083        let handler = build_strict_streaming_handler().await;
1084
1085        // Initialize
1086        let init_req = streaming_mcp_request(
1087            &serde_json::json!({
1088                "jsonrpc": "2.0", "method": "initialize", "id": 1,
1089                "params": {
1090                    "protocolVersion": "2025-11-25",
1091                    "capabilities": {},
1092                    "clientInfo": { "name": "test", "version": "1.0.0" }
1093                }
1094            }).to_string(),
1095            None,
1096        );
1097        let init_resp = handler.handle_streaming(init_req).await.unwrap();
1098        let session_id = extract_session_id(&init_resp).unwrap();
1099        let _ = collect_streaming_body(init_resp).await;
1100
1101        // notifications/initialized
1102        let notif_req = streaming_mcp_request(
1103            &serde_json::json!({
1104                "jsonrpc": "2.0",
1105                "method": "notifications/initialized",
1106                "params": {}
1107            }).to_string(),
1108            Some(&session_id),
1109        );
1110        let notif_resp = handler.handle_streaming(notif_req).await.unwrap();
1111        let (status, _) = collect_streaming_body(notif_resp).await;
1112        assert_eq!(status, 202);
1113
1114        // Immediately — no delay — send tools/list
1115        let list_req = streaming_mcp_request(
1116            &serde_json::json!({
1117                "jsonrpc": "2.0", "method": "tools/list", "id": 2
1118            }).to_string(),
1119            Some(&session_id),
1120        );
1121        let list_resp = handler.handle_streaming(list_req).await.unwrap();
1122        let (status, body) = collect_streaming_body(list_resp).await;
1123        assert_eq!(status, 200, "tools/list must succeed immediately after initialized");
1124        let json = parse_response_json(&body);
1125        assert!(
1126            json["result"]["tools"].is_array(),
1127            "Must return tools list, not error: {json}"
1128        );
1129    }
1130
1131    /// P1: Lenient mode allows operations without notifications/initialized
1132    #[tokio::test]
1133    async fn test_lambda_streaming_lenient_mode_allows_without_initialized() {
1134        use crate::LambdaMcpServerBuilder;
1135        use turul_mcp_session_storage::InMemorySessionStorage;
1136
1137        let server = LambdaMcpServerBuilder::new()
1138            .name("lenient-test")
1139            .version("1.0.0")
1140            .tool(LifecycleTestTool)
1141            .storage(Arc::new(InMemorySessionStorage::new()))
1142            .strict_lifecycle(false) // lenient mode
1143            .sse(true)
1144            .build()
1145            .await
1146            .unwrap();
1147
1148        let handler = server.handler().await.unwrap();
1149
1150        // Initialize (no notifications/initialized)
1151        let init_req = streaming_mcp_request(
1152            &serde_json::json!({
1153                "jsonrpc": "2.0", "method": "initialize", "id": 1,
1154                "params": {
1155                    "protocolVersion": "2025-11-25",
1156                    "capabilities": {},
1157                    "clientInfo": { "name": "test", "version": "1.0.0" }
1158                }
1159            }).to_string(),
1160            None,
1161        );
1162        let init_resp = handler.handle_streaming(init_req).await.unwrap();
1163        let session_id = extract_session_id(&init_resp).unwrap();
1164        let _ = collect_streaming_body(init_resp).await;
1165
1166        // Skip notifications/initialized — go straight to tools/list
1167        let list_req = streaming_mcp_request(
1168            &serde_json::json!({
1169                "jsonrpc": "2.0", "method": "tools/list", "id": 2
1170            }).to_string(),
1171            Some(&session_id),
1172        );
1173        let list_resp = handler.handle_streaming(list_req).await.unwrap();
1174        let (status, body) = collect_streaming_body(list_resp).await;
1175        assert_eq!(status, 200, "Lenient mode should allow tools/list without initialized");
1176        let json = parse_response_json(&body);
1177        assert!(
1178            json["result"]["tools"].is_array(),
1179            "Must return tools list in lenient mode: {json}"
1180        );
1181    }
1182}