1use std::collections::HashMap;
9use std::net::SocketAddr;
10use std::sync::Arc;
11
12use async_trait::async_trait;
13use tracing::{debug, error, info, warn};
14
15use crate::handlers::McpHandler;
16use crate::session::{SessionContext, SessionManager};
17use crate::{McpServerBuilder, McpTool, Result, tool::tool_to_descriptor};
18use turul_mcp_json_rpc_server::JsonRpcHandler;
19
20use turul_mcp_protocol::McpError;
21use turul_mcp_protocol::*;
22
23pub struct McpServer {
25 pub implementation: Implementation,
27 pub capabilities: ServerCapabilities,
29 tools: HashMap<String, Arc<dyn McpTool>>,
31 handlers: HashMap<String, Arc<dyn McpHandler>>,
33 session_manager: Arc<SessionManager>,
35 session_storage: Option<Arc<turul_mcp_session_storage::BoxedSessionStorage>>,
37 instructions: Option<String>,
39 strict_lifecycle: bool,
41 middleware_stack: crate::middleware::MiddlewareStack,
43
44 #[cfg(feature = "http")]
46 bind_address: SocketAddr,
47 #[cfg(feature = "http")]
48 mcp_path: String,
49 #[cfg(feature = "http")]
50 enable_cors: bool,
51 #[cfg(feature = "http")]
52 enable_sse: bool,
53}
54
55impl McpServer {
56 #[allow(clippy::too_many_arguments)]
58 pub(crate) fn new(
59 implementation: Implementation,
60 capabilities: ServerCapabilities,
61 tools: HashMap<String, Arc<dyn McpTool>>,
62 handlers: HashMap<String, Arc<dyn McpHandler>>,
63 instructions: Option<String>,
64 session_timeout_minutes: Option<u64>,
65 session_cleanup_interval_seconds: Option<u64>,
66 session_storage: Option<Arc<turul_mcp_session_storage::BoxedSessionStorage>>,
67 strict_lifecycle: bool,
68 middleware_stack: crate::middleware::MiddlewareStack,
69 #[cfg(feature = "http")] bind_address: SocketAddr,
70 #[cfg(feature = "http")] mcp_path: String,
71 #[cfg(feature = "http")] enable_cors: bool,
72 #[cfg(feature = "http")] enable_sse: bool,
73 ) -> Self {
74 let session_manager = match &session_storage {
76 Some(storage) => {
77 if let (Some(timeout_mins), Some(cleanup_secs)) =
78 (session_timeout_minutes, session_cleanup_interval_seconds)
79 {
80 Arc::new(SessionManager::with_storage_and_timeouts(
81 Arc::clone(storage),
82 capabilities.clone(),
83 std::time::Duration::from_secs(timeout_mins * 60),
84 std::time::Duration::from_secs(cleanup_secs),
85 ))
86 } else {
87 Arc::new(SessionManager::with_storage_and_timeouts(
88 Arc::clone(storage),
89 capabilities.clone(),
90 std::time::Duration::from_secs(30 * 60), std::time::Duration::from_secs(60), ))
93 }
94 }
95 None => {
96 if let (Some(timeout_mins), Some(cleanup_secs)) =
98 (session_timeout_minutes, session_cleanup_interval_seconds)
99 {
100 Arc::new(SessionManager::with_timeouts(
101 capabilities.clone(),
102 std::time::Duration::from_secs(timeout_mins * 60),
103 std::time::Duration::from_secs(cleanup_secs),
104 ))
105 } else {
106 Arc::new(SessionManager::new(capabilities.clone()))
107 }
108 }
109 };
110
111 if let Some(storage) = &session_storage {
113 debug!(
114 "McpServer configured with session storage backend: {:p}",
115 storage
116 );
117 } else {
118 debug!("McpServer configured without session storage");
119 }
120
121 Self {
122 implementation,
123 capabilities,
124 tools,
125 handlers,
126 session_manager,
127 session_storage,
128 instructions,
129 strict_lifecycle,
130 middleware_stack,
131 #[cfg(feature = "http")]
132 bind_address,
133 #[cfg(feature = "http")]
134 mcp_path,
135 #[cfg(feature = "http")]
136 enable_cors,
137 #[cfg(feature = "http")]
138 enable_sse,
139 }
140 }
141
142 pub fn builder() -> McpServerBuilder {
153 McpServerBuilder::new()
154 }
155
156 pub fn capabilities(&self) -> &turul_mcp_protocol::ServerCapabilities {
158 &self.capabilities
159 }
160
161 pub async fn run(&self) -> Result<()> {
163 #[cfg(feature = "http")]
164 {
165 self.run_http().await
166 }
167 #[cfg(not(feature = "http"))]
168 {
169 Err(McpError::configuration(
171 "No transport available. Enable the 'http' feature to use HTTP transport.",
172 ))
173 }
174 }
175
176 #[cfg(feature = "http")]
178 pub async fn run_http(&self) -> Result<()> {
179 info!(
180 "Starting MCP server: {} v{}",
181 self.implementation.name, self.implementation.version
182 );
183 info!("Session management: enabled with automatic cleanup");
184
185 if self.enable_sse {
186 info!("SSE notifications: enabled at GET {}", self.mcp_path);
187 }
188
189 let _cleanup_task = self.session_manager.clone().start_cleanup_task();
191
192 let tool_handler = SessionAwareToolHandler::new(
194 self.tools.clone(),
195 self.session_manager.clone(),
196 self.strict_lifecycle,
197 );
198
199 let init_handler = SessionAwareInitializeHandler::new(
201 self.implementation.clone(),
202 self.capabilities.clone(),
203 self.instructions.clone(),
204 self.session_manager.clone(),
205 self.strict_lifecycle,
206 );
207
208 let session_storage = self.session_manager.get_storage();
210 debug!("Configuring HTTP MCP server with session storage backend");
211 let mut builder =
212 turul_http_mcp_server::HttpMcpServer::builder_with_storage(session_storage)
213 .bind_address(self.bind_address)
214 .mcp_path(&self.mcp_path)
215 .cors(self.enable_cors)
216 .get_sse(self.enable_sse) .server_capabilities(self.capabilities.clone()) .with_middleware_stack(Arc::new(self.middleware_stack.clone())) .register_handler(vec!["initialize".to_string()], init_handler)
221 .register_handler(
222 vec!["tools/list".to_string()],
223 ListToolsHandler::new_with_session_manager(
224 self.tools.clone(),
225 self.session_manager.clone(),
226 self.strict_lifecycle,
227 ),
228 )
229 .register_handler(vec!["tools/call".to_string()], tool_handler);
230
231 for (method, handler) in &self.handlers {
233 let bridge_handler = SessionAwareMcpHandlerBridge::new(
234 handler.clone(),
235 self.session_manager.clone(),
236 self.strict_lifecycle,
237 );
238 builder = builder.register_handler(vec![method.clone()], bridge_handler);
239 }
240
241 use crate::handlers::InitializedNotificationHandler;
243 let initialized_handler = InitializedNotificationHandler::new(self.session_manager.clone());
244 let initialized_bridge = SessionAwareMcpHandlerBridge::new(
245 Arc::new(initialized_handler),
246 self.session_manager.clone(),
247 self.strict_lifecycle,
248 );
249 builder = builder.register_handler(
250 vec!["notifications/initialized".to_string()],
251 initialized_bridge,
252 );
253
254 let http_server = builder.build();
255
256 if self.enable_sse {
258 debug!("SSE support enabled with integrated session management");
259
260 self.setup_sse_event_bridge(&http_server).await;
262 }
263
264 http_server.run().await.map_err(|http_err| match http_err {
265 turul_http_mcp_server::HttpMcpError::Mcp(mcp_err) => mcp_err,
266 turul_http_mcp_server::HttpMcpError::Http(http_err) => {
267 McpError::transport(&http_err.to_string())
268 }
269 turul_http_mcp_server::HttpMcpError::JsonRpc(rpc_err) => {
270 McpError::json_rpc_protocol(&rpc_err.to_string())
271 }
272 turul_http_mcp_server::HttpMcpError::Serialization(ser_err) => {
273 McpError::SerializationError(ser_err)
274 }
275 turul_http_mcp_server::HttpMcpError::Io(io_err) => McpError::IoError(io_err),
276 turul_http_mcp_server::HttpMcpError::InvalidRequest(msg) => {
277 McpError::InvalidParameters(msg)
278 }
279 })?;
280 Ok(())
281 }
282
283 async fn setup_sse_event_bridge(&self, http_server: &turul_http_mcp_server::HttpMcpServer) {
285 debug!("🌉 Setting up SSE event bridge between SessionManager and StreamManager");
286
287 let stream_manager = http_server.get_stream_manager();
288 let mut global_events = self.session_manager.subscribe_all_session_events();
289
290 tokio::spawn(async move {
291 debug!("🌐 SSE Event Bridge: Started listening for session events");
292
293 while let Ok((session_id, event)) = global_events.recv().await {
294 debug!(
295 "📡 SSE Bridge: Received event from session {}: {:?}",
296 session_id, event
297 );
298
299 match event {
301 crate::session::SessionEvent::Custom { event_type, data } => {
302 debug!(
303 "📤 SSE Bridge: Broadcasting custom event '{}' to StreamManager",
304 event_type
305 );
306
307 if let Err(e) = stream_manager
308 .broadcast_to_session(&session_id, event_type, data)
309 .await
310 {
311 error!(
312 "❌ SSE Bridge: Failed to broadcast to session {}: {}",
313 session_id, e
314 );
315 } else {
316 debug!(
317 "✅ SSE Bridge: Successfully broadcast to session {}",
318 session_id
319 );
320 }
321 }
322 other_event => {
323 debug!("⏭ SSE Bridge: Skipping non-custom event: {:?}", other_event);
324 }
325 }
326 }
327
328 debug!("🚫 SSE Event Bridge: Global event receiver closed");
329 });
330
331 info!("✅ SSE event bridge established successfully");
332 }
333
334 #[cfg(feature = "http")]
336 pub async fn run_with_sse_access(
337 &self,
338 ) -> Result<(
339 turul_http_mcp_server::HttpMcpServer,
340 tokio::task::JoinHandle<turul_http_mcp_server::Result<()>>,
341 )> {
342 info!(
343 "Starting MCP server: {} v{}",
344 self.implementation.name, self.implementation.version
345 );
346 info!("Session management: enabled with automatic cleanup");
347
348 if self.enable_sse {
349 info!("SSE notifications: enabled - SSE manager available for notifications");
350 }
351
352 let _cleanup_task = self.session_manager.clone().start_cleanup_task();
354
355 let tool_handler = SessionAwareToolHandler::new(
357 self.tools.clone(),
358 self.session_manager.clone(),
359 self.strict_lifecycle,
360 );
361
362 let init_handler = SessionAwareInitializeHandler::new(
364 self.implementation.clone(),
365 self.capabilities.clone(),
366 self.instructions.clone(),
367 self.session_manager.clone(),
368 self.strict_lifecycle,
369 );
370
371 let session_storage = self.session_manager.get_storage();
373 debug!("Configuring HTTP MCP server with session storage backend");
374 let mut builder =
375 turul_http_mcp_server::HttpMcpServer::builder_with_storage(session_storage)
376 .bind_address(self.bind_address)
377 .mcp_path(&self.mcp_path)
378 .cors(self.enable_cors)
379 .get_sse(self.enable_sse) .server_capabilities(self.capabilities.clone()) .with_middleware_stack(Arc::new(self.middleware_stack.clone())) .register_handler(vec!["initialize".to_string()], init_handler)
384 .register_handler(
385 vec!["tools/list".to_string()],
386 ListToolsHandler::new_with_session_manager(
387 self.tools.clone(),
388 self.session_manager.clone(),
389 self.strict_lifecycle,
390 ),
391 )
392 .register_handler(vec!["tools/call".to_string()], tool_handler);
393
394 for (method, handler) in &self.handlers {
397 let bridge_handler = SessionAwareMcpHandlerBridge::new(
398 handler.clone(),
399 self.session_manager.clone(),
400 self.strict_lifecycle,
401 );
402 builder = builder.register_handler(vec![method.clone()], bridge_handler);
403 }
404
405 use crate::handlers::InitializedNotificationHandler;
407 let initialized_handler = InitializedNotificationHandler::new(self.session_manager.clone());
408 let initialized_bridge = SessionAwareMcpHandlerBridge::new(
409 Arc::new(initialized_handler),
410 self.session_manager.clone(),
411 self.strict_lifecycle,
412 );
413 builder = builder.register_handler(
414 vec!["notifications/initialized".to_string()],
415 initialized_bridge,
416 );
417
418 let http_server = builder.build();
419
420 let server_task = {
422 let server = http_server.clone();
423 tokio::spawn(async move { server.run().await })
424 };
425
426 Ok((http_server, server_task))
427 }
428
429 pub fn session_storage_info(&self) -> &str {
431 if let Some(storage) = &self.session_storage {
432 debug!(
433 "Accessing session storage for info - backend is configured: {:p}",
434 storage
435 );
436 "Backend configured"
437 } else {
438 "No backend configured"
439 }
440 }
441}
442
443pub struct SessionAwareMcpHandlerBridge {
448 handler: Arc<dyn McpHandler>,
449 session_manager: Arc<SessionManager>,
450 strict_lifecycle: bool,
451}
452
453impl SessionAwareMcpHandlerBridge {
454 pub fn new(
456 handler: Arc<dyn McpHandler>,
457 session_manager: Arc<SessionManager>,
458 strict_lifecycle: bool,
459 ) -> Self {
460 Self {
461 handler,
462 session_manager,
463 strict_lifecycle,
464 }
465 }
466}
467
468#[async_trait]
469impl JsonRpcHandler for SessionAwareMcpHandlerBridge {
470 type Error = McpError;
471
472 async fn handle(
473 &self,
474 method: &str,
475 params: Option<turul_mcp_json_rpc_server::RequestParams>,
476 session_context: Option<turul_mcp_json_rpc_server::r#async::SessionContext>,
477 ) -> std::result::Result<serde_json::Value, McpError> {
478 debug!("Handling {} request via session-aware bridge", method);
479
480 let mcp_session_context = if let Some(json_rpc_ctx) = session_context {
482 debug!(
483 "Converting JSON-RPC session context: session_id={}",
484 json_rpc_ctx.session_id
485 );
486 Some(SessionContext::from_json_rpc_with_broadcaster(
487 json_rpc_ctx,
488 self.session_manager.get_storage(),
489 ))
490 } else {
491 let session_id = extract_session_id_from_params(¶ms);
493 if let Some(sid) = session_id {
494 debug!("Fallback: extracted session_id from params: {}", sid);
495 self.session_manager.create_session_context(&sid)
496 } else {
497 None
498 }
499 };
500
501 if self.strict_lifecycle
503 && method != "initialize"
504 && method != "notifications/initialized"
505 && let Some(ref session_ctx) = mcp_session_context
506 {
507 let session_initialized = self
508 .session_manager
509 .is_session_initialized(&session_ctx.session_id)
510 .await;
511 if !session_initialized {
512 debug!(
513 "🚫 STRICT MODE: Rejecting {} request for session {} - session not yet initialized (waiting for notifications/initialized)",
514 method, session_ctx.session_id
515 );
516 return Err(McpError::SessionError(
517 "Session not initialized - client must send notifications/initialized first (strict lifecycle mode)".to_string()
518 ));
519 }
520 }
521
522 let mcp_params = params.map(|p| p.to_value());
524
525 match self
527 .handler
528 .handle_with_session(mcp_params, mcp_session_context)
529 .await
530 {
531 Ok(result) => Ok(result),
532 Err(error) => {
533 error!("MCP handler error: {}", error);
534 Err(error) }
536 }
537 }
538
539 async fn handle_notification(
540 &self,
541 method: &str,
542 params: Option<turul_mcp_json_rpc_server::RequestParams>,
543 session_context: Option<turul_mcp_json_rpc_server::r#async::SessionContext>,
544 ) -> std::result::Result<(), McpError> {
545 debug!("Handling {} notification via session-aware bridge", method);
546
547 let mcp_session_context = session_context.map(|json_rpc_ctx| {
549 SessionContext::from_json_rpc_with_broadcaster(
550 json_rpc_ctx,
551 self.session_manager.get_storage(),
552 )
553 });
554
555 if self.strict_lifecycle
558 && method != "notifications/initialized"
559 && let Some(ref session_ctx) = mcp_session_context
560 {
561 let session_initialized = self
562 .session_manager
563 .is_session_initialized(&session_ctx.session_id)
564 .await;
565 if !session_initialized {
566 tracing::debug!(
567 "🚫 STRICT MODE: Rejecting notification {} for session {} - session not yet initialized",
568 method,
569 session_ctx.session_id
570 );
571 return Err(McpError::SessionError(
572 "Session not initialized - client must send notifications/initialized first (strict lifecycle mode)".to_string()
573 ));
574 }
575 }
576
577 let mcp_params = params.map(|p| p.to_value());
579
580 match self
582 .handler
583 .handle_with_session(mcp_params, mcp_session_context)
584 .await
585 {
586 Ok(_result) => Ok(()), Err(error) => {
588 tracing::error!("MCP notification handler error: {}", error);
589 Err(error)
590 }
591 }
592 }
593
594 fn supported_methods(&self) -> Vec<String> {
595 self.handler.supported_methods()
596 }
597}
598
599fn extract_session_id_from_params(
601 _params: &Option<turul_mcp_json_rpc_server::RequestParams>,
602) -> Option<String> {
603 None
606}
607
608pub struct SessionAwareInitializeHandler {
610 implementation: Implementation,
611 capabilities: ServerCapabilities,
612 instructions: Option<String>,
613 session_manager: Arc<SessionManager>,
614 strict_lifecycle: bool,
615}
616
617impl SessionAwareInitializeHandler {
618 pub fn new(
619 implementation: Implementation,
620 capabilities: ServerCapabilities,
621 instructions: Option<String>,
622 session_manager: Arc<SessionManager>,
623 strict_lifecycle: bool,
624 ) -> Self {
625 Self {
626 implementation,
627 capabilities,
628 instructions,
629 session_manager,
630 strict_lifecycle,
631 }
632 }
633
634 fn negotiate_version(&self, client_version: &str) -> std::result::Result<McpVersion, String> {
642 use turul_mcp_protocol::version::McpVersion;
643
644 let requested_version = match client_version.parse::<McpVersion>() {
646 Ok(version) => version,
647 Err(_) => {
648 if client_version.matches('-').count() == 2 {
652 if client_version > "2025-06-18" {
654 McpVersion::LATEST
656 } else if client_version < "2024-11-05" {
657 return Err(format!(
659 "Cannot negotiate compatible version with client version {} (server requires at least {})",
660 client_version, "2024-11-05"
661 ));
662 } else {
663 return Err(format!("Unknown protocol version: {}", client_version));
665 }
666 } else {
667 return Err(format!(
668 "Invalid protocol version format: {}",
669 client_version
670 ));
671 }
672 }
673 };
674
675 let supported_versions = [
677 McpVersion::V2024_11_05,
678 McpVersion::V2025_03_26,
679 McpVersion::V2025_06_18,
680 ];
681
682 if supported_versions.contains(&requested_version) {
684 return Ok(requested_version);
685 }
686
687 let compatible_versions: Vec<_> = supported_versions
690 .iter()
691 .filter(|&&v| v <= requested_version)
692 .collect();
693
694 if let Some(&&best_version) = compatible_versions.iter().max() {
695 Ok(best_version)
696 } else {
697 Err(format!(
699 "Cannot negotiate compatible version with client version {} (server requires at least {})",
700 client_version,
701 supported_versions.iter().min().unwrap()
702 ))
703 }
704 }
705
706 fn adjust_capabilities_for_version(&self, version: McpVersion) -> ServerCapabilities {
711 let adjusted = self.capabilities.clone();
712
713 info!(
725 "Server capabilities adjusted for protocol version {}",
726 version
727 );
728 debug!(
729 "Capabilities: logging={}, tools={}, resources={}, prompts={}",
730 adjusted.logging.is_some(),
731 adjusted.tools.is_some(),
732 adjusted.resources.is_some(),
733 adjusted.prompts.is_some()
734 );
735
736 adjusted
737 }
738}
739
740#[async_trait]
741impl JsonRpcHandler for SessionAwareInitializeHandler {
742 type Error = McpError;
743
744 async fn handle(
745 &self,
746 method: &str,
747 params: Option<turul_mcp_json_rpc_server::RequestParams>,
748 session_context: Option<turul_mcp_json_rpc_server::r#async::SessionContext>,
749 ) -> std::result::Result<serde_json::Value, McpError> {
750 debug!("Handling {} request with session support", method);
751
752 if method != "initialize" {
753 return Err(McpError::InvalidParameters(format!(
754 "Method not supported: {}",
755 method
756 )));
757 }
758
759 let request = if let Some(params) = params {
761 let params_value = params.to_value();
762 serde_json::from_value::<InitializeRequest>(params_value).map_err(|e| {
763 McpError::InvalidParameters(format!("Invalid initialize request: {}", e))
764 })?
765 } else {
766 return Err(McpError::MissingParameter(
767 "Missing parameters for initialize".to_string(),
768 ));
769 };
770
771 let negotiated_version = match self.negotiate_version(&request.protocol_version) {
773 Ok(version) => {
774 info!(
775 "Protocol version negotiated: {} (client requested: {})",
776 version, request.protocol_version
777 );
778 version
779 }
780 Err(e) => {
781 error!("Protocol version negotiation failed: {}", e);
782 return Err(McpError::ConfigurationError(format!(
783 "Version negotiation failed: {}",
784 e
785 )));
786 }
787 };
788
789 let session_id = if let Some(ctx) = &session_context {
791 debug!("Using session from context: {}", ctx.session_id);
792
793 let cache_exists = self
796 .session_manager
797 .session_exists_in_cache(&ctx.session_id)
798 .await;
799 debug!(
800 "Session {} exists in cache: {}",
801 ctx.session_id, cache_exists
802 );
803
804 if !cache_exists {
805 debug!("Session {} not in cache, checking storage", ctx.session_id);
806
807 match self
809 .session_manager
810 .load_session_from_storage(&ctx.session_id)
811 .await
812 {
813 Ok(true) => {
814 debug!(
815 "Session {} loaded from storage with preserved capabilities",
816 ctx.session_id
817 );
818 }
819 Ok(false) => {
820 warn!(
823 "Session {} not found in storage, creating with defaults",
824 ctx.session_id
825 );
826 self.session_manager
827 .add_session_to_cache(
828 ctx.session_id.clone(),
829 self.session_manager.get_default_capabilities(),
830 )
831 .await;
832 }
833 Err(e) => {
834 error!(
835 "Failed to load session {} from storage: {}",
836 ctx.session_id, e
837 );
838 self.session_manager
840 .add_session_to_cache(
841 ctx.session_id.clone(),
842 self.session_manager.get_default_capabilities(),
843 )
844 .await;
845 }
846 }
847 } else {
848 debug!("Session {} already exists in cache", ctx.session_id);
849 }
850
851 ctx.session_id.clone()
852 } else {
853 debug!("No session context provided, creating new session");
854 self.session_manager.create_session().await
855 };
856
857 self.session_manager
860 .set_session_state(
861 &session_id,
862 "client_info",
863 serde_json::to_value(&request.client_info).map_err(McpError::SerializationError)?,
864 )
865 .await;
866
867 self.session_manager
868 .set_session_state(
869 &session_id,
870 "client_capabilities",
871 serde_json::to_value(&request.capabilities)
872 .map_err(McpError::SerializationError)?,
873 )
874 .await;
875
876 self.session_manager
877 .set_session_state(
878 &session_id,
879 "negotiated_version",
880 serde_json::to_value(negotiated_version).map_err(McpError::SerializationError)?,
881 )
882 .await;
883
884 self.session_manager
888 .set_session_state(
889 &session_id,
890 "mcp_version",
891 serde_json::json!(negotiated_version.as_str()),
892 )
893 .await;
894
895 if !self.strict_lifecycle {
898 debug!(
899 "📝 LENIENT MODE: Immediately initializing session {} (strict_lifecycle=false)",
900 session_id
901 );
902 if let Err(e) = self
903 .session_manager
904 .initialize_session_with_version(
905 &session_id,
906 request.client_info,
907 request.capabilities,
908 negotiated_version,
909 )
910 .await
911 {
912 error!("❌ Failed to initialize session {}: {}", session_id, e);
913 return Err(McpError::SessionError(format!(
914 "Failed to initialize session: {}",
915 e
916 )));
917 }
918 info!(
919 "✅ Session {} created and immediately initialized with protocol version {} (lenient mode)",
920 session_id, negotiated_version
921 );
922 } else {
923 info!(
924 "⏳ Session {} created and ready for client with protocol version {} (strict mode - waiting for notifications/initialized)",
925 session_id, negotiated_version
926 );
927 }
928
929 let adjusted_capabilities = self.adjust_capabilities_for_version(negotiated_version);
931 let mut response = InitializeResult::new(
932 negotiated_version,
933 adjusted_capabilities,
934 self.implementation.clone(),
935 );
936
937 if let Some(instructions) = &self.instructions {
938 response = response.with_instructions(instructions.clone());
939 }
940
941 serde_json::to_value(response).map_err(McpError::SerializationError)
944 }
945
946 fn supported_methods(&self) -> Vec<String> {
947 vec!["initialize".to_string()]
948 }
949}
950
951pub struct ListToolsHandler {
953 tools: HashMap<String, Arc<dyn McpTool>>,
954 session_manager: Option<Arc<SessionManager>>,
955 strict_lifecycle: bool,
956}
957
958impl ListToolsHandler {
959 pub fn new(tools: HashMap<String, Arc<dyn McpTool>>) -> Self {
960 Self {
961 tools,
962 session_manager: None,
963 strict_lifecycle: false,
964 }
965 }
966
967 pub fn new_with_session_manager(
968 tools: HashMap<String, Arc<dyn McpTool>>,
969 session_manager: Arc<SessionManager>,
970 strict_lifecycle: bool,
971 ) -> Self {
972 Self {
973 tools,
974 session_manager: Some(session_manager),
975 strict_lifecycle,
976 }
977 }
978}
979
980#[async_trait]
981impl JsonRpcHandler for ListToolsHandler {
982 type Error = McpError;
983
984 async fn handle(
985 &self,
986 method: &str,
987 params: Option<turul_mcp_json_rpc_server::RequestParams>,
988 session_context: Option<turul_mcp_json_rpc_server::r#async::SessionContext>,
989 ) -> std::result::Result<serde_json::Value, McpError> {
990 use turul_mcp_protocol::meta::{Cursor, PaginatedResponse};
991
992 debug!("Handling {} request", method);
993
994 if self.strict_lifecycle
996 && let (Some(session_manager), Some(session_ctx)) =
997 (&self.session_manager, &session_context)
998 {
999 let session_initialized = session_manager
1000 .is_session_initialized(&session_ctx.session_id)
1001 .await;
1002 if !session_initialized {
1003 debug!(
1004 "🚫 STRICT MODE: Rejecting {} request for session {} - session not yet initialized (waiting for notifications/initialized)",
1005 method, session_ctx.session_id
1006 );
1007 return Err(McpError::SessionError(
1008 "Session not initialized - client must send notifications/initialized first (strict lifecycle mode)".to_string()
1009 ));
1010 }
1011 }
1012
1013 if method != "tools/list" {
1014 return Err(McpError::InvalidParameters(format!(
1015 "Method '{}' not supported by tools/list handler",
1016 method
1017 )));
1018 }
1019
1020 use turul_mcp_protocol::tools::{ListToolsParams, ListToolsResult};
1022 let list_params = if let Some(params_value) = params {
1023 serde_json::from_value::<ListToolsParams>(params_value.to_value()).map_err(|e| {
1024 McpError::InvalidParameters(format!("Invalid parameters for tools/list: {}", e))
1025 })?
1026 } else {
1027 ListToolsParams::new()
1028 };
1029
1030 let cursor = list_params.cursor;
1031 debug!("Listing tools with cursor: {:?}", cursor);
1032
1033 let mut tools: Vec<Tool> = self
1035 .tools
1036 .values()
1037 .map(|tool| tool_to_descriptor(tool.as_ref()))
1038 .collect();
1039
1040 tools.sort_by(|a, b| a.name.cmp(&b.name));
1042
1043 const DEFAULT_PAGE_SIZE: usize = 50; const MAX_LIMIT: u32 = 100; if let Some(limit) = list_params.limit
1049 && limit == 0
1050 {
1051 return Err(McpError::InvalidParameters(
1052 "limit must be a positive integer (> 0)".to_string(),
1053 ));
1054 }
1055
1056 let page_size = list_params
1058 .limit
1059 .map(|l| std::cmp::min(l, MAX_LIMIT) as usize)
1060 .unwrap_or(DEFAULT_PAGE_SIZE);
1061
1062 let start_index = if let Some(cursor) = &cursor {
1064 let cursor_name = cursor.as_str();
1066 tools
1068 .iter()
1069 .position(|t| t.name.as_str() > cursor_name)
1070 .unwrap_or(tools.len())
1071 } else {
1072 0 };
1074
1075 let end_index = std::cmp::min(start_index + page_size, tools.len());
1077
1078 let page_tools: Vec<Tool> = tools[start_index..end_index].to_vec();
1080
1081 let has_more = end_index < tools.len();
1083
1084 let next_cursor = if has_more {
1086 page_tools.last().map(|t| Cursor::new(&t.name))
1088 } else {
1089 None
1090 };
1091
1092 debug!(
1093 "Tool pagination: start={}, end={}, page_size={}, has_more={}, next_cursor={:?}",
1094 start_index,
1095 end_index,
1096 page_tools.len(),
1097 has_more,
1098 next_cursor
1099 );
1100
1101 let mut base_response = ListToolsResult::new(page_tools);
1102 let total = Some(tools.len() as u64);
1103
1104 if let Some(ref cursor) = next_cursor {
1106 base_response = base_response.with_next_cursor(cursor.clone());
1107 }
1108
1109 let next_cursor_clone = next_cursor.clone();
1110 let mut paginated_response =
1111 PaginatedResponse::with_pagination(base_response, next_cursor, total, has_more);
1112
1113 if let Some(request_meta) = list_params.meta {
1115 let mut response_meta = paginated_response.meta().cloned().unwrap_or_else(|| {
1117 turul_mcp_protocol::meta::Meta::with_pagination(next_cursor_clone, total, has_more)
1118 });
1119
1120 for (key, value) in request_meta {
1122 response_meta.extra.insert(key, value);
1123 }
1124
1125 paginated_response = paginated_response.with_meta(response_meta);
1126 }
1127
1128 serde_json::to_value(paginated_response).map_err(McpError::SerializationError)
1129 }
1130
1131 fn supported_methods(&self) -> Vec<String> {
1132 vec!["tools/list".to_string()]
1133 }
1134}
1135
1136pub struct SessionAwareToolHandler {
1138 tools: HashMap<String, Arc<dyn McpTool>>,
1139 session_manager: Arc<SessionManager>,
1140 strict_lifecycle: bool,
1141}
1142
1143impl SessionAwareToolHandler {
1144 pub fn new(
1145 tools: HashMap<String, Arc<dyn McpTool>>,
1146 session_manager: Arc<SessionManager>,
1147 strict_lifecycle: bool,
1148 ) -> Self {
1149 Self {
1150 tools,
1151 session_manager,
1152 strict_lifecycle,
1153 }
1154 }
1155}
1156
1157#[async_trait]
1158impl JsonRpcHandler for SessionAwareToolHandler {
1159 type Error = McpError;
1160
1161 async fn handle(
1162 &self,
1163 method: &str,
1164 params: Option<turul_mcp_json_rpc_server::RequestParams>,
1165 session_context: Option<turul_mcp_json_rpc_server::r#async::SessionContext>,
1166 ) -> std::result::Result<serde_json::Value, McpError> {
1167 debug!("Handling {} request with session support", method);
1168
1169 if method != "tools/call" {
1170 return Err(McpError::InvalidParameters(format!(
1171 "Method '{}' not supported by tools/call handler",
1172 method
1173 )));
1174 }
1175
1176 if self.strict_lifecycle {
1178 if let Some(ref session_ctx) = session_context {
1179 let session_initialized = self
1180 .session_manager
1181 .is_session_initialized(&session_ctx.session_id)
1182 .await;
1183 if !session_initialized {
1184 debug!(
1185 "🚫 STRICT MODE: Rejecting {} request for session {} - session not yet initialized (waiting for notifications/initialized)",
1186 method, session_ctx.session_id
1187 );
1188 return Err(McpError::SessionError(
1189 "Session not initialized - client must send notifications/initialized first (strict lifecycle mode)".to_string(),
1190 ));
1191 }
1192 debug!(
1193 "✅ STRICT MODE: Session {} is initialized - allowing {} request",
1194 session_ctx.session_id, method
1195 );
1196 }
1197 } else {
1198 debug!(
1199 "📝 LENIENT MODE: Allowing {} request without lifecycle check (strict_lifecycle=false)",
1200 method
1201 );
1202 }
1203
1204 let params =
1205 params.ok_or_else(|| McpError::MissingParameter("CallToolRequest".to_string()))?;
1206
1207 use turul_mcp_protocol::param_extraction::extract_params;
1209
1210 let call_params: turul_mcp_protocol::tools::CallToolParams = extract_params(params)?;
1211
1212 let tool = self
1214 .tools
1215 .get(&call_params.name)
1216 .ok_or_else(|| McpError::ToolNotFound(call_params.name.clone()))?;
1217
1218 let mcp_session_context = if let Some(json_rpc_ctx) = session_context {
1220 debug!(
1221 "Converting JSON-RPC session context for tool call: session_id={}",
1222 json_rpc_ctx.session_id
1223 );
1224 Some(SessionContext::from_json_rpc_with_broadcaster(
1225 json_rpc_ctx,
1226 self.session_manager.get_storage(),
1227 ))
1228 } else {
1229 debug!("No session context provided for tool call");
1230 None
1231 };
1232
1233 let args = call_params
1235 .arguments
1236 .map(|hashmap| {
1237 serde_json::to_value(hashmap)
1238 .unwrap_or(serde_json::Value::Object(serde_json::Map::new()))
1239 })
1240 .unwrap_or_else(|| serde_json::Value::Object(serde_json::Map::new()));
1241 match tool.call(args, mcp_session_context).await {
1242 Ok(response) => serde_json::to_value(response).map_err(McpError::SerializationError),
1243 Err(error_msg) => {
1244 error!("Tool execution error: {}", error_msg);
1245 Err(error_msg) }
1247 }
1248 }
1249
1250 fn supported_methods(&self) -> Vec<String> {
1251 vec!["tools/call".to_string()]
1252 }
1253}
1254
1255impl std::fmt::Debug for McpServer {
1256 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1257 f.debug_struct("McpServer")
1258 .field("implementation", &self.implementation)
1259 .field("capabilities", &self.capabilities)
1260 .field("tools", &format!("HashMap with {} tools", self.tools.len()))
1261 .field("instructions", &self.instructions)
1262 .finish()
1263 }
1264}
1265
1266#[cfg(test)]
1267mod tests {
1268 use super::*;
1269 use crate::McpTool;
1270 use async_trait::async_trait;
1271 use serde_json::Value;
1272 use std::collections::HashMap;
1273 use turul_mcp_protocol::ToolSchema;
1274 use turul_mcp_protocol::tools::{CallToolResult, ToolResult};
1275 use turul_mcp_builders::prelude::*; struct TestTool {
1278 input_schema: ToolSchema,
1279 }
1280
1281 impl TestTool {
1282 fn new() -> Self {
1283 Self {
1284 input_schema: ToolSchema::object(),
1285 }
1286 }
1287 }
1288
1289 impl HasBaseMetadata for TestTool {
1290 fn name(&self) -> &str {
1291 "test"
1292 }
1293 fn title(&self) -> Option<&str> {
1294 Some("Test Tool")
1295 }
1296 }
1297
1298 impl HasDescription for TestTool {
1299 fn description(&self) -> Option<&str> {
1300 Some("Test tool for unit tests")
1301 }
1302 }
1303
1304 impl HasInputSchema for TestTool {
1305 fn input_schema(&self) -> &ToolSchema {
1306 &self.input_schema
1307 }
1308 }
1309
1310 impl HasOutputSchema for TestTool {
1311 fn output_schema(&self) -> Option<&ToolSchema> {
1312 None
1313 }
1314 }
1315
1316 impl HasAnnotations for TestTool {
1317 fn annotations(&self) -> Option<&turul_mcp_protocol::tools::ToolAnnotations> {
1318 None
1319 }
1320 }
1321
1322 impl HasToolMeta for TestTool {
1323 fn tool_meta(&self) -> Option<&HashMap<String, Value>> {
1324 None
1325 }
1326 }
1327
1328 #[async_trait]
1329 impl McpTool for TestTool {
1330 async fn call(
1331 &self,
1332 _args: Value,
1333 _session: Option<crate::SessionContext>,
1334 ) -> crate::McpResult<CallToolResult> {
1335 Ok(CallToolResult::success(vec![ToolResult::text(
1336 "test result",
1337 )]))
1338 }
1339 }
1340
1341 #[test]
1342 fn test_server_creation() {
1343 let server = McpServer::builder()
1344 .name("test-server")
1345 .version("1.0.0")
1346 .tool(TestTool::new())
1347 .build()
1348 .unwrap();
1349
1350 assert_eq!(server.implementation.name, "test-server");
1351 assert_eq!(server.implementation.version, "1.0.0");
1352 assert_eq!(server.tools.len(), 1);
1353 }
1354
1355 #[tokio::test]
1356 async fn test_list_tools_handler() {
1357 let mut tools: HashMap<String, Arc<dyn McpTool>> = HashMap::new();
1358 tools.insert("test".to_string(), Arc::new(TestTool::new()));
1359
1360 let handler = ListToolsHandler::new(tools);
1361 let result = handler.handle("tools/list", None, None).await.unwrap();
1362
1363 let response: ListToolsResult = serde_json::from_value(result).unwrap();
1364 assert_eq!(response.tools.len(), 1);
1365 assert_eq!(response.tools[0].name, "test");
1366 }
1367
1368 #[tokio::test]
1369 async fn test_tool_handler() {
1370 let mut tools: HashMap<String, Arc<dyn McpTool>> = HashMap::new();
1371 tools.insert("test".to_string(), Arc::new(TestTool::new()));
1372
1373 let session_manager = Arc::new(SessionManager::new(ServerCapabilities::default()));
1374 let handler = SessionAwareToolHandler::new(tools, session_manager, false);
1375 let params = turul_mcp_json_rpc_server::RequestParams::Object(
1377 [
1378 ("name".to_string(), serde_json::json!("test")),
1379 ("arguments".to_string(), serde_json::json!({})),
1380 ]
1381 .into_iter()
1382 .collect(),
1383 );
1384
1385 let result = handler
1386 .handle("tools/call", Some(params), None)
1387 .await
1388 .unwrap();
1389 let response: CallToolResult = serde_json::from_value(result).unwrap();
1390
1391 assert_eq!(response.content.len(), 1);
1392 if let ToolResult::Text { text, .. } = &response.content[0] {
1393 assert_eq!(text, "test result");
1394 }
1395 }
1396}