Skip to main content

turul_mcp_aws_lambda/
server.rs

1//! Lambda MCP Server Implementation
2//!
3//! This module provides the main Lambda MCP server implementation that mirrors
4//! the architecture of turul-mcp-server but adapted for AWS Lambda deployment.
5
6use std::collections::HashMap;
7use std::sync::Arc;
8
9use tracing::info;
10
11use turul_http_mcp_server::{ServerConfig, StreamConfig, StreamManager};
12use turul_mcp_protocol::{Implementation, ServerCapabilities};
13use turul_mcp_server::{
14    McpCompletion, McpElicitation, McpLogger, McpNotification, McpPrompt, McpResource, McpRoot,
15    McpSampling, McpTool, handlers::McpHandler, session::SessionManager,
16};
17use turul_mcp_session_storage::BoxedSessionStorage;
18
19use crate::error::Result;
20use crate::handler::LambdaMcpHandler;
21
22#[cfg(feature = "cors")]
23use crate::cors::CorsConfig;
24
25/// Main Lambda MCP server
26///
27/// This server stores all configuration and can create Lambda handlers when needed.
28/// It mirrors the architecture of McpServer but is designed for Lambda deployment.
29#[allow(dead_code)]
30pub struct LambdaMcpServer {
31    /// Server implementation information
32    pub implementation: Implementation,
33    /// Server capabilities
34    pub capabilities: ServerCapabilities,
35    /// Registered tools
36    tools: HashMap<String, Arc<dyn McpTool>>,
37    /// Registered resources
38    resources: HashMap<String, Arc<dyn McpResource>>,
39    /// Registered prompts
40    prompts: HashMap<String, Arc<dyn McpPrompt>>,
41    /// Registered elicitations
42    elicitations: HashMap<String, Arc<dyn McpElicitation>>,
43    /// Registered sampling providers
44    sampling: HashMap<String, Arc<dyn McpSampling>>,
45    /// Registered completion providers
46    completions: HashMap<String, Arc<dyn McpCompletion>>,
47    /// Registered loggers
48    loggers: HashMap<String, Arc<dyn McpLogger>>,
49    /// Registered root providers
50    root_providers: HashMap<String, Arc<dyn McpRoot>>,
51    /// Registered notification providers
52    notifications: HashMap<String, Arc<dyn McpNotification>>,
53    /// Registered handlers
54    handlers: HashMap<String, Arc<dyn McpHandler>>,
55    /// Configured roots
56    roots: Vec<turul_mcp_protocol::roots::Root>,
57    /// Optional client instructions
58    instructions: Option<String>,
59    /// Session manager for state persistence
60    session_manager: Arc<SessionManager>,
61    /// Session storage backend (shared between SessionManager and handler)
62    session_storage: Arc<BoxedSessionStorage>,
63    /// Strict MCP lifecycle enforcement
64    strict_lifecycle: bool,
65    /// Server configuration
66    server_config: ServerConfig,
67    /// Enable SSE streaming
68    enable_sse: bool,
69    /// Stream configuration
70    stream_config: StreamConfig,
71    /// CORS configuration (if enabled)
72    #[cfg(feature = "cors")]
73    cors_config: Option<CorsConfig>,
74    /// Middleware stack for request/response interception
75    middleware_stack: turul_http_mcp_server::middleware::MiddlewareStack,
76    /// Custom route registry (e.g., .well-known endpoints)
77    route_registry: Arc<turul_http_mcp_server::RouteRegistry>,
78    /// Optional task runtime for MCP task support
79    task_runtime: Option<Arc<turul_mcp_server::TaskRuntime>>,
80}
81
82impl LambdaMcpServer {
83    /// Create a new Lambda MCP server (use builder instead)
84    #[allow(clippy::too_many_arguments)]
85    pub(crate) fn new(
86        implementation: Implementation,
87        capabilities: ServerCapabilities,
88        tools: HashMap<String, Arc<dyn McpTool>>,
89        resources: HashMap<String, Arc<dyn McpResource>>,
90        prompts: HashMap<String, Arc<dyn McpPrompt>>,
91        elicitations: HashMap<String, Arc<dyn McpElicitation>>,
92        sampling: HashMap<String, Arc<dyn McpSampling>>,
93        completions: HashMap<String, Arc<dyn McpCompletion>>,
94        loggers: HashMap<String, Arc<dyn McpLogger>>,
95        root_providers: HashMap<String, Arc<dyn McpRoot>>,
96        notifications: HashMap<String, Arc<dyn McpNotification>>,
97        handlers: HashMap<String, Arc<dyn McpHandler>>,
98        roots: Vec<turul_mcp_protocol::roots::Root>,
99        instructions: Option<String>,
100        session_storage: Arc<BoxedSessionStorage>,
101        strict_lifecycle: bool,
102        server_config: ServerConfig,
103        enable_sse: bool,
104        stream_config: StreamConfig,
105        #[cfg(feature = "cors")] cors_config: Option<CorsConfig>,
106        middleware_stack: turul_http_mcp_server::middleware::MiddlewareStack,
107        route_registry: Arc<turul_http_mcp_server::RouteRegistry>,
108        task_runtime: Option<Arc<turul_mcp_server::TaskRuntime>>,
109    ) -> Self {
110        // Create session manager with server capabilities
111        let session_manager = Arc::new(SessionManager::with_storage_and_timeouts(
112            Arc::clone(&session_storage),
113            capabilities.clone(),
114            std::time::Duration::from_secs(30 * 60), // 30 minutes default
115            std::time::Duration::from_secs(60),      // 1 minute cleanup interval
116        ));
117
118        Self {
119            implementation,
120            capabilities,
121            tools,
122            resources,
123            prompts,
124            elicitations,
125            sampling,
126            completions,
127            loggers,
128            root_providers,
129            notifications,
130            handlers,
131            roots,
132            instructions,
133            session_manager,
134            session_storage,
135            strict_lifecycle,
136            server_config,
137            enable_sse,
138            stream_config,
139            #[cfg(feature = "cors")]
140            cors_config,
141            middleware_stack,
142            route_registry,
143            task_runtime,
144        }
145    }
146
147    /// Get a reference to the server capabilities.
148    pub fn capabilities(&self) -> &ServerCapabilities {
149        &self.capabilities
150    }
151
152    /// Create a Lambda handler ready for use with Lambda runtime
153    ///
154    /// This is equivalent to McpServer::run_http() but creates a handler instead of running a server.
155    pub async fn handler(&self) -> Result<LambdaMcpHandler> {
156        info!(
157            "Creating Lambda MCP handler: {} v{}",
158            self.implementation.name, self.implementation.version
159        );
160        info!("Session management: enabled with automatic cleanup");
161
162        if self.enable_sse {
163            info!("SSE notifications: enabled for Lambda responses");
164
165            // ⚠️ GUARDRAIL: SSE enabled without streaming feature
166            #[cfg(not(feature = "streaming"))]
167            {
168                use tracing::warn;
169                warn!("⚠️  SSE is enabled but 'streaming' feature is not available!");
170                warn!(
171                    "   For real SSE streaming, use handle_streaming() with run_with_streaming_response"
172                );
173                warn!(
174                    "   Current handle() method will return SSE snapshots, not real-time streams"
175                );
176                warn!("   To enable streaming: add 'streaming' feature to turul-mcp-aws-lambda");
177            }
178        }
179
180        // Start session cleanup task (same as MCP server)
181        let _cleanup_task = self.session_manager.clone().start_cleanup_task();
182
183        // Cold-start recovery: handler() is called once per Lambda cold start from main().
184        // The returned LambdaMcpHandler is Clone'd for each request — recovery runs exactly once.
185        if let Some(ref runtime) = self.task_runtime {
186            match runtime.recover_stuck_tasks().await {
187                Ok(recovered) if !recovered.is_empty() => {
188                    info!(
189                        count = recovered.len(),
190                        "Recovered stuck tasks from previous invocations"
191                    );
192                }
193                Err(e) => {
194                    use tracing::warn;
195                    warn!(error = %e, "Failed to recover stuck tasks on startup");
196                }
197                _ => {}
198            }
199        }
200
201        // Create stream manager for SSE
202        let stream_manager = Arc::new(StreamManager::with_config(
203            self.session_storage.clone(),
204            self.stream_config.clone(),
205        ));
206
207        // Create JSON-RPC dispatcher
208        use turul_mcp_json_rpc_server::JsonRpcDispatcher;
209        let mut dispatcher = JsonRpcDispatcher::new();
210
211        // Create session-aware initialize handler (reuse MCP server handler)
212        use turul_mcp_server::SessionAwareInitializeHandler;
213        let init_handler = SessionAwareInitializeHandler::new(
214            self.implementation.clone(),
215            self.capabilities.clone(),
216            self.instructions.clone(),
217            self.session_manager.clone(),
218            self.strict_lifecycle,
219        );
220        dispatcher.register_method("initialize".to_string(), init_handler);
221
222        // Create session-aware tools/list handler (reuse MCP server handler)
223        use turul_mcp_server::ListToolsHandler;
224        let list_handler = ListToolsHandler::new_with_session_manager(
225            self.tools.clone(),
226            self.session_manager.clone(),
227            self.strict_lifecycle,
228            self.task_runtime.is_some(),
229        );
230        dispatcher.register_method("tools/list".to_string(), list_handler);
231
232        // Create session-aware tool handler for tools/call (reuse MCP server handler)
233        use turul_mcp_server::SessionAwareToolHandler;
234        let mut tool_handler = SessionAwareToolHandler::new(
235            self.tools.clone(),
236            self.session_manager.clone(),
237            self.strict_lifecycle,
238        );
239        if let Some(ref runtime) = self.task_runtime {
240            tool_handler = tool_handler.with_task_runtime(Arc::clone(runtime));
241        }
242        dispatcher.register_method("tools/call".to_string(), tool_handler);
243
244        // Register all MCP handlers with session awareness (reuse MCP server bridge)
245        use turul_mcp_server::SessionAwareMcpHandlerBridge;
246        for (method, handler) in &self.handlers {
247            let bridge_handler = SessionAwareMcpHandlerBridge::new(
248                handler.clone(),
249                self.session_manager.clone(),
250                self.strict_lifecycle,
251            );
252            dispatcher.register_method(method.clone(), bridge_handler);
253        }
254
255        // Register notifications/initialized handler — required for strict lifecycle.
256        // Without this, clients can never complete the MCP handshake.
257        use turul_mcp_server::handlers::InitializedNotificationHandler;
258        let initialized_handler = InitializedNotificationHandler::new(self.session_manager.clone());
259        let initialized_bridge = SessionAwareMcpHandlerBridge::new(
260            Arc::new(initialized_handler),
261            self.session_manager.clone(),
262            self.strict_lifecycle,
263        );
264        dispatcher.register_method("notifications/initialized".to_string(), initialized_bridge);
265
266        // Create the Lambda handler with all components and middleware
267        let middleware_stack = Arc::new(self.middleware_stack.clone());
268
269        Ok(LambdaMcpHandler::with_middleware(
270            self.server_config.clone(),
271            Arc::new(dispatcher),
272            self.session_storage.clone(),
273            stream_manager,
274            self.stream_config.clone(),
275            self.capabilities.clone(),
276            middleware_stack,
277            self.enable_sse,
278            Arc::clone(&self.route_registry),
279        ))
280    }
281
282    /// Get information about the session storage backend
283    pub fn session_storage_info(&self) -> &str {
284        "Session storage configured"
285    }
286}