1use std::collections::HashMap;
7use std::sync::Arc;
8
9use tracing::info;
10
11use turul_http_mcp_server::{ServerConfig, StreamConfig, StreamManager};
12use turul_mcp_protocol::{Implementation, ServerCapabilities};
13use turul_mcp_server::{
14 McpCompletion, McpElicitation, McpLogger, McpNotification, McpPrompt, McpResource, McpRoot,
15 McpSampling, McpTool, handlers::McpHandler, session::SessionManager,
16};
17use turul_mcp_session_storage::BoxedSessionStorage;
18
19use crate::error::Result;
20use crate::handler::LambdaMcpHandler;
21
22#[cfg(feature = "cors")]
23use crate::cors::CorsConfig;
24
25#[allow(dead_code)]
30pub struct LambdaMcpServer {
31 pub implementation: Implementation,
33 pub capabilities: ServerCapabilities,
35 tools: HashMap<String, Arc<dyn McpTool>>,
37 resources: HashMap<String, Arc<dyn McpResource>>,
39 prompts: HashMap<String, Arc<dyn McpPrompt>>,
41 elicitations: HashMap<String, Arc<dyn McpElicitation>>,
43 sampling: HashMap<String, Arc<dyn McpSampling>>,
45 completions: HashMap<String, Arc<dyn McpCompletion>>,
47 loggers: HashMap<String, Arc<dyn McpLogger>>,
49 root_providers: HashMap<String, Arc<dyn McpRoot>>,
51 notifications: HashMap<String, Arc<dyn McpNotification>>,
53 handlers: HashMap<String, Arc<dyn McpHandler>>,
55 roots: Vec<turul_mcp_protocol::roots::Root>,
57 instructions: Option<String>,
59 session_manager: Arc<SessionManager>,
61 session_storage: Arc<BoxedSessionStorage>,
63 strict_lifecycle: bool,
65 server_config: ServerConfig,
67 enable_sse: bool,
69 stream_config: StreamConfig,
71 #[cfg(feature = "cors")]
73 cors_config: Option<CorsConfig>,
74 middleware_stack: turul_http_mcp_server::middleware::MiddlewareStack,
76 route_registry: Arc<turul_http_mcp_server::RouteRegistry>,
78 task_runtime: Option<Arc<turul_mcp_server::TaskRuntime>>,
80}
81
82impl LambdaMcpServer {
83 #[allow(clippy::too_many_arguments)]
85 pub(crate) fn new(
86 implementation: Implementation,
87 capabilities: ServerCapabilities,
88 tools: HashMap<String, Arc<dyn McpTool>>,
89 resources: HashMap<String, Arc<dyn McpResource>>,
90 prompts: HashMap<String, Arc<dyn McpPrompt>>,
91 elicitations: HashMap<String, Arc<dyn McpElicitation>>,
92 sampling: HashMap<String, Arc<dyn McpSampling>>,
93 completions: HashMap<String, Arc<dyn McpCompletion>>,
94 loggers: HashMap<String, Arc<dyn McpLogger>>,
95 root_providers: HashMap<String, Arc<dyn McpRoot>>,
96 notifications: HashMap<String, Arc<dyn McpNotification>>,
97 handlers: HashMap<String, Arc<dyn McpHandler>>,
98 roots: Vec<turul_mcp_protocol::roots::Root>,
99 instructions: Option<String>,
100 session_storage: Arc<BoxedSessionStorage>,
101 strict_lifecycle: bool,
102 server_config: ServerConfig,
103 enable_sse: bool,
104 stream_config: StreamConfig,
105 #[cfg(feature = "cors")] cors_config: Option<CorsConfig>,
106 middleware_stack: turul_http_mcp_server::middleware::MiddlewareStack,
107 route_registry: Arc<turul_http_mcp_server::RouteRegistry>,
108 task_runtime: Option<Arc<turul_mcp_server::TaskRuntime>>,
109 ) -> Self {
110 let session_manager = Arc::new(SessionManager::with_storage_and_timeouts(
112 Arc::clone(&session_storage),
113 capabilities.clone(),
114 std::time::Duration::from_secs(30 * 60), std::time::Duration::from_secs(60), ));
117
118 Self {
119 implementation,
120 capabilities,
121 tools,
122 resources,
123 prompts,
124 elicitations,
125 sampling,
126 completions,
127 loggers,
128 root_providers,
129 notifications,
130 handlers,
131 roots,
132 instructions,
133 session_manager,
134 session_storage,
135 strict_lifecycle,
136 server_config,
137 enable_sse,
138 stream_config,
139 #[cfg(feature = "cors")]
140 cors_config,
141 middleware_stack,
142 route_registry,
143 task_runtime,
144 }
145 }
146
147 pub fn capabilities(&self) -> &ServerCapabilities {
149 &self.capabilities
150 }
151
152 pub async fn handler(&self) -> Result<LambdaMcpHandler> {
156 info!(
157 "Creating Lambda MCP handler: {} v{}",
158 self.implementation.name, self.implementation.version
159 );
160 info!("Session management: enabled with automatic cleanup");
161
162 if self.enable_sse {
163 info!("SSE notifications: enabled for Lambda responses");
164
165 #[cfg(not(feature = "streaming"))]
167 {
168 use tracing::warn;
169 warn!("⚠️ SSE is enabled but 'streaming' feature is not available!");
170 warn!(
171 " For real SSE streaming, use handle_streaming() with run_with_streaming_response"
172 );
173 warn!(
174 " Current handle() method will return SSE snapshots, not real-time streams"
175 );
176 warn!(" To enable streaming: add 'streaming' feature to turul-mcp-aws-lambda");
177 }
178 }
179
180 let _cleanup_task = self.session_manager.clone().start_cleanup_task();
182
183 if let Some(ref runtime) = self.task_runtime {
186 match runtime.recover_stuck_tasks().await {
187 Ok(recovered) if !recovered.is_empty() => {
188 info!(
189 count = recovered.len(),
190 "Recovered stuck tasks from previous invocations"
191 );
192 }
193 Err(e) => {
194 use tracing::warn;
195 warn!(error = %e, "Failed to recover stuck tasks on startup");
196 }
197 _ => {}
198 }
199 }
200
201 let stream_manager = Arc::new(StreamManager::with_config(
203 self.session_storage.clone(),
204 self.stream_config.clone(),
205 ));
206
207 use turul_mcp_json_rpc_server::JsonRpcDispatcher;
209 let mut dispatcher = JsonRpcDispatcher::new();
210
211 use turul_mcp_server::SessionAwareInitializeHandler;
213 let init_handler = SessionAwareInitializeHandler::new(
214 self.implementation.clone(),
215 self.capabilities.clone(),
216 self.instructions.clone(),
217 self.session_manager.clone(),
218 self.strict_lifecycle,
219 );
220 dispatcher.register_method("initialize".to_string(), init_handler);
221
222 use turul_mcp_server::ListToolsHandler;
224 let list_handler = ListToolsHandler::new_with_session_manager(
225 self.tools.clone(),
226 self.session_manager.clone(),
227 self.strict_lifecycle,
228 self.task_runtime.is_some(),
229 );
230 dispatcher.register_method("tools/list".to_string(), list_handler);
231
232 use turul_mcp_server::SessionAwareToolHandler;
234 let mut tool_handler = SessionAwareToolHandler::new(
235 self.tools.clone(),
236 self.session_manager.clone(),
237 self.strict_lifecycle,
238 );
239 if let Some(ref runtime) = self.task_runtime {
240 tool_handler = tool_handler.with_task_runtime(Arc::clone(runtime));
241 }
242 dispatcher.register_method("tools/call".to_string(), tool_handler);
243
244 use turul_mcp_server::SessionAwareMcpHandlerBridge;
246 for (method, handler) in &self.handlers {
247 let bridge_handler = SessionAwareMcpHandlerBridge::new(
248 handler.clone(),
249 self.session_manager.clone(),
250 self.strict_lifecycle,
251 );
252 dispatcher.register_method(method.clone(), bridge_handler);
253 }
254
255 use turul_mcp_server::handlers::InitializedNotificationHandler;
258 let initialized_handler = InitializedNotificationHandler::new(self.session_manager.clone());
259 let initialized_bridge = SessionAwareMcpHandlerBridge::new(
260 Arc::new(initialized_handler),
261 self.session_manager.clone(),
262 self.strict_lifecycle,
263 );
264 dispatcher.register_method("notifications/initialized".to_string(), initialized_bridge);
265
266 let middleware_stack = Arc::new(self.middleware_stack.clone());
268
269 Ok(LambdaMcpHandler::with_middleware(
270 self.server_config.clone(),
271 Arc::new(dispatcher),
272 self.session_storage.clone(),
273 stream_manager,
274 self.stream_config.clone(),
275 self.capabilities.clone(),
276 middleware_stack,
277 self.enable_sse,
278 Arc::clone(&self.route_registry),
279 ))
280 }
281
282 pub fn session_storage_info(&self) -> &str {
284 "Session storage configured"
285 }
286}