Skip to main content

turul_mcp_aws_lambda/
handler.rs

1//! Lambda MCP handler that delegates to SessionMcpHandler
2//!
3//! This module provides the LambdaMcpHandler that processes Lambda HTTP
4//! requests by delegating to SessionMcpHandler, eliminating code duplication.
5
6use std::sync::Arc;
7
8use lambda_http::{Body as LambdaBody, Request as LambdaRequest, Response as LambdaResponse};
9use tracing::{debug, info};
10
11use turul_http_mcp_server::{
12    ServerConfig, SessionMcpHandler, StreamConfig, StreamManager, StreamableHttpHandler,
13};
14use turul_mcp_json_rpc_server::JsonRpcDispatcher;
15use turul_mcp_protocol::{McpError, ServerCapabilities};
16use turul_mcp_session_storage::BoxedSessionStorage;
17
18use crate::error::Result;
19
20#[cfg(feature = "cors")]
21use crate::cors::{CorsConfig, create_preflight_response, inject_cors_headers};
22
23/// Main handler for Lambda MCP requests
24///
25/// This handler processes MCP requests in Lambda by delegating to SessionMcpHandler,
26/// eliminating 600+ lines of duplicate business logic code.
27///
28/// Features:
29/// 1. Type conversion between lambda_http and hyper
30/// 2. Delegation to SessionMcpHandler for all business logic
31/// 3. CORS support for browser clients
32/// 4. SSE validation to prevent silent failures
33#[derive(Clone)]
34pub struct LambdaMcpHandler {
35    /// SessionMcpHandler for legacy protocol support
36    session_handler: SessionMcpHandler,
37
38    /// StreamableHttpHandler for MCP 2025-11-25 with proper headers
39    streamable_handler: StreamableHttpHandler,
40
41    /// Whether SSE is enabled (used for testing and debugging)
42    #[allow(dead_code)]
43    sse_enabled: bool,
44
45    /// Custom route registry (e.g., .well-known endpoints)
46    route_registry: Arc<turul_http_mcp_server::RouteRegistry>,
47
48    /// CORS configuration (if enabled)
49    #[cfg(feature = "cors")]
50    cors_config: Option<CorsConfig>,
51}
52
53impl LambdaMcpHandler {
54    /// Create a new Lambda MCP handler with the framework components
55    #[allow(clippy::too_many_arguments)]
56    pub fn new(
57        dispatcher: JsonRpcDispatcher<McpError>,
58        session_storage: Arc<BoxedSessionStorage>,
59        stream_manager: Arc<StreamManager>,
60        config: ServerConfig,
61        stream_config: StreamConfig,
62        _implementation: turul_mcp_protocol::Implementation,
63        capabilities: ServerCapabilities,
64        sse_enabled: bool,
65        #[cfg(feature = "cors")] cors_config: Option<CorsConfig>,
66    ) -> Self {
67        let dispatcher = Arc::new(dispatcher);
68
69        // Create empty middleware stack (shared by both handlers)
70        let middleware_stack = Arc::new(turul_http_mcp_server::middleware::MiddlewareStack::new());
71
72        // Create SessionMcpHandler for legacy protocol support
73        let session_handler = SessionMcpHandler::with_shared_stream_manager(
74            config.clone(),
75            dispatcher.clone(),
76            session_storage.clone(),
77            stream_config.clone(),
78            stream_manager.clone(),
79            middleware_stack.clone(),
80        );
81
82        // Create StreamableHttpHandler for MCP 2025-11-25 support
83        let streamable_handler = StreamableHttpHandler::new(
84            Arc::new(config.clone()),
85            dispatcher.clone(),
86            session_storage.clone(),
87            stream_manager.clone(),
88            capabilities.clone(),
89            middleware_stack,
90        );
91
92        Self {
93            session_handler,
94            streamable_handler,
95            sse_enabled,
96            route_registry: Arc::new(turul_http_mcp_server::RouteRegistry::new()),
97            #[cfg(feature = "cors")]
98            cors_config,
99        }
100    }
101
102    /// Create with shared stream manager (for advanced use cases)
103    #[allow(clippy::too_many_arguments)]
104    pub fn with_shared_stream_manager(
105        config: ServerConfig,
106        dispatcher: Arc<JsonRpcDispatcher<McpError>>,
107        session_storage: Arc<BoxedSessionStorage>,
108        stream_manager: Arc<StreamManager>,
109        stream_config: StreamConfig,
110        _implementation: turul_mcp_protocol::Implementation,
111        capabilities: ServerCapabilities,
112        sse_enabled: bool,
113    ) -> Self {
114        // Create empty middleware stack (shared by both handlers)
115        let middleware_stack = Arc::new(turul_http_mcp_server::middleware::MiddlewareStack::new());
116
117        // Create SessionMcpHandler for legacy protocol support
118        let session_handler = SessionMcpHandler::with_shared_stream_manager(
119            config.clone(),
120            dispatcher.clone(),
121            session_storage.clone(),
122            stream_config.clone(),
123            stream_manager.clone(),
124            middleware_stack.clone(),
125        );
126
127        // Create StreamableHttpHandler for MCP 2025-11-25 support
128        let streamable_handler = StreamableHttpHandler::new(
129            Arc::new(config),
130            dispatcher,
131            session_storage,
132            stream_manager,
133            capabilities,
134            middleware_stack,
135        );
136
137        Self {
138            session_handler,
139            streamable_handler,
140            sse_enabled,
141            route_registry: Arc::new(turul_http_mcp_server::RouteRegistry::new()),
142            #[cfg(feature = "cors")]
143            cors_config: None,
144        }
145    }
146
147    /// Create with custom middleware stack (for testing and examples)
148    #[allow(clippy::too_many_arguments)]
149    pub fn with_middleware(
150        config: ServerConfig,
151        dispatcher: Arc<JsonRpcDispatcher<McpError>>,
152        session_storage: Arc<BoxedSessionStorage>,
153        stream_manager: Arc<StreamManager>,
154        stream_config: StreamConfig,
155        capabilities: ServerCapabilities,
156        middleware_stack: Arc<turul_http_mcp_server::middleware::MiddlewareStack>,
157        sse_enabled: bool,
158        route_registry: Arc<turul_http_mcp_server::RouteRegistry>,
159    ) -> Self {
160        // Create SessionMcpHandler with custom middleware
161        let session_handler = SessionMcpHandler::with_shared_stream_manager(
162            config.clone(),
163            dispatcher.clone(),
164            session_storage.clone(),
165            stream_config.clone(),
166            stream_manager.clone(),
167            middleware_stack.clone(),
168        );
169
170        // Create StreamableHttpHandler with custom middleware
171        let streamable_handler = StreamableHttpHandler::new(
172            Arc::new(config),
173            dispatcher,
174            session_storage,
175            stream_manager,
176            capabilities,
177            middleware_stack,
178        );
179
180        Self {
181            session_handler,
182            streamable_handler,
183            sse_enabled,
184            route_registry,
185            #[cfg(feature = "cors")]
186            cors_config: None,
187        }
188    }
189
190    /// Set CORS configuration
191    #[cfg(feature = "cors")]
192    pub fn with_cors(mut self, cors_config: CorsConfig) -> Self {
193        self.cors_config = Some(cors_config);
194        self
195    }
196
197    /// Get access to the underlying stream manager for notifications
198    pub fn get_stream_manager(&self) -> &Arc<StreamManager> {
199        self.session_handler.get_stream_manager()
200    }
201
202    /// Handle a Lambda HTTP request (snapshot mode - no real-time SSE)
203    ///
204    /// This method performs delegation to SessionMcpHandler for all business logic.
205    /// It only handles Lambda-specific concerns: CORS and type conversion.
206    ///
207    /// Note: If SSE is enabled (.sse(true)), SSE responses may not stream properly
208    /// with regular Lambda runtime. For proper SSE streaming, use handle_streaming()
209    /// with run_with_streaming_response().
210    pub async fn handle(&self, req: LambdaRequest) -> Result<LambdaResponse<LambdaBody>> {
211        let method = req.method().clone();
212        let uri = req.uri().clone();
213
214        let request_origin = req
215            .headers()
216            .get("origin")
217            .and_then(|v| v.to_str().ok())
218            .map(|s| s.to_string());
219
220        info!(
221            "🌐 Lambda MCP request: {} {} (origin: {:?})",
222            method, uri, request_origin
223        );
224
225        // Handle CORS preflight requests first (Lambda-specific logic)
226        #[cfg(feature = "cors")]
227        if method == http::Method::OPTIONS
228            && let Some(ref cors_config) = self.cors_config
229        {
230            debug!("Handling CORS preflight request");
231            return create_preflight_response(cors_config, request_origin.as_deref());
232        }
233
234        // 🚀 DELEGATION: Convert Lambda request to hyper request
235        let hyper_req = crate::adapter::lambda_to_hyper_request(req)?;
236
237        // Check custom routes (e.g., .well-known) before MCP delegation
238        let path = hyper_req.uri().path().to_string();
239        if !self.route_registry.is_empty() {
240            match self.route_registry.match_route(&path) {
241                Ok(Some(route_handler)) => {
242                    debug!("Custom route matched: {}", path);
243                    use http_body_util::BodyExt;
244                    let (parts, body) = hyper_req.into_parts();
245                    let boxed_req = hyper::Request::from_parts(parts, body.boxed_unsync());
246                    let route_resp = route_handler.handle(boxed_req).await;
247                    let mut lambda_resp =
248                        crate::adapter::hyper_to_lambda_response(route_resp).await?;
249                    #[cfg(feature = "cors")]
250                    if let Some(ref cors_config) = self.cors_config {
251                        inject_cors_headers(
252                            &mut lambda_resp,
253                            cors_config,
254                            request_origin.as_deref(),
255                        )?;
256                    }
257                    return Ok(lambda_resp);
258                }
259                Ok(None) => {} // No match, continue to MCP handler
260                Err(e) => {
261                    debug!("Route validation error: {}", e);
262                    let route_resp = e.into_response();
263                    let mut lambda_resp =
264                        crate::adapter::hyper_to_lambda_response(route_resp).await?;
265                    #[cfg(feature = "cors")]
266                    if let Some(ref cors_config) = self.cors_config {
267                        inject_cors_headers(
268                            &mut lambda_resp,
269                            cors_config,
270                            request_origin.as_deref(),
271                        )?;
272                    }
273                    return Ok(lambda_resp);
274                }
275            }
276        }
277
278        // 🚀 DELEGATION: Use SessionMcpHandler for all business logic
279        let hyper_resp = self
280            .session_handler
281            .handle_mcp_request(hyper_req)
282            .await
283            .map_err(|e| crate::error::LambdaError::McpFramework(e.to_string()))?;
284
285        // 🚀 DELEGATION: Convert hyper response back to Lambda response
286        let mut lambda_resp = crate::adapter::hyper_to_lambda_response(hyper_resp).await?;
287
288        // Apply CORS headers if configured (Lambda-specific logic)
289        #[cfg(feature = "cors")]
290        if let Some(ref cors_config) = self.cors_config {
291            inject_cors_headers(&mut lambda_resp, cors_config, request_origin.as_deref())?;
292        }
293
294        Ok(lambda_resp)
295    }
296
297    /// Handle Lambda streaming request (real SSE streaming)
298    ///
299    /// This method enables real-time SSE streaming using Lambda's streaming response capability.
300    /// It delegates all business logic to SessionMcpHandler.
301    pub async fn handle_streaming(
302        &self,
303        req: LambdaRequest,
304    ) -> std::result::Result<
305        lambda_http::Response<
306            http_body_util::combinators::UnsyncBoxBody<bytes::Bytes, hyper::Error>,
307        >,
308        Box<dyn std::error::Error + Send + Sync>,
309    > {
310        let method = req.method().clone();
311        let uri = req.uri().clone();
312        let request_origin = req
313            .headers()
314            .get("origin")
315            .and_then(|v| v.to_str().ok())
316            .map(|s| s.to_string());
317
318        debug!(
319            "🌊 Lambda streaming MCP request: {} {} (origin: {:?})",
320            method, uri, request_origin
321        );
322
323        // Handle CORS preflight requests first (Lambda-specific logic)
324        #[cfg(feature = "cors")]
325        if method == http::Method::OPTIONS
326            && let Some(ref cors_config) = self.cors_config
327        {
328            debug!("Handling CORS preflight request (streaming)");
329            let preflight_response =
330                create_preflight_response(cors_config, request_origin.as_deref())
331                    .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?;
332
333            // Convert LambdaResponse<LambdaBody> to streaming response
334            return Ok(self.convert_lambda_response_to_streaming(preflight_response));
335        }
336
337        // 🚀 DELEGATION: Convert Lambda request to hyper request
338        let hyper_req = crate::adapter::lambda_to_hyper_request(req)
339            .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?;
340
341        // Check custom routes (e.g., .well-known) before MCP delegation
342        let path = hyper_req.uri().path().to_string();
343        if !self.route_registry.is_empty() {
344            match self.route_registry.match_route(&path) {
345                Ok(Some(route_handler)) => {
346                    debug!("Custom route matched (streaming): {}", path);
347                    use http_body_util::BodyExt;
348                    let (parts, body) = hyper_req.into_parts();
349                    let boxed_req = hyper::Request::from_parts(parts, body.boxed_unsync());
350                    return Ok(route_handler.handle(boxed_req).await);
351                }
352                Ok(None) => {} // No match, continue to MCP handler
353                Err(e) => {
354                    debug!("Route validation error (streaming): {}", e);
355                    return Ok(e.into_response());
356                }
357            }
358        }
359
360        // 🚀 PROTOCOL ROUTING: Check protocol version and route to appropriate handler
361        use turul_http_mcp_server::protocol::McpProtocolVersion;
362        let protocol_version = hyper_req
363            .headers()
364            .get("MCP-Protocol-Version")
365            .and_then(|h| h.to_str().ok())
366            .and_then(McpProtocolVersion::parse_version)
367            .unwrap_or(McpProtocolVersion::V2025_06_18);
368
369        // Route based on protocol version
370        let hyper_resp = if protocol_version.supports_streamable_http() {
371            // Use StreamableHttpHandler for MCP 2025-11-25 (proper headers, chunked SSE)
372            debug!(
373                "Using StreamableHttpHandler for protocol {}",
374                protocol_version.to_string()
375            );
376            self.streamable_handler.handle_request(hyper_req).await
377        } else {
378            // Legacy protocol: use SessionMcpHandler
379            debug!(
380                "Using SessionMcpHandler for legacy protocol {}",
381                protocol_version.to_string()
382            );
383            self.session_handler
384                .handle_mcp_request(hyper_req)
385                .await
386                .map_err(|e| {
387                    Box::new(crate::error::LambdaError::McpFramework(e.to_string()))
388                        as Box<dyn std::error::Error + Send + Sync>
389                })?
390        };
391
392        // 🚀 DELEGATION: Convert hyper response to Lambda streaming response (preserves streaming!)
393        let mut lambda_resp = crate::adapter::hyper_to_lambda_streaming(hyper_resp);
394
395        // Apply CORS headers if configured (Lambda-specific logic)
396        #[cfg(feature = "cors")]
397        if let Some(ref cors_config) = self.cors_config {
398            inject_cors_headers(&mut lambda_resp, cors_config, request_origin.as_deref())
399                .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?;
400        }
401
402        Ok(lambda_resp)
403    }
404
405    /// Convert Lambda response to streaming format (helper for CORS preflight)
406    fn convert_lambda_response_to_streaming(
407        &self,
408        lambda_response: LambdaResponse<LambdaBody>,
409    ) -> lambda_http::Response<http_body_util::combinators::UnsyncBoxBody<bytes::Bytes, hyper::Error>>
410    {
411        use bytes::Bytes;
412        use http_body_util::{BodyExt, Full};
413
414        let (parts, body) = lambda_response.into_parts();
415        let body_bytes = match body {
416            LambdaBody::Empty => Bytes::new(),
417            LambdaBody::Text(text) => Bytes::from(text),
418            LambdaBody::Binary(bytes) => Bytes::from(bytes),
419            _ => Bytes::new(),
420        };
421
422        // Map error type from Infallible to hyper::Error
423        let streaming_body = Full::new(body_bytes)
424            .map_err(|e: std::convert::Infallible| match e {})
425            .boxed_unsync();
426
427        lambda_http::Response::from_parts(parts, streaming_body)
428    }
429}
430
431#[cfg(test)]
432mod tests {
433    use super::*;
434    use http::Request;
435    use turul_mcp_session_storage::InMemorySessionStorage;
436
437    #[tokio::test]
438    async fn test_handler_creation() {
439        let session_storage = Arc::new(InMemorySessionStorage::new());
440        let stream_manager = Arc::new(StreamManager::new(session_storage.clone()));
441        let dispatcher = JsonRpcDispatcher::new();
442        let config = ServerConfig::default();
443        let implementation = turul_mcp_protocol::Implementation::new("test", "1.0.0");
444        let capabilities = ServerCapabilities::default();
445
446        let handler = LambdaMcpHandler::new(
447            dispatcher,
448            session_storage,
449            stream_manager,
450            config,
451            StreamConfig::default(),
452            implementation,
453            capabilities,
454            false, // SSE disabled for test
455            #[cfg(feature = "cors")]
456            None,
457        );
458
459        // Test that handler was created successfully
460        assert!(!handler.sse_enabled);
461    }
462
463    #[tokio::test]
464    async fn test_sse_enabled_with_handle_works() {
465        let session_storage = Arc::new(InMemorySessionStorage::new());
466        let stream_manager = Arc::new(StreamManager::new(session_storage.clone()));
467        let dispatcher = JsonRpcDispatcher::new();
468        let config = ServerConfig::default();
469        let implementation = turul_mcp_protocol::Implementation::new("test", "1.0.0");
470        let capabilities = ServerCapabilities::default();
471
472        // Create handler with SSE enabled
473        let handler = LambdaMcpHandler::new(
474            dispatcher,
475            session_storage,
476            stream_manager,
477            config,
478            StreamConfig::default(),
479            implementation,
480            capabilities,
481            true, // SSE enabled - should work with handle() for snapshot-based SSE
482            #[cfg(feature = "cors")]
483            None,
484        );
485
486        // Create a test Lambda request
487        let lambda_req = Request::builder()
488            .method("POST")
489            .uri("/mcp")
490            .body(LambdaBody::Text(
491                r#"{"jsonrpc":"2.0","method":"initialize","id":1}"#.to_string(),
492            ))
493            .unwrap();
494
495        // handle() should work (provides snapshot-based SSE rather than real-time streaming)
496        let result = handler.handle(lambda_req).await;
497        assert!(
498            result.is_ok(),
499            "handle() should work with SSE enabled for snapshot-based responses"
500        );
501    }
502
503    /// Test that verifies StreamConfig is properly threaded through the delegation
504    #[tokio::test]
505    async fn test_stream_config_preservation() {
506        let session_storage = Arc::new(InMemorySessionStorage::new());
507        let dispatcher = JsonRpcDispatcher::new();
508        let config = ServerConfig::default();
509        let implementation = turul_mcp_protocol::Implementation::new("test", "1.0.0");
510        let capabilities = ServerCapabilities::default();
511
512        // Create a custom StreamConfig with non-default values
513        let custom_stream_config = StreamConfig {
514            channel_buffer_size: 1024,      // Non-default value (default is 1000)
515            max_replay_events: 200,         // Non-default value (default is 100)
516            keepalive_interval_seconds: 10, // Non-default value (default is 30)
517            cors_origin: "https://custom-test.example.com".to_string(), // Non-default value
518        };
519
520        // Create stream manager with the custom config
521        let stream_manager = Arc::new(StreamManager::with_config(
522            session_storage.clone(),
523            custom_stream_config.clone(),
524        ));
525
526        let handler = LambdaMcpHandler::new(
527            dispatcher,
528            session_storage,
529            stream_manager,
530            config,
531            custom_stream_config.clone(),
532            implementation,
533            capabilities,
534            false, // SSE disabled for test
535            #[cfg(feature = "cors")]
536            None,
537        );
538
539        // The handler should be created successfully, proving the StreamConfig was accepted
540        assert!(!handler.sse_enabled);
541
542        // Verify that the stream manager has the custom configuration
543        let stream_manager = handler.get_stream_manager();
544
545        // Verify the StreamConfig values were propagated correctly
546        let actual_config = stream_manager.get_config();
547
548        assert_eq!(
549            actual_config.channel_buffer_size, custom_stream_config.channel_buffer_size,
550            "Custom channel_buffer_size was not propagated correctly"
551        );
552        assert_eq!(
553            actual_config.max_replay_events, custom_stream_config.max_replay_events,
554            "Custom max_replay_events was not propagated correctly"
555        );
556        assert_eq!(
557            actual_config.keepalive_interval_seconds,
558            custom_stream_config.keepalive_interval_seconds,
559            "Custom keepalive_interval_seconds was not propagated correctly"
560        );
561        assert_eq!(
562            actual_config.cors_origin, custom_stream_config.cors_origin,
563            "Custom cors_origin was not propagated correctly"
564        );
565
566        // Verify the stream manager is accessible (proves delegation worked)
567        assert!(Arc::strong_count(stream_manager) >= 1);
568    }
569
570    /// Test the full builder → server → handler chain with StreamConfig
571    #[tokio::test]
572    async fn test_full_builder_chain_stream_config() {
573        use crate::LambdaMcpServerBuilder;
574        use turul_mcp_session_storage::InMemorySessionStorage;
575
576        // Create a custom StreamConfig with non-default values
577        let custom_stream_config = turul_http_mcp_server::StreamConfig {
578            channel_buffer_size: 2048,      // Non-default value
579            max_replay_events: 500,         // Non-default value
580            keepalive_interval_seconds: 15, // Non-default value
581            cors_origin: "https://full-chain-test.example.com".to_string(),
582        };
583
584        // Test the complete builder → server → handler chain
585        let server = LambdaMcpServerBuilder::new()
586            .name("full-chain-test")
587            .version("1.0.0")
588            .storage(Arc::new(InMemorySessionStorage::new()))
589            .sse(true) // Enable SSE to test streaming functionality
590            .stream_config(custom_stream_config.clone())
591            .build()
592            .await
593            .expect("Server should build successfully");
594
595        // Create handler from server (this is the critical chain step)
596        let handler = server
597            .handler()
598            .await
599            .expect("Handler should be created from server");
600
601        // Verify the handler was created successfully
602        assert!(handler.sse_enabled, "SSE should be enabled");
603
604        // Verify that the custom StreamConfig was preserved through the entire chain
605        let stream_manager = handler.get_stream_manager();
606        let actual_config = stream_manager.get_config();
607
608        assert_eq!(
609            actual_config.channel_buffer_size, custom_stream_config.channel_buffer_size,
610            "Custom channel_buffer_size should be preserved through builder → server → handler chain"
611        );
612        assert_eq!(
613            actual_config.max_replay_events, custom_stream_config.max_replay_events,
614            "Custom max_replay_events should be preserved through builder → server → handler chain"
615        );
616        assert_eq!(
617            actual_config.keepalive_interval_seconds,
618            custom_stream_config.keepalive_interval_seconds,
619            "Custom keepalive_interval_seconds should be preserved through builder → server → handler chain"
620        );
621        assert_eq!(
622            actual_config.cors_origin, custom_stream_config.cors_origin,
623            "Custom cors_origin should be preserved through builder → server → handler chain"
624        );
625
626        // Verify the stream manager is functional
627        assert!(
628            Arc::strong_count(stream_manager) >= 1,
629            "Stream manager should be properly initialized"
630        );
631
632        // Additional verification: Test that the configuration is actually used functionally
633        // by verifying the stream manager can be used with the custom configuration
634        let test_session_id = uuid::Uuid::now_v7().as_simple().to_string();
635
636        // The stream manager should be able to handle session operations with the custom config
637        // This verifies the config isn't just preserved but actually used
638        let subscriptions = stream_manager.get_subscriptions(&test_session_id).await;
639        assert!(
640            subscriptions.is_empty(),
641            "New session should have no subscriptions initially"
642        );
643
644        // Verify the stream manager was constructed with our custom config values
645        // This confirms the config propagated through the entire builder → server → handler chain
646        assert_eq!(
647            stream_manager.get_config().channel_buffer_size,
648            2048,
649            "Stream manager should be using the custom buffer size functionally"
650        );
651    }
652
653    /// Test matrix: 4 combinations of streaming runtime vs SSE configuration
654    /// This ensures we don't have runtime hangs or configuration conflicts
655    ///
656    /// Test 1: Non-streaming runtime + sse(false) - This should work (snapshot mode)
657    #[tokio::test]
658    async fn test_non_streaming_runtime_sse_false() {
659        use crate::LambdaMcpServerBuilder;
660        use turul_mcp_session_storage::InMemorySessionStorage;
661
662        let server = LambdaMcpServerBuilder::new()
663            .name("test-non-streaming-sse-false")
664            .version("1.0.0")
665            .storage(Arc::new(InMemorySessionStorage::new()))
666            .sse(false) // Disable SSE for non-streaming runtime
667            .build()
668            .await
669            .expect("Server should build successfully");
670
671        let handler = server
672            .handler()
673            .await
674            .expect("Handler should be created from server");
675
676        // Verify configuration
677        assert!(!handler.sse_enabled, "SSE should be disabled");
678
679        // Create a test request (POST /mcp works in all configs)
680        let lambda_req = Request::builder()
681            .method("POST")
682            .uri("/mcp")
683            .body(LambdaBody::Text(
684                r#"{"jsonrpc":"2.0","method":"initialize","id":1}"#.to_string(),
685            ))
686            .unwrap();
687
688        // This should work without hanging
689        let result = handler.handle(lambda_req).await;
690        assert!(
691            result.is_ok(),
692            "POST /mcp should work with non-streaming + sse(false)"
693        );
694    }
695
696    /// Test 2: Non-streaming runtime + sse(true) - This should work (snapshot-based SSE)
697    #[tokio::test]
698    async fn test_non_streaming_runtime_sse_true() {
699        use crate::LambdaMcpServerBuilder;
700        use turul_mcp_session_storage::InMemorySessionStorage;
701
702        let server = LambdaMcpServerBuilder::new()
703            .name("test-non-streaming-sse-true")
704            .version("1.0.0")
705            .storage(Arc::new(InMemorySessionStorage::new()))
706            .sse(true) // Enable SSE for snapshot-based responses
707            .build()
708            .await
709            .expect("Server should build successfully");
710
711        let handler = server
712            .handler()
713            .await
714            .expect("Handler should be created from server");
715
716        // Verify configuration
717        assert!(handler.sse_enabled, "SSE should be enabled");
718
719        // Create a test request (POST /mcp works in all configs)
720        let lambda_req = Request::builder()
721            .method("POST")
722            .uri("/mcp")
723            .body(LambdaBody::Text(
724                r#"{"jsonrpc":"2.0","method":"initialize","id":1}"#.to_string(),
725            ))
726            .unwrap();
727
728        // This should work without hanging (provides snapshot-based SSE)
729        let result = handler.handle(lambda_req).await;
730        assert!(
731            result.is_ok(),
732            "POST /mcp should work with non-streaming + sse(true)"
733        );
734
735        // Note: GET /mcp would provide snapshot events, not real-time streaming
736        // This is the key difference from handle_streaming()
737    }
738
739    /// Test 3: Streaming runtime + sse(false) - This should work (SSE disabled)
740    #[tokio::test]
741    async fn test_streaming_runtime_sse_false() {
742        use crate::LambdaMcpServerBuilder;
743        use turul_mcp_session_storage::InMemorySessionStorage;
744
745        let server = LambdaMcpServerBuilder::new()
746            .name("test-streaming-sse-false")
747            .version("1.0.0")
748            .storage(Arc::new(InMemorySessionStorage::new()))
749            .sse(false) // Disable SSE even with streaming runtime
750            .build()
751            .await
752            .expect("Server should build successfully");
753
754        let handler = server
755            .handler()
756            .await
757            .expect("Handler should be created from server");
758
759        // Verify configuration
760        assert!(!handler.sse_enabled, "SSE should be disabled");
761
762        // Create a test request for streaming handler
763        let lambda_req = Request::builder()
764            .method("POST")
765            .uri("/mcp")
766            .body(LambdaBody::Text(
767                r#"{"jsonrpc":"2.0","method":"initialize","id":1}"#.to_string(),
768            ))
769            .unwrap();
770
771        // This should work with streaming runtime even when SSE is disabled
772        let result = handler.handle_streaming(lambda_req).await;
773        assert!(
774            result.is_ok(),
775            "Streaming runtime should work with sse(false)"
776        );
777    }
778
779    /// Test 4: Streaming runtime + sse(true) - This should work (real-time SSE streaming)
780    #[tokio::test]
781    async fn test_streaming_runtime_sse_true() {
782        use crate::LambdaMcpServerBuilder;
783        use turul_mcp_session_storage::InMemorySessionStorage;
784
785        let server = LambdaMcpServerBuilder::new()
786            .name("test-streaming-sse-true")
787            .version("1.0.0")
788            .storage(Arc::new(InMemorySessionStorage::new()))
789            .sse(true) // Enable SSE with streaming runtime for real-time streaming
790            .build()
791            .await
792            .expect("Server should build successfully");
793
794        let handler = server
795            .handler()
796            .await
797            .expect("Handler should be created from server");
798
799        // Verify configuration
800        assert!(handler.sse_enabled, "SSE should be enabled");
801
802        // Create a test request for streaming handler
803        let lambda_req = Request::builder()
804            .method("POST")
805            .uri("/mcp")
806            .body(LambdaBody::Text(
807                r#"{"jsonrpc":"2.0","method":"initialize","id":1}"#.to_string(),
808            ))
809            .unwrap();
810
811        // This should work and provide real-time SSE streaming
812        let result = handler.handle_streaming(lambda_req).await;
813        assert!(
814            result.is_ok(),
815            "Streaming runtime should work with sse(true) for real-time streaming"
816        );
817
818        // Note: GET /mcp would provide real-time streaming events
819        // This is the optimal configuration for real-time notifications
820    }
821
822    // ── Strict lifecycle tests over handle_streaming() ────────────────
823
824    /// Helper: build a Lambda handler with strict lifecycle and a test tool via the builder.
825    async fn build_strict_streaming_handler() -> LambdaMcpHandler {
826        use crate::LambdaMcpServerBuilder;
827        use turul_mcp_session_storage::InMemorySessionStorage;
828
829        let server = LambdaMcpServerBuilder::new()
830            .name("lifecycle-test")
831            .version("1.0.0")
832            .tool(LifecycleTestTool)
833            .storage(Arc::new(InMemorySessionStorage::new()))
834            .strict_lifecycle(true) // explicit — survives default changes
835            .sse(true)
836            .build()
837            .await
838            .expect("build should succeed");
839
840        server.handler().await.expect("handler should succeed")
841    }
842
843    // Test tool for lifecycle tests — satisfies all required traits
844    #[derive(Clone, Default)]
845    struct LifecycleTestTool;
846
847    impl turul_mcp_builders::traits::HasBaseMetadata for LifecycleTestTool {
848        fn name(&self) -> &str {
849            "ping_tool"
850        }
851    }
852    impl turul_mcp_builders::traits::HasDescription for LifecycleTestTool {
853        fn description(&self) -> Option<&str> {
854            Some("test tool")
855        }
856    }
857    impl turul_mcp_builders::traits::HasInputSchema for LifecycleTestTool {
858        fn input_schema(&self) -> &turul_mcp_protocol::ToolSchema {
859            static SCHEMA: std::sync::OnceLock<turul_mcp_protocol::ToolSchema> =
860                std::sync::OnceLock::new();
861            SCHEMA.get_or_init(turul_mcp_protocol::ToolSchema::object)
862        }
863    }
864    impl turul_mcp_builders::traits::HasOutputSchema for LifecycleTestTool {
865        fn output_schema(&self) -> Option<&turul_mcp_protocol::ToolSchema> {
866            None
867        }
868    }
869    impl turul_mcp_builders::traits::HasAnnotations for LifecycleTestTool {
870        fn annotations(&self) -> Option<&turul_mcp_protocol::tools::ToolAnnotations> {
871            None
872        }
873    }
874    impl turul_mcp_builders::traits::HasToolMeta for LifecycleTestTool {
875        fn tool_meta(&self) -> Option<&std::collections::HashMap<String, serde_json::Value>> {
876            None
877        }
878    }
879    impl turul_mcp_builders::traits::HasIcons for LifecycleTestTool {}
880    impl turul_mcp_builders::traits::HasExecution for LifecycleTestTool {}
881
882    #[async_trait::async_trait]
883    impl turul_mcp_server::McpTool for LifecycleTestTool {
884        async fn call(
885            &self,
886            _args: serde_json::Value,
887            _session: Option<turul_mcp_server::SessionContext>,
888        ) -> turul_mcp_server::McpResult<turul_mcp_protocol::tools::CallToolResult> {
889            Ok(turul_mcp_protocol::tools::CallToolResult::success(vec![
890                turul_mcp_protocol::tools::ToolResult::text("pong"),
891            ]))
892        }
893    }
894
895    /// Helper: create a Lambda POST request for handle_streaming()
896    fn streaming_mcp_request(body: &str, session_id: Option<&str>) -> LambdaRequest {
897        let mut builder = Request::builder()
898            .method("POST")
899            .uri("/mcp")
900            .header("Content-Type", "application/json")
901            .header("Accept", "application/json, text/event-stream")
902            .header("MCP-Protocol-Version", "2025-11-25");
903
904        if let Some(sid) = session_id {
905            builder = builder.header("Mcp-Session-Id", sid);
906        }
907
908        builder.body(LambdaBody::Text(body.to_string())).unwrap()
909    }
910
911    /// Helper: collect streaming response body into a string
912    async fn collect_streaming_body(
913        response: lambda_http::Response<
914            http_body_util::combinators::UnsyncBoxBody<bytes::Bytes, hyper::Error>,
915        >,
916    ) -> (http::StatusCode, String) {
917        use http_body_util::BodyExt;
918        let status = response.status();
919        let session_id = response
920            .headers()
921            .get("Mcp-Session-Id")
922            .and_then(|v| v.to_str().ok())
923            .map(String::from);
924        let body_bytes = response
925            .into_body()
926            .collect()
927            .await
928            .map(|c| c.to_bytes())
929            .unwrap_or_default();
930        let body_str = String::from_utf8_lossy(&body_bytes).to_string();
931        let _ = session_id; // available if needed
932        (status, body_str)
933    }
934
935    /// Helper: extract session ID from a streaming response
936    fn extract_session_id(
937        response: &lambda_http::Response<
938            http_body_util::combinators::UnsyncBoxBody<bytes::Bytes, hyper::Error>,
939        >,
940    ) -> Option<String> {
941        response
942            .headers()
943            .get("Mcp-Session-Id")
944            .and_then(|v| v.to_str().ok())
945            .map(String::from)
946    }
947
948    /// Helper: parse JSON from a response body (handles SSE "data: " prefix)
949    fn parse_response_json(body: &str) -> serde_json::Value {
950        // Strip SSE framing if present
951        let json_str = body
952            .lines()
953            .find(|line| line.starts_with("data: "))
954            .map(|line| &line[6..])
955            .unwrap_or(body.trim());
956        serde_json::from_str(json_str)
957            .unwrap_or_else(|e| panic!("Failed to parse JSON from body: {e}\nBody: {body}"))
958    }
959
960    /// P0: Full strict lifecycle handshake succeeds on handle_streaming()
961    #[tokio::test]
962    async fn test_lambda_streaming_strict_handshake_succeeds() {
963        let handler = build_strict_streaming_handler().await;
964
965        // Step 1: initialize
966        let init_req = streaming_mcp_request(
967            &serde_json::json!({
968                "jsonrpc": "2.0", "method": "initialize", "id": 1,
969                "params": {
970                    "protocolVersion": "2025-11-25",
971                    "capabilities": {},
972                    "clientInfo": { "name": "test", "version": "1.0.0" }
973                }
974            })
975            .to_string(),
976            None,
977        );
978        let init_resp = handler
979            .handle_streaming(init_req)
980            .await
981            .expect("initialize should succeed");
982        let session_id = extract_session_id(&init_resp).expect("must return session ID");
983        let (status, _body) = collect_streaming_body(init_resp).await;
984        assert_eq!(status, 200, "initialize should return 200");
985
986        // Step 2: notifications/initialized
987        let notif_req = streaming_mcp_request(
988            &serde_json::json!({
989                "jsonrpc": "2.0",
990                "method": "notifications/initialized",
991                "params": {}
992            })
993            .to_string(),
994            Some(&session_id),
995        );
996        let notif_resp = handler
997            .handle_streaming(notif_req)
998            .await
999            .expect("notification should succeed");
1000        let (status, _) = collect_streaming_body(notif_resp).await;
1001        assert_eq!(status, 202, "notifications/initialized should return 202");
1002
1003        // Step 3: tools/list
1004        let list_req = streaming_mcp_request(
1005            &serde_json::json!({
1006                "jsonrpc": "2.0", "method": "tools/list", "id": 2
1007            })
1008            .to_string(),
1009            Some(&session_id),
1010        );
1011        let list_resp = handler
1012            .handle_streaming(list_req)
1013            .await
1014            .expect("tools/list should succeed");
1015        let (status, body) = collect_streaming_body(list_resp).await;
1016        assert_eq!(status, 200, "tools/list should return 200");
1017        let json = parse_response_json(&body);
1018        assert!(
1019            json["result"]["tools"].is_array(),
1020            "tools/list should return tools array: {json}"
1021        );
1022
1023        // Step 4: tools/call
1024        let call_req = streaming_mcp_request(
1025            &serde_json::json!({
1026                "jsonrpc": "2.0", "method": "tools/call", "id": 3,
1027                "params": { "name": "ping_tool", "arguments": {} }
1028            })
1029            .to_string(),
1030            Some(&session_id),
1031        );
1032        let call_resp = handler
1033            .handle_streaming(call_req)
1034            .await
1035            .expect("tools/call should succeed");
1036        let (status, body) = collect_streaming_body(call_resp).await;
1037        assert_eq!(status, 200, "tools/call should return 200");
1038        let json = parse_response_json(&body);
1039        assert!(
1040            json["result"].is_object(),
1041            "tools/call should return result: {json}"
1042        );
1043    }
1044
1045    /// P0: Strict lifecycle rejects both tools/list and tools/call before notifications/initialized
1046    #[tokio::test]
1047    async fn test_lambda_streaming_strict_rejects_before_initialized() {
1048        let handler = build_strict_streaming_handler().await;
1049
1050        // Initialize to get session
1051        let init_req = streaming_mcp_request(
1052            &serde_json::json!({
1053                "jsonrpc": "2.0", "method": "initialize", "id": 1,
1054                "params": {
1055                    "protocolVersion": "2025-11-25",
1056                    "capabilities": {},
1057                    "clientInfo": { "name": "test", "version": "1.0.0" }
1058                }
1059            })
1060            .to_string(),
1061            None,
1062        );
1063        let init_resp = handler.handle_streaming(init_req).await.unwrap();
1064        let session_id = extract_session_id(&init_resp).unwrap();
1065        let _ = collect_streaming_body(init_resp).await;
1066
1067        // tools/list without notifications/initialized — must fail
1068        let list_req = streaming_mcp_request(
1069            &serde_json::json!({
1070                "jsonrpc": "2.0", "method": "tools/list", "id": 2
1071            })
1072            .to_string(),
1073            Some(&session_id),
1074        );
1075        let list_resp = handler.handle_streaming(list_req).await.unwrap();
1076        let (_, body) = collect_streaming_body(list_resp).await;
1077        let json = parse_response_json(&body);
1078        assert!(
1079            json["error"].is_object(),
1080            "tools/list should return JSON-RPC error: {json}"
1081        );
1082        assert_eq!(
1083            json["error"]["code"].as_i64().unwrap(),
1084            -32031,
1085            "tools/list must return SessionError code -32031, got: {json}"
1086        );
1087        assert!(
1088            json["error"]["message"]
1089                .as_str()
1090                .unwrap()
1091                .contains("notifications/initialized"),
1092            "Error must mention notifications/initialized: {}",
1093            json["error"]["message"]
1094        );
1095
1096        // tools/call without notifications/initialized — must also fail
1097        let call_req = streaming_mcp_request(
1098            &serde_json::json!({
1099                "jsonrpc": "2.0", "method": "tools/call", "id": 3,
1100                "params": { "name": "ping_tool", "arguments": {} }
1101            })
1102            .to_string(),
1103            Some(&session_id),
1104        );
1105        let call_resp = handler.handle_streaming(call_req).await.unwrap();
1106        let (_, body) = collect_streaming_body(call_resp).await;
1107        let json = parse_response_json(&body);
1108        assert!(
1109            json["error"].is_object(),
1110            "tools/call should return JSON-RPC error: {json}"
1111        );
1112        assert_eq!(
1113            json["error"]["code"].as_i64().unwrap(),
1114            -32031,
1115            "tools/call must return SessionError code -32031, got: {json}"
1116        );
1117        assert!(
1118            json["error"]["message"]
1119                .as_str()
1120                .unwrap()
1121                .contains("notifications/initialized"),
1122            "Error must mention notifications/initialized: {}",
1123            json["error"]["message"]
1124        );
1125    }
1126
1127    /// P0: tools/list succeeds immediately after notifications/initialized (race fix proof)
1128    #[tokio::test]
1129    async fn test_lambda_streaming_initialized_is_effective_immediately() {
1130        let handler = build_strict_streaming_handler().await;
1131
1132        // Initialize
1133        let init_req = streaming_mcp_request(
1134            &serde_json::json!({
1135                "jsonrpc": "2.0", "method": "initialize", "id": 1,
1136                "params": {
1137                    "protocolVersion": "2025-11-25",
1138                    "capabilities": {},
1139                    "clientInfo": { "name": "test", "version": "1.0.0" }
1140                }
1141            })
1142            .to_string(),
1143            None,
1144        );
1145        let init_resp = handler.handle_streaming(init_req).await.unwrap();
1146        let session_id = extract_session_id(&init_resp).unwrap();
1147        let _ = collect_streaming_body(init_resp).await;
1148
1149        // notifications/initialized
1150        let notif_req = streaming_mcp_request(
1151            &serde_json::json!({
1152                "jsonrpc": "2.0",
1153                "method": "notifications/initialized",
1154                "params": {}
1155            })
1156            .to_string(),
1157            Some(&session_id),
1158        );
1159        let notif_resp = handler.handle_streaming(notif_req).await.unwrap();
1160        let (status, _) = collect_streaming_body(notif_resp).await;
1161        assert_eq!(status, 202);
1162
1163        // Immediately — no delay — send tools/list
1164        let list_req = streaming_mcp_request(
1165            &serde_json::json!({
1166                "jsonrpc": "2.0", "method": "tools/list", "id": 2
1167            })
1168            .to_string(),
1169            Some(&session_id),
1170        );
1171        let list_resp = handler.handle_streaming(list_req).await.unwrap();
1172        let (status, body) = collect_streaming_body(list_resp).await;
1173        assert_eq!(
1174            status, 200,
1175            "tools/list must succeed immediately after initialized"
1176        );
1177        let json = parse_response_json(&body);
1178        assert!(
1179            json["result"]["tools"].is_array(),
1180            "Must return tools list, not error: {json}"
1181        );
1182    }
1183
1184    /// P1: Lenient mode allows operations without notifications/initialized
1185    #[tokio::test]
1186    async fn test_lambda_streaming_lenient_mode_allows_without_initialized() {
1187        use crate::LambdaMcpServerBuilder;
1188        use turul_mcp_session_storage::InMemorySessionStorage;
1189
1190        let server = LambdaMcpServerBuilder::new()
1191            .name("lenient-test")
1192            .version("1.0.0")
1193            .tool(LifecycleTestTool)
1194            .storage(Arc::new(InMemorySessionStorage::new()))
1195            .strict_lifecycle(false) // lenient mode
1196            .sse(true)
1197            .build()
1198            .await
1199            .unwrap();
1200
1201        let handler = server.handler().await.unwrap();
1202
1203        // Initialize (no notifications/initialized)
1204        let init_req = streaming_mcp_request(
1205            &serde_json::json!({
1206                "jsonrpc": "2.0", "method": "initialize", "id": 1,
1207                "params": {
1208                    "protocolVersion": "2025-11-25",
1209                    "capabilities": {},
1210                    "clientInfo": { "name": "test", "version": "1.0.0" }
1211                }
1212            })
1213            .to_string(),
1214            None,
1215        );
1216        let init_resp = handler.handle_streaming(init_req).await.unwrap();
1217        let session_id = extract_session_id(&init_resp).unwrap();
1218        let _ = collect_streaming_body(init_resp).await;
1219
1220        // Skip notifications/initialized — go straight to tools/list
1221        let list_req = streaming_mcp_request(
1222            &serde_json::json!({
1223                "jsonrpc": "2.0", "method": "tools/list", "id": 2
1224            })
1225            .to_string(),
1226            Some(&session_id),
1227        );
1228        let list_resp = handler.handle_streaming(list_req).await.unwrap();
1229        let (status, body) = collect_streaming_body(list_resp).await;
1230        assert_eq!(
1231            status, 200,
1232            "Lenient mode should allow tools/list without initialized"
1233        );
1234        let json = parse_response_json(&body);
1235        assert!(
1236            json["result"]["tools"].is_array(),
1237            "Must return tools list in lenient mode: {json}"
1238        );
1239    }
1240}