Skip to main content

turul_mcp_aws_lambda/
server.rs

1//! Lambda MCP Server Implementation
2//!
3//! This module provides the main Lambda MCP server implementation that mirrors
4//! the architecture of turul-mcp-server but adapted for AWS Lambda deployment.
5
6use std::collections::HashMap;
7use std::sync::Arc;
8
9use tracing::info;
10
11use turul_http_mcp_server::{ServerConfig, StreamConfig, StreamManager};
12use turul_mcp_protocol::{Implementation, ServerCapabilities};
13use turul_mcp_server::{
14    McpCompletion, McpElicitation, McpLogger, McpNotification, McpPrompt, McpResource, McpRoot,
15    McpSampling, McpTool, handlers::McpHandler, session::SessionManager,
16};
17use turul_mcp_session_storage::BoxedSessionStorage;
18
19use crate::error::Result;
20use crate::handler::LambdaMcpHandler;
21
22#[cfg(feature = "cors")]
23use crate::cors::CorsConfig;
24
25/// Main Lambda MCP server
26///
27/// This server stores all configuration and can create Lambda handlers when needed.
28/// It mirrors the architecture of McpServer but is designed for Lambda deployment.
29#[allow(dead_code)]
30pub struct LambdaMcpServer {
31    /// Server implementation information
32    pub implementation: Implementation,
33    /// Server capabilities
34    pub capabilities: ServerCapabilities,
35    /// Registered tools
36    tools: HashMap<String, Arc<dyn McpTool>>,
37    /// Registered resources
38    resources: HashMap<String, Arc<dyn McpResource>>,
39    /// Registered prompts
40    prompts: HashMap<String, Arc<dyn McpPrompt>>,
41    /// Registered elicitations
42    elicitations: HashMap<String, Arc<dyn McpElicitation>>,
43    /// Registered sampling providers
44    sampling: HashMap<String, Arc<dyn McpSampling>>,
45    /// Registered completion providers
46    completions: HashMap<String, Arc<dyn McpCompletion>>,
47    /// Registered loggers
48    loggers: HashMap<String, Arc<dyn McpLogger>>,
49    /// Registered root providers
50    root_providers: HashMap<String, Arc<dyn McpRoot>>,
51    /// Registered notification providers
52    notifications: HashMap<String, Arc<dyn McpNotification>>,
53    /// Registered handlers
54    handlers: HashMap<String, Arc<dyn McpHandler>>,
55    /// Configured roots
56    roots: Vec<turul_mcp_protocol::roots::Root>,
57    /// Optional client instructions
58    instructions: Option<String>,
59    /// Session manager for state persistence
60    session_manager: Arc<SessionManager>,
61    /// Session storage backend (shared between SessionManager and handler)
62    session_storage: Arc<BoxedSessionStorage>,
63    /// Strict MCP lifecycle enforcement
64    strict_lifecycle: bool,
65    /// Server configuration
66    server_config: ServerConfig,
67    /// Enable SSE streaming
68    enable_sse: bool,
69    /// Stream configuration
70    stream_config: StreamConfig,
71    /// CORS configuration (if enabled)
72    #[cfg(feature = "cors")]
73    cors_config: Option<CorsConfig>,
74    /// Middleware stack for request/response interception
75    middleware_stack: turul_http_mcp_server::middleware::MiddlewareStack,
76    /// Optional task runtime for MCP task support
77    task_runtime: Option<Arc<turul_mcp_server::TaskRuntime>>,
78}
79
80impl LambdaMcpServer {
81    /// Create a new Lambda MCP server (use builder instead)
82    #[allow(clippy::too_many_arguments)]
83    pub(crate) fn new(
84        implementation: Implementation,
85        capabilities: ServerCapabilities,
86        tools: HashMap<String, Arc<dyn McpTool>>,
87        resources: HashMap<String, Arc<dyn McpResource>>,
88        prompts: HashMap<String, Arc<dyn McpPrompt>>,
89        elicitations: HashMap<String, Arc<dyn McpElicitation>>,
90        sampling: HashMap<String, Arc<dyn McpSampling>>,
91        completions: HashMap<String, Arc<dyn McpCompletion>>,
92        loggers: HashMap<String, Arc<dyn McpLogger>>,
93        root_providers: HashMap<String, Arc<dyn McpRoot>>,
94        notifications: HashMap<String, Arc<dyn McpNotification>>,
95        handlers: HashMap<String, Arc<dyn McpHandler>>,
96        roots: Vec<turul_mcp_protocol::roots::Root>,
97        instructions: Option<String>,
98        session_storage: Arc<BoxedSessionStorage>,
99        strict_lifecycle: bool,
100        server_config: ServerConfig,
101        enable_sse: bool,
102        stream_config: StreamConfig,
103        #[cfg(feature = "cors")] cors_config: Option<CorsConfig>,
104        middleware_stack: turul_http_mcp_server::middleware::MiddlewareStack,
105        task_runtime: Option<Arc<turul_mcp_server::TaskRuntime>>,
106    ) -> Self {
107        // Create session manager with server capabilities
108        let session_manager = Arc::new(SessionManager::with_storage_and_timeouts(
109            Arc::clone(&session_storage),
110            capabilities.clone(),
111            std::time::Duration::from_secs(30 * 60), // 30 minutes default
112            std::time::Duration::from_secs(60),      // 1 minute cleanup interval
113        ));
114
115        Self {
116            implementation,
117            capabilities,
118            tools,
119            resources,
120            prompts,
121            elicitations,
122            sampling,
123            completions,
124            loggers,
125            root_providers,
126            notifications,
127            handlers,
128            roots,
129            instructions,
130            session_manager,
131            session_storage,
132            strict_lifecycle,
133            server_config,
134            enable_sse,
135            stream_config,
136            #[cfg(feature = "cors")]
137            cors_config,
138            middleware_stack,
139            task_runtime,
140        }
141    }
142
143    /// Get a reference to the server capabilities.
144    pub fn capabilities(&self) -> &ServerCapabilities {
145        &self.capabilities
146    }
147
148    /// Create a Lambda handler ready for use with Lambda runtime
149    ///
150    /// This is equivalent to McpServer::run_http() but creates a handler instead of running a server.
151    pub async fn handler(&self) -> Result<LambdaMcpHandler> {
152        info!(
153            "Creating Lambda MCP handler: {} v{}",
154            self.implementation.name, self.implementation.version
155        );
156        info!("Session management: enabled with automatic cleanup");
157
158        if self.enable_sse {
159            info!("SSE notifications: enabled for Lambda responses");
160
161            // ⚠️ GUARDRAIL: SSE enabled without streaming feature
162            #[cfg(not(feature = "streaming"))]
163            {
164                use tracing::warn;
165                warn!("⚠️  SSE is enabled but 'streaming' feature is not available!");
166                warn!(
167                    "   For real SSE streaming, use handle_streaming() with run_with_streaming_response"
168                );
169                warn!(
170                    "   Current handle() method will return SSE snapshots, not real-time streams"
171                );
172                warn!("   To enable streaming: add 'streaming' feature to turul-mcp-aws-lambda");
173            }
174        }
175
176        // Start session cleanup task (same as MCP server)
177        let _cleanup_task = self.session_manager.clone().start_cleanup_task();
178
179        // Cold-start recovery: handler() is called once per Lambda cold start from main().
180        // The returned LambdaMcpHandler is Clone'd for each request — recovery runs exactly once.
181        if let Some(ref runtime) = self.task_runtime {
182            match runtime.recover_stuck_tasks().await {
183                Ok(recovered) if !recovered.is_empty() => {
184                    info!(
185                        count = recovered.len(),
186                        "Recovered stuck tasks from previous invocations"
187                    );
188                }
189                Err(e) => {
190                    use tracing::warn;
191                    warn!(error = %e, "Failed to recover stuck tasks on startup");
192                }
193                _ => {}
194            }
195        }
196
197        // Create stream manager for SSE
198        let stream_manager = Arc::new(StreamManager::with_config(
199            self.session_storage.clone(),
200            self.stream_config.clone(),
201        ));
202
203        // Create JSON-RPC dispatcher
204        use turul_mcp_json_rpc_server::JsonRpcDispatcher;
205        let mut dispatcher = JsonRpcDispatcher::new();
206
207        // Create session-aware initialize handler (reuse MCP server handler)
208        use turul_mcp_server::SessionAwareInitializeHandler;
209        let init_handler = SessionAwareInitializeHandler::new(
210            self.implementation.clone(),
211            self.capabilities.clone(),
212            self.instructions.clone(),
213            self.session_manager.clone(),
214            self.strict_lifecycle,
215        );
216        dispatcher.register_method("initialize".to_string(), init_handler);
217
218        // Create tools/list handler (reuse MCP server handler)
219        use turul_mcp_server::ListToolsHandler;
220        let list_handler = ListToolsHandler::new(self.tools.clone());
221        dispatcher.register_method("tools/list".to_string(), list_handler);
222
223        // Create session-aware tool handler for tools/call (reuse MCP server handler)
224        use turul_mcp_server::SessionAwareToolHandler;
225        let mut tool_handler = SessionAwareToolHandler::new(
226            self.tools.clone(),
227            self.session_manager.clone(),
228            self.strict_lifecycle,
229        );
230        if let Some(ref runtime) = self.task_runtime {
231            tool_handler = tool_handler.with_task_runtime(Arc::clone(runtime));
232        }
233        dispatcher.register_method("tools/call".to_string(), tool_handler);
234
235        // Register all MCP handlers with session awareness (reuse MCP server bridge)
236        use turul_mcp_server::SessionAwareMcpHandlerBridge;
237        for (method, handler) in &self.handlers {
238            let bridge_handler = SessionAwareMcpHandlerBridge::new(
239                handler.clone(),
240                self.session_manager.clone(),
241                self.strict_lifecycle,
242            );
243            dispatcher.register_method(method.clone(), bridge_handler);
244        }
245
246        // Create the Lambda handler with all components and middleware
247        let middleware_stack = Arc::new(self.middleware_stack.clone());
248
249        Ok(LambdaMcpHandler::with_middleware(
250            self.server_config.clone(),
251            Arc::new(dispatcher),
252            self.session_storage.clone(),
253            stream_manager,
254            self.stream_config.clone(),
255            self.capabilities.clone(),
256            middleware_stack,
257            self.enable_sse,
258        ))
259    }
260
261    /// Get information about the session storage backend
262    pub fn session_storage_info(&self) -> &str {
263        "Session storage configured"
264    }
265}