1use std::collections::HashMap;
7use std::sync::Arc;
8
9use tracing::info;
10
11use turul_http_mcp_server::{ServerConfig, StreamConfig, StreamManager};
12use turul_mcp_protocol::{Implementation, ServerCapabilities};
13use turul_mcp_server::{
14 McpCompletion, McpElicitation, McpLogger, McpNotification, McpPrompt, McpResource, McpRoot,
15 McpSampling, McpTool, handlers::McpHandler, session::SessionManager,
16};
17use turul_mcp_session_storage::BoxedSessionStorage;
18
19use crate::error::Result;
20use crate::handler::LambdaMcpHandler;
21
22#[cfg(feature = "cors")]
23use crate::cors::CorsConfig;
24
25#[allow(dead_code)]
30pub struct LambdaMcpServer {
31 pub implementation: Implementation,
33 pub capabilities: ServerCapabilities,
35 tools: HashMap<String, Arc<dyn McpTool>>,
37 resources: HashMap<String, Arc<dyn McpResource>>,
39 prompts: HashMap<String, Arc<dyn McpPrompt>>,
41 elicitations: HashMap<String, Arc<dyn McpElicitation>>,
43 sampling: HashMap<String, Arc<dyn McpSampling>>,
45 completions: HashMap<String, Arc<dyn McpCompletion>>,
47 loggers: HashMap<String, Arc<dyn McpLogger>>,
49 root_providers: HashMap<String, Arc<dyn McpRoot>>,
51 notifications: HashMap<String, Arc<dyn McpNotification>>,
53 handlers: HashMap<String, Arc<dyn McpHandler>>,
55 roots: Vec<turul_mcp_protocol::roots::Root>,
57 instructions: Option<String>,
59 session_manager: Arc<SessionManager>,
61 session_storage: Arc<BoxedSessionStorage>,
63 strict_lifecycle: bool,
65 server_config: ServerConfig,
67 enable_sse: bool,
69 stream_config: StreamConfig,
71 #[cfg(feature = "cors")]
73 cors_config: Option<CorsConfig>,
74 middleware_stack: turul_http_mcp_server::middleware::MiddlewareStack,
76 task_runtime: Option<Arc<turul_mcp_server::TaskRuntime>>,
78}
79
80impl LambdaMcpServer {
81 #[allow(clippy::too_many_arguments)]
83 pub(crate) fn new(
84 implementation: Implementation,
85 capabilities: ServerCapabilities,
86 tools: HashMap<String, Arc<dyn McpTool>>,
87 resources: HashMap<String, Arc<dyn McpResource>>,
88 prompts: HashMap<String, Arc<dyn McpPrompt>>,
89 elicitations: HashMap<String, Arc<dyn McpElicitation>>,
90 sampling: HashMap<String, Arc<dyn McpSampling>>,
91 completions: HashMap<String, Arc<dyn McpCompletion>>,
92 loggers: HashMap<String, Arc<dyn McpLogger>>,
93 root_providers: HashMap<String, Arc<dyn McpRoot>>,
94 notifications: HashMap<String, Arc<dyn McpNotification>>,
95 handlers: HashMap<String, Arc<dyn McpHandler>>,
96 roots: Vec<turul_mcp_protocol::roots::Root>,
97 instructions: Option<String>,
98 session_storage: Arc<BoxedSessionStorage>,
99 strict_lifecycle: bool,
100 server_config: ServerConfig,
101 enable_sse: bool,
102 stream_config: StreamConfig,
103 #[cfg(feature = "cors")] cors_config: Option<CorsConfig>,
104 middleware_stack: turul_http_mcp_server::middleware::MiddlewareStack,
105 task_runtime: Option<Arc<turul_mcp_server::TaskRuntime>>,
106 ) -> Self {
107 let session_manager = Arc::new(SessionManager::with_storage_and_timeouts(
109 Arc::clone(&session_storage),
110 capabilities.clone(),
111 std::time::Duration::from_secs(30 * 60), std::time::Duration::from_secs(60), ));
114
115 Self {
116 implementation,
117 capabilities,
118 tools,
119 resources,
120 prompts,
121 elicitations,
122 sampling,
123 completions,
124 loggers,
125 root_providers,
126 notifications,
127 handlers,
128 roots,
129 instructions,
130 session_manager,
131 session_storage,
132 strict_lifecycle,
133 server_config,
134 enable_sse,
135 stream_config,
136 #[cfg(feature = "cors")]
137 cors_config,
138 middleware_stack,
139 task_runtime,
140 }
141 }
142
143 pub fn capabilities(&self) -> &ServerCapabilities {
145 &self.capabilities
146 }
147
148 pub async fn handler(&self) -> Result<LambdaMcpHandler> {
152 info!(
153 "Creating Lambda MCP handler: {} v{}",
154 self.implementation.name, self.implementation.version
155 );
156 info!("Session management: enabled with automatic cleanup");
157
158 if self.enable_sse {
159 info!("SSE notifications: enabled for Lambda responses");
160
161 #[cfg(not(feature = "streaming"))]
163 {
164 use tracing::warn;
165 warn!("⚠️ SSE is enabled but 'streaming' feature is not available!");
166 warn!(
167 " For real SSE streaming, use handle_streaming() with run_with_streaming_response"
168 );
169 warn!(
170 " Current handle() method will return SSE snapshots, not real-time streams"
171 );
172 warn!(" To enable streaming: add 'streaming' feature to turul-mcp-aws-lambda");
173 }
174 }
175
176 let _cleanup_task = self.session_manager.clone().start_cleanup_task();
178
179 if let Some(ref runtime) = self.task_runtime {
182 match runtime.recover_stuck_tasks().await {
183 Ok(recovered) if !recovered.is_empty() => {
184 info!(
185 count = recovered.len(),
186 "Recovered stuck tasks from previous invocations"
187 );
188 }
189 Err(e) => {
190 use tracing::warn;
191 warn!(error = %e, "Failed to recover stuck tasks on startup");
192 }
193 _ => {}
194 }
195 }
196
197 let stream_manager = Arc::new(StreamManager::with_config(
199 self.session_storage.clone(),
200 self.stream_config.clone(),
201 ));
202
203 use turul_mcp_json_rpc_server::JsonRpcDispatcher;
205 let mut dispatcher = JsonRpcDispatcher::new();
206
207 use turul_mcp_server::SessionAwareInitializeHandler;
209 let init_handler = SessionAwareInitializeHandler::new(
210 self.implementation.clone(),
211 self.capabilities.clone(),
212 self.instructions.clone(),
213 self.session_manager.clone(),
214 self.strict_lifecycle,
215 );
216 dispatcher.register_method("initialize".to_string(), init_handler);
217
218 use turul_mcp_server::ListToolsHandler;
220 let list_handler = ListToolsHandler::new(self.tools.clone(), self.task_runtime.is_some());
221 dispatcher.register_method("tools/list".to_string(), list_handler);
222
223 use turul_mcp_server::SessionAwareToolHandler;
225 let mut tool_handler = SessionAwareToolHandler::new(
226 self.tools.clone(),
227 self.session_manager.clone(),
228 self.strict_lifecycle,
229 );
230 if let Some(ref runtime) = self.task_runtime {
231 tool_handler = tool_handler.with_task_runtime(Arc::clone(runtime));
232 }
233 dispatcher.register_method("tools/call".to_string(), tool_handler);
234
235 use turul_mcp_server::SessionAwareMcpHandlerBridge;
237 for (method, handler) in &self.handlers {
238 let bridge_handler = SessionAwareMcpHandlerBridge::new(
239 handler.clone(),
240 self.session_manager.clone(),
241 self.strict_lifecycle,
242 );
243 dispatcher.register_method(method.clone(), bridge_handler);
244 }
245
246 let middleware_stack = Arc::new(self.middleware_stack.clone());
248
249 Ok(LambdaMcpHandler::with_middleware(
250 self.server_config.clone(),
251 Arc::new(dispatcher),
252 self.session_storage.clone(),
253 stream_manager,
254 self.stream_config.clone(),
255 self.capabilities.clone(),
256 middleware_stack,
257 self.enable_sse,
258 ))
259 }
260
261 pub fn session_storage_info(&self) -> &str {
263 "Session storage configured"
264 }
265}