turul_mcp_server/
server.rs

1//! MCP Server Implementation and Session-Aware Handlers
2//!
3//! This module contains the core MCP server implementation (`McpServer`) and
4//! session-aware handlers that bridge MCP protocol requests with business logic.
5//! Includes handlers for initialization, tool execution, and tool listing with
6//! automatic session management and protocol compliance.
7
8use std::collections::HashMap;
9use std::net::SocketAddr;
10use std::sync::Arc;
11
12use async_trait::async_trait;
13use tracing::{debug, error, info, warn};
14
15use crate::handlers::McpHandler;
16use crate::session::{SessionContext, SessionManager};
17use crate::{McpServerBuilder, McpTool, Result, tool::tool_to_descriptor};
18use turul_mcp_json_rpc_server::JsonRpcHandler;
19
20use turul_mcp_protocol::McpError;
21use turul_mcp_protocol::*;
22
23/// Main MCP server
24pub struct McpServer {
25    /// Server implementation information
26    pub implementation: Implementation,
27    /// Server capabilities
28    pub capabilities: ServerCapabilities,
29    /// Registered tools
30    tools: HashMap<String, Arc<dyn McpTool>>,
31    /// Registered handlers
32    handlers: HashMap<String, Arc<dyn McpHandler>>,
33    /// Session manager for state persistence
34    session_manager: Arc<SessionManager>,
35    /// Session storage backend (shared between SessionManager and HTTP layer)
36    session_storage: Option<Arc<turul_mcp_session_storage::BoxedSessionStorage>>,
37    /// Optional client instructions
38    instructions: Option<String>,
39    /// Strict MCP lifecycle enforcement
40    strict_lifecycle: bool,
41    /// Middleware stack for request/response processing
42    middleware_stack: crate::middleware::MiddlewareStack,
43
44    // HTTP configuration (if enabled)
45    #[cfg(feature = "http")]
46    bind_address: SocketAddr,
47    #[cfg(feature = "http")]
48    mcp_path: String,
49    #[cfg(feature = "http")]
50    enable_cors: bool,
51    #[cfg(feature = "http")]
52    enable_sse: bool,
53}
54
55impl McpServer {
56    /// Create a new MCP server (use builder instead)
57    #[allow(clippy::too_many_arguments)]
58    pub(crate) fn new(
59        implementation: Implementation,
60        capabilities: ServerCapabilities,
61        tools: HashMap<String, Arc<dyn McpTool>>,
62        handlers: HashMap<String, Arc<dyn McpHandler>>,
63        instructions: Option<String>,
64        session_timeout_minutes: Option<u64>,
65        session_cleanup_interval_seconds: Option<u64>,
66        session_storage: Option<Arc<turul_mcp_session_storage::BoxedSessionStorage>>,
67        strict_lifecycle: bool,
68        middleware_stack: crate::middleware::MiddlewareStack,
69        #[cfg(feature = "http")] bind_address: SocketAddr,
70        #[cfg(feature = "http")] mcp_path: String,
71        #[cfg(feature = "http")] enable_cors: bool,
72        #[cfg(feature = "http")] enable_sse: bool,
73    ) -> Self {
74        // Create session manager with server capabilities, custom timeouts, and storage
75        let session_manager = match &session_storage {
76            Some(storage) => {
77                if let (Some(timeout_mins), Some(cleanup_secs)) =
78                    (session_timeout_minutes, session_cleanup_interval_seconds)
79                {
80                    Arc::new(SessionManager::with_storage_and_timeouts(
81                        Arc::clone(storage),
82                        capabilities.clone(),
83                        std::time::Duration::from_secs(timeout_mins * 60),
84                        std::time::Duration::from_secs(cleanup_secs),
85                    ))
86                } else {
87                    Arc::new(SessionManager::with_storage_and_timeouts(
88                        Arc::clone(storage),
89                        capabilities.clone(),
90                        std::time::Duration::from_secs(30 * 60), // Default 30 minutes
91                        std::time::Duration::from_secs(60),      // Default 1 minute
92                    ))
93                }
94            }
95            None => {
96                // Default to InMemory storage
97                if let (Some(timeout_mins), Some(cleanup_secs)) =
98                    (session_timeout_minutes, session_cleanup_interval_seconds)
99                {
100                    Arc::new(SessionManager::with_timeouts(
101                        capabilities.clone(),
102                        std::time::Duration::from_secs(timeout_mins * 60),
103                        std::time::Duration::from_secs(cleanup_secs),
104                    ))
105                } else {
106                    Arc::new(SessionManager::new(capabilities.clone()))
107                }
108            }
109        };
110
111        // Debug: Log session storage configuration
112        if let Some(storage) = &session_storage {
113            debug!(
114                "McpServer configured with session storage backend: {:p}",
115                storage
116            );
117        } else {
118            debug!("McpServer configured without session storage");
119        }
120
121        Self {
122            implementation,
123            capabilities,
124            tools,
125            handlers,
126            session_manager,
127            session_storage,
128            instructions,
129            strict_lifecycle,
130            middleware_stack,
131            #[cfg(feature = "http")]
132            bind_address,
133            #[cfg(feature = "http")]
134            mcp_path,
135            #[cfg(feature = "http")]
136            enable_cors,
137            #[cfg(feature = "http")]
138            enable_sse,
139        }
140    }
141
142    /// Create a new builder
143    ///
144    /// # Example
145    /// ```rust
146    /// use turul_mcp_server::McpServer;
147    ///
148    /// let builder = McpServer::builder()
149    ///     .name("my-server")
150    ///     .version("1.0.0");
151    /// ```
152    pub fn builder() -> McpServerBuilder {
153        McpServerBuilder::new()
154    }
155
156    /// Get the server's configured capabilities
157    pub fn capabilities(&self) -> &turul_mcp_protocol::ServerCapabilities {
158        &self.capabilities
159    }
160
161    /// Run the server with the default transport (HTTP if available)
162    pub async fn run(&self) -> Result<()> {
163        #[cfg(feature = "http")]
164        {
165            self.run_http().await
166        }
167        #[cfg(not(feature = "http"))]
168        {
169            // If no HTTP feature, we can't run without transport
170            Err(McpError::configuration(
171                "No transport available. Enable the 'http' feature to use HTTP transport.",
172            ))
173        }
174    }
175
176    /// Run the server with HTTP transport (requires "http" feature)
177    #[cfg(feature = "http")]
178    pub async fn run_http(&self) -> Result<()> {
179        info!(
180            "Starting MCP server: {} v{}",
181            self.implementation.name, self.implementation.version
182        );
183        info!("Session management: enabled with automatic cleanup");
184
185        if self.enable_sse {
186            info!("SSE notifications: enabled at GET {}", self.mcp_path);
187        }
188
189        // Start session cleanup task
190        let _cleanup_task = self.session_manager.clone().start_cleanup_task();
191
192        // Create session-aware tool handler
193        let tool_handler = SessionAwareToolHandler::new(
194            self.tools.clone(),
195            self.session_manager.clone(),
196            self.strict_lifecycle,
197        );
198
199        // Create session-aware initialize handler
200        let init_handler = SessionAwareInitializeHandler::new(
201            self.implementation.clone(),
202            self.capabilities.clone(),
203            self.instructions.clone(),
204            self.session_manager.clone(),
205            self.strict_lifecycle,
206        );
207
208        // Build HTTP server with shared session storage from SessionManager
209        let session_storage = self.session_manager.get_storage();
210        debug!("Configuring HTTP MCP server with session storage backend");
211        let mut builder =
212            turul_http_mcp_server::HttpMcpServer::builder_with_storage(session_storage)
213                .bind_address(self.bind_address)
214                .mcp_path(&self.mcp_path)
215                .cors(self.enable_cors)
216                .get_sse(self.enable_sse) // GET SSE controlled by main server enable_sse flag
217                // POST SSE remains at default (false) for compatibility
218                .server_capabilities(self.capabilities.clone()) // Pass server capabilities
219                .with_middleware_stack(Arc::new(self.middleware_stack.clone())) // Pass middleware stack
220                .register_handler(vec!["initialize".to_string()], init_handler)
221                .register_handler(
222                    vec!["tools/list".to_string()],
223                    ListToolsHandler::new_with_session_manager(
224                        self.tools.clone(),
225                        self.session_manager.clone(),
226                        self.strict_lifecycle,
227                    ),
228                )
229                .register_handler(vec!["tools/call".to_string()], tool_handler);
230
231        // Register all MCP handlers with session awareness
232        for (method, handler) in &self.handlers {
233            let bridge_handler = SessionAwareMcpHandlerBridge::new(
234                handler.clone(),
235                self.session_manager.clone(),
236                self.strict_lifecycle,
237            );
238            builder = builder.register_handler(vec![method.clone()], bridge_handler);
239        }
240
241        // Register special initialized notification handler that can mark sessions as initialized
242        use crate::handlers::InitializedNotificationHandler;
243        let initialized_handler = InitializedNotificationHandler::new(self.session_manager.clone());
244        let initialized_bridge = SessionAwareMcpHandlerBridge::new(
245            Arc::new(initialized_handler),
246            self.session_manager.clone(),
247            self.strict_lifecycle,
248        );
249        builder = builder.register_handler(
250            vec!["notifications/initialized".to_string()],
251            initialized_bridge,
252        );
253
254        let http_server = builder.build();
255
256        // SSE is now integrated directly into the session management
257        if self.enable_sse {
258            debug!("SSE support enabled with integrated session management");
259
260            // Set up event forwarding bridge between SessionManager and StreamManager
261            self.setup_sse_event_bridge(&http_server).await;
262        }
263
264        http_server.run().await.map_err(|http_err| match http_err {
265            turul_http_mcp_server::HttpMcpError::Mcp(mcp_err) => mcp_err,
266            turul_http_mcp_server::HttpMcpError::Http(http_err) => {
267                McpError::transport(&http_err.to_string())
268            }
269            turul_http_mcp_server::HttpMcpError::JsonRpc(rpc_err) => {
270                McpError::json_rpc_protocol(&rpc_err.to_string())
271            }
272            turul_http_mcp_server::HttpMcpError::Serialization(ser_err) => {
273                McpError::SerializationError(ser_err)
274            }
275            turul_http_mcp_server::HttpMcpError::Io(io_err) => McpError::IoError(io_err),
276            turul_http_mcp_server::HttpMcpError::InvalidRequest(msg) => {
277                McpError::InvalidParameters(msg)
278            }
279        })?;
280        Ok(())
281    }
282
283    /// Set up event forwarding bridge between SessionManager and StreamManager
284    async fn setup_sse_event_bridge(&self, http_server: &turul_http_mcp_server::HttpMcpServer) {
285        debug!("🌉 Setting up SSE event bridge between SessionManager and StreamManager");
286
287        let stream_manager = http_server.get_stream_manager();
288        let mut global_events = self.session_manager.subscribe_all_session_events();
289
290        tokio::spawn(async move {
291            debug!("🌐 SSE Event Bridge: Started listening for session events");
292
293            while let Ok((session_id, event)) = global_events.recv().await {
294                debug!(
295                    "📡 SSE Bridge: Received event from session {}: {:?}",
296                    session_id, event
297                );
298
299                // Convert SessionEvent to StreamManager event format
300                match event {
301                    crate::session::SessionEvent::Custom { event_type, data } => {
302                        debug!(
303                            "📤 SSE Bridge: Broadcasting custom event '{}' to StreamManager",
304                            event_type
305                        );
306
307                        if let Err(e) = stream_manager
308                            .broadcast_to_session(&session_id, event_type, data)
309                            .await
310                        {
311                            error!(
312                                "❌ SSE Bridge: Failed to broadcast to session {}: {}",
313                                session_id, e
314                            );
315                        } else {
316                            debug!(
317                                "✅ SSE Bridge: Successfully broadcast to session {}",
318                                session_id
319                            );
320                        }
321                    }
322                    other_event => {
323                        debug!("⏭ SSE Bridge: Skipping non-custom event: {:?}", other_event);
324                    }
325                }
326            }
327
328            debug!("🚫 SSE Event Bridge: Global event receiver closed");
329        });
330
331        info!("✅ SSE event bridge established successfully");
332    }
333
334    /// Run the server and return the HTTP server handle for SSE access (requires "http" feature)
335    #[cfg(feature = "http")]
336    pub async fn run_with_sse_access(
337        &self,
338    ) -> Result<(
339        turul_http_mcp_server::HttpMcpServer,
340        tokio::task::JoinHandle<turul_http_mcp_server::Result<()>>,
341    )> {
342        info!(
343            "Starting MCP server: {} v{}",
344            self.implementation.name, self.implementation.version
345        );
346        info!("Session management: enabled with automatic cleanup");
347
348        if self.enable_sse {
349            info!("SSE notifications: enabled - SSE manager available for notifications");
350        }
351
352        // Start session cleanup task
353        let _cleanup_task = self.session_manager.clone().start_cleanup_task();
354
355        // Create session-aware tool handler
356        let tool_handler = SessionAwareToolHandler::new(
357            self.tools.clone(),
358            self.session_manager.clone(),
359            self.strict_lifecycle,
360        );
361
362        // Create session-aware initialize handler
363        let init_handler = SessionAwareInitializeHandler::new(
364            self.implementation.clone(),
365            self.capabilities.clone(),
366            self.instructions.clone(),
367            self.session_manager.clone(),
368            self.strict_lifecycle,
369        );
370
371        // Build HTTP server with shared session storage from SessionManager
372        let session_storage = self.session_manager.get_storage();
373        debug!("Configuring HTTP MCP server with session storage backend");
374        let mut builder =
375            turul_http_mcp_server::HttpMcpServer::builder_with_storage(session_storage)
376                .bind_address(self.bind_address)
377                .mcp_path(&self.mcp_path)
378                .cors(self.enable_cors)
379                .get_sse(self.enable_sse) // GET SSE controlled by main server enable_sse flag
380                // POST SSE remains at default (false) for compatibility
381                .server_capabilities(self.capabilities.clone()) // Pass server capabilities
382                .with_middleware_stack(Arc::new(self.middleware_stack.clone())) // Pass middleware stack
383                .register_handler(vec!["initialize".to_string()], init_handler)
384                .register_handler(
385                    vec!["tools/list".to_string()],
386                    ListToolsHandler::new_with_session_manager(
387                        self.tools.clone(),
388                        self.session_manager.clone(),
389                        self.strict_lifecycle,
390                    ),
391                )
392                .register_handler(vec!["tools/call".to_string()], tool_handler);
393
394        // TODO investigate if this also adds the tools/list and tools/call handlers
395        // Register all MCP handlers with session awareness
396        for (method, handler) in &self.handlers {
397            let bridge_handler = SessionAwareMcpHandlerBridge::new(
398                handler.clone(),
399                self.session_manager.clone(),
400                self.strict_lifecycle,
401            );
402            builder = builder.register_handler(vec![method.clone()], bridge_handler);
403        }
404
405        // Register special initialized notification handler that can mark sessions as initialized
406        use crate::handlers::InitializedNotificationHandler;
407        let initialized_handler = InitializedNotificationHandler::new(self.session_manager.clone());
408        let initialized_bridge = SessionAwareMcpHandlerBridge::new(
409            Arc::new(initialized_handler),
410            self.session_manager.clone(),
411            self.strict_lifecycle,
412        );
413        builder = builder.register_handler(
414            vec!["notifications/initialized".to_string()],
415            initialized_bridge,
416        );
417
418        let http_server = builder.build();
419
420        // Run server in background task
421        let server_task = {
422            let server = http_server.clone();
423            tokio::spawn(async move { server.run().await })
424        };
425
426        Ok((http_server, server_task))
427    }
428
429    /// Get session storage configuration info
430    pub fn session_storage_info(&self) -> &str {
431        if let Some(storage) = &self.session_storage {
432            debug!(
433                "Accessing session storage for info - backend is configured: {:p}",
434                storage
435            );
436            "Backend configured"
437        } else {
438            "No backend configured"
439        }
440    }
441}
442
443/// Session-aware bridge handler that adapts McpHandler to JsonRpcHandler
444///
445/// Provides session management and lifecycle enforcement for custom MCP handlers,
446/// automatically injecting session context and enforcing initialization requirements.
447pub struct SessionAwareMcpHandlerBridge {
448    handler: Arc<dyn McpHandler>,
449    session_manager: Arc<SessionManager>,
450    strict_lifecycle: bool,
451}
452
453impl SessionAwareMcpHandlerBridge {
454    /// Creates a new session-aware handler bridge
455    pub fn new(
456        handler: Arc<dyn McpHandler>,
457        session_manager: Arc<SessionManager>,
458        strict_lifecycle: bool,
459    ) -> Self {
460        Self {
461            handler,
462            session_manager,
463            strict_lifecycle,
464        }
465    }
466}
467
468#[async_trait]
469impl JsonRpcHandler for SessionAwareMcpHandlerBridge {
470    type Error = McpError;
471
472    async fn handle(
473        &self,
474        method: &str,
475        params: Option<turul_mcp_json_rpc_server::RequestParams>,
476        session_context: Option<turul_mcp_json_rpc_server::r#async::SessionContext>,
477    ) -> std::result::Result<serde_json::Value, McpError> {
478        debug!("Handling {} request via session-aware bridge", method);
479
480        // Convert JSON-RPC SessionContext to MCP SessionContext
481        let mcp_session_context = if let Some(json_rpc_ctx) = session_context {
482            debug!(
483                "Converting JSON-RPC session context: session_id={}",
484                json_rpc_ctx.session_id
485            );
486            Some(SessionContext::from_json_rpc_with_broadcaster(
487                json_rpc_ctx,
488                self.session_manager.get_storage(),
489            ))
490        } else {
491            // Fallback: extract session ID from params (legacy behavior)
492            let session_id = extract_session_id_from_params(&params);
493            if let Some(sid) = session_id {
494                debug!("Fallback: extracted session_id from params: {}", sid);
495                self.session_manager.create_session_context(&sid)
496            } else {
497                None
498            }
499        };
500
501        // MCP Lifecycle Guard: Ensure session is initialized before allowing operations (if strict mode enabled)
502        if self.strict_lifecycle
503            && method != "initialize"
504            && method != "notifications/initialized"
505            && let Some(ref session_ctx) = mcp_session_context
506        {
507            let session_initialized = self
508                .session_manager
509                .is_session_initialized(&session_ctx.session_id)
510                .await;
511            if !session_initialized {
512                debug!(
513                    "🚫 STRICT MODE: Rejecting {} request for session {} - session not yet initialized (waiting for notifications/initialized)",
514                    method, session_ctx.session_id
515                );
516                return Err(McpError::SessionError(
517                        "Session not initialized - client must send notifications/initialized first (strict lifecycle mode)".to_string()
518                    ));
519            }
520        }
521
522        // Convert JSON-RPC params to Value
523        let mcp_params = params.map(|p| p.to_value());
524
525        // Call the MCP handler with session context - propagate errors directly
526        match self
527            .handler
528            .handle_with_session(mcp_params, mcp_session_context)
529            .await
530        {
531            Ok(result) => Ok(result),
532            Err(error) => {
533                error!("MCP handler error: {}", error);
534                Err(error) // Propagate McpError directly, no double-wrapping!
535            }
536        }
537    }
538
539    async fn handle_notification(
540        &self,
541        method: &str,
542        params: Option<turul_mcp_json_rpc_server::RequestParams>,
543        session_context: Option<turul_mcp_json_rpc_server::r#async::SessionContext>,
544    ) -> std::result::Result<(), McpError> {
545        debug!("Handling {} notification via session-aware bridge", method);
546
547        // Convert JSON-RPC SessionContext to MCP SessionContext
548        let mcp_session_context = session_context.map(|json_rpc_ctx| {
549            SessionContext::from_json_rpc_with_broadcaster(
550                json_rpc_ctx,
551                self.session_manager.get_storage(),
552            )
553        });
554
555        // MCP Lifecycle Guard for notifications: Allow notifications/initialized to pass through
556        // but enforce lifecycle for other notifications if strict mode is enabled
557        if self.strict_lifecycle
558            && method != "notifications/initialized"
559            && let Some(ref session_ctx) = mcp_session_context
560        {
561            let session_initialized = self
562                .session_manager
563                .is_session_initialized(&session_ctx.session_id)
564                .await;
565            if !session_initialized {
566                tracing::debug!(
567                    "🚫 STRICT MODE: Rejecting notification {} for session {} - session not yet initialized",
568                    method,
569                    session_ctx.session_id
570                );
571                return Err(McpError::SessionError(
572                    "Session not initialized - client must send notifications/initialized first (strict lifecycle mode)".to_string()
573                ));
574            }
575        }
576
577        // Convert JSON-RPC params to Value
578        let mcp_params = params.map(|p| p.to_value());
579
580        // Call the MCP handler's handle_with_session method for notifications
581        match self
582            .handler
583            .handle_with_session(mcp_params, mcp_session_context)
584            .await
585        {
586            Ok(_result) => Ok(()), // Notifications don't return values
587            Err(error) => {
588                tracing::error!("MCP notification handler error: {}", error);
589                Err(error)
590            }
591        }
592    }
593
594    fn supported_methods(&self) -> Vec<String> {
595        self.handler.supported_methods()
596    }
597}
598
599/// Extract session ID from request parameters (placeholder implementation)
600fn extract_session_id_from_params(
601    _params: &Option<turul_mcp_json_rpc_server::RequestParams>,
602) -> Option<String> {
603    // In a real implementation, this would extract session ID from HTTP headers
604    // For now, return None as we'll implement proper session extraction later
605    None
606}
607
608/// Session-aware handler for initialize requests
609pub struct SessionAwareInitializeHandler {
610    implementation: Implementation,
611    capabilities: ServerCapabilities,
612    instructions: Option<String>,
613    session_manager: Arc<SessionManager>,
614    strict_lifecycle: bool,
615}
616
617impl SessionAwareInitializeHandler {
618    pub fn new(
619        implementation: Implementation,
620        capabilities: ServerCapabilities,
621        instructions: Option<String>,
622        session_manager: Arc<SessionManager>,
623        strict_lifecycle: bool,
624    ) -> Self {
625        Self {
626            implementation,
627            capabilities,
628            instructions,
629            session_manager,
630            strict_lifecycle,
631        }
632    }
633
634    /// Negotiate protocol version with client
635    ///
636    /// Server supports backward compatibility with older protocol versions.
637    /// The negotiation follows this priority:
638    /// 1. Use client's requested version if server supports it
639    /// 2. Use the highest version both client and server support
640    /// 3. Fall back to minimum compatible version
641    fn negotiate_version(&self, client_version: &str) -> std::result::Result<McpVersion, String> {
642        use turul_mcp_protocol::version::McpVersion;
643
644        // Try to parse client's requested version
645        let requested_version = match client_version.parse::<McpVersion>() {
646            Ok(version) => version,
647            Err(_) => {
648                // Unknown version - check if it's newer than what we support
649                // If it looks like a valid date format but we don't know it,
650                // we need to check if it's likely newer or older than our range
651                if client_version.matches('-').count() == 2 {
652                    // Check if it's likely newer than our latest version (2025-06-18)
653                    if client_version > "2025-06-18" {
654                        // Assume it's newer, use latest as fallback
655                        McpVersion::LATEST
656                    } else if client_version < "2024-11-05" {
657                        // Too old, we don't support versions before our minimum
658                        return Err(format!(
659                            "Cannot negotiate compatible version with client version {} (server requires at least {})",
660                            client_version, "2024-11-05"
661                        ));
662                    } else {
663                        // Unknown version between our supported range - shouldn't happen
664                        return Err(format!("Unknown protocol version: {}", client_version));
665                    }
666                } else {
667                    return Err(format!(
668                        "Invalid protocol version format: {}",
669                        client_version
670                    ));
671                }
672            }
673        };
674
675        // Define server's supported versions (all versions from 2024-11-05 to current)
676        let supported_versions = [
677            McpVersion::V2024_11_05,
678            McpVersion::V2025_03_26,
679            McpVersion::V2025_06_18,
680        ];
681
682        // Strategy 1: If server supports client's requested version, use it
683        if supported_versions.contains(&requested_version) {
684            return Ok(requested_version);
685        }
686
687        // Strategy 2: Find highest supported version ≤ client requested version
688        // This allows clients to request newer versions while falling back gracefully
689        let compatible_versions: Vec<_> = supported_versions
690            .iter()
691            .filter(|&&v| v <= requested_version)
692            .collect();
693
694        if let Some(&&best_version) = compatible_versions.iter().max() {
695            Ok(best_version)
696        } else {
697            // Strategy 3: No compatible version found - client too old
698            Err(format!(
699                "Cannot negotiate compatible version with client version {} (server requires at least {})",
700                client_version,
701                supported_versions.iter().min().unwrap()
702            ))
703        }
704    }
705
706    /// Adjust server capabilities based on negotiated protocol version
707    ///
708    /// Some capabilities are only available in newer protocol versions.
709    /// This method filters capabilities to match what the negotiated version supports.
710    fn adjust_capabilities_for_version(&self, version: McpVersion) -> ServerCapabilities {
711        let adjusted = self.capabilities.clone();
712
713        // Before version 2025-06-18, _meta field support wasn't available
714        // So we don't need to adjust capabilities for that specifically since it's
715        // handled at the protocol level.
716
717        // Before version 2025-03-26, streamable HTTP wasn't available
718        // But HTTP transport capability isn't explicitly declared in ServerCapabilities,
719        // so no adjustment needed here.
720
721        // All other capabilities (tools, resources, prompts, etc.) are version-independent
722        // in terms of their basic functionality.
723
724        info!(
725            "Server capabilities adjusted for protocol version {}",
726            version
727        );
728        debug!(
729            "Capabilities: logging={}, tools={}, resources={}, prompts={}",
730            adjusted.logging.is_some(),
731            adjusted.tools.is_some(),
732            adjusted.resources.is_some(),
733            adjusted.prompts.is_some()
734        );
735
736        adjusted
737    }
738}
739
740#[async_trait]
741impl JsonRpcHandler for SessionAwareInitializeHandler {
742    type Error = McpError;
743
744    async fn handle(
745        &self,
746        method: &str,
747        params: Option<turul_mcp_json_rpc_server::RequestParams>,
748        session_context: Option<turul_mcp_json_rpc_server::r#async::SessionContext>,
749    ) -> std::result::Result<serde_json::Value, McpError> {
750        debug!("Handling {} request with session support", method);
751
752        if method != "initialize" {
753            return Err(McpError::InvalidParameters(format!(
754                "Method not supported: {}",
755                method
756            )));
757        }
758
759        // Parse initialize request
760        let request = if let Some(params) = params {
761            let params_value = params.to_value();
762            serde_json::from_value::<InitializeRequest>(params_value).map_err(|e| {
763                McpError::InvalidParameters(format!("Invalid initialize request: {}", e))
764            })?
765        } else {
766            return Err(McpError::MissingParameter(
767                "Missing parameters for initialize".to_string(),
768            ));
769        };
770
771        // Perform protocol version negotiation
772        let negotiated_version = match self.negotiate_version(&request.protocol_version) {
773            Ok(version) => {
774                info!(
775                    "Protocol version negotiated: {} (client requested: {})",
776                    version, request.protocol_version
777                );
778                version
779            }
780            Err(e) => {
781                error!("Protocol version negotiation failed: {}", e);
782                return Err(McpError::ConfigurationError(format!(
783                    "Version negotiation failed: {}",
784                    e
785                )));
786            }
787        };
788
789        // Use session ID provided by HTTP layer, or create new one if not provided
790        let session_id = if let Some(ctx) = &session_context {
791            debug!("Using session from context: {}", ctx.session_id);
792
793            // Add session to cache if it doesn't exist there
794            // This handles sessions created directly in storage by session_handler
795            let cache_exists = self
796                .session_manager
797                .session_exists_in_cache(&ctx.session_id)
798                .await;
799            debug!(
800                "Session {} exists in cache: {}",
801                ctx.session_id, cache_exists
802            );
803
804            if !cache_exists {
805                debug!("Session {} not in cache, checking storage", ctx.session_id);
806
807                // Try to load session from storage with its actual capabilities
808                match self
809                    .session_manager
810                    .load_session_from_storage(&ctx.session_id)
811                    .await
812                {
813                    Ok(true) => {
814                        debug!(
815                            "Session {} loaded from storage with preserved capabilities",
816                            ctx.session_id
817                        );
818                    }
819                    Ok(false) => {
820                        // Session doesn't exist in storage either - this shouldn't happen
821                        // in normal flow but handle gracefully
822                        warn!(
823                            "Session {} not found in storage, creating with defaults",
824                            ctx.session_id
825                        );
826                        self.session_manager
827                            .add_session_to_cache(
828                                ctx.session_id.clone(),
829                                self.session_manager.get_default_capabilities(),
830                            )
831                            .await;
832                    }
833                    Err(e) => {
834                        error!(
835                            "Failed to load session {} from storage: {}",
836                            ctx.session_id, e
837                        );
838                        // Fallback to defaults only on error
839                        self.session_manager
840                            .add_session_to_cache(
841                                ctx.session_id.clone(),
842                                self.session_manager.get_default_capabilities(),
843                            )
844                            .await;
845                    }
846                }
847            } else {
848                debug!("Session {} already exists in cache", ctx.session_id);
849            }
850
851            ctx.session_id.clone()
852        } else {
853            debug!("No session context provided, creating new session");
854            self.session_manager.create_session().await
855        };
856
857        // Store client info and capabilities in session state for later initialization
858        // Per MCP spec, session is NOT initialized until client sends notifications/initialized
859        self.session_manager
860            .set_session_state(
861                &session_id,
862                "client_info",
863                serde_json::to_value(&request.client_info).map_err(McpError::SerializationError)?,
864            )
865            .await;
866
867        self.session_manager
868            .set_session_state(
869                &session_id,
870                "client_capabilities",
871                serde_json::to_value(&request.capabilities)
872                    .map_err(McpError::SerializationError)?,
873            )
874            .await;
875
876        self.session_manager
877            .set_session_state(
878                &session_id,
879                "negotiated_version",
880                serde_json::to_value(negotiated_version).map_err(McpError::SerializationError)?,
881            )
882            .await;
883
884        // Store negotiated version before initialization (differs by mode)
885
886        // Store the negotiated version in session state for tools to access
887        self.session_manager
888            .set_session_state(
889                &session_id,
890                "mcp_version",
891                serde_json::json!(negotiated_version.as_str()),
892            )
893            .await;
894
895        // In lenient mode, immediately mark session as initialized
896        // In strict mode, wait for notifications/initialized from client
897        if !self.strict_lifecycle {
898            debug!(
899                "📝 LENIENT MODE: Immediately initializing session {} (strict_lifecycle=false)",
900                session_id
901            );
902            if let Err(e) = self
903                .session_manager
904                .initialize_session_with_version(
905                    &session_id,
906                    request.client_info,
907                    request.capabilities,
908                    negotiated_version,
909                )
910                .await
911            {
912                error!("❌ Failed to initialize session {}: {}", session_id, e);
913                return Err(McpError::SessionError(format!(
914                    "Failed to initialize session: {}",
915                    e
916                )));
917            }
918            info!(
919                "✅ Session {} created and immediately initialized with protocol version {} (lenient mode)",
920                session_id, negotiated_version
921            );
922        } else {
923            info!(
924                "⏳ Session {} created and ready for client with protocol version {} (strict mode - waiting for notifications/initialized)",
925                session_id, negotiated_version
926            );
927        }
928
929        // Create response with negotiated version and adjusted capabilities
930        let adjusted_capabilities = self.adjust_capabilities_for_version(negotiated_version);
931        let mut response = InitializeResult::new(
932            negotiated_version,
933            adjusted_capabilities,
934            self.implementation.clone(),
935        );
936
937        if let Some(instructions) = &self.instructions {
938            response = response.with_instructions(instructions.clone());
939        }
940
941        // Session ID is communicated to HTTP layer via session manager
942
943        serde_json::to_value(response).map_err(McpError::SerializationError)
944    }
945
946    fn supported_methods(&self) -> Vec<String> {
947        vec!["initialize".to_string()]
948    }
949}
950
951/// Handler for tools/list requests
952pub struct ListToolsHandler {
953    tools: HashMap<String, Arc<dyn McpTool>>,
954    session_manager: Option<Arc<SessionManager>>,
955    strict_lifecycle: bool,
956}
957
958impl ListToolsHandler {
959    pub fn new(tools: HashMap<String, Arc<dyn McpTool>>) -> Self {
960        Self {
961            tools,
962            session_manager: None,
963            strict_lifecycle: false,
964        }
965    }
966
967    pub fn new_with_session_manager(
968        tools: HashMap<String, Arc<dyn McpTool>>,
969        session_manager: Arc<SessionManager>,
970        strict_lifecycle: bool,
971    ) -> Self {
972        Self {
973            tools,
974            session_manager: Some(session_manager),
975            strict_lifecycle,
976        }
977    }
978}
979
980#[async_trait]
981impl JsonRpcHandler for ListToolsHandler {
982    type Error = McpError;
983
984    async fn handle(
985        &self,
986        method: &str,
987        params: Option<turul_mcp_json_rpc_server::RequestParams>,
988        session_context: Option<turul_mcp_json_rpc_server::r#async::SessionContext>,
989    ) -> std::result::Result<serde_json::Value, McpError> {
990        use turul_mcp_protocol::meta::{Cursor, PaginatedResponse};
991
992        debug!("Handling {} request", method);
993
994        // MCP Lifecycle Guard: Ensure session is initialized before allowing operations (if strict mode enabled)
995        if self.strict_lifecycle
996            && let (Some(session_manager), Some(session_ctx)) =
997                (&self.session_manager, &session_context)
998        {
999            let session_initialized = session_manager
1000                .is_session_initialized(&session_ctx.session_id)
1001                .await;
1002            if !session_initialized {
1003                debug!(
1004                    "🚫 STRICT MODE: Rejecting {} request for session {} - session not yet initialized (waiting for notifications/initialized)",
1005                    method, session_ctx.session_id
1006                );
1007                return Err(McpError::SessionError(
1008                    "Session not initialized - client must send notifications/initialized first (strict lifecycle mode)".to_string()
1009                ));
1010            }
1011        }
1012
1013        if method != "tools/list" {
1014            return Err(McpError::InvalidParameters(format!(
1015                "Method '{}' not supported by tools/list handler",
1016                method
1017            )));
1018        }
1019
1020        // Parse typed parameters for cursor and meta propagation
1021        use turul_mcp_protocol::tools::{ListToolsParams, ListToolsResult};
1022        let list_params = if let Some(params_value) = params {
1023            serde_json::from_value::<ListToolsParams>(params_value.to_value()).map_err(|e| {
1024                McpError::InvalidParameters(format!("Invalid parameters for tools/list: {}", e))
1025            })?
1026        } else {
1027            ListToolsParams::new()
1028        };
1029
1030        let cursor = list_params.cursor;
1031        debug!("Listing tools with cursor: {:?}", cursor);
1032
1033        // Convert tools to descriptors and sort by name for stable pagination
1034        let mut tools: Vec<Tool> = self
1035            .tools
1036            .values()
1037            .map(|tool| tool_to_descriptor(tool.as_ref()))
1038            .collect();
1039
1040        // Sort by tool name to ensure stable ordering for pagination
1041        tools.sort_by(|a, b| a.name.cmp(&b.name));
1042
1043        // Implement cursor-based pagination
1044        const DEFAULT_PAGE_SIZE: usize = 50; // MCP suggested default
1045        const MAX_LIMIT: u32 = 100; // Framework-specific DoS protection
1046
1047        // Validate limit parameter - MCP spec requires positive integer
1048        if let Some(limit) = list_params.limit
1049            && limit == 0
1050        {
1051            return Err(McpError::InvalidParameters(
1052                "limit must be a positive integer (> 0)".to_string(),
1053            ));
1054        }
1055
1056        // Apply limit clamping for DoS protection (framework extension)
1057        let page_size = list_params
1058            .limit
1059            .map(|l| std::cmp::min(l, MAX_LIMIT) as usize)
1060            .unwrap_or(DEFAULT_PAGE_SIZE);
1061
1062        // Find starting index based on cursor
1063        let start_index = if let Some(cursor) = &cursor {
1064            // Cursor contains the last tool name from previous page
1065            let cursor_name = cursor.as_str();
1066            // Find the position after the cursor name (first tool > cursor)
1067            tools
1068                .iter()
1069                .position(|t| t.name.as_str() > cursor_name)
1070                .unwrap_or(tools.len())
1071        } else {
1072            0 // No cursor = start from beginning
1073        };
1074
1075        // Calculate end index for this page
1076        let end_index = std::cmp::min(start_index + page_size, tools.len());
1077
1078        // Extract page of tools
1079        let page_tools: Vec<Tool> = tools[start_index..end_index].to_vec();
1080
1081        // Determine if there are more tools after this page
1082        let has_more = end_index < tools.len();
1083
1084        // Generate next cursor if there are more tools
1085        let next_cursor = if has_more {
1086            // Cursor should be the name of the last item in current page
1087            page_tools.last().map(|t| Cursor::new(&t.name))
1088        } else {
1089            None
1090        };
1091
1092        debug!(
1093            "Tool pagination: start={}, end={}, page_size={}, has_more={}, next_cursor={:?}",
1094            start_index,
1095            end_index,
1096            page_tools.len(),
1097            has_more,
1098            next_cursor
1099        );
1100
1101        let mut base_response = ListToolsResult::new(page_tools);
1102        let total = Some(tools.len() as u64);
1103
1104        // Set top-level nextCursor field on the result before wrapping
1105        if let Some(ref cursor) = next_cursor {
1106            base_response = base_response.with_next_cursor(cursor.clone());
1107        }
1108
1109        let next_cursor_clone = next_cursor.clone();
1110        let mut paginated_response =
1111            PaginatedResponse::with_pagination(base_response, next_cursor, total, has_more);
1112
1113        // Propagate optional _meta from request to response (MCP 2025-06-18 compliance)
1114        if let Some(request_meta) = list_params.meta {
1115            // Get existing meta from PaginatedResponse or use pagination defaults
1116            let mut response_meta = paginated_response.meta().cloned().unwrap_or_else(|| {
1117                turul_mcp_protocol::meta::Meta::with_pagination(next_cursor_clone, total, has_more)
1118            });
1119
1120            // Merge request's _meta fields into extra without clobbering pagination
1121            for (key, value) in request_meta {
1122                response_meta.extra.insert(key, value);
1123            }
1124
1125            paginated_response = paginated_response.with_meta(response_meta);
1126        }
1127
1128        serde_json::to_value(paginated_response).map_err(McpError::SerializationError)
1129    }
1130
1131    fn supported_methods(&self) -> Vec<String> {
1132        vec!["tools/list".to_string()]
1133    }
1134}
1135
1136/// Session-aware handler for tool execution
1137pub struct SessionAwareToolHandler {
1138    tools: HashMap<String, Arc<dyn McpTool>>,
1139    session_manager: Arc<SessionManager>,
1140    strict_lifecycle: bool,
1141}
1142
1143impl SessionAwareToolHandler {
1144    pub fn new(
1145        tools: HashMap<String, Arc<dyn McpTool>>,
1146        session_manager: Arc<SessionManager>,
1147        strict_lifecycle: bool,
1148    ) -> Self {
1149        Self {
1150            tools,
1151            session_manager,
1152            strict_lifecycle,
1153        }
1154    }
1155}
1156
1157#[async_trait]
1158impl JsonRpcHandler for SessionAwareToolHandler {
1159    type Error = McpError;
1160
1161    async fn handle(
1162        &self,
1163        method: &str,
1164        params: Option<turul_mcp_json_rpc_server::RequestParams>,
1165        session_context: Option<turul_mcp_json_rpc_server::r#async::SessionContext>,
1166    ) -> std::result::Result<serde_json::Value, McpError> {
1167        debug!("Handling {} request with session support", method);
1168
1169        if method != "tools/call" {
1170            return Err(McpError::InvalidParameters(format!(
1171                "Method '{}' not supported by tools/call handler",
1172                method
1173            )));
1174        }
1175
1176        // MCP Lifecycle Guard: Ensure session is initialized before allowing tool operations (if strict mode enabled)
1177        if self.strict_lifecycle {
1178            if let Some(ref session_ctx) = session_context {
1179                let session_initialized = self
1180                    .session_manager
1181                    .is_session_initialized(&session_ctx.session_id)
1182                    .await;
1183                if !session_initialized {
1184                    debug!(
1185                        "🚫 STRICT MODE: Rejecting {} request for session {} - session not yet initialized (waiting for notifications/initialized)",
1186                        method, session_ctx.session_id
1187                    );
1188                    return Err(McpError::SessionError(
1189                        "Session not initialized - client must send notifications/initialized first (strict lifecycle mode)".to_string(),
1190                    ));
1191                }
1192                debug!(
1193                    "✅ STRICT MODE: Session {} is initialized - allowing {} request",
1194                    session_ctx.session_id, method
1195                );
1196            }
1197        } else {
1198            debug!(
1199                "📝 LENIENT MODE: Allowing {} request without lifecycle check (strict_lifecycle=false)",
1200                method
1201            );
1202        }
1203
1204        let params =
1205            params.ok_or_else(|| McpError::MissingParameter("CallToolRequest".to_string()))?;
1206
1207        // Use the parameter extraction pattern from the other project
1208        use turul_mcp_protocol::param_extraction::extract_params;
1209
1210        let call_params: turul_mcp_protocol::tools::CallToolParams = extract_params(params)?;
1211
1212        // Find the tool
1213        let tool = self
1214            .tools
1215            .get(&call_params.name)
1216            .ok_or_else(|| McpError::ToolNotFound(call_params.name.clone()))?;
1217
1218        // Convert JSON-RPC SessionContext to MCP SessionContext for tool execution
1219        let mcp_session_context = if let Some(json_rpc_ctx) = session_context {
1220            debug!(
1221                "Converting JSON-RPC session context for tool call: session_id={}",
1222                json_rpc_ctx.session_id
1223            );
1224            Some(SessionContext::from_json_rpc_with_broadcaster(
1225                json_rpc_ctx,
1226                self.session_manager.get_storage(),
1227            ))
1228        } else {
1229            debug!("No session context provided for tool call");
1230            None
1231        };
1232
1233        // Execute the tool with session context
1234        let args = call_params
1235            .arguments
1236            .map(|hashmap| {
1237                serde_json::to_value(hashmap)
1238                    .unwrap_or(serde_json::Value::Object(serde_json::Map::new()))
1239            })
1240            .unwrap_or_else(|| serde_json::Value::Object(serde_json::Map::new()));
1241        match tool.call(args, mcp_session_context).await {
1242            Ok(response) => serde_json::to_value(response).map_err(McpError::SerializationError),
1243            Err(error_msg) => {
1244                error!("Tool execution error: {}", error_msg);
1245                Err(error_msg) // Propagate tool error directly (already proper McpError type)
1246            }
1247        }
1248    }
1249
1250    fn supported_methods(&self) -> Vec<String> {
1251        vec!["tools/call".to_string()]
1252    }
1253}
1254
1255impl std::fmt::Debug for McpServer {
1256    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1257        f.debug_struct("McpServer")
1258            .field("implementation", &self.implementation)
1259            .field("capabilities", &self.capabilities)
1260            .field("tools", &format!("HashMap with {} tools", self.tools.len()))
1261            .field("instructions", &self.instructions)
1262            .finish()
1263    }
1264}
1265
1266#[cfg(test)]
1267mod tests {
1268    use super::*;
1269    use crate::McpTool;
1270    use async_trait::async_trait;
1271    use serde_json::Value;
1272    use std::collections::HashMap;
1273    use turul_mcp_protocol::ToolSchema;
1274    use turul_mcp_protocol::tools::{CallToolResult, ToolResult};
1275    use turul_mcp_builders::prelude::*;  // HasBaseMetadata, HasDescription, etc.
1276
1277    struct TestTool {
1278        input_schema: ToolSchema,
1279    }
1280
1281    impl TestTool {
1282        fn new() -> Self {
1283            Self {
1284                input_schema: ToolSchema::object(),
1285            }
1286        }
1287    }
1288
1289    impl HasBaseMetadata for TestTool {
1290        fn name(&self) -> &str {
1291            "test"
1292        }
1293        fn title(&self) -> Option<&str> {
1294            Some("Test Tool")
1295        }
1296    }
1297
1298    impl HasDescription for TestTool {
1299        fn description(&self) -> Option<&str> {
1300            Some("Test tool for unit tests")
1301        }
1302    }
1303
1304    impl HasInputSchema for TestTool {
1305        fn input_schema(&self) -> &ToolSchema {
1306            &self.input_schema
1307        }
1308    }
1309
1310    impl HasOutputSchema for TestTool {
1311        fn output_schema(&self) -> Option<&ToolSchema> {
1312            None
1313        }
1314    }
1315
1316    impl HasAnnotations for TestTool {
1317        fn annotations(&self) -> Option<&turul_mcp_protocol::tools::ToolAnnotations> {
1318            None
1319        }
1320    }
1321
1322    impl HasToolMeta for TestTool {
1323        fn tool_meta(&self) -> Option<&HashMap<String, Value>> {
1324            None
1325        }
1326    }
1327
1328    #[async_trait]
1329    impl McpTool for TestTool {
1330        async fn call(
1331            &self,
1332            _args: Value,
1333            _session: Option<crate::SessionContext>,
1334        ) -> crate::McpResult<CallToolResult> {
1335            Ok(CallToolResult::success(vec![ToolResult::text(
1336                "test result",
1337            )]))
1338        }
1339    }
1340
1341    #[test]
1342    fn test_server_creation() {
1343        let server = McpServer::builder()
1344            .name("test-server")
1345            .version("1.0.0")
1346            .tool(TestTool::new())
1347            .build()
1348            .unwrap();
1349
1350        assert_eq!(server.implementation.name, "test-server");
1351        assert_eq!(server.implementation.version, "1.0.0");
1352        assert_eq!(server.tools.len(), 1);
1353    }
1354
1355    #[tokio::test]
1356    async fn test_list_tools_handler() {
1357        let mut tools: HashMap<String, Arc<dyn McpTool>> = HashMap::new();
1358        tools.insert("test".to_string(), Arc::new(TestTool::new()));
1359
1360        let handler = ListToolsHandler::new(tools);
1361        let result = handler.handle("tools/list", None, None).await.unwrap();
1362
1363        let response: ListToolsResult = serde_json::from_value(result).unwrap();
1364        assert_eq!(response.tools.len(), 1);
1365        assert_eq!(response.tools[0].name, "test");
1366    }
1367
1368    #[tokio::test]
1369    async fn test_tool_handler() {
1370        let mut tools: HashMap<String, Arc<dyn McpTool>> = HashMap::new();
1371        tools.insert("test".to_string(), Arc::new(TestTool::new()));
1372
1373        let session_manager = Arc::new(SessionManager::new(ServerCapabilities::default()));
1374        let handler = SessionAwareToolHandler::new(tools, session_manager, false);
1375        // Create params matching the CallToolParams structure
1376        let params = turul_mcp_json_rpc_server::RequestParams::Object(
1377            [
1378                ("name".to_string(), serde_json::json!("test")),
1379                ("arguments".to_string(), serde_json::json!({})),
1380            ]
1381            .into_iter()
1382            .collect(),
1383        );
1384
1385        let result = handler
1386            .handle("tools/call", Some(params), None)
1387            .await
1388            .unwrap();
1389        let response: CallToolResult = serde_json::from_value(result).unwrap();
1390
1391        assert_eq!(response.content.len(), 1);
1392        if let ToolResult::Text { text, .. } = &response.content[0] {
1393            assert_eq!(text, "test result");
1394        }
1395    }
1396}