ultrafast_mcp_server/
server.rs

1//! UltraFastServer implementation module
2//!
3//! This module contains the main server implementation with all the core functionality.
4
5use std::{borrow::Cow, collections::HashMap, sync::Arc};
6use tokio::sync::{RwLock, broadcast};
7use tracing::{error, info, warn};
8
9use ultrafast_mcp_core::{
10    config::TimeoutConfig,
11    error::{MCPError, MCPResult},
12    protocol::{
13        capabilities::ServerCapabilities,
14        jsonrpc::{JsonRpcError, JsonRpcMessage, JsonRpcRequest, JsonRpcResponse},
15    },
16    schema::validation::validate_tool_schema,
17    types::{
18        notifications::{LogLevel, LogLevelSetRequest, LogLevelSetResponse},
19        prompts::Prompt,
20        resources::{Resource, SubscribeResponse},
21        roots::{RootsListChangedNotification, SetRootsRequest, SetRootsResponse},
22        server::ServerInfo,
23        tools::Tool,
24    },
25    utils::{CancellationManager, PingManager},
26};
27#[cfg(feature = "http")]
28use ultrafast_mcp_transport::streamable_http::server::{HttpTransportConfig, HttpTransportServer};
29use ultrafast_mcp_transport::{Transport, TransportConfig, create_transport};
30
31use crate::context::{Context, LoggerConfig};
32use crate::handlers::*;
33
34/// MCP Server state
35#[derive(Debug, Clone, PartialEq, Eq)]
36pub enum ServerState {
37    Uninitialized,
38    Initializing,
39    Initialized,
40    Operating,
41    ShuttingDown,
42    Shutdown,
43}
44
45impl ServerState {
46    /// Check if the server can accept operations
47    /// According to MCP 2025-06-18 specification, operations are allowed
48    /// once the server is initialized (after initialize response)
49    pub fn can_operate(&self) -> bool {
50        matches!(self, ServerState::Initialized | ServerState::Operating)
51    }
52
53    /// Check if the server is initialized
54    pub fn is_initialized(&self) -> bool {
55        matches!(self, ServerState::Initialized | ServerState::Operating)
56    }
57
58    /// Check if the server is shutting down
59    pub fn is_shutting_down(&self) -> bool {
60        matches!(self, ServerState::ShuttingDown | ServerState::Shutdown)
61    }
62}
63
64/// Tool registration error
65#[derive(Debug, thiserror::Error)]
66pub enum ToolRegistrationError {
67    #[error("Tool with name '{0}' already exists")]
68    ToolAlreadyExists(String),
69    #[error("Invalid tool schema: {0}")]
70    InvalidSchema(String),
71    #[error("Tool name '{0}' is reserved")]
72    ReservedName(String),
73    #[error("Tool description is required")]
74    MissingDescription,
75    #[error("Tool input schema is required")]
76    MissingInputSchema,
77    #[error("Tool output schema is required")]
78    MissingOutputSchema,
79}
80
81/// Server logging configuration
82#[derive(Debug, Clone)]
83pub struct ServerLoggingConfig {
84    /// Current minimum log level
85    pub current_level: LogLevel,
86    /// Whether clients can change the log level
87    pub allow_level_changes: bool,
88    /// Default logger configuration for new contexts
89    pub default_logger_config: LoggerConfig,
90}
91
92impl Default for ServerLoggingConfig {
93    fn default() -> Self {
94        Self {
95            current_level: LogLevel::Info,
96            allow_level_changes: true,
97            default_logger_config: LoggerConfig::default(),
98        }
99    }
100}
101
102/// MCP Server implementation
103#[derive(Clone)]
104pub struct UltraFastServer {
105    info: ServerInfo,
106    capabilities: ServerCapabilities,
107    state: Arc<RwLock<ServerState>>,
108    tools: Arc<RwLock<HashMap<String, Tool>>>,
109    resources: Arc<RwLock<HashMap<String, Resource>>>,
110    prompts: Arc<RwLock<HashMap<String, Prompt>>>,
111    tool_handler: Option<Arc<dyn ToolHandler>>,
112    resource_handler: Option<Arc<dyn ResourceHandler>>,
113    prompt_handler: Option<Arc<dyn PromptHandler>>,
114    sampling_handler: Option<Arc<dyn SamplingHandler>>,
115    completion_handler: Option<Arc<dyn CompletionHandler>>,
116    roots_handler: Option<Arc<dyn RootsHandler>>,
117    elicitation_handler: Option<Arc<dyn ElicitationHandler>>,
118    subscription_handler: Option<Arc<dyn ResourceSubscriptionHandler>>,
119    #[allow(dead_code)]
120    resource_subscriptions: Arc<RwLock<HashMap<String, Vec<String>>>>,
121    cancellation_manager: Arc<CancellationManager>,
122    ping_manager: Arc<PingManager>,
123    // Enhanced logging configuration
124    logging_config: Arc<RwLock<ServerLoggingConfig>>,
125
126    #[cfg(feature = "monitoring")]
127    monitoring_system: Option<Arc<crate::MonitoringSystem>>,
128
129    // Advanced handlers
130    advanced_sampling_handler: Option<Arc<dyn AdvancedSamplingHandler>>,
131
132    // Timeout configuration (MCP 2025-06-18 compliance)
133    timeout_config: Arc<TimeoutConfig>,
134    // Authentication middleware (removed oauth feature)
135}
136
137impl std::fmt::Debug for UltraFastServer {
138    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
139        f.debug_struct("UltraFastServer")
140            .field("info", &self.info)
141            .field("capabilities", &self.capabilities)
142            .finish()
143    }
144}
145
146impl UltraFastServer {
147    /// Create a new UltraFastServer with the given info and capabilities
148    pub fn new(info: ServerInfo, capabilities: ServerCapabilities) -> Self {
149        Self {
150            info,
151            capabilities,
152            state: Arc::new(RwLock::new(ServerState::Uninitialized)),
153            tools: Arc::new(RwLock::new(HashMap::new())),
154            resources: Arc::new(RwLock::new(HashMap::new())),
155            prompts: Arc::new(RwLock::new(HashMap::new())),
156            tool_handler: None,
157            resource_handler: None,
158            prompt_handler: None,
159            sampling_handler: None,
160            completion_handler: None,
161            roots_handler: None,
162            elicitation_handler: None,
163            subscription_handler: None,
164            resource_subscriptions: Arc::new(RwLock::new(HashMap::new())),
165            cancellation_manager: Arc::new(CancellationManager::new()),
166            ping_manager: Arc::new(PingManager::default()),
167            logging_config: Arc::new(RwLock::new(ServerLoggingConfig::default())),
168
169            #[cfg(feature = "monitoring")]
170            monitoring_system: None,
171
172            // Advanced handlers
173            advanced_sampling_handler: None,
174
175            // Timeout configuration (MCP 2025-06-18 compliance)
176            timeout_config: Arc::new(TimeoutConfig::default()),
177        }
178    }
179
180    /// Configure server logging
181    pub async fn set_logging_config(&self, config: ServerLoggingConfig) {
182        let mut logging_config = self.logging_config.write().await;
183        *logging_config = config;
184        info!("Server logging configuration updated");
185    }
186
187    /// Get current server logging configuration
188    pub async fn get_logging_config(&self) -> ServerLoggingConfig {
189        self.logging_config.read().await.clone()
190    }
191
192    /// Set timeout configuration
193    pub fn with_timeout_config(mut self, config: TimeoutConfig) -> Self {
194        self.timeout_config = Arc::new(config);
195        self
196    }
197
198    /// Get current timeout configuration
199    pub fn get_timeout_config(&self) -> TimeoutConfig {
200        (*self.timeout_config).clone()
201    }
202
203    /// Set timeout configuration for high-performance scenarios
204    pub fn with_high_performance_timeouts(mut self) -> Self {
205        self.timeout_config = Arc::new(TimeoutConfig::high_performance());
206        self
207    }
208
209    /// Set timeout configuration for long-running operations
210    pub fn with_long_running_timeouts(mut self) -> Self {
211        self.timeout_config = Arc::new(TimeoutConfig::long_running());
212        self
213    }
214
215    /// Get timeout for a specific operation
216    pub fn get_operation_timeout(&self, operation: &str) -> std::time::Duration {
217        self.timeout_config.get_timeout_for_operation(operation)
218    }
219
220    /// Validate timeout configuration
221    pub fn validate_timeout_config(&self) -> Result<(), String> {
222        let config = &self.timeout_config;
223
224        // Validate all timeouts are within bounds
225        if !config.validate_timeout(config.connect_timeout) {
226            return Err("Connect timeout is out of bounds".to_string());
227        }
228        if !config.validate_timeout(config.request_timeout) {
229            return Err("Request timeout is out of bounds".to_string());
230        }
231        if !config.validate_timeout(config.response_timeout) {
232            return Err("Response timeout is out of bounds".to_string());
233        }
234        if !config.validate_timeout(config.tool_execution_timeout) {
235            return Err("Tool execution timeout is out of bounds".to_string());
236        }
237        if !config.validate_timeout(config.resource_read_timeout) {
238            return Err("Resource read timeout is out of bounds".to_string());
239        }
240        if !config.validate_timeout(config.prompt_generation_timeout) {
241            return Err("Prompt generation timeout is out of bounds".to_string());
242        }
243        if !config.validate_timeout(config.sampling_timeout) {
244            return Err("Sampling timeout is out of bounds".to_string());
245        }
246        if !config.validate_timeout(config.completion_timeout) {
247            return Err("Completion timeout is out of bounds".to_string());
248        }
249        if !config.validate_timeout(config.shutdown_timeout) {
250            return Err("Shutdown timeout is out of bounds".to_string());
251        }
252        if !config.validate_timeout(config.heartbeat_interval) {
253            return Err("Heartbeat interval is out of bounds".to_string());
254        }
255
256        Ok(())
257    }
258
259    /// Set the current log level
260    pub async fn set_log_level(&self, level: LogLevel) -> MCPResult<()> {
261        let mut logging_config = self.logging_config.write().await;
262
263        if !logging_config.allow_level_changes {
264            return Err(MCPError::invalid_request(
265                "Log level changes are not allowed on this server".to_string(),
266            ));
267        }
268
269        logging_config.current_level = level.clone();
270        logging_config.default_logger_config.min_level = level.clone();
271
272        info!("Server log level changed to: {:?}", level);
273        Ok(())
274    }
275
276    /// Get the current log level
277    pub async fn get_log_level(&self) -> LogLevel {
278        self.logging_config.read().await.current_level.clone()
279    }
280
281    // ===== FLUENT BUILDER METHODS =====
282
283    /// Enable monitoring with custom configuration
284    #[cfg(feature = "monitoring")]
285    pub fn with_monitoring_config(mut self, config: crate::MonitoringConfig) -> Self {
286        let monitoring = crate::MonitoringSystem::new(config);
287        self.monitoring_system = Some(Arc::new(monitoring));
288        info!("Monitoring enabled with custom configuration");
289        self
290    }
291    #[cfg(not(feature = "monitoring"))]
292    pub fn with_monitoring_config(self, _config: ()) -> Self {
293        warn!("Monitoring feature not enabled. Add 'monitoring' feature to enable monitoring.");
294        self
295    }
296
297    /// Enable monitoring with default configuration
298    #[cfg(feature = "monitoring")]
299    pub fn with_monitoring(mut self) -> Self {
300        let monitoring = crate::MonitoringSystem::new(crate::MonitoringConfig::default());
301        self.monitoring_system = Some(Arc::new(monitoring));
302        info!("Monitoring enabled with default configuration");
303        self
304    }
305    #[cfg(not(feature = "monitoring"))]
306    pub fn with_monitoring(self) -> Self {
307        warn!("Monitoring feature not enabled. Add 'monitoring' feature to enable monitoring.");
308        self
309    }
310
311    /// Enable health checks with default configuration
312    #[cfg(feature = "monitoring")]
313    pub fn with_health_checks(mut self) -> Self {
314        if let Some(_monitoring) = &self.monitoring_system {
315            // Health checks are automatically initialized when monitoring system is created
316            info!("Health checks enabled");
317        } else {
318            // Create monitoring system if not already present
319            let mut config = crate::MonitoringConfig::default();
320            config.health.enabled = true;
321            let monitoring = crate::MonitoringSystem::new(config);
322            self.monitoring_system = Some(Arc::new(monitoring));
323            info!("Health checks enabled with new monitoring system");
324        }
325        self
326    }
327    #[cfg(not(feature = "monitoring"))]
328    pub fn with_health_checks(self) -> Self {
329        warn!(
330            "Health checks require monitoring feature. Add 'monitoring' feature to enable health checks."
331        );
332        self
333    }
334
335    /// Enable metrics collection with default configuration
336    #[cfg(feature = "monitoring")]
337    pub fn with_metrics(mut self) -> Self {
338        if let Some(_monitoring) = &self.monitoring_system {
339            // Metrics are automatically available via monitoring.metrics()
340            info!("Metrics collection enabled");
341        } else {
342            // Create monitoring system if not already present
343            let mut config = crate::MonitoringConfig::default();
344            config.metrics.enabled = true;
345            let monitoring = crate::MonitoringSystem::new(config);
346            self.monitoring_system = Some(Arc::new(monitoring));
347            info!("Metrics collection enabled with new monitoring system");
348        }
349        self
350    }
351    #[cfg(not(feature = "monitoring"))]
352    pub fn with_metrics(self) -> Self {
353        warn!("Metrics require monitoring feature. Add 'monitoring' feature to enable metrics.");
354        self
355    }
356
357    /// Enable tracing with default configuration
358    #[cfg(feature = "monitoring")]
359    pub fn with_tracing(mut self) -> Self {
360        if let Some(_monitoring) = &self.monitoring_system {
361            // Tracing is configured via the monitoring config
362            info!("Tracing enabled");
363        } else {
364            // Create monitoring system if not already present
365            let mut config = crate::MonitoringConfig::default();
366            config.tracing.enabled = true;
367            let monitoring = crate::MonitoringSystem::new(config);
368            self.monitoring_system = Some(Arc::new(monitoring));
369            info!("Tracing enabled with new monitoring system");
370        }
371        self
372    }
373    #[cfg(not(feature = "monitoring"))]
374    pub fn with_tracing(self) -> Self {
375        warn!("Tracing requires monitoring feature. Add 'monitoring' feature to enable tracing.");
376        self
377    }
378
379    /// Enable all monitoring features (health checks, metrics, tracing)
380    #[cfg(feature = "monitoring")]
381    pub fn with_full_monitoring(mut self) -> Self {
382        let mut config = crate::MonitoringConfig::default();
383        config.health.enabled = true;
384        config.metrics.enabled = true;
385        config.tracing.enabled = true;
386        let monitoring = crate::MonitoringSystem::new(config);
387        self.monitoring_system = Some(Arc::new(monitoring));
388        info!("Full monitoring enabled (health checks, metrics, tracing)");
389        self
390    }
391    #[cfg(not(feature = "monitoring"))]
392    pub fn with_full_monitoring(self) -> Self {
393        warn!(
394            "Full monitoring requires monitoring feature. Add 'monitoring' feature to enable all monitoring features."
395        );
396        self
397    }
398
399    /// Enable middleware support
400    pub fn with_middleware(self) -> Self {
401        // This would integrate with the transport middleware system
402        info!("Middleware support enabled");
403        self
404    }
405
406    /// Enable recovery mechanisms
407    pub fn with_recovery(self) -> Self {
408        info!("Recovery mechanisms enabled");
409        self
410    }
411
412    /// Enable OAuth authentication
413    pub fn with_oauth(self) -> Self {
414        info!("OAuth authentication enabled");
415        self
416    }
417
418    /// Enable authentication with custom configuration (feature removed)
419    pub fn with_authentication(self, _token_validator: (), _required_scopes: Vec<String>) -> Self {
420        warn!("Authentication feature has been removed. Use ultrafast-mcp-auth crate directly.");
421        self
422    }
423
424    /// Enable Bearer token authentication (feature removed)
425    pub fn with_bearer_auth(self, _secret: String, _required_scopes: Vec<String>) -> Self {
426        warn!(
427            "Bearer authentication feature has been removed. Use ultrafast-mcp-auth crate directly."
428        );
429        self
430    }
431
432    /// Enable API key authentication
433    pub fn with_api_key_auth(self) -> Self {
434        info!("API key authentication enabled");
435        self
436    }
437
438    /// Enable Basic authentication
439    pub fn with_basic_auth(self) -> Self {
440        info!("Basic authentication enabled");
441        self
442    }
443
444    /// Enable rate limiting
445    pub fn with_rate_limiting(self, requests_per_minute: u32) -> Self {
446        info!(
447            "Rate limiting enabled: {} requests per minute",
448            requests_per_minute
449        );
450        self
451    }
452
453    /// Enable request validation
454    pub fn with_request_validation(self) -> Self {
455        info!("Request validation enabled");
456        self
457    }
458
459    /// Enable response caching
460    pub fn with_response_caching(self) -> Self {
461        info!("Response caching enabled");
462        self
463    }
464
465    /// Get the monitoring system if available
466    #[cfg(feature = "monitoring")]
467    pub fn monitoring(&self) -> Option<Arc<crate::MonitoringSystem>> {
468        self.monitoring_system.clone()
469    }
470    #[cfg(not(feature = "monitoring"))]
471    pub fn monitoring(&self) -> Option<()> {
472        None
473    }
474
475    /// Create a context with the current server logging configuration
476    pub async fn create_context(&self) -> Context {
477        let logging_config = self.logging_config.read().await;
478        let logger_config = logging_config.default_logger_config.clone();
479
480        Context::new().with_logger_config(logger_config)
481    }
482
483    /// Create a context with custom request and session IDs
484    pub async fn create_context_with_ids(
485        &self,
486        request_id: String,
487        session_id: Option<String>,
488    ) -> Context {
489        let logging_config = self.logging_config.read().await;
490        let logger_config = logging_config.default_logger_config.clone();
491
492        let mut context = Context::new()
493            .with_request_id(request_id)
494            .with_logger_config(logger_config);
495
496        if let Some(session_id) = session_id {
497            context = context.with_session_id(session_id);
498        }
499
500        context
501    }
502
503    /// Register a tool with validation
504    pub async fn register_tool(&self, tool: Tool) -> Result<(), ToolRegistrationError> {
505        // Validate tool name
506        if tool.name.is_empty() {
507            return Err(ToolRegistrationError::MissingDescription);
508        }
509
510        if self.is_reserved_name(&tool.name) {
511            return Err(ToolRegistrationError::ReservedName(tool.name.clone()));
512        }
513
514        // Validate required fields
515        if tool.description.is_empty() {
516            return Err(ToolRegistrationError::MissingDescription);
517        }
518
519        // Validate tool schema
520        if let Err(e) = validate_tool_schema(&tool.input_schema) {
521            return Err(ToolRegistrationError::InvalidSchema(format!(
522                "Input schema: {e}"
523            )));
524        }
525
526        if let Some(output_schema) = &tool.output_schema {
527            if let Err(e) = validate_tool_schema(output_schema) {
528                return Err(ToolRegistrationError::InvalidSchema(format!(
529                    "Output schema: {e}"
530                )));
531            }
532        } else {
533            return Err(ToolRegistrationError::MissingOutputSchema);
534        }
535
536        // Check for existing tool
537        let mut tools = self.tools.write().await;
538        if tools.contains_key(&tool.name) {
539            return Err(ToolRegistrationError::ToolAlreadyExists(tool.name.clone()));
540        }
541
542        // Register the tool
543        let tool_name = tool.name.clone();
544        tools.insert(tool_name.clone(), tool);
545        info!("Registered tool: {}", tool_name);
546
547        Ok(())
548    }
549
550    /// Register multiple tools
551    pub async fn register_tools(&self, tools: Vec<Tool>) -> Result<(), ToolRegistrationError> {
552        for tool in tools {
553            self.register_tool(tool).await?;
554        }
555        Ok(())
556    }
557
558    /// Unregister a tool by name
559    pub async fn unregister_tool(&self, name: &str) -> bool {
560        let mut tools = self.tools.write().await;
561        tools.remove(name).is_some()
562    }
563
564    /// Get a tool by name
565    pub async fn get_tool(&self, name: &str) -> Option<Tool> {
566        let tools = self.tools.read().await;
567        tools.get(name).cloned()
568    }
569
570    /// List all registered tools
571    pub async fn list_tools(&self) -> Vec<Tool> {
572        let tools = self.tools.read().await;
573        tools.values().cloned().collect()
574    }
575
576    /// Check if a tool exists
577    pub async fn has_tool(&self, name: &str) -> bool {
578        let tools = self.tools.read().await;
579        tools.contains_key(name)
580    }
581
582    /// Get tool count
583    pub async fn tool_count(&self) -> usize {
584        let tools = self.tools.read().await;
585        tools.len()
586    }
587
588    /// Clear all tools
589    pub async fn clear_tools(&self) {
590        let mut tools = self.tools.write().await;
591        let count = tools.len();
592        tools.clear();
593        info!("Cleared {} tools", count);
594    }
595
596    /// Check if a name is reserved
597    fn is_reserved_name(&self, name: &str) -> bool {
598        // MCP reserved method names
599        let reserved_names = [
600            "initialize",
601            "initialized",
602            "shutdown",
603            "exit",
604            "ping",
605            "tools/list",
606            "tools/call",
607            "resources/list",
608            "resources/read",
609            "resources/subscribe",
610            "resources/unsubscribe",
611            "prompts/list",
612            "prompts/get",
613            "sampling/create",
614            "completion/complete",
615            "roots/list",
616            "elicitation/request",
617            "logging/setLevel",
618        ];
619
620        reserved_names.contains(&name)
621    }
622
623    /// Validate tool call arguments against tool schema
624    pub async fn validate_tool_call(
625        &self,
626        tool_name: &str,
627        arguments: &serde_json::Value,
628    ) -> Result<(), MCPError> {
629        let tool = self.get_tool(tool_name).await;
630        let tool =
631            tool.ok_or_else(|| MCPError::invalid_request(format!("Tool '{tool_name}' not found")))?;
632
633        ultrafast_mcp_core::schema::validation::validate_tool_input(arguments, &tool.input_schema)
634            .map_err(|e| {
635                MCPError::invalid_request(format!(
636                    "Tool '{tool_name}' input validation failed: {e}"
637                ))
638            })?;
639
640        Ok(())
641    }
642
643    /// Execute a tool call with validation
644    pub async fn execute_tool_call(
645        &self,
646        tool_name: &str,
647        arguments: serde_json::Value,
648    ) -> Result<ultrafast_mcp_core::types::tools::ToolResult, MCPError> {
649        // Validate the tool call
650        self.validate_tool_call(tool_name, &arguments).await?;
651
652        // Get the tool handler
653        let tool_handler = self
654            .tool_handler
655            .as_ref()
656            .ok_or_else(|| MCPError::internal_error("No tool handler configured".to_string()))?;
657
658        // Create the tool call
659        let tool_call = ultrafast_mcp_core::types::tools::ToolCall {
660            name: tool_name.to_string(),
661            arguments: Some(arguments),
662        };
663
664        // Execute the tool call
665        tool_handler
666            .handle_tool_call(tool_call)
667            .await
668            .map_err(|e| MCPError::internal_error(format!("Tool execution failed: {e}")))
669    }
670
671    /// Add a tool handler to the server
672    pub fn with_tool_handler(mut self, handler: Arc<dyn ToolHandler>) -> Self {
673        self.tool_handler = Some(handler);
674        self
675    }
676
677    /// Add a resource handler to the server
678    pub fn with_resource_handler(mut self, handler: Arc<dyn ResourceHandler>) -> Self {
679        self.resource_handler = Some(handler);
680        self
681    }
682
683    /// Add a prompt handler to the server
684    pub fn with_prompt_handler(mut self, handler: Arc<dyn PromptHandler>) -> Self {
685        self.prompt_handler = Some(handler);
686        self
687    }
688
689    /// Add a sampling handler to the server
690    pub fn with_sampling_handler(mut self, handler: Arc<dyn SamplingHandler>) -> Self {
691        self.sampling_handler = Some(handler);
692        self
693    }
694
695    /// Add a completion handler to the server
696    pub fn with_completion_handler(mut self, handler: Arc<dyn CompletionHandler>) -> Self {
697        self.completion_handler = Some(handler);
698        self
699    }
700
701    /// Add a roots handler to the server
702    pub fn with_roots_handler(mut self, handler: Arc<dyn RootsHandler>) -> Self {
703        self.roots_handler = Some(handler);
704        // Note: Roots is a client capability, not server capability
705        // The server responds to roots requests but doesn't advertise it
706        self
707    }
708
709    /// Add an elicitation handler to the server
710    pub fn with_elicitation_handler(mut self, handler: Arc<dyn ElicitationHandler>) -> Self {
711        self.elicitation_handler = Some(handler);
712        // Note: Elicitation is a client capability, not server capability
713        // The server responds to elicitation requests but doesn't advertise it
714        self
715    }
716
717    /// Add a subscription handler to the server
718    pub fn with_subscription_handler(
719        mut self,
720        handler: Arc<dyn ResourceSubscriptionHandler>,
721    ) -> Self {
722        self.subscription_handler = Some(handler);
723        self
724    }
725
726    /// Configure logging with a custom configuration
727    pub fn with_logging_config(mut self, config: ServerLoggingConfig) -> Self {
728        let logging_config = Arc::get_mut(&mut self.logging_config)
729            .expect("Cannot modify logging config after server has been cloned");
730        *logging_config.get_mut() = config;
731        self
732    }
733
734    /// Run the server with stdio transport
735    pub async fn run_stdio(&self) -> MCPResult<()> {
736        let transport = create_transport(TransportConfig::Stdio)
737            .await
738            .map_err(|e| MCPError::internal_error(format!("Transport creation failed: {e}")))?;
739        self.run_with_transport(transport).await
740    }
741
742    /// Run the server with a custom transport
743    pub async fn run_with_transport(&self, mut transport: Box<dyn Transport>) -> MCPResult<()> {
744        info!("Starting UltraFastServer with transport");
745
746        // Initialize the server
747        *self.state.write().await = ServerState::Initializing;
748
749        // Start message handling loop
750        loop {
751            match transport.receive_message().await {
752                Ok(message) => {
753                    if let Err(e) = self.handle_message(message, &mut transport).await {
754                        error!("Error handling message: {}", e);
755                    }
756                }
757                Err(e) => {
758                    error!("Transport error: {}", e);
759                    break;
760                }
761            }
762        }
763
764        Ok(())
765    }
766
767    /// Run the server with Streamable HTTP transport
768    #[cfg(feature = "http")]
769    pub async fn run_streamable_http(&self, host: &str, port: u16) -> MCPResult<()> {
770        info!(
771            "Starting UltraFastServer with Streamable HTTP on {}:{}",
772            host, port
773        );
774
775        let config = HttpTransportConfig {
776            host: host.to_string(),
777            port,
778            ..Default::default()
779        };
780
781        self.run_http(config).await
782    }
783
784    /// Run the server with HTTP transport
785    #[cfg(feature = "http")]
786    pub async fn run_http(&self, config: HttpTransportConfig) -> MCPResult<()> {
787        info!("Starting HTTP transport server with config: {:?}", config);
788
789        let transport_server = HttpTransportServer::new(config);
790        let message_receiver = transport_server.get_message_receiver();
791        let message_sender = transport_server.get_message_sender();
792        let response_sender = transport_server.get_response_sender();
793
794        // Start message processing task
795        let server_clone = self.clone();
796        let _message_processor = tokio::spawn(async move {
797            server_clone
798                .process_http_messages(message_receiver, message_sender, response_sender)
799                .await;
800        });
801
802        // Start the HTTP server
803        transport_server
804            .run()
805            .await
806            .map_err(|e| MCPError::internal_error(format!("HTTP server failed: {e}")))
807    }
808
809    /// Run the server with custom Streamable HTTP transport configuration
810    /// This provides clearer naming for advanced Streamable HTTP configuration
811    #[cfg(feature = "http")]
812    pub async fn run_streamable_http_with_config(
813        &self,
814        config: HttpTransportConfig,
815    ) -> MCPResult<()> {
816        self.run_http(config).await
817    }
818
819    /// Process HTTP messages from the transport layer
820    #[allow(dead_code)]
821    async fn process_http_messages(
822        &self,
823        mut message_receiver: broadcast::Receiver<(String, JsonRpcMessage)>,
824        _message_sender: broadcast::Sender<(String, JsonRpcMessage)>,
825        response_sender: broadcast::Sender<(String, JsonRpcMessage)>,
826    ) {
827        info!("HTTP message processor started");
828
829        while let Ok((session_id, message)) = message_receiver.recv().await {
830            let session_id_clone = session_id.clone();
831            match message {
832                JsonRpcMessage::Request(request) => {
833                    info!(
834                        "Processing HTTP request: {} (session: {})",
835                        request.method, session_id
836                    );
837
838                    let response = self.handle_request(request).await;
839                    let response_message = JsonRpcMessage::Response(response);
840
841                    info!(
842                        "Sending response for session {}: {:?}",
843                        session_id, response_message
844                    );
845
846                    // Send the response back through the response sender
847                    if let Err(e) = response_sender.send((session_id_clone, response_message)) {
848                        error!("Failed to send response for session {}: {}", session_id, e);
849                    } else {
850                        info!("Successfully sent response for session {}", session_id);
851                    }
852                }
853                JsonRpcMessage::Notification(notification) => {
854                    info!(
855                        "Processing HTTP notification: {} (session: {})",
856                        notification.method, session_id
857                    );
858
859                    if let Err(e) = self.handle_notification(notification).await {
860                        error!(
861                            "Failed to handle notification for session {}: {}",
862                            session_id, e
863                        );
864                    }
865                    // Notifications don't have responses, so no need to send anything back
866                }
867                JsonRpcMessage::Response(_) => {
868                    warn!(
869                        "Received unexpected response message for session: {}",
870                        session_id
871                    );
872                }
873            }
874        }
875
876        info!("HTTP message processor stopped");
877    }
878
879    /// Get server info
880    pub fn info(&self) -> &ServerInfo {
881        &self.info
882    }
883
884    /// Get cancellation manager
885    pub fn cancellation_manager(&self) -> Arc<CancellationManager> {
886        self.cancellation_manager.clone()
887    }
888
889    /// Get ping manager
890    pub fn ping_manager(&self) -> Arc<PingManager> {
891        self.ping_manager.clone()
892    }
893
894    /// Start periodic ping monitoring (optional, for connection health)
895    /// This method should be called after the server is running with a transport
896    pub async fn start_ping_monitoring(&self, ping_interval: std::time::Duration) -> MCPResult<()> {
897        info!(
898            "Starting periodic ping monitoring with interval: {:?}",
899            ping_interval
900        );
901
902        // Note: This is a placeholder for future implementation
903        // The actual ping monitoring would need to be integrated with the transport layer
904        // For now, we log that ping monitoring is enabled
905        info!("Ping monitoring enabled (interval: {:?})", ping_interval);
906
907        // The PingManager is already configured with default intervals
908        // Future implementation would integrate with the transport layer
909        // to send periodic pings to clients
910
911        Ok(())
912    }
913
914    /// Stop ping monitoring
915    pub async fn stop_ping_monitoring(&self) -> MCPResult<()> {
916        info!("Stopping periodic ping monitoring");
917        // The ping monitoring task will naturally stop when the transport is closed
918        Ok(())
919    }
920
921    /// Handle MCP initialize request
922    async fn handle_initialize(
923        &self,
924        request: ultrafast_mcp_core::protocol::InitializeRequest,
925    ) -> Result<ultrafast_mcp_core::protocol::InitializeResponse, MCPError> {
926        info!(
927            "Handling initialize request from client: {} (version: {})",
928            request.client_info.name, request.protocol_version
929        );
930
931        // Negotiate protocol version
932        let negotiated_version = match ultrafast_mcp_core::protocol::version::negotiate_version(
933            &request.protocol_version,
934        ) {
935            Ok(version) => {
936                info!(
937                    "Protocol version negotiated: {} -> {}",
938                    request.protocol_version, version
939                );
940                version
941            }
942            Err(e) => {
943                error!("Protocol version negotiation failed: {}", e);
944                return Err(MCPError::invalid_request(format!(
945                    "Protocol version negotiation failed: {}. Supported versions: {:?}",
946                    e,
947                    ultrafast_mcp_core::protocol::version::SUPPORTED_VERSIONS
948                )));
949            }
950        };
951
952        // Validate the initialize request
953        if let Err(e) = request.validate_protocol_version() {
954            warn!("Initialize request validation warning: {}", e);
955            // Continue with warning but don't fail
956        }
957
958        // Validate compatibility
959        if let Err(e) = ultrafast_mcp_core::protocol::capabilities::validate_compatibility(
960            &request.capabilities,
961            &self.capabilities,
962        ) {
963            error!("Capability validation failed: {}", e);
964            return Err(MCPError::Protocol(
965                ultrafast_mcp_core::error::ProtocolError::CapabilityNotSupported(e),
966            ));
967        }
968
969        info!("Capabilities validated successfully");
970
971        // Update server state to Initialized (not Operating yet)
972        // This follows MCP 2025-06-18 specification: server should wait for initialized notification
973        {
974            let mut state = self.state.write().await;
975            *state = ServerState::Initialized;
976        }
977
978        info!(
979            "Server initialized with protocol version: {} (waiting for initialized notification)",
980            negotiated_version
981        );
982
983        Ok(ultrafast_mcp_core::protocol::InitializeResponse {
984            protocol_version: negotiated_version,
985            capabilities: self.capabilities.clone(),
986            server_info: self.info.clone(),
987            instructions: None,
988        })
989    }
990
991    /// Handle MCP initialized notification
992    async fn handle_initialized(
993        &self,
994        _notification: ultrafast_mcp_core::protocol::InitializedNotification,
995    ) -> MCPResult<()> {
996        info!("Received initialized notification from client");
997
998        // Ensure server state is operating (it should already be from initialize)
999        {
1000            let mut state = self.state.write().await;
1001            *state = ServerState::Operating;
1002        }
1003
1004        info!("Server confirmed operating state via initialized notification");
1005        Ok(())
1006    }
1007
1008    /// Handle MCP shutdown request
1009    async fn handle_shutdown(
1010        &self,
1011        request: ultrafast_mcp_core::protocol::ShutdownRequest,
1012    ) -> MCPResult<()> {
1013        info!("Handling shutdown request: {:?}", request.reason);
1014
1015        // Update server state
1016        {
1017            let mut state = self.state.write().await;
1018            *state = ServerState::ShuttingDown;
1019        }
1020
1021        // Perform cleanup
1022        self.perform_shutdown_cleanup().await;
1023
1024        // Update state to shutdown
1025        {
1026            let mut state = self.state.write().await;
1027            *state = ServerState::Shutdown;
1028        }
1029
1030        info!("Server shutdown completed");
1031        Ok(())
1032    }
1033
1034    /// Perform shutdown cleanup
1035    async fn perform_shutdown_cleanup(&self) {
1036        info!("Performing shutdown cleanup");
1037
1038        // Clear all tools
1039        self.clear_tools().await;
1040
1041        // Clear all resources
1042        {
1043            let mut resources = self.resources.write().await;
1044            resources.clear();
1045        }
1046
1047        // Clear all prompts
1048        {
1049            let mut prompts = self.prompts.write().await;
1050            prompts.clear();
1051        }
1052
1053        // Clear resource subscriptions
1054        {
1055            let mut subscriptions = self.resource_subscriptions.write().await;
1056            subscriptions.clear();
1057        }
1058
1059        info!("Shutdown cleanup completed");
1060    }
1061
1062    /// Get current server state
1063    pub async fn get_state(&self) -> ServerState {
1064        self.state.read().await.clone()
1065    }
1066
1067    /// Check if server can accept operations
1068    pub async fn can_operate(&self) -> bool {
1069        self.state.read().await.can_operate()
1070    }
1071
1072    /// Helper function to deserialize request parameters with proper defaults
1073    fn deserialize_list_tools_request(
1074        &self,
1075        params: Option<serde_json::Value>,
1076    ) -> ultrafast_mcp_core::types::tools::ListToolsRequest {
1077        serde_json::from_value(params.unwrap_or_default()).unwrap_or_default()
1078    }
1079
1080    fn deserialize_list_resources_request(
1081        &self,
1082        params: Option<serde_json::Value>,
1083    ) -> ultrafast_mcp_core::types::resources::ListResourcesRequest {
1084        serde_json::from_value(params.unwrap_or_default()).unwrap_or_default()
1085    }
1086
1087    fn deserialize_list_prompts_request(
1088        &self,
1089        params: Option<serde_json::Value>,
1090    ) -> ultrafast_mcp_core::types::prompts::ListPromptsRequest {
1091        serde_json::from_value(params.unwrap_or_default()).unwrap_or_default()
1092    }
1093
1094    fn deserialize_get_prompt_request(
1095        &self,
1096        params: Option<serde_json::Value>,
1097    ) -> ultrafast_mcp_core::types::prompts::GetPromptRequest {
1098        serde_json::from_value(params.unwrap_or_default()).unwrap_or_default()
1099    }
1100
1101    fn deserialize_read_resource_request(
1102        &self,
1103        params: Option<serde_json::Value>,
1104    ) -> ultrafast_mcp_core::types::resources::ReadResourceRequest {
1105        serde_json::from_value(params.unwrap_or_default()).unwrap_or_default()
1106    }
1107
1108    fn deserialize_list_resource_templates_request(
1109        &self,
1110        params: Option<serde_json::Value>,
1111    ) -> ultrafast_mcp_core::types::resources::ListResourceTemplatesRequest {
1112        serde_json::from_value(params.unwrap_or_default()).unwrap_or_default()
1113    }
1114
1115    fn deserialize_subscribe_request(
1116        &self,
1117        params: Option<serde_json::Value>,
1118    ) -> ultrafast_mcp_core::types::resources::SubscribeRequest {
1119        serde_json::from_value(params.unwrap_or_default()).unwrap_or_else(|_| {
1120            ultrafast_mcp_core::types::resources::SubscribeRequest { uri: String::new() }
1121        })
1122    }
1123
1124    fn deserialize_unsubscribe_request(
1125        &self,
1126        params: Option<serde_json::Value>,
1127    ) -> ultrafast_mcp_core::types::resources::UnsubscribeRequest {
1128        serde_json::from_value(params.unwrap_or_default()).unwrap_or_else(|_| {
1129            ultrafast_mcp_core::types::resources::UnsubscribeRequest { uri: String::new() }
1130        })
1131    }
1132
1133    fn deserialize_create_message_request(
1134        &self,
1135        params: Option<serde_json::Value>,
1136    ) -> ultrafast_mcp_core::types::sampling::CreateMessageRequest {
1137        serde_json::from_value(params.unwrap_or_default()).unwrap_or_default()
1138    }
1139
1140    fn deserialize_elicitation_request(
1141        &self,
1142        params: Option<serde_json::Value>,
1143    ) -> ultrafast_mcp_core::types::elicitation::ElicitationRequest {
1144        serde_json::from_value(params.unwrap_or_default()).unwrap_or_default()
1145    }
1146
1147    fn deserialize_complete_request(
1148        &self,
1149        params: Option<serde_json::Value>,
1150    ) -> ultrafast_mcp_core::types::completion::CompleteRequest {
1151        match params {
1152            Some(params) => serde_json::from_value(params).unwrap_or_else(|_| {
1153                ultrafast_mcp_core::types::completion::CompleteRequest {
1154                    reference: ultrafast_mcp_core::types::completion::CompletionReference {
1155                        ref_type: "ref/prompt".to_string(),
1156                        name: "".to_string(),
1157                    },
1158                    argument: ultrafast_mcp_core::types::completion::CompletionArgument {
1159                        name: "".to_string(),
1160                        value: "".to_string(),
1161                    },
1162                    context: None,
1163                }
1164            }),
1165            None => ultrafast_mcp_core::types::completion::CompleteRequest {
1166                reference: ultrafast_mcp_core::types::completion::CompletionReference {
1167                    ref_type: "ref/prompt".to_string(),
1168                    name: "".to_string(),
1169                },
1170                argument: ultrafast_mcp_core::types::completion::CompletionArgument {
1171                    name: "".to_string(),
1172                    value: "".to_string(),
1173                },
1174                context: None,
1175            },
1176        }
1177    }
1178
1179    /// Handle incoming messages
1180    async fn handle_message(
1181        &self,
1182        message: JsonRpcMessage,
1183        transport: &mut Box<dyn Transport>,
1184    ) -> MCPResult<()> {
1185        match message {
1186            JsonRpcMessage::Request(request) => {
1187                // Check if this is actually a notification (no ID)
1188                if request.id.is_none() {
1189                    // This is a notification, handle it as such
1190                    self.handle_notification(request).await?;
1191                } else {
1192                    // This is a request, handle it with timeout
1193                    let operation_timeout = self.get_operation_timeout(&request.method);
1194                    let request_id = request.id.clone(); // Clone before moving request
1195                    let response =
1196                        tokio::time::timeout(operation_timeout, self.handle_request(request)).await;
1197
1198                    match response {
1199                        Ok(response) => {
1200                            transport
1201                                .send_message(JsonRpcMessage::Response(response))
1202                                .await
1203                                .map_err(|e| {
1204                                    MCPError::internal_error(format!("Failed to send message: {e}"))
1205                                })?;
1206                        }
1207                        Err(_) => {
1208                            // Request timed out, send timeout error
1209                            let timeout_error = JsonRpcResponse::error(
1210                                JsonRpcError::new(-32000, "Request timeout".to_string()),
1211                                request_id.clone(),
1212                            );
1213                            transport
1214                                .send_message(JsonRpcMessage::Response(timeout_error))
1215                                .await
1216                                .map_err(|e| {
1217                                    MCPError::internal_error(format!(
1218                                        "Failed to send timeout error: {e}"
1219                                    ))
1220                                })?;
1221
1222                            // Send cancellation notification
1223                            if let Some(request_id) = &request_id {
1224                                self.notify_cancelled(
1225                                    serde_json::Value::String(request_id.to_string()),
1226                                    Some("Request timed out".to_string()),
1227                                    transport,
1228                                )
1229                                .await?;
1230                            }
1231                        }
1232                    }
1233                }
1234            }
1235            JsonRpcMessage::Notification(notification) => {
1236                self.handle_notification(notification).await?;
1237            }
1238            JsonRpcMessage::Response(_) => {
1239                warn!("Received unexpected response message");
1240            }
1241        }
1242        Ok(())
1243    }
1244
1245    /// Handle incoming requests
1246    async fn handle_request(&self, request: JsonRpcRequest) -> JsonRpcResponse {
1247        info!(
1248            "Handling request: {} (id: {:?})",
1249            request.method, request.id
1250        );
1251
1252        match request.method.as_str() {
1253            // MCP Lifecycle methods
1254            "initialize" => {
1255                match serde_json::from_value::<ultrafast_mcp_core::protocol::InitializeRequest>(
1256                    request.params.unwrap_or_default(),
1257                ) {
1258                    Ok(init_request) => match self.handle_initialize(init_request).await {
1259                        Ok(response) => match serde_json::to_value(response) {
1260                            Ok(value) => JsonRpcResponse::success(value, request.id),
1261                            Err(e) => JsonRpcResponse::error(
1262                                JsonRpcError::new(-32603, format!("Serialization error: {e}")),
1263                                request.id,
1264                            ),
1265                        },
1266                        Err(e) => JsonRpcResponse::error(
1267                            JsonRpcError::new(-32603, e.to_string()),
1268                            request.id,
1269                        ),
1270                    },
1271                    Err(e) => JsonRpcResponse::error(
1272                        JsonRpcError::invalid_params(Some(format!(
1273                            "Invalid initialize request: {e}"
1274                        ))),
1275                        request.id,
1276                    ),
1277                }
1278            }
1279            "shutdown" => {
1280                let shutdown_request = match serde_json::from_value::<
1281                    ultrafast_mcp_core::protocol::ShutdownRequest,
1282                >(request.params.unwrap_or_default())
1283                {
1284                    Ok(req) => req,
1285                    Err(_) => ultrafast_mcp_core::protocol::ShutdownRequest { reason: None },
1286                };
1287
1288                match self.handle_shutdown(shutdown_request).await {
1289                    Ok(_) => JsonRpcResponse::success(serde_json::json!({}), request.id),
1290                    Err(e) => {
1291                        JsonRpcResponse::error(JsonRpcError::new(-32603, e.to_string()), request.id)
1292                    }
1293                }
1294            }
1295
1296            // Tools methods
1297            "tools/list" => {
1298                if !self.can_operate().await {
1299                    return JsonRpcResponse::error(
1300                        JsonRpcError::internal_error(Some("Server not ready".to_string())),
1301                        request.id,
1302                    );
1303                }
1304
1305                let list_request = self.deserialize_list_tools_request(request.params.clone());
1306
1307                if let Some(handler) = &self.tool_handler {
1308                    match handler.list_tools(list_request).await {
1309                        Ok(response) => {
1310                            // If handler returns empty tools, fallback to registered tools
1311                            if response.tools.is_empty() {
1312                                let tools = self.list_tools().await;
1313                                let response =
1314                                    ultrafast_mcp_core::types::tools::ListToolsResponse {
1315                                        tools,
1316                                        next_cursor: None,
1317                                    };
1318                                match serde_json::to_value(response) {
1319                                    Ok(value) => JsonRpcResponse::success(value, request.id),
1320                                    Err(e) => JsonRpcResponse::error(
1321                                        JsonRpcError::new(
1322                                            -32603,
1323                                            format!("Serialization error: {e}"),
1324                                        ),
1325                                        request.id,
1326                                    ),
1327                                }
1328                            } else {
1329                                match serde_json::to_value(response) {
1330                                    Ok(value) => JsonRpcResponse::success(value, request.id),
1331                                    Err(e) => JsonRpcResponse::error(
1332                                        JsonRpcError::new(
1333                                            -32603,
1334                                            format!("Serialization error: {e}"),
1335                                        ),
1336                                        request.id,
1337                                    ),
1338                                }
1339                            }
1340                        }
1341                        Err(e) => JsonRpcResponse::error(
1342                            JsonRpcError::new(-32603, format!("Tools list failed: {e}")),
1343                            request.id,
1344                        ),
1345                    }
1346                } else {
1347                    // Fallback to registered tools
1348                    let tools = self.list_tools().await;
1349                    let response = ultrafast_mcp_core::types::tools::ListToolsResponse {
1350                        tools,
1351                        next_cursor: None,
1352                    };
1353                    match serde_json::to_value(response) {
1354                        Ok(value) => JsonRpcResponse::success(value, request.id),
1355                        Err(e) => JsonRpcResponse::error(
1356                            JsonRpcError::new(-32603, format!("Serialization error: {e}")),
1357                            request.id,
1358                        ),
1359                    }
1360                }
1361            }
1362            "tools/call" => {
1363                if !self.can_operate().await {
1364                    return JsonRpcResponse::error(
1365                        JsonRpcError::internal_error(Some("Server not ready".to_string())),
1366                        request.id,
1367                    );
1368                }
1369
1370                let params = match &request.params {
1371                    Some(params) => params,
1372                    None => {
1373                        return JsonRpcResponse::error(
1374                            JsonRpcError::new(
1375                                -32602,
1376                                "Tool call failed: Missing parameters".to_string(),
1377                            ),
1378                            request.id,
1379                        );
1380                    }
1381                };
1382
1383                let tool_name = params.get("name").and_then(|v| v.as_str());
1384                let arguments = params
1385                    .get("arguments")
1386                    .cloned()
1387                    .unwrap_or(serde_json::json!({}));
1388
1389                if let Some(tool_name) = tool_name {
1390                    if let Some(handler) = &self.tool_handler {
1391                        let tool_call = ultrafast_mcp_core::types::tools::ToolCall {
1392                            name: tool_name.to_string(),
1393                            arguments: Some(arguments.clone()),
1394                        };
1395                        // Arguments validation will be handled by the tool handler
1396                        match handler.handle_tool_call(tool_call).await {
1397                            Ok(result) => match serde_json::to_value(result) {
1398                                Ok(value) => JsonRpcResponse::success(value, request.id),
1399                                Err(e) => JsonRpcResponse::error(
1400                                    JsonRpcError::new(-32603, format!("Serialization error: {e}")),
1401                                    request.id,
1402                                ),
1403                            },
1404                            Err(e) => {
1405                                use ultrafast_mcp_core::error::{MCPError, ProtocolError};
1406                                let (code, msg) = match &e {
1407                                    MCPError::Protocol(ProtocolError::InvalidParams(_))
1408                                    | MCPError::Protocol(ProtocolError::NotFound(_)) => {
1409                                        (-32602, format!("Tool call failed: {e}"))
1410                                    }
1411                                    _ => (-32603, format!("Tool call failed: {e}")),
1412                                };
1413                                JsonRpcResponse::error(JsonRpcError::new(code, msg), request.id)
1414                            }
1415                        }
1416                    } else {
1417                        // Fallback to registered tools
1418                        if !self.has_tool(tool_name).await {
1419                            return JsonRpcResponse::error(
1420                                JsonRpcError::new(
1421                                    -32602,
1422                                    format!("Tool call failed: Tool not found: {tool_name}"),
1423                                ),
1424                                request.id,
1425                            );
1426                        }
1427                        // Arguments validation will be handled by the tool handler
1428                        match self.execute_tool_call(tool_name, arguments).await {
1429                            Ok(result) => match serde_json::to_value(result) {
1430                                Ok(value) => JsonRpcResponse::success(value, request.id),
1431                                Err(e) => JsonRpcResponse::error(
1432                                    JsonRpcError::new(-32603, format!("Serialization error: {e}")),
1433                                    request.id,
1434                                ),
1435                            },
1436                            Err(e) => {
1437                                use ultrafast_mcp_core::error::{MCPError, ProtocolError};
1438                                let (code, msg) = match &e {
1439                                    MCPError::Protocol(ProtocolError::InvalidParams(_))
1440                                    | MCPError::Protocol(ProtocolError::NotFound(_)) => {
1441                                        (-32602, format!("Tool call failed: {e}"))
1442                                    }
1443                                    _ => (-32603, format!("Tool call failed: {e}")),
1444                                };
1445                                JsonRpcResponse::error(JsonRpcError::new(code, msg), request.id)
1446                            }
1447                        }
1448                    }
1449                } else {
1450                    JsonRpcResponse::error(
1451                        JsonRpcError::new(
1452                            -32602,
1453                            "Tool call failed: Missing or invalid tool name".to_string(),
1454                        ),
1455                        request.id,
1456                    )
1457                }
1458            }
1459
1460            // Resources methods
1461            "resources/list" => {
1462                if !self.can_operate().await {
1463                    return JsonRpcResponse::error(
1464                        JsonRpcError::new(-32000, "Server not ready".to_string()),
1465                        request.id,
1466                    );
1467                }
1468
1469                let list_request = self.deserialize_list_resources_request(request.params.clone());
1470
1471                if let Some(handler) = &self.resource_handler {
1472                    // For resources/list, we don't validate against roots since it's a general listing
1473                    // Root validation will be done when individual resources are accessed
1474
1475                    match handler.list_resources(list_request).await {
1476                        Ok(response) => match serde_json::to_value(response) {
1477                            Ok(value) => JsonRpcResponse::success(value, request.id),
1478                            Err(e) => JsonRpcResponse::error(
1479                                JsonRpcError::new(-32603, format!("Serialization error: {e}")),
1480                                request.id,
1481                            ),
1482                        },
1483                        Err(e) => JsonRpcResponse::error(
1484                            JsonRpcError::new(-32603, format!("Resources list failed: {e}")),
1485                            request.id,
1486                        ),
1487                    }
1488                } else {
1489                    JsonRpcResponse::error(
1490                        JsonRpcError::new(-32601, "Resources not supported".to_string()),
1491                        request.id,
1492                    )
1493                }
1494            }
1495            "resources/read" => {
1496                if !self.can_operate().await {
1497                    return JsonRpcResponse::error(
1498                        JsonRpcError::new(-32000, "Server not ready".to_string()),
1499                        request.id,
1500                    );
1501                }
1502
1503                let read_request = self.deserialize_read_resource_request(request.params.clone());
1504
1505                if let Some(handler) = &self.resource_handler {
1506                    // Validate against roots if roots handler is available
1507                    if let Some(roots_handler) = &self.roots_handler {
1508                        match roots_handler.list_roots().await {
1509                            Ok(roots) => {
1510                                if let Err(e) = handler
1511                                    .validate_resource_access(
1512                                        &read_request.uri,
1513                                        ultrafast_mcp_core::types::roots::RootOperation::Read,
1514                                        &roots,
1515                                    )
1516                                    .await
1517                                {
1518                                    return JsonRpcResponse::error(
1519                                        JsonRpcError::new(
1520                                            -32603,
1521                                            format!("Root validation failed: {e}"),
1522                                        ),
1523                                        request.id,
1524                                    );
1525                                }
1526                            }
1527                            Err(e) => {
1528                                return JsonRpcResponse::error(
1529                                    JsonRpcError::new(-32603, format!("Failed to get roots: {e}")),
1530                                    request.id,
1531                                );
1532                            }
1533                        }
1534                    }
1535
1536                    match handler.read_resource(read_request).await {
1537                        Ok(response) => match serde_json::to_value(response) {
1538                            Ok(value) => JsonRpcResponse::success(value, request.id),
1539                            Err(e) => JsonRpcResponse::error(
1540                                JsonRpcError::new(-32603, format!("Serialization error: {e}")),
1541                                request.id,
1542                            ),
1543                        },
1544                        Err(e) => JsonRpcResponse::error(
1545                            JsonRpcError::new(-32603, format!("Resource read failed: {e}")),
1546                            request.id,
1547                        ),
1548                    }
1549                } else {
1550                    JsonRpcResponse::error(
1551                        JsonRpcError::new(-32601, "Resources not supported".to_string()),
1552                        request.id,
1553                    )
1554                }
1555            }
1556            "resources/templates/list" => {
1557                if !self.can_operate().await {
1558                    return JsonRpcResponse::error(
1559                        JsonRpcError::new(-32000, "Server not ready".to_string()),
1560                        request.id,
1561                    );
1562                }
1563
1564                let list_request =
1565                    self.deserialize_list_resource_templates_request(request.params.clone());
1566
1567                if let Some(handler) = &self.resource_handler {
1568                    match handler.list_resource_templates(list_request).await {
1569                        Ok(response) => JsonRpcResponse::success(
1570                            serde_json::to_value(response).unwrap(),
1571                            request.id,
1572                        ),
1573                        Err(e) => JsonRpcResponse::error(
1574                            JsonRpcError::new(
1575                                -32603,
1576                                format!("Resource templates list failed: {e}"),
1577                            ),
1578                            request.id,
1579                        ),
1580                    }
1581                } else {
1582                    JsonRpcResponse::error(
1583                        JsonRpcError::new(-32601, "Resources not supported".to_string()),
1584                        request.id,
1585                    )
1586                }
1587            }
1588            "resources/subscribe" => {
1589                if !self.can_operate().await {
1590                    return JsonRpcResponse::error(
1591                        JsonRpcError::new(-32000, "Server not ready".to_string()),
1592                        request.id,
1593                    );
1594                }
1595
1596                let subscribe_request = self.deserialize_subscribe_request(request.params.clone());
1597
1598                // Validate against roots if roots handler is available
1599                if let Some(roots_handler) = &self.roots_handler {
1600                    if let Some(resource_handler) = &self.resource_handler {
1601                        match roots_handler.list_roots().await {
1602                            Ok(roots) => {
1603                                if let Err(e) = resource_handler
1604                                    .validate_resource_access(
1605                                        &subscribe_request.uri,
1606                                        ultrafast_mcp_core::types::roots::RootOperation::Read,
1607                                        &roots,
1608                                    )
1609                                    .await
1610                                {
1611                                    return JsonRpcResponse::error(
1612                                        JsonRpcError::new(
1613                                            -32603,
1614                                            format!("Root validation failed: {e}"),
1615                                        ),
1616                                        request.id,
1617                                    );
1618                                }
1619                            }
1620                            Err(e) => {
1621                                return JsonRpcResponse::error(
1622                                    JsonRpcError::new(-32603, format!("Failed to get roots: {e}")),
1623                                    request.id,
1624                                );
1625                            }
1626                        }
1627                    }
1628                }
1629
1630                if let Some(handler) = &self.subscription_handler {
1631                    match handler.subscribe(subscribe_request.uri.clone()).await {
1632                        Ok(_) => {
1633                            // Subscription successful - return success response
1634                            // Note: The client may timeout if it expects immediate notifications
1635                            // This is a limitation of the current MCP architecture
1636                            JsonRpcResponse::success(
1637                                serde_json::to_value(SubscribeResponse::new()).unwrap(),
1638                                request.id,
1639                            )
1640                        }
1641                        Err(e) => JsonRpcResponse::error(
1642                            JsonRpcError::new(-32603, format!("Resource subscribe failed: {e}")),
1643                            request.id,
1644                        ),
1645                    }
1646                } else {
1647                    JsonRpcResponse::error(
1648                        JsonRpcError::new(
1649                            -32601,
1650                            "Resource subscriptions not supported".to_string(),
1651                        ),
1652                        request.id,
1653                    )
1654                }
1655            }
1656            "resources/unsubscribe" => {
1657                if !self.can_operate().await {
1658                    return JsonRpcResponse::error(
1659                        JsonRpcError::new(-32000, "Server not ready".to_string()),
1660                        request.id,
1661                    );
1662                }
1663
1664                let unsubscribe_request =
1665                    self.deserialize_unsubscribe_request(request.params.clone());
1666
1667                if let Some(handler) = &self.subscription_handler {
1668                    match handler.unsubscribe(unsubscribe_request.uri).await {
1669                        Ok(_) => JsonRpcResponse::success(serde_json::Value::Null, request.id),
1670                        Err(e) => JsonRpcResponse::error(
1671                            JsonRpcError::new(-32603, format!("Resource unsubscribe failed: {e}")),
1672                            request.id,
1673                        ),
1674                    }
1675                } else {
1676                    JsonRpcResponse::error(
1677                        JsonRpcError::new(
1678                            -32601,
1679                            "Resource subscriptions not supported".to_string(),
1680                        ),
1681                        request.id,
1682                    )
1683                }
1684            }
1685
1686            // Prompts methods
1687            "prompts/list" => {
1688                if !self.can_operate().await {
1689                    return JsonRpcResponse::error(
1690                        JsonRpcError::new(-32000, "Server not ready".to_string()),
1691                        request.id,
1692                    );
1693                }
1694
1695                let list_request = self.deserialize_list_prompts_request(request.params.clone());
1696
1697                if let Some(handler) = &self.prompt_handler {
1698                    match handler.list_prompts(list_request).await {
1699                        Ok(response) => JsonRpcResponse::success(
1700                            serde_json::to_value(response).unwrap(),
1701                            request.id,
1702                        ),
1703                        Err(e) => JsonRpcResponse::error(
1704                            JsonRpcError::new(-32603, format!("Prompts list failed: {e}")),
1705                            request.id,
1706                        ),
1707                    }
1708                } else {
1709                    JsonRpcResponse::error(
1710                        JsonRpcError::new(-32601, "Prompts not supported".to_string()),
1711                        request.id,
1712                    )
1713                }
1714            }
1715            "prompts/get" => {
1716                if !self.can_operate().await {
1717                    return JsonRpcResponse::error(
1718                        JsonRpcError::new(-32000, "Server not ready".to_string()),
1719                        request.id,
1720                    );
1721                }
1722
1723                let get_request = self.deserialize_get_prompt_request(request.params.clone());
1724
1725                if let Some(handler) = &self.prompt_handler {
1726                    match handler.get_prompt(get_request).await {
1727                        Ok(response) => JsonRpcResponse::success(
1728                            serde_json::to_value(response).unwrap(),
1729                            request.id,
1730                        ),
1731                        Err(e) => JsonRpcResponse::error(
1732                            JsonRpcError::new(-32603, format!("Prompt get failed: {e}")),
1733                            request.id,
1734                        ),
1735                    }
1736                } else {
1737                    JsonRpcResponse::error(
1738                        JsonRpcError::new(-32601, "Prompts not supported".to_string()),
1739                        request.id,
1740                    )
1741                }
1742            }
1743
1744            // Completion methods
1745            "completion/complete" => {
1746                if !self.can_operate().await {
1747                    return JsonRpcResponse::error(
1748                        JsonRpcError::new(-32000, "Server not ready".to_string()),
1749                        request.id,
1750                    );
1751                }
1752
1753                let complete_request = self.deserialize_complete_request(request.params.clone());
1754
1755                if let Some(handler) = &self.completion_handler {
1756                    match handler.complete(complete_request).await {
1757                        Ok(response) => JsonRpcResponse::success(
1758                            serde_json::to_value(response).unwrap(),
1759                            request.id,
1760                        ),
1761                        Err(e) => JsonRpcResponse::error(
1762                            JsonRpcError::new(-32603, format!("Completion failed: {e}")),
1763                            request.id,
1764                        ),
1765                    }
1766                } else {
1767                    JsonRpcResponse::error(
1768                        JsonRpcError::new(-32601, "Completion not supported".to_string()),
1769                        request.id,
1770                    )
1771                }
1772            }
1773
1774            // Sampling methods
1775            "sampling/createMessage" => {
1776                if !self.can_operate().await {
1777                    return JsonRpcResponse::error(
1778                        JsonRpcError::new(-32000, "Server not ready".to_string()),
1779                        request.id,
1780                    );
1781                }
1782
1783                let create_request =
1784                    self.deserialize_create_message_request(request.params.clone());
1785
1786                if let Some(handler) = &self.sampling_handler {
1787                    match handler.create_message(create_request).await {
1788                        Ok(response) => JsonRpcResponse::success(
1789                            serde_json::to_value(response).unwrap(),
1790                            request.id,
1791                        ),
1792                        Err(e) => JsonRpcResponse::error(
1793                            JsonRpcError::new(-32603, format!("Message creation failed: {e}")),
1794                            request.id,
1795                        ),
1796                    }
1797                } else {
1798                    JsonRpcResponse::error(
1799                        JsonRpcError::new(-32601, "Sampling not supported".to_string()),
1800                        request.id,
1801                    )
1802                }
1803            }
1804
1805            // Roots methods
1806            "roots/list" => {
1807                if !self.can_operate().await {
1808                    return JsonRpcResponse::error(
1809                        JsonRpcError::new(-32000, "Server not ready".to_string()),
1810                        request.id,
1811                    );
1812                }
1813
1814                if let Some(handler) = &self.roots_handler {
1815                    match handler.list_roots().await {
1816                        Ok(response) => JsonRpcResponse::success(
1817                            serde_json::to_value(response).unwrap(),
1818                            request.id,
1819                        ),
1820                        Err(e) => JsonRpcResponse::error(
1821                            JsonRpcError::new(-32603, format!("Roots list failed: {e}")),
1822                            request.id,
1823                        ),
1824                    }
1825                } else {
1826                    JsonRpcResponse::error(
1827                        JsonRpcError::new(-32601, "Roots not supported".to_string()),
1828                        request.id,
1829                    )
1830                }
1831            }
1832
1833            // Elicitation methods
1834            "elicitation/create" => {
1835                if !self.can_operate().await {
1836                    return JsonRpcResponse::error(
1837                        JsonRpcError::new(-32000, "Server not ready".to_string()),
1838                        request.id,
1839                    );
1840                }
1841
1842                let elicitation_request =
1843                    self.deserialize_elicitation_request(request.params.clone());
1844
1845                if let Some(handler) = &self.elicitation_handler {
1846                    match handler.handle_elicitation(elicitation_request).await {
1847                        Ok(response) => JsonRpcResponse::success(
1848                            serde_json::to_value(response).unwrap(),
1849                            request.id,
1850                        ),
1851                        Err(e) => JsonRpcResponse::error(
1852                            JsonRpcError::new(-32603, format!("Elicitation failed: {e}")),
1853                            request.id,
1854                        ),
1855                    }
1856                } else {
1857                    JsonRpcResponse::error(
1858                        JsonRpcError::new(-32601, "Elicitation not supported".to_string()),
1859                        request.id,
1860                    )
1861                }
1862            }
1863
1864            "elicitation/respond" => {
1865                if !self.can_operate().await {
1866                    return JsonRpcResponse::error(
1867                        JsonRpcError::new(-32000, "Server not ready".to_string()),
1868                        request.id,
1869                    );
1870                }
1871
1872                let elicitation_response = match serde_json::from_value::<
1873                    ultrafast_mcp_core::types::elicitation::ElicitationResponse,
1874                >(
1875                    request.params.unwrap_or_default()
1876                ) {
1877                    Ok(response) => response,
1878                    Err(e) => {
1879                        return JsonRpcResponse::error(
1880                            JsonRpcError::new(-32602, format!("Invalid elicitation response: {e}")),
1881                            request.id,
1882                        );
1883                    }
1884                };
1885
1886                // Log the elicitation response
1887                info!(
1888                    "Received elicitation response: {:?}",
1889                    elicitation_response.action
1890                );
1891
1892                // In a real implementation, this would be handled by the server's elicitation flow
1893                // For now, we'll just return success
1894                JsonRpcResponse::success(serde_json::json!({}), request.id)
1895            }
1896
1897            // Logging methods
1898            "logging/setLevel" => {
1899                let params = match &request.params {
1900                    Some(params) => params,
1901                    None => {
1902                        return JsonRpcResponse::error(
1903                            JsonRpcError::new(-32602, "Missing parameters".to_string()),
1904                            request.id,
1905                        );
1906                    }
1907                };
1908
1909                match serde_json::from_value::<LogLevelSetRequest>(params.clone()) {
1910                    Ok(set_request) => match self.set_log_level(set_request.level).await {
1911                        Ok(()) => {
1912                            let response = LogLevelSetResponse::new();
1913                            JsonRpcResponse::success(
1914                                serde_json::to_value(response).unwrap(),
1915                                request.id,
1916                            )
1917                        }
1918                        Err(e) => JsonRpcResponse::error(
1919                            JsonRpcError::new(-32603, format!("Failed to set log level: {e}")),
1920                            request.id,
1921                        ),
1922                    },
1923                    Err(e) => JsonRpcResponse::error(
1924                        JsonRpcError::new(-32602, format!("Invalid log level set request: {e}")),
1925                        request.id,
1926                    ),
1927                }
1928            }
1929
1930            // Ping method for connection health monitoring
1931            "ping" => {
1932                let ping_request = match serde_json::from_value::<
1933                    ultrafast_mcp_core::types::notifications::PingRequest,
1934                >(request.params.unwrap_or_default())
1935                {
1936                    Ok(req) => req,
1937                    Err(_) => ultrafast_mcp_core::types::notifications::PingRequest { data: None },
1938                };
1939
1940                match self.ping_manager.handle_ping(ping_request).await {
1941                    Ok(response) => JsonRpcResponse::success(
1942                        serde_json::to_value(response).unwrap(),
1943                        request.id,
1944                    ),
1945                    Err(e) => JsonRpcResponse::error(
1946                        JsonRpcError::new(-32603, format!("Ping failed: {e}")),
1947                        request.id,
1948                    ),
1949                }
1950            }
1951
1952            // Roots methods
1953            "roots/set" => {
1954                let params = match &request.params {
1955                    Some(params) => params,
1956                    None => {
1957                        return JsonRpcResponse::error(
1958                            JsonRpcError::new(-32602, "Missing parameters".to_string()),
1959                            request.id,
1960                        );
1961                    }
1962                };
1963
1964                match serde_json::from_value::<SetRootsRequest>(params.clone()) {
1965                    Ok(set_request) => {
1966                        let response = self
1967                            .handle_set_roots(
1968                                set_request.roots,
1969                                &mut Box::new(
1970                                    create_transport(TransportConfig::Stdio).await.unwrap(),
1971                                ),
1972                            )
1973                            .await;
1974                        JsonRpcResponse::success(
1975                            serde_json::to_value(response).unwrap(),
1976                            request.id,
1977                        )
1978                    }
1979                    Err(e) => JsonRpcResponse::error(
1980                        JsonRpcError::new(-32602, format!("Invalid roots set request: {e}")),
1981                        request.id,
1982                    ),
1983                }
1984            }
1985
1986            // Unknown method
1987            _ => JsonRpcResponse::error(
1988                JsonRpcError::new(
1989                    -32601,
1990                    format!("Method not implemented: {}", request.method),
1991                ),
1992                request.id,
1993            ),
1994        }
1995    }
1996
1997    /// Handle incoming notifications
1998    async fn handle_notification(&self, notification: JsonRpcRequest) -> MCPResult<()> {
1999        info!("Handling notification: {}", notification.method);
2000
2001        match notification.method.as_str() {
2002            "initialized" => {
2003                let notification = ultrafast_mcp_core::protocol::InitializedNotification {};
2004                self.handle_initialized(notification).await?;
2005                Ok(())
2006            }
2007            "notifications/cancelled" => {
2008                // Handle cancellation notification
2009                if let Some(params) = notification.params {
2010                    let cancellation_notification: ultrafast_mcp_core::types::notifications::CancelledNotification =
2011                        serde_json::from_value(params)?;
2012
2013                    // Use the cancellation manager to handle the cancellation
2014                    let _cancelled = self
2015                        .cancellation_manager
2016                        .handle_cancellation(cancellation_notification)
2017                        .await?;
2018                    info!("Cancellation notification processed");
2019                }
2020                Ok(())
2021            }
2022            _ => {
2023                warn!("Unknown notification method: {}", notification.method);
2024                Ok(())
2025            }
2026        }
2027    }
2028
2029    // ===== NOTIFICATION METHODS =====
2030
2031    /// Send tools list changed notification
2032    pub async fn notify_tools_changed(&self, transport: &mut Box<dyn Transport>) -> MCPResult<()> {
2033        let notification =
2034            ultrafast_mcp_core::types::notifications::ToolsListChangedNotification::new();
2035        self.send_notification(
2036            "notifications/tools/listChanged",
2037            Some(serde_json::to_value(notification)?),
2038            transport,
2039        )
2040        .await
2041    }
2042
2043    /// Send resources list changed notification
2044    pub async fn notify_resources_changed(
2045        &self,
2046        transport: &mut Box<dyn Transport>,
2047    ) -> MCPResult<()> {
2048        let notification =
2049            ultrafast_mcp_core::types::notifications::ResourcesListChangedNotification::new();
2050        self.send_notification(
2051            "notifications/resources/listChanged",
2052            Some(serde_json::to_value(notification)?),
2053            transport,
2054        )
2055        .await
2056    }
2057
2058    /// Send prompts list changed notification
2059    pub async fn notify_prompts_changed(
2060        &self,
2061        transport: &mut Box<dyn Transport>,
2062    ) -> MCPResult<()> {
2063        let notification =
2064            ultrafast_mcp_core::types::notifications::PromptsListChangedNotification::new();
2065        self.send_notification(
2066            "notifications/prompts/listChanged",
2067            Some(serde_json::to_value(notification)?),
2068            transport,
2069        )
2070        .await
2071    }
2072
2073    /// Send resource updated notification
2074    pub async fn notify_resource_updated(
2075        &self,
2076        uri: String,
2077        transport: &mut Box<dyn Transport>,
2078    ) -> MCPResult<()> {
2079        let notification =
2080            ultrafast_mcp_core::types::resources::ResourceUpdatedNotification { uri };
2081        self.send_notification(
2082            "notifications/resources/updated",
2083            Some(serde_json::to_value(notification)?),
2084            transport,
2085        )
2086        .await
2087    }
2088
2089    /// Send progress notification
2090    pub async fn notify_progress(
2091        &self,
2092        progress_token: serde_json::Value,
2093        progress: f64,
2094        total: Option<f64>,
2095        message: Option<String>,
2096        transport: &mut Box<dyn Transport>,
2097    ) -> MCPResult<()> {
2098        let mut notification = ultrafast_mcp_core::types::notifications::ProgressNotification::new(
2099            progress_token,
2100            progress,
2101        );
2102        if let Some(total) = total {
2103            notification = notification.with_total(total);
2104        }
2105        if let Some(message) = message {
2106            notification = notification.with_message(message);
2107        }
2108        self.send_notification(
2109            "notifications/progress",
2110            Some(serde_json::to_value(notification)?),
2111            transport,
2112        )
2113        .await
2114    }
2115
2116    /// Send logging message notification
2117    pub async fn notify_logging_message(
2118        &self,
2119        level: ultrafast_mcp_core::types::notifications::LogLevel,
2120        data: serde_json::Value,
2121        logger: Option<String>,
2122        transport: &mut Box<dyn Transport>,
2123    ) -> MCPResult<()> {
2124        let mut notification =
2125            ultrafast_mcp_core::types::notifications::LoggingMessageNotification::new(level, data);
2126        if let Some(logger) = logger {
2127            notification = notification.with_logger(logger);
2128        }
2129        self.send_notification(
2130            "notifications/logging/message",
2131            Some(serde_json::to_value(notification)?),
2132            transport,
2133        )
2134        .await
2135    }
2136
2137    /// Send cancellation notification
2138    pub async fn notify_cancelled(
2139        &self,
2140        request_id: serde_json::Value,
2141        reason: Option<String>,
2142        transport: &mut Box<dyn Transport>,
2143    ) -> MCPResult<()> {
2144        let mut notification =
2145            ultrafast_mcp_core::types::notifications::CancelledNotification::new(request_id);
2146        if let Some(reason) = reason {
2147            notification = notification.with_reason(reason);
2148        }
2149        self.send_notification(
2150            "notifications/cancelled",
2151            Some(serde_json::to_value(notification)?),
2152            transport,
2153        )
2154        .await
2155    }
2156
2157    /// Send roots list changed notification
2158    pub async fn notify_roots_changed(&self, transport: &mut Box<dyn Transport>) -> MCPResult<()> {
2159        let notification =
2160            ultrafast_mcp_core::types::notifications::RootsListChangedNotification::new();
2161        self.send_notification(
2162            "notifications/roots/listChanged",
2163            Some(serde_json::to_value(notification)?),
2164            transport,
2165        )
2166        .await
2167    }
2168
2169    /// Generic method to send notifications
2170    async fn send_notification(
2171        &self,
2172        method: &str,
2173        params: Option<serde_json::Value>,
2174        transport: &mut Box<dyn Transport>,
2175    ) -> MCPResult<()> {
2176        let notification = JsonRpcRequest {
2177            jsonrpc: Cow::Borrowed("2.0"),
2178            id: None, // Notifications have no ID
2179            method: method.to_string(),
2180            params,
2181            meta: std::collections::HashMap::new(),
2182        };
2183
2184        transport
2185            .send_message(JsonRpcMessage::Request(notification))
2186            .await
2187            .map_err(|e| MCPError::internal_error(format!("Failed to send notification: {e}")))?;
2188
2189        info!("Sent notification: {}", method);
2190        Ok(())
2191    }
2192
2193    /// Set the advanced sampling handler for context collection and human-in-the-loop features
2194    pub fn with_advanced_sampling_handler(
2195        mut self,
2196        handler: Arc<dyn AdvancedSamplingHandler>,
2197    ) -> Self {
2198        self.advanced_sampling_handler = Some(handler);
2199        self
2200    }
2201
2202    /// Set the advanced sampling handler with default implementation
2203    pub fn with_default_advanced_sampling(mut self) -> Self {
2204        let default_handler = Arc::new(DefaultAdvancedSamplingHandler::new(self.info.clone()));
2205        self.advanced_sampling_handler = Some(default_handler);
2206        self
2207    }
2208
2209    /// Handle a roots/set request
2210    pub async fn handle_set_roots(
2211        &self,
2212        roots: Vec<ultrafast_mcp_core::types::roots::Root>,
2213        transport: &mut Box<dyn Transport>,
2214    ) -> SetRootsResponse {
2215        if let Some(handler) = &self.roots_handler {
2216            match handler.set_roots(roots.clone()).await {
2217                Ok(_) => {
2218                    // Send notification to all clients (for demo, just send to this transport)
2219                    let notification = RootsListChangedNotification { roots };
2220                    let params = serde_json::to_value(notification).ok();
2221                    let _ = self
2222                        .send_notification("roots/listChanged", params, transport)
2223                        .await;
2224                    SetRootsResponse {
2225                        success: true,
2226                        error: None,
2227                    }
2228                }
2229                Err(e) => SetRootsResponse {
2230                    success: false,
2231                    error: Some(e.to_string()),
2232                },
2233            }
2234        } else {
2235            SetRootsResponse {
2236                success: false,
2237                error: Some("Roots handler not available".to_string()),
2238            }
2239        }
2240    }
2241}
2242
2243#[cfg(test)]
2244mod tests {
2245    use super::*;
2246    use serde_json::json;
2247    use ultrafast_mcp_core::types::{
2248        server::ServerInfo,
2249        tools::{Tool, ToolContent},
2250    };
2251
2252    // Mock tool handler for testing
2253    struct MockToolHandler;
2254
2255    #[async_trait::async_trait]
2256    impl ToolHandler for MockToolHandler {
2257        async fn handle_tool_call(
2258            &self,
2259            call: ultrafast_mcp_core::types::tools::ToolCall,
2260        ) -> MCPResult<ultrafast_mcp_core::types::tools::ToolResult> {
2261            // Simulate error for nonexistent tool or invalid arguments
2262            if call.name == "nonexistent_tool" {
2263                return Err(ultrafast_mcp_core::error::MCPError::not_found(
2264                    "Tool not found".to_string(),
2265                ));
2266            }
2267            if let Some(args) = &call.arguments {
2268                if args.get("input").is_none() {
2269                    return Err(ultrafast_mcp_core::error::MCPError::invalid_params(
2270                        "Invalid parameters".to_string(),
2271                    ));
2272                }
2273            } else {
2274                return Err(ultrafast_mcp_core::error::MCPError::invalid_params(
2275                    "Missing arguments".to_string(),
2276                ));
2277            }
2278            Ok(ultrafast_mcp_core::types::tools::ToolResult {
2279                content: vec![ToolContent::text(format!("Mock result for {}", call.name))],
2280                is_error: None,
2281            })
2282        }
2283
2284        async fn list_tools(
2285            &self,
2286            _request: ultrafast_mcp_core::types::tools::ListToolsRequest,
2287        ) -> MCPResult<ultrafast_mcp_core::types::tools::ListToolsResponse> {
2288            // This will be overridden by the server's fallback to registered tools
2289            Ok(ultrafast_mcp_core::types::tools::ListToolsResponse {
2290                tools: vec![],
2291                next_cursor: None,
2292            })
2293        }
2294    }
2295
2296    fn create_test_server() -> UltraFastServer {
2297        let info = ServerInfo {
2298            name: "test-server".to_string(),
2299            version: "1.0.0".to_string(),
2300            description: Some("Test server".to_string()),
2301            homepage: None,
2302            repository: None,
2303            authors: Some(vec!["test".to_string()]),
2304            license: Some("MIT".to_string()),
2305        };
2306        let capabilities = ServerCapabilities::default();
2307        UltraFastServer::new(info, capabilities).with_tool_handler(Arc::new(MockToolHandler))
2308    }
2309
2310    async fn create_initialized_test_server() -> UltraFastServer {
2311        let server = create_test_server();
2312
2313        // Initialize the server to operating state
2314        let init_request = ultrafast_mcp_core::protocol::InitializeRequest {
2315            protocol_version: "2025-06-18".to_string(),
2316            capabilities: ultrafast_mcp_core::protocol::ClientCapabilities::default(),
2317            client_info: ultrafast_mcp_core::types::client::ClientInfo {
2318                name: "test-client".to_string(),
2319                version: "1.0.0".to_string(),
2320                description: Some("Test client".to_string()),
2321                homepage: None,
2322                repository: None,
2323                authors: Some(vec!["test".to_string()]),
2324                license: Some("MIT".to_string()),
2325            },
2326        };
2327
2328        let _response = server.handle_initialize(init_request).await;
2329
2330        // Send initialized notification
2331        let notification = ultrafast_mcp_core::protocol::InitializedNotification {};
2332        let _ = server.handle_initialized(notification).await;
2333
2334        server
2335    }
2336
2337    fn create_valid_tool(name: &str) -> Tool {
2338        Tool {
2339            name: name.to_string(),
2340            description: "A test tool".to_string(),
2341            input_schema: json!({
2342                "type": "object",
2343                "properties": {
2344                    "input": {"type": "string"}
2345                },
2346                "required": ["input"]
2347            }),
2348            output_schema: Some(json!({
2349                "type": "object",
2350                "properties": {
2351                    "output": {"type": "string"}
2352                }
2353            })),
2354            annotations: None,
2355        }
2356    }
2357
2358    #[tokio::test]
2359    async fn test_register_valid_tool() {
2360        let server = create_test_server();
2361        let tool = create_valid_tool("test_tool");
2362
2363        let result = server.register_tool(tool).await;
2364        assert!(result.is_ok());
2365
2366        assert!(server.has_tool("test_tool").await);
2367        assert_eq!(server.tool_count().await, 1);
2368    }
2369
2370    #[tokio::test]
2371    async fn test_register_duplicate_tool() {
2372        let server = create_test_server();
2373        let tool1 = create_valid_tool("test_tool");
2374        let tool2 = create_valid_tool("test_tool");
2375
2376        server.register_tool(tool1).await.unwrap();
2377        let result = server.register_tool(tool2).await;
2378
2379        assert!(matches!(
2380            result,
2381            Err(ToolRegistrationError::ToolAlreadyExists(_))
2382        ));
2383        assert_eq!(server.tool_count().await, 1);
2384    }
2385
2386    #[tokio::test]
2387    async fn test_register_reserved_name() {
2388        let server = create_test_server();
2389        let tool = create_valid_tool("initialize");
2390
2391        let result = server.register_tool(tool).await;
2392        assert!(matches!(
2393            result,
2394            Err(ToolRegistrationError::ReservedName(_))
2395        ));
2396        assert_eq!(server.tool_count().await, 0);
2397    }
2398
2399    #[tokio::test]
2400    async fn test_register_tool_without_description() {
2401        let server = create_test_server();
2402        let mut tool = create_valid_tool("test_tool");
2403        tool.description = "".to_string();
2404
2405        let result = server.register_tool(tool).await;
2406        assert!(matches!(
2407            result,
2408            Err(ToolRegistrationError::MissingDescription)
2409        ));
2410    }
2411
2412    #[tokio::test]
2413    async fn test_register_tool_with_invalid_input_schema() {
2414        let server = create_test_server();
2415        let mut tool = create_valid_tool("test_tool");
2416        tool.input_schema = json!("invalid schema");
2417
2418        let result = server.register_tool(tool).await;
2419        assert!(matches!(
2420            result,
2421            Err(ToolRegistrationError::InvalidSchema(_))
2422        ));
2423    }
2424
2425    #[tokio::test]
2426    async fn test_register_tool_without_output_schema() {
2427        let server = create_test_server();
2428        let mut tool = create_valid_tool("test_tool");
2429        tool.output_schema = None;
2430
2431        let result = server.register_tool(tool).await;
2432        assert!(matches!(
2433            result,
2434            Err(ToolRegistrationError::MissingOutputSchema)
2435        ));
2436    }
2437
2438    #[tokio::test]
2439    async fn test_register_tool_with_invalid_schema() {
2440        let server = create_test_server();
2441        let mut tool = create_valid_tool("test_tool");
2442        tool.input_schema = json!("invalid schema");
2443
2444        let result = server.register_tool(tool).await;
2445        assert!(matches!(
2446            result,
2447            Err(ToolRegistrationError::InvalidSchema(_))
2448        ));
2449    }
2450
2451    #[tokio::test]
2452    async fn test_unregister_tool() {
2453        let server = create_test_server();
2454        let tool = create_valid_tool("test_tool");
2455
2456        server.register_tool(tool).await.unwrap();
2457        assert!(server.has_tool("test_tool").await);
2458
2459        let result = server.unregister_tool("test_tool");
2460        assert!(result.await);
2461        assert!(!server.has_tool("test_tool").await);
2462        assert_eq!(server.tool_count().await, 0);
2463    }
2464
2465    #[tokio::test]
2466    async fn test_unregister_nonexistent_tool() {
2467        let server = create_test_server();
2468        let result = server.unregister_tool("nonexistent");
2469        assert!(!result.await);
2470    }
2471
2472    #[tokio::test]
2473    async fn test_register_multiple_tools() {
2474        let server = create_test_server();
2475        let tools = vec![
2476            create_valid_tool("tool1"),
2477            create_valid_tool("tool2"),
2478            create_valid_tool("tool3"),
2479        ];
2480
2481        let result = server.register_tools(tools).await;
2482        assert!(result.is_ok());
2483        assert_eq!(server.tool_count().await, 3);
2484        assert!(server.has_tool("tool1").await);
2485        assert!(server.has_tool("tool2").await);
2486        assert!(server.has_tool("tool3").await);
2487    }
2488
2489    #[tokio::test]
2490    async fn test_register_multiple_tools_with_duplicate() {
2491        let server = create_test_server();
2492        let tools = vec![
2493            create_valid_tool("tool1"),
2494            create_valid_tool("tool1"), // Duplicate
2495            create_valid_tool("tool2"),
2496        ];
2497
2498        let result = server.register_tools(tools).await;
2499        assert!(matches!(
2500            result,
2501            Err(ToolRegistrationError::ToolAlreadyExists(_))
2502        ));
2503        assert_eq!(server.tool_count().await, 1); // Only the first one should be registered
2504    }
2505
2506    #[tokio::test]
2507    async fn test_get_tool() {
2508        let server = create_test_server();
2509        let tool = create_valid_tool("test_tool");
2510
2511        server.register_tool(tool.clone()).await.unwrap();
2512
2513        let retrieved = server.get_tool("test_tool").await;
2514        assert!(retrieved.is_some());
2515        assert_eq!(retrieved.unwrap().name, tool.name);
2516    }
2517
2518    #[tokio::test]
2519    async fn test_get_nonexistent_tool() {
2520        let server = create_test_server();
2521        let retrieved = server.get_tool("nonexistent").await;
2522        assert!(retrieved.is_none());
2523    }
2524
2525    #[tokio::test]
2526    async fn test_list_tools() {
2527        let server = create_test_server();
2528        let tools = vec![create_valid_tool("tool1"), create_valid_tool("tool2")];
2529
2530        server.register_tools(tools).await.unwrap();
2531
2532        let listed = server.list_tools().await;
2533        assert_eq!(listed.len(), 2);
2534        assert!(listed.iter().any(|t| t.name == "tool1"));
2535        assert!(listed.iter().any(|t| t.name == "tool2"));
2536    }
2537
2538    #[tokio::test]
2539    async fn test_clear_tools() {
2540        let server = create_test_server();
2541        let tools = vec![create_valid_tool("tool1"), create_valid_tool("tool2")];
2542
2543        server.register_tools(tools).await.unwrap();
2544        assert_eq!(server.tool_count().await, 2);
2545
2546        server.clear_tools().await;
2547        assert_eq!(server.tool_count().await, 0);
2548        assert!(!server.has_tool("tool1").await);
2549        assert!(!server.has_tool("tool2").await);
2550    }
2551
2552    #[tokio::test]
2553    async fn test_validate_tool_call() {
2554        let server = create_test_server();
2555        let tool = create_valid_tool("test_tool");
2556        server.register_tool(tool).await.unwrap();
2557
2558        let valid_args = json!({"input": "test input"});
2559        let result = server.validate_tool_call("test_tool", &valid_args).await;
2560        assert!(result.is_ok());
2561    }
2562
2563    #[tokio::test]
2564    async fn test_validate_tool_call_invalid_args() {
2565        let server = create_test_server();
2566        let tool = create_valid_tool("test_tool");
2567        server.register_tool(tool).await.unwrap();
2568
2569        let invalid_args = json!({"wrong_field": "test input"});
2570        let result = server.validate_tool_call("test_tool", &invalid_args).await;
2571        assert!(result.is_err());
2572    }
2573
2574    #[tokio::test]
2575    async fn test_validate_nonexistent_tool_call() {
2576        let server = create_test_server();
2577        let args = json!({"input": "test input"});
2578        let result = server.validate_tool_call("nonexistent", &args).await;
2579        assert!(result.is_err());
2580    }
2581
2582    #[tokio::test]
2583    async fn test_execute_tool_call() {
2584        let server = create_test_server();
2585        let tool = create_valid_tool("test_tool");
2586        server.register_tool(tool).await.unwrap();
2587
2588        let args = json!({"input": "test input"});
2589        let result = server.execute_tool_call("test_tool", args).await;
2590        assert!(result.is_ok());
2591
2592        let tool_result = result.unwrap();
2593        assert_eq!(tool_result.content.len(), 1);
2594        assert!(!tool_result.is_error.unwrap_or(false));
2595    }
2596
2597    #[tokio::test]
2598    async fn test_execute_tool_call_without_handler() {
2599        let server = UltraFastServer::new(
2600            ServerInfo {
2601                name: "test-server".to_string(),
2602                version: "1.0.0".to_string(),
2603                description: Some("Test server".to_string()),
2604                homepage: None,
2605                repository: None,
2606                authors: Some(vec!["test".to_string()]),
2607                license: Some("MIT".to_string()),
2608            },
2609            ServerCapabilities::default(),
2610        );
2611        let tool = create_valid_tool("test_tool");
2612        server.register_tool(tool).await.unwrap();
2613
2614        let args = json!({"input": "test input"});
2615        let result = server.execute_tool_call("test_tool", args).await;
2616        assert!(result.is_err());
2617    }
2618
2619    #[tokio::test]
2620    async fn test_reserved_names() {
2621        let server = create_test_server();
2622        let reserved_names = [
2623            "initialize",
2624            "initialized",
2625            "shutdown",
2626            "exit",
2627            "ping",
2628            "tools/list",
2629            "tools/call",
2630            "resources/list",
2631            "resources/read",
2632            "resources/subscribe",
2633            "resources/unsubscribe",
2634            "prompts/list",
2635            "prompts/get",
2636            "sampling/create",
2637            "completion/complete",
2638            "roots/list",
2639            "elicitation/request",
2640        ];
2641
2642        for name in &reserved_names {
2643            let tool = create_valid_tool(name);
2644            let result = server.register_tool(tool).await;
2645            assert!(matches!(
2646                result,
2647                Err(ToolRegistrationError::ReservedName(_))
2648            ));
2649        }
2650    }
2651
2652    #[tokio::test]
2653    async fn test_tools_list_jsonrpc() {
2654        let server = create_initialized_test_server().await;
2655
2656        // Register some tools
2657        let tools = vec![create_valid_tool("tool1"), create_valid_tool("tool2")];
2658        server.register_tools(tools).await.unwrap();
2659
2660        // Create tools/list request
2661        let request = JsonRpcRequest {
2662            jsonrpc: Cow::Borrowed("2.0"),
2663            id: Some(ultrafast_mcp_core::protocol::jsonrpc::RequestId::string(
2664                "test-id",
2665            )),
2666            method: "tools/list".to_string(),
2667            params: None,
2668            meta: std::collections::HashMap::new(),
2669        };
2670
2671        let response = server.handle_request(request).await;
2672
2673        // Verify response
2674        if let Some(result) = &response.result {
2675            assert_eq!(
2676                response.id,
2677                Some(ultrafast_mcp_core::protocol::jsonrpc::RequestId::string(
2678                    "test-id"
2679                ))
2680            );
2681            let tools_array = result.get("tools").and_then(|t| t.as_array()).unwrap();
2682            assert_eq!(tools_array.len(), 2);
2683
2684            let tool_names: Vec<&str> = tools_array
2685                .iter()
2686                .filter_map(|t| t.get("name").and_then(|n| n.as_str()))
2687                .collect();
2688            assert!(tool_names.contains(&"tool1"));
2689            assert!(tool_names.contains(&"tool2"));
2690        } else {
2691            panic!("Expected success response");
2692        }
2693    }
2694
2695    #[tokio::test]
2696    async fn test_tools_call_jsonrpc_success() {
2697        let server = create_initialized_test_server().await;
2698
2699        // Register a tool
2700        let tool = create_valid_tool("test_tool");
2701        server.register_tool(tool).await.unwrap();
2702
2703        // Create tools/call request
2704        let request = JsonRpcRequest {
2705            jsonrpc: Cow::Borrowed("2.0"),
2706            id: Some(ultrafast_mcp_core::protocol::jsonrpc::RequestId::string(
2707                "test-id",
2708            )),
2709            method: "tools/call".to_string(),
2710            params: Some(json!({
2711                "name": "test_tool",
2712                "arguments": {
2713                    "input": "test input"
2714                }
2715            })),
2716            meta: std::collections::HashMap::new(),
2717        };
2718
2719        let response = server.handle_request(request).await;
2720
2721        // Verify response
2722        if let Some(result) = &response.result {
2723            assert_eq!(
2724                response.id,
2725                Some(ultrafast_mcp_core::protocol::jsonrpc::RequestId::string(
2726                    "test-id"
2727                ))
2728            );
2729
2730            // Check that result contains content
2731            let content = result.get("content").and_then(|c| c.as_array()).unwrap();
2732            assert_eq!(content.len(), 1);
2733
2734            // The ToolContent::text creates a structure with "type": "text" and "text" field
2735            let text_content = content[0].get("text").and_then(|t| t.as_str()).unwrap();
2736            assert!(text_content.contains("Mock result for test_tool"));
2737        } else {
2738            panic!("Expected success response");
2739        }
2740    }
2741
2742    #[tokio::test]
2743    async fn test_tools_call_jsonrpc_missing_params() {
2744        let server = create_initialized_test_server().await;
2745
2746        // Create tools/call request without parameters
2747        let request = JsonRpcRequest {
2748            jsonrpc: Cow::Borrowed("2.0"),
2749            id: Some(ultrafast_mcp_core::protocol::jsonrpc::RequestId::string(
2750                "test-id",
2751            )),
2752            method: "tools/call".to_string(),
2753            params: None,
2754            meta: std::collections::HashMap::new(),
2755        };
2756
2757        let response = server.handle_request(request).await;
2758
2759        // Verify error response
2760        if let Some(error) = &response.error {
2761            assert_eq!(
2762                response.id,
2763                Some(ultrafast_mcp_core::protocol::jsonrpc::RequestId::string(
2764                    "test-id"
2765                ))
2766            );
2767            assert_eq!(error.code, -32602); // Invalid params
2768            assert!(error.message.contains("Missing parameters"));
2769        } else {
2770            panic!("Expected error response");
2771        }
2772    }
2773
2774    #[tokio::test]
2775    async fn test_tools_call_jsonrpc_missing_name() {
2776        let server = create_initialized_test_server().await;
2777
2778        // Create tools/call request without tool name
2779        let request = JsonRpcRequest {
2780            jsonrpc: Cow::Borrowed("2.0"),
2781            id: Some(ultrafast_mcp_core::protocol::jsonrpc::RequestId::string(
2782                "test-id",
2783            )),
2784            method: "tools/call".to_string(),
2785            params: Some(json!({
2786                "arguments": {
2787                    "input": "test input"
2788                }
2789            })),
2790            meta: std::collections::HashMap::new(),
2791        };
2792
2793        let response = server.handle_request(request).await;
2794
2795        // Verify error response
2796        if let Some(error) = &response.error {
2797            assert_eq!(
2798                response.id,
2799                Some(ultrafast_mcp_core::protocol::jsonrpc::RequestId::string(
2800                    "test-id"
2801                ))
2802            );
2803            assert_eq!(error.code, -32602); // Invalid params
2804            assert!(error.message.contains("Missing or invalid tool name"));
2805        } else {
2806            panic!("Expected error response");
2807        }
2808    }
2809
2810    #[tokio::test]
2811    async fn test_tools_call_jsonrpc_nonexistent_tool() {
2812        let server = create_initialized_test_server().await;
2813
2814        // Create tools/call request for non-existent tool
2815        let request = JsonRpcRequest {
2816            jsonrpc: Cow::Borrowed("2.0"),
2817            id: Some(ultrafast_mcp_core::protocol::jsonrpc::RequestId::string(
2818                "test-id",
2819            )),
2820            method: "tools/call".to_string(),
2821            params: Some(json!({
2822                "name": "nonexistent_tool",
2823                "arguments": {
2824                    "input": "test input"
2825                }
2826            })),
2827            meta: std::collections::HashMap::new(),
2828        };
2829
2830        let response = server.handle_request(request).await;
2831
2832        // Verify error response
2833        if let Some(error) = &response.error {
2834            assert_eq!(
2835                response.id,
2836                Some(ultrafast_mcp_core::protocol::jsonrpc::RequestId::string(
2837                    "test-id"
2838                ))
2839            );
2840            assert_eq!(error.code, -32602); // Invalid params
2841            assert!(error.message.contains("Tool call failed:"));
2842            assert!(error.message.contains("Tool not found"));
2843        } else {
2844            panic!("Expected error response");
2845        }
2846    }
2847
2848    #[tokio::test]
2849    async fn test_tools_call_jsonrpc_invalid_arguments() {
2850        let server = create_initialized_test_server().await;
2851
2852        // Register a tool
2853        let tool = create_valid_tool("test_tool");
2854        server.register_tool(tool).await.unwrap();
2855
2856        // Create tools/call request with invalid arguments
2857        let request = JsonRpcRequest {
2858            jsonrpc: Cow::Borrowed("2.0"),
2859            id: Some(ultrafast_mcp_core::protocol::jsonrpc::RequestId::string(
2860                "test-id",
2861            )),
2862            method: "tools/call".to_string(),
2863            params: Some(json!({
2864                "name": "test_tool",
2865                "arguments": {
2866                    "wrong_field": "test input"
2867                }
2868            })),
2869            meta: std::collections::HashMap::new(),
2870        };
2871
2872        let response = server.handle_request(request).await;
2873
2874        // Verify error response
2875        if let Some(error) = &response.error {
2876            assert_eq!(
2877                response.id,
2878                Some(ultrafast_mcp_core::protocol::jsonrpc::RequestId::string(
2879                    "test-id"
2880                ))
2881            );
2882            assert_eq!(error.code, -32602); // Invalid params
2883            assert!(error.message.contains("Invalid parameters"));
2884        } else {
2885            panic!("Expected error response");
2886        }
2887    }
2888
2889    #[tokio::test]
2890    async fn test_tools_call_jsonrpc_empty_arguments() {
2891        let server = create_initialized_test_server().await;
2892
2893        // Register a tool
2894        let tool = create_valid_tool("test_tool");
2895        server.register_tool(tool).await.unwrap();
2896
2897        // Create tools/call request with empty arguments
2898        let request = JsonRpcRequest {
2899            jsonrpc: Cow::Borrowed("2.0"),
2900            id: Some(ultrafast_mcp_core::protocol::jsonrpc::RequestId::string(
2901                "test-id",
2902            )),
2903            method: "tools/call".to_string(),
2904            params: Some(json!({
2905                "name": "test_tool",
2906                "arguments": {}
2907            })),
2908            meta: std::collections::HashMap::new(),
2909        };
2910
2911        let response = server.handle_request(request).await;
2912
2913        // Verify error response
2914        if let Some(error) = &response.error {
2915            assert_eq!(
2916                response.id,
2917                Some(ultrafast_mcp_core::protocol::jsonrpc::RequestId::string(
2918                    "test-id"
2919                ))
2920            );
2921            assert_eq!(error.code, -32602); // Invalid params
2922            // The actual error message format has changed to include more context
2923            assert!(error.message.contains("Invalid parameters"));
2924        } else {
2925            panic!("Expected error response");
2926        }
2927    }
2928
2929    #[tokio::test]
2930    async fn test_unknown_method() {
2931        let server = create_test_server();
2932
2933        // Create request for unknown method
2934        let request = JsonRpcRequest {
2935            jsonrpc: Cow::Borrowed("2.0"),
2936            id: Some(ultrafast_mcp_core::protocol::jsonrpc::RequestId::string(
2937                "test-id",
2938            )),
2939            method: "unknown/method".to_string(),
2940            params: None,
2941            meta: std::collections::HashMap::new(),
2942        };
2943
2944        let response = server.handle_request(request).await;
2945
2946        // Verify error response
2947        if let Some(error) = &response.error {
2948            assert_eq!(
2949                response.id,
2950                Some(ultrafast_mcp_core::protocol::jsonrpc::RequestId::string(
2951                    "test-id"
2952                ))
2953            );
2954            assert_eq!(error.code, -32601); // Method not found
2955            assert!(error.message.contains("Method not implemented"));
2956        } else {
2957            panic!("Expected error response");
2958        }
2959    }
2960
2961    #[tokio::test]
2962    async fn test_tools_integration_workflow() {
2963        let server = create_initialized_test_server().await;
2964
2965        // Step 1: Register multiple tools
2966        let tools = vec![
2967            create_valid_tool("calculator"),
2968            create_valid_tool("file_reader"),
2969        ];
2970        server.register_tools(tools).await.unwrap();
2971        assert_eq!(server.tool_count().await, 2);
2972
2973        // Step 2: List tools via JSON-RPC
2974        let list_request = JsonRpcRequest {
2975            jsonrpc: Cow::Borrowed("2.0"),
2976            id: Some(ultrafast_mcp_core::protocol::jsonrpc::RequestId::string(
2977                "list-id",
2978            )),
2979            method: "tools/list".to_string(),
2980            params: None,
2981            meta: std::collections::HashMap::new(),
2982        };
2983
2984        let list_response = server.handle_request(list_request).await;
2985        if let Some(result) = &list_response.result {
2986            assert_eq!(
2987                list_response.id,
2988                Some(ultrafast_mcp_core::protocol::jsonrpc::RequestId::string(
2989                    "list-id"
2990                ))
2991            );
2992            let tools_array = result.get("tools").and_then(|t| t.as_array()).unwrap();
2993            assert_eq!(tools_array.len(), 2);
2994        } else {
2995            panic!("Expected success response for tools/list");
2996        }
2997
2998        // Step 3: Call a tool via JSON-RPC
2999        let call_request = JsonRpcRequest {
3000            jsonrpc: Cow::Borrowed("2.0"),
3001            id: Some(ultrafast_mcp_core::protocol::jsonrpc::RequestId::string(
3002                "call-id",
3003            )),
3004            method: "tools/call".to_string(),
3005            params: Some(json!({
3006                "name": "calculator",
3007                "arguments": {
3008                    "input": "2 + 2"
3009                }
3010            })),
3011            meta: std::collections::HashMap::new(),
3012        };
3013
3014        let call_response = server.handle_request(call_request).await;
3015        if let Some(result) = &call_response.result {
3016            assert_eq!(
3017                call_response.id,
3018                Some(ultrafast_mcp_core::protocol::jsonrpc::RequestId::string(
3019                    "call-id"
3020                ))
3021            );
3022            let content = result
3023                .get("content")
3024                .and_then(|c| c.as_array())
3025                .expect("Expected content array");
3026            assert_eq!(content.len(), 1);
3027        } else {
3028            panic!("Expected success response for tools/call");
3029        }
3030
3031        // Step 4: Verify tool still exists in registry
3032        assert!(server.has_tool("calculator").await);
3033        assert!(server.has_tool("file_reader").await);
3034    }
3035}