1use std::{borrow::Cow, collections::HashMap, sync::Arc};
6use tokio::sync::{RwLock, broadcast};
7use tracing::{error, info, warn};
8
9use ultrafast_mcp_core::{
10 config::TimeoutConfig,
11 error::{MCPError, MCPResult},
12 protocol::{
13 capabilities::ServerCapabilities,
14 jsonrpc::{JsonRpcError, JsonRpcMessage, JsonRpcRequest, JsonRpcResponse},
15 },
16 schema::validation::validate_tool_schema,
17 types::{
18 notifications::{LogLevel, LogLevelSetRequest, LogLevelSetResponse},
19 prompts::Prompt,
20 resources::{Resource, SubscribeResponse},
21 roots::{RootsListChangedNotification, SetRootsRequest, SetRootsResponse},
22 server::ServerInfo,
23 tools::Tool,
24 },
25 utils::{CancellationManager, PingManager},
26};
27#[cfg(feature = "http")]
28use ultrafast_mcp_transport::streamable_http::server::{HttpTransportConfig, HttpTransportServer};
29use ultrafast_mcp_transport::{Transport, TransportConfig, create_transport};
30
31use crate::context::{Context, LoggerConfig};
32use crate::handlers::*;
33
34#[derive(Debug, Clone, PartialEq, Eq)]
36pub enum ServerState {
37 Uninitialized,
38 Initializing,
39 Initialized,
40 Operating,
41 ShuttingDown,
42 Shutdown,
43}
44
45impl ServerState {
46 pub fn can_operate(&self) -> bool {
50 matches!(self, ServerState::Initialized | ServerState::Operating)
51 }
52
53 pub fn is_initialized(&self) -> bool {
55 matches!(self, ServerState::Initialized | ServerState::Operating)
56 }
57
58 pub fn is_shutting_down(&self) -> bool {
60 matches!(self, ServerState::ShuttingDown | ServerState::Shutdown)
61 }
62}
63
64#[derive(Debug, thiserror::Error)]
66pub enum ToolRegistrationError {
67 #[error("Tool with name '{0}' already exists")]
68 ToolAlreadyExists(String),
69 #[error("Invalid tool schema: {0}")]
70 InvalidSchema(String),
71 #[error("Tool name '{0}' is reserved")]
72 ReservedName(String),
73 #[error("Tool description is required")]
74 MissingDescription,
75 #[error("Tool input schema is required")]
76 MissingInputSchema,
77 #[error("Tool output schema is required")]
78 MissingOutputSchema,
79}
80
81#[derive(Debug, Clone)]
83pub struct ServerLoggingConfig {
84 pub current_level: LogLevel,
86 pub allow_level_changes: bool,
88 pub default_logger_config: LoggerConfig,
90}
91
92impl Default for ServerLoggingConfig {
93 fn default() -> Self {
94 Self {
95 current_level: LogLevel::Info,
96 allow_level_changes: true,
97 default_logger_config: LoggerConfig::default(),
98 }
99 }
100}
101
102#[derive(Clone)]
104pub struct UltraFastServer {
105 info: ServerInfo,
106 capabilities: ServerCapabilities,
107 state: Arc<RwLock<ServerState>>,
108 tools: Arc<RwLock<HashMap<String, Tool>>>,
109 resources: Arc<RwLock<HashMap<String, Resource>>>,
110 prompts: Arc<RwLock<HashMap<String, Prompt>>>,
111 tool_handler: Option<Arc<dyn ToolHandler>>,
112 resource_handler: Option<Arc<dyn ResourceHandler>>,
113 prompt_handler: Option<Arc<dyn PromptHandler>>,
114 sampling_handler: Option<Arc<dyn SamplingHandler>>,
115 completion_handler: Option<Arc<dyn CompletionHandler>>,
116 roots_handler: Option<Arc<dyn RootsHandler>>,
117 elicitation_handler: Option<Arc<dyn ElicitationHandler>>,
118 subscription_handler: Option<Arc<dyn ResourceSubscriptionHandler>>,
119 #[allow(dead_code)]
120 resource_subscriptions: Arc<RwLock<HashMap<String, Vec<String>>>>,
121 cancellation_manager: Arc<CancellationManager>,
122 ping_manager: Arc<PingManager>,
123 logging_config: Arc<RwLock<ServerLoggingConfig>>,
125
126 #[cfg(feature = "monitoring")]
127 monitoring_system: Option<Arc<crate::MonitoringSystem>>,
128
129 advanced_sampling_handler: Option<Arc<dyn AdvancedSamplingHandler>>,
131
132 timeout_config: Arc<TimeoutConfig>,
134 }
136
137impl std::fmt::Debug for UltraFastServer {
138 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
139 f.debug_struct("UltraFastServer")
140 .field("info", &self.info)
141 .field("capabilities", &self.capabilities)
142 .finish()
143 }
144}
145
146impl UltraFastServer {
147 pub fn new(info: ServerInfo, capabilities: ServerCapabilities) -> Self {
149 Self {
150 info,
151 capabilities,
152 state: Arc::new(RwLock::new(ServerState::Uninitialized)),
153 tools: Arc::new(RwLock::new(HashMap::new())),
154 resources: Arc::new(RwLock::new(HashMap::new())),
155 prompts: Arc::new(RwLock::new(HashMap::new())),
156 tool_handler: None,
157 resource_handler: None,
158 prompt_handler: None,
159 sampling_handler: None,
160 completion_handler: None,
161 roots_handler: None,
162 elicitation_handler: None,
163 subscription_handler: None,
164 resource_subscriptions: Arc::new(RwLock::new(HashMap::new())),
165 cancellation_manager: Arc::new(CancellationManager::new()),
166 ping_manager: Arc::new(PingManager::default()),
167 logging_config: Arc::new(RwLock::new(ServerLoggingConfig::default())),
168
169 #[cfg(feature = "monitoring")]
170 monitoring_system: None,
171
172 advanced_sampling_handler: None,
174
175 timeout_config: Arc::new(TimeoutConfig::default()),
177 }
178 }
179
180 pub async fn set_logging_config(&self, config: ServerLoggingConfig) {
182 let mut logging_config = self.logging_config.write().await;
183 *logging_config = config;
184 info!("Server logging configuration updated");
185 }
186
187 pub async fn get_logging_config(&self) -> ServerLoggingConfig {
189 self.logging_config.read().await.clone()
190 }
191
192 pub fn with_timeout_config(mut self, config: TimeoutConfig) -> Self {
194 self.timeout_config = Arc::new(config);
195 self
196 }
197
198 pub fn get_timeout_config(&self) -> TimeoutConfig {
200 (*self.timeout_config).clone()
201 }
202
203 pub fn with_high_performance_timeouts(mut self) -> Self {
205 self.timeout_config = Arc::new(TimeoutConfig::high_performance());
206 self
207 }
208
209 pub fn with_long_running_timeouts(mut self) -> Self {
211 self.timeout_config = Arc::new(TimeoutConfig::long_running());
212 self
213 }
214
215 pub fn get_operation_timeout(&self, operation: &str) -> std::time::Duration {
217 self.timeout_config.get_timeout_for_operation(operation)
218 }
219
220 pub fn validate_timeout_config(&self) -> Result<(), String> {
222 let config = &self.timeout_config;
223
224 if !config.validate_timeout(config.connect_timeout) {
226 return Err("Connect timeout is out of bounds".to_string());
227 }
228 if !config.validate_timeout(config.request_timeout) {
229 return Err("Request timeout is out of bounds".to_string());
230 }
231 if !config.validate_timeout(config.response_timeout) {
232 return Err("Response timeout is out of bounds".to_string());
233 }
234 if !config.validate_timeout(config.tool_execution_timeout) {
235 return Err("Tool execution timeout is out of bounds".to_string());
236 }
237 if !config.validate_timeout(config.resource_read_timeout) {
238 return Err("Resource read timeout is out of bounds".to_string());
239 }
240 if !config.validate_timeout(config.prompt_generation_timeout) {
241 return Err("Prompt generation timeout is out of bounds".to_string());
242 }
243 if !config.validate_timeout(config.sampling_timeout) {
244 return Err("Sampling timeout is out of bounds".to_string());
245 }
246 if !config.validate_timeout(config.completion_timeout) {
247 return Err("Completion timeout is out of bounds".to_string());
248 }
249 if !config.validate_timeout(config.shutdown_timeout) {
250 return Err("Shutdown timeout is out of bounds".to_string());
251 }
252 if !config.validate_timeout(config.heartbeat_interval) {
253 return Err("Heartbeat interval is out of bounds".to_string());
254 }
255
256 Ok(())
257 }
258
259 pub async fn set_log_level(&self, level: LogLevel) -> MCPResult<()> {
261 let mut logging_config = self.logging_config.write().await;
262
263 if !logging_config.allow_level_changes {
264 return Err(MCPError::invalid_request(
265 "Log level changes are not allowed on this server".to_string(),
266 ));
267 }
268
269 logging_config.current_level = level.clone();
270 logging_config.default_logger_config.min_level = level.clone();
271
272 info!("Server log level changed to: {:?}", level);
273 Ok(())
274 }
275
276 pub async fn get_log_level(&self) -> LogLevel {
278 self.logging_config.read().await.current_level.clone()
279 }
280
281 #[cfg(feature = "monitoring")]
285 pub fn with_monitoring_config(mut self, config: crate::MonitoringConfig) -> Self {
286 let monitoring = crate::MonitoringSystem::new(config);
287 self.monitoring_system = Some(Arc::new(monitoring));
288 info!("Monitoring enabled with custom configuration");
289 self
290 }
291 #[cfg(not(feature = "monitoring"))]
292 pub fn with_monitoring_config(self, _config: ()) -> Self {
293 warn!("Monitoring feature not enabled. Add 'monitoring' feature to enable monitoring.");
294 self
295 }
296
297 #[cfg(feature = "monitoring")]
299 pub fn with_monitoring(mut self) -> Self {
300 let monitoring = crate::MonitoringSystem::new(crate::MonitoringConfig::default());
301 self.monitoring_system = Some(Arc::new(monitoring));
302 info!("Monitoring enabled with default configuration");
303 self
304 }
305 #[cfg(not(feature = "monitoring"))]
306 pub fn with_monitoring(self) -> Self {
307 warn!("Monitoring feature not enabled. Add 'monitoring' feature to enable monitoring.");
308 self
309 }
310
311 #[cfg(feature = "monitoring")]
313 pub fn with_health_checks(mut self) -> Self {
314 if let Some(_monitoring) = &self.monitoring_system {
315 info!("Health checks enabled");
317 } else {
318 let mut config = crate::MonitoringConfig::default();
320 config.health.enabled = true;
321 let monitoring = crate::MonitoringSystem::new(config);
322 self.monitoring_system = Some(Arc::new(monitoring));
323 info!("Health checks enabled with new monitoring system");
324 }
325 self
326 }
327 #[cfg(not(feature = "monitoring"))]
328 pub fn with_health_checks(self) -> Self {
329 warn!(
330 "Health checks require monitoring feature. Add 'monitoring' feature to enable health checks."
331 );
332 self
333 }
334
335 #[cfg(feature = "monitoring")]
337 pub fn with_metrics(mut self) -> Self {
338 if let Some(_monitoring) = &self.monitoring_system {
339 info!("Metrics collection enabled");
341 } else {
342 let mut config = crate::MonitoringConfig::default();
344 config.metrics.enabled = true;
345 let monitoring = crate::MonitoringSystem::new(config);
346 self.monitoring_system = Some(Arc::new(monitoring));
347 info!("Metrics collection enabled with new monitoring system");
348 }
349 self
350 }
351 #[cfg(not(feature = "monitoring"))]
352 pub fn with_metrics(self) -> Self {
353 warn!("Metrics require monitoring feature. Add 'monitoring' feature to enable metrics.");
354 self
355 }
356
357 #[cfg(feature = "monitoring")]
359 pub fn with_tracing(mut self) -> Self {
360 if let Some(_monitoring) = &self.monitoring_system {
361 info!("Tracing enabled");
363 } else {
364 let mut config = crate::MonitoringConfig::default();
366 config.tracing.enabled = true;
367 let monitoring = crate::MonitoringSystem::new(config);
368 self.monitoring_system = Some(Arc::new(monitoring));
369 info!("Tracing enabled with new monitoring system");
370 }
371 self
372 }
373 #[cfg(not(feature = "monitoring"))]
374 pub fn with_tracing(self) -> Self {
375 warn!("Tracing requires monitoring feature. Add 'monitoring' feature to enable tracing.");
376 self
377 }
378
379 #[cfg(feature = "monitoring")]
381 pub fn with_full_monitoring(mut self) -> Self {
382 let mut config = crate::MonitoringConfig::default();
383 config.health.enabled = true;
384 config.metrics.enabled = true;
385 config.tracing.enabled = true;
386 let monitoring = crate::MonitoringSystem::new(config);
387 self.monitoring_system = Some(Arc::new(monitoring));
388 info!("Full monitoring enabled (health checks, metrics, tracing)");
389 self
390 }
391 #[cfg(not(feature = "monitoring"))]
392 pub fn with_full_monitoring(self) -> Self {
393 warn!(
394 "Full monitoring requires monitoring feature. Add 'monitoring' feature to enable all monitoring features."
395 );
396 self
397 }
398
399 pub fn with_middleware(self) -> Self {
401 info!("Middleware support enabled");
403 self
404 }
405
406 pub fn with_recovery(self) -> Self {
408 info!("Recovery mechanisms enabled");
409 self
410 }
411
412 pub fn with_oauth(self) -> Self {
414 info!("OAuth authentication enabled");
415 self
416 }
417
418 pub fn with_authentication(self, _token_validator: (), _required_scopes: Vec<String>) -> Self {
420 warn!("Authentication feature has been removed. Use ultrafast-mcp-auth crate directly.");
421 self
422 }
423
424 pub fn with_bearer_auth(self, _secret: String, _required_scopes: Vec<String>) -> Self {
426 warn!(
427 "Bearer authentication feature has been removed. Use ultrafast-mcp-auth crate directly."
428 );
429 self
430 }
431
432 pub fn with_api_key_auth(self) -> Self {
434 info!("API key authentication enabled");
435 self
436 }
437
438 pub fn with_basic_auth(self) -> Self {
440 info!("Basic authentication enabled");
441 self
442 }
443
444 pub fn with_rate_limiting(self, requests_per_minute: u32) -> Self {
446 info!(
447 "Rate limiting enabled: {} requests per minute",
448 requests_per_minute
449 );
450 self
451 }
452
453 pub fn with_request_validation(self) -> Self {
455 info!("Request validation enabled");
456 self
457 }
458
459 pub fn with_response_caching(self) -> Self {
461 info!("Response caching enabled");
462 self
463 }
464
465 #[cfg(feature = "monitoring")]
467 pub fn monitoring(&self) -> Option<Arc<crate::MonitoringSystem>> {
468 self.monitoring_system.clone()
469 }
470 #[cfg(not(feature = "monitoring"))]
471 pub fn monitoring(&self) -> Option<()> {
472 None
473 }
474
475 pub async fn create_context(&self) -> Context {
477 let logging_config = self.logging_config.read().await;
478 let logger_config = logging_config.default_logger_config.clone();
479
480 Context::new().with_logger_config(logger_config)
481 }
482
483 pub async fn create_context_with_ids(
485 &self,
486 request_id: String,
487 session_id: Option<String>,
488 ) -> Context {
489 let logging_config = self.logging_config.read().await;
490 let logger_config = logging_config.default_logger_config.clone();
491
492 let mut context = Context::new()
493 .with_request_id(request_id)
494 .with_logger_config(logger_config);
495
496 if let Some(session_id) = session_id {
497 context = context.with_session_id(session_id);
498 }
499
500 context
501 }
502
503 pub async fn register_tool(&self, tool: Tool) -> Result<(), ToolRegistrationError> {
505 if tool.name.is_empty() {
507 return Err(ToolRegistrationError::MissingDescription);
508 }
509
510 if self.is_reserved_name(&tool.name) {
511 return Err(ToolRegistrationError::ReservedName(tool.name.clone()));
512 }
513
514 if tool.description.is_empty() {
516 return Err(ToolRegistrationError::MissingDescription);
517 }
518
519 if let Err(e) = validate_tool_schema(&tool.input_schema) {
521 return Err(ToolRegistrationError::InvalidSchema(format!(
522 "Input schema: {e}"
523 )));
524 }
525
526 if let Some(output_schema) = &tool.output_schema {
527 if let Err(e) = validate_tool_schema(output_schema) {
528 return Err(ToolRegistrationError::InvalidSchema(format!(
529 "Output schema: {e}"
530 )));
531 }
532 } else {
533 return Err(ToolRegistrationError::MissingOutputSchema);
534 }
535
536 let mut tools = self.tools.write().await;
538 if tools.contains_key(&tool.name) {
539 return Err(ToolRegistrationError::ToolAlreadyExists(tool.name.clone()));
540 }
541
542 let tool_name = tool.name.clone();
544 tools.insert(tool_name.clone(), tool);
545 info!("Registered tool: {}", tool_name);
546
547 Ok(())
548 }
549
550 pub async fn register_tools(&self, tools: Vec<Tool>) -> Result<(), ToolRegistrationError> {
552 for tool in tools {
553 self.register_tool(tool).await?;
554 }
555 Ok(())
556 }
557
558 pub async fn unregister_tool(&self, name: &str) -> bool {
560 let mut tools = self.tools.write().await;
561 tools.remove(name).is_some()
562 }
563
564 pub async fn get_tool(&self, name: &str) -> Option<Tool> {
566 let tools = self.tools.read().await;
567 tools.get(name).cloned()
568 }
569
570 pub async fn list_tools(&self) -> Vec<Tool> {
572 let tools = self.tools.read().await;
573 tools.values().cloned().collect()
574 }
575
576 pub async fn has_tool(&self, name: &str) -> bool {
578 let tools = self.tools.read().await;
579 tools.contains_key(name)
580 }
581
582 pub async fn tool_count(&self) -> usize {
584 let tools = self.tools.read().await;
585 tools.len()
586 }
587
588 pub async fn clear_tools(&self) {
590 let mut tools = self.tools.write().await;
591 let count = tools.len();
592 tools.clear();
593 info!("Cleared {} tools", count);
594 }
595
596 fn is_reserved_name(&self, name: &str) -> bool {
598 let reserved_names = [
600 "initialize",
601 "initialized",
602 "shutdown",
603 "exit",
604 "ping",
605 "tools/list",
606 "tools/call",
607 "resources/list",
608 "resources/read",
609 "resources/subscribe",
610 "resources/unsubscribe",
611 "prompts/list",
612 "prompts/get",
613 "sampling/create",
614 "completion/complete",
615 "roots/list",
616 "elicitation/request",
617 "logging/setLevel",
618 ];
619
620 reserved_names.contains(&name)
621 }
622
623 pub async fn validate_tool_call(
625 &self,
626 tool_name: &str,
627 arguments: &serde_json::Value,
628 ) -> Result<(), MCPError> {
629 let tool = self.get_tool(tool_name).await;
630 let tool =
631 tool.ok_or_else(|| MCPError::invalid_request(format!("Tool '{tool_name}' not found")))?;
632
633 ultrafast_mcp_core::schema::validation::validate_tool_input(arguments, &tool.input_schema)
634 .map_err(|e| {
635 MCPError::invalid_request(format!(
636 "Tool '{tool_name}' input validation failed: {e}"
637 ))
638 })?;
639
640 Ok(())
641 }
642
643 pub async fn execute_tool_call(
645 &self,
646 tool_name: &str,
647 arguments: serde_json::Value,
648 ) -> Result<ultrafast_mcp_core::types::tools::ToolResult, MCPError> {
649 self.validate_tool_call(tool_name, &arguments).await?;
651
652 let tool_handler = self
654 .tool_handler
655 .as_ref()
656 .ok_or_else(|| MCPError::internal_error("No tool handler configured".to_string()))?;
657
658 let tool_call = ultrafast_mcp_core::types::tools::ToolCall {
660 name: tool_name.to_string(),
661 arguments: Some(arguments),
662 };
663
664 tool_handler
666 .handle_tool_call(tool_call)
667 .await
668 .map_err(|e| MCPError::internal_error(format!("Tool execution failed: {e}")))
669 }
670
671 pub fn with_tool_handler(mut self, handler: Arc<dyn ToolHandler>) -> Self {
673 self.tool_handler = Some(handler);
674 self
675 }
676
677 pub fn with_resource_handler(mut self, handler: Arc<dyn ResourceHandler>) -> Self {
679 self.resource_handler = Some(handler);
680 self
681 }
682
683 pub fn with_prompt_handler(mut self, handler: Arc<dyn PromptHandler>) -> Self {
685 self.prompt_handler = Some(handler);
686 self
687 }
688
689 pub fn with_sampling_handler(mut self, handler: Arc<dyn SamplingHandler>) -> Self {
691 self.sampling_handler = Some(handler);
692 self
693 }
694
695 pub fn with_completion_handler(mut self, handler: Arc<dyn CompletionHandler>) -> Self {
697 self.completion_handler = Some(handler);
698 self
699 }
700
701 pub fn with_roots_handler(mut self, handler: Arc<dyn RootsHandler>) -> Self {
703 self.roots_handler = Some(handler);
704 self
707 }
708
709 pub fn with_elicitation_handler(mut self, handler: Arc<dyn ElicitationHandler>) -> Self {
711 self.elicitation_handler = Some(handler);
712 self
715 }
716
717 pub fn with_subscription_handler(
719 mut self,
720 handler: Arc<dyn ResourceSubscriptionHandler>,
721 ) -> Self {
722 self.subscription_handler = Some(handler);
723 self
724 }
725
726 pub fn with_logging_config(mut self, config: ServerLoggingConfig) -> Self {
728 let logging_config = Arc::get_mut(&mut self.logging_config)
729 .expect("Cannot modify logging config after server has been cloned");
730 *logging_config.get_mut() = config;
731 self
732 }
733
734 pub async fn run_stdio(&self) -> MCPResult<()> {
736 let transport = create_transport(TransportConfig::Stdio)
737 .await
738 .map_err(|e| MCPError::internal_error(format!("Transport creation failed: {e}")))?;
739 self.run_with_transport(transport).await
740 }
741
742 pub async fn run_with_transport(&self, mut transport: Box<dyn Transport>) -> MCPResult<()> {
744 info!("Starting UltraFastServer with transport");
745
746 *self.state.write().await = ServerState::Initializing;
748
749 loop {
751 match transport.receive_message().await {
752 Ok(message) => {
753 if let Err(e) = self.handle_message(message, &mut transport).await {
754 error!("Error handling message: {}", e);
755 }
756 }
757 Err(e) => {
758 error!("Transport error: {}", e);
759 break;
760 }
761 }
762 }
763
764 Ok(())
765 }
766
767 #[cfg(feature = "http")]
769 pub async fn run_streamable_http(&self, host: &str, port: u16) -> MCPResult<()> {
770 info!(
771 "Starting UltraFastServer with Streamable HTTP on {}:{}",
772 host, port
773 );
774
775 let config = HttpTransportConfig {
776 host: host.to_string(),
777 port,
778 ..Default::default()
779 };
780
781 self.run_http(config).await
782 }
783
784 #[cfg(feature = "http")]
786 pub async fn run_http(&self, config: HttpTransportConfig) -> MCPResult<()> {
787 info!("Starting HTTP transport server with config: {:?}", config);
788
789 let transport_server = HttpTransportServer::new(config);
790 let message_receiver = transport_server.get_message_receiver();
791 let message_sender = transport_server.get_message_sender();
792 let response_sender = transport_server.get_response_sender();
793
794 let server_clone = self.clone();
796 let _message_processor = tokio::spawn(async move {
797 server_clone
798 .process_http_messages(message_receiver, message_sender, response_sender)
799 .await;
800 });
801
802 transport_server
804 .run()
805 .await
806 .map_err(|e| MCPError::internal_error(format!("HTTP server failed: {e}")))
807 }
808
809 #[cfg(feature = "http")]
812 pub async fn run_streamable_http_with_config(
813 &self,
814 config: HttpTransportConfig,
815 ) -> MCPResult<()> {
816 self.run_http(config).await
817 }
818
819 #[allow(dead_code)]
821 async fn process_http_messages(
822 &self,
823 mut message_receiver: broadcast::Receiver<(String, JsonRpcMessage)>,
824 _message_sender: broadcast::Sender<(String, JsonRpcMessage)>,
825 response_sender: broadcast::Sender<(String, JsonRpcMessage)>,
826 ) {
827 info!("HTTP message processor started");
828
829 while let Ok((session_id, message)) = message_receiver.recv().await {
830 let session_id_clone = session_id.clone();
831 match message {
832 JsonRpcMessage::Request(request) => {
833 info!(
834 "Processing HTTP request: {} (session: {})",
835 request.method, session_id
836 );
837
838 let response = self.handle_request(request).await;
839 let response_message = JsonRpcMessage::Response(response);
840
841 info!(
842 "Sending response for session {}: {:?}",
843 session_id, response_message
844 );
845
846 if let Err(e) = response_sender.send((session_id_clone, response_message)) {
848 error!("Failed to send response for session {}: {}", session_id, e);
849 } else {
850 info!("Successfully sent response for session {}", session_id);
851 }
852 }
853 JsonRpcMessage::Notification(notification) => {
854 info!(
855 "Processing HTTP notification: {} (session: {})",
856 notification.method, session_id
857 );
858
859 if let Err(e) = self.handle_notification(notification).await {
860 error!(
861 "Failed to handle notification for session {}: {}",
862 session_id, e
863 );
864 }
865 }
867 JsonRpcMessage::Response(_) => {
868 warn!(
869 "Received unexpected response message for session: {}",
870 session_id
871 );
872 }
873 }
874 }
875
876 info!("HTTP message processor stopped");
877 }
878
879 pub fn info(&self) -> &ServerInfo {
881 &self.info
882 }
883
884 pub fn cancellation_manager(&self) -> Arc<CancellationManager> {
886 self.cancellation_manager.clone()
887 }
888
889 pub fn ping_manager(&self) -> Arc<PingManager> {
891 self.ping_manager.clone()
892 }
893
894 pub async fn start_ping_monitoring(&self, ping_interval: std::time::Duration) -> MCPResult<()> {
897 info!(
898 "Starting periodic ping monitoring with interval: {:?}",
899 ping_interval
900 );
901
902 info!("Ping monitoring enabled (interval: {:?})", ping_interval);
906
907 Ok(())
912 }
913
914 pub async fn stop_ping_monitoring(&self) -> MCPResult<()> {
916 info!("Stopping periodic ping monitoring");
917 Ok(())
919 }
920
921 async fn handle_initialize(
923 &self,
924 request: ultrafast_mcp_core::protocol::InitializeRequest,
925 ) -> Result<ultrafast_mcp_core::protocol::InitializeResponse, MCPError> {
926 info!(
927 "Handling initialize request from client: {} (version: {})",
928 request.client_info.name, request.protocol_version
929 );
930
931 let negotiated_version = match ultrafast_mcp_core::protocol::version::negotiate_version(
933 &request.protocol_version,
934 ) {
935 Ok(version) => {
936 info!(
937 "Protocol version negotiated: {} -> {}",
938 request.protocol_version, version
939 );
940 version
941 }
942 Err(e) => {
943 error!("Protocol version negotiation failed: {}", e);
944 return Err(MCPError::invalid_request(format!(
945 "Protocol version negotiation failed: {}. Supported versions: {:?}",
946 e,
947 ultrafast_mcp_core::protocol::version::SUPPORTED_VERSIONS
948 )));
949 }
950 };
951
952 if let Err(e) = request.validate_protocol_version() {
954 warn!("Initialize request validation warning: {}", e);
955 }
957
958 if let Err(e) = ultrafast_mcp_core::protocol::capabilities::validate_compatibility(
960 &request.capabilities,
961 &self.capabilities,
962 ) {
963 error!("Capability validation failed: {}", e);
964 return Err(MCPError::Protocol(
965 ultrafast_mcp_core::error::ProtocolError::CapabilityNotSupported(e),
966 ));
967 }
968
969 info!("Capabilities validated successfully");
970
971 {
974 let mut state = self.state.write().await;
975 *state = ServerState::Initialized;
976 }
977
978 info!(
979 "Server initialized with protocol version: {} (waiting for initialized notification)",
980 negotiated_version
981 );
982
983 Ok(ultrafast_mcp_core::protocol::InitializeResponse {
984 protocol_version: negotiated_version,
985 capabilities: self.capabilities.clone(),
986 server_info: self.info.clone(),
987 instructions: None,
988 })
989 }
990
991 async fn handle_initialized(
993 &self,
994 _notification: ultrafast_mcp_core::protocol::InitializedNotification,
995 ) -> MCPResult<()> {
996 info!("Received initialized notification from client");
997
998 {
1000 let mut state = self.state.write().await;
1001 *state = ServerState::Operating;
1002 }
1003
1004 info!("Server confirmed operating state via initialized notification");
1005 Ok(())
1006 }
1007
1008 async fn handle_shutdown(
1010 &self,
1011 request: ultrafast_mcp_core::protocol::ShutdownRequest,
1012 ) -> MCPResult<()> {
1013 info!("Handling shutdown request: {:?}", request.reason);
1014
1015 {
1017 let mut state = self.state.write().await;
1018 *state = ServerState::ShuttingDown;
1019 }
1020
1021 self.perform_shutdown_cleanup().await;
1023
1024 {
1026 let mut state = self.state.write().await;
1027 *state = ServerState::Shutdown;
1028 }
1029
1030 info!("Server shutdown completed");
1031 Ok(())
1032 }
1033
1034 async fn perform_shutdown_cleanup(&self) {
1036 info!("Performing shutdown cleanup");
1037
1038 self.clear_tools().await;
1040
1041 {
1043 let mut resources = self.resources.write().await;
1044 resources.clear();
1045 }
1046
1047 {
1049 let mut prompts = self.prompts.write().await;
1050 prompts.clear();
1051 }
1052
1053 {
1055 let mut subscriptions = self.resource_subscriptions.write().await;
1056 subscriptions.clear();
1057 }
1058
1059 info!("Shutdown cleanup completed");
1060 }
1061
1062 pub async fn get_state(&self) -> ServerState {
1064 self.state.read().await.clone()
1065 }
1066
1067 pub async fn can_operate(&self) -> bool {
1069 self.state.read().await.can_operate()
1070 }
1071
1072 fn deserialize_list_tools_request(
1074 &self,
1075 params: Option<serde_json::Value>,
1076 ) -> ultrafast_mcp_core::types::tools::ListToolsRequest {
1077 serde_json::from_value(params.unwrap_or_default()).unwrap_or_default()
1078 }
1079
1080 fn deserialize_list_resources_request(
1081 &self,
1082 params: Option<serde_json::Value>,
1083 ) -> ultrafast_mcp_core::types::resources::ListResourcesRequest {
1084 serde_json::from_value(params.unwrap_or_default()).unwrap_or_default()
1085 }
1086
1087 fn deserialize_list_prompts_request(
1088 &self,
1089 params: Option<serde_json::Value>,
1090 ) -> ultrafast_mcp_core::types::prompts::ListPromptsRequest {
1091 serde_json::from_value(params.unwrap_or_default()).unwrap_or_default()
1092 }
1093
1094 fn deserialize_get_prompt_request(
1095 &self,
1096 params: Option<serde_json::Value>,
1097 ) -> ultrafast_mcp_core::types::prompts::GetPromptRequest {
1098 serde_json::from_value(params.unwrap_or_default()).unwrap_or_default()
1099 }
1100
1101 fn deserialize_read_resource_request(
1102 &self,
1103 params: Option<serde_json::Value>,
1104 ) -> ultrafast_mcp_core::types::resources::ReadResourceRequest {
1105 serde_json::from_value(params.unwrap_or_default()).unwrap_or_default()
1106 }
1107
1108 fn deserialize_list_resource_templates_request(
1109 &self,
1110 params: Option<serde_json::Value>,
1111 ) -> ultrafast_mcp_core::types::resources::ListResourceTemplatesRequest {
1112 serde_json::from_value(params.unwrap_or_default()).unwrap_or_default()
1113 }
1114
1115 fn deserialize_subscribe_request(
1116 &self,
1117 params: Option<serde_json::Value>,
1118 ) -> ultrafast_mcp_core::types::resources::SubscribeRequest {
1119 serde_json::from_value(params.unwrap_or_default()).unwrap_or_else(|_| {
1120 ultrafast_mcp_core::types::resources::SubscribeRequest { uri: String::new() }
1121 })
1122 }
1123
1124 fn deserialize_unsubscribe_request(
1125 &self,
1126 params: Option<serde_json::Value>,
1127 ) -> ultrafast_mcp_core::types::resources::UnsubscribeRequest {
1128 serde_json::from_value(params.unwrap_or_default()).unwrap_or_else(|_| {
1129 ultrafast_mcp_core::types::resources::UnsubscribeRequest { uri: String::new() }
1130 })
1131 }
1132
1133 fn deserialize_create_message_request(
1134 &self,
1135 params: Option<serde_json::Value>,
1136 ) -> ultrafast_mcp_core::types::sampling::CreateMessageRequest {
1137 serde_json::from_value(params.unwrap_or_default()).unwrap_or_default()
1138 }
1139
1140 fn deserialize_elicitation_request(
1141 &self,
1142 params: Option<serde_json::Value>,
1143 ) -> ultrafast_mcp_core::types::elicitation::ElicitationRequest {
1144 serde_json::from_value(params.unwrap_or_default()).unwrap_or_default()
1145 }
1146
1147 fn deserialize_complete_request(
1148 &self,
1149 params: Option<serde_json::Value>,
1150 ) -> ultrafast_mcp_core::types::completion::CompleteRequest {
1151 match params {
1152 Some(params) => serde_json::from_value(params).unwrap_or_else(|_| {
1153 ultrafast_mcp_core::types::completion::CompleteRequest {
1154 reference: ultrafast_mcp_core::types::completion::CompletionReference {
1155 ref_type: "ref/prompt".to_string(),
1156 name: "".to_string(),
1157 },
1158 argument: ultrafast_mcp_core::types::completion::CompletionArgument {
1159 name: "".to_string(),
1160 value: "".to_string(),
1161 },
1162 context: None,
1163 }
1164 }),
1165 None => ultrafast_mcp_core::types::completion::CompleteRequest {
1166 reference: ultrafast_mcp_core::types::completion::CompletionReference {
1167 ref_type: "ref/prompt".to_string(),
1168 name: "".to_string(),
1169 },
1170 argument: ultrafast_mcp_core::types::completion::CompletionArgument {
1171 name: "".to_string(),
1172 value: "".to_string(),
1173 },
1174 context: None,
1175 },
1176 }
1177 }
1178
1179 async fn handle_message(
1181 &self,
1182 message: JsonRpcMessage,
1183 transport: &mut Box<dyn Transport>,
1184 ) -> MCPResult<()> {
1185 match message {
1186 JsonRpcMessage::Request(request) => {
1187 if request.id.is_none() {
1189 self.handle_notification(request).await?;
1191 } else {
1192 let operation_timeout = self.get_operation_timeout(&request.method);
1194 let request_id = request.id.clone(); let response =
1196 tokio::time::timeout(operation_timeout, self.handle_request(request)).await;
1197
1198 match response {
1199 Ok(response) => {
1200 transport
1201 .send_message(JsonRpcMessage::Response(response))
1202 .await
1203 .map_err(|e| {
1204 MCPError::internal_error(format!("Failed to send message: {e}"))
1205 })?;
1206 }
1207 Err(_) => {
1208 let timeout_error = JsonRpcResponse::error(
1210 JsonRpcError::new(-32000, "Request timeout".to_string()),
1211 request_id.clone(),
1212 );
1213 transport
1214 .send_message(JsonRpcMessage::Response(timeout_error))
1215 .await
1216 .map_err(|e| {
1217 MCPError::internal_error(format!(
1218 "Failed to send timeout error: {e}"
1219 ))
1220 })?;
1221
1222 if let Some(request_id) = &request_id {
1224 self.notify_cancelled(
1225 serde_json::Value::String(request_id.to_string()),
1226 Some("Request timed out".to_string()),
1227 transport,
1228 )
1229 .await?;
1230 }
1231 }
1232 }
1233 }
1234 }
1235 JsonRpcMessage::Notification(notification) => {
1236 self.handle_notification(notification).await?;
1237 }
1238 JsonRpcMessage::Response(_) => {
1239 warn!("Received unexpected response message");
1240 }
1241 }
1242 Ok(())
1243 }
1244
1245 async fn handle_request(&self, request: JsonRpcRequest) -> JsonRpcResponse {
1247 info!(
1248 "Handling request: {} (id: {:?})",
1249 request.method, request.id
1250 );
1251
1252 match request.method.as_str() {
1253 "initialize" => {
1255 match serde_json::from_value::<ultrafast_mcp_core::protocol::InitializeRequest>(
1256 request.params.unwrap_or_default(),
1257 ) {
1258 Ok(init_request) => match self.handle_initialize(init_request).await {
1259 Ok(response) => match serde_json::to_value(response) {
1260 Ok(value) => JsonRpcResponse::success(value, request.id),
1261 Err(e) => JsonRpcResponse::error(
1262 JsonRpcError::new(-32603, format!("Serialization error: {e}")),
1263 request.id,
1264 ),
1265 },
1266 Err(e) => JsonRpcResponse::error(
1267 JsonRpcError::new(-32603, e.to_string()),
1268 request.id,
1269 ),
1270 },
1271 Err(e) => JsonRpcResponse::error(
1272 JsonRpcError::invalid_params(Some(format!(
1273 "Invalid initialize request: {e}"
1274 ))),
1275 request.id,
1276 ),
1277 }
1278 }
1279 "shutdown" => {
1280 let shutdown_request = match serde_json::from_value::<
1281 ultrafast_mcp_core::protocol::ShutdownRequest,
1282 >(request.params.unwrap_or_default())
1283 {
1284 Ok(req) => req,
1285 Err(_) => ultrafast_mcp_core::protocol::ShutdownRequest { reason: None },
1286 };
1287
1288 match self.handle_shutdown(shutdown_request).await {
1289 Ok(_) => JsonRpcResponse::success(serde_json::json!({}), request.id),
1290 Err(e) => {
1291 JsonRpcResponse::error(JsonRpcError::new(-32603, e.to_string()), request.id)
1292 }
1293 }
1294 }
1295
1296 "tools/list" => {
1298 if !self.can_operate().await {
1299 return JsonRpcResponse::error(
1300 JsonRpcError::internal_error(Some("Server not ready".to_string())),
1301 request.id,
1302 );
1303 }
1304
1305 let list_request = self.deserialize_list_tools_request(request.params.clone());
1306
1307 if let Some(handler) = &self.tool_handler {
1308 match handler.list_tools(list_request).await {
1309 Ok(response) => {
1310 if response.tools.is_empty() {
1312 let tools = self.list_tools().await;
1313 let response =
1314 ultrafast_mcp_core::types::tools::ListToolsResponse {
1315 tools,
1316 next_cursor: None,
1317 };
1318 match serde_json::to_value(response) {
1319 Ok(value) => JsonRpcResponse::success(value, request.id),
1320 Err(e) => JsonRpcResponse::error(
1321 JsonRpcError::new(
1322 -32603,
1323 format!("Serialization error: {e}"),
1324 ),
1325 request.id,
1326 ),
1327 }
1328 } else {
1329 match serde_json::to_value(response) {
1330 Ok(value) => JsonRpcResponse::success(value, request.id),
1331 Err(e) => JsonRpcResponse::error(
1332 JsonRpcError::new(
1333 -32603,
1334 format!("Serialization error: {e}"),
1335 ),
1336 request.id,
1337 ),
1338 }
1339 }
1340 }
1341 Err(e) => JsonRpcResponse::error(
1342 JsonRpcError::new(-32603, format!("Tools list failed: {e}")),
1343 request.id,
1344 ),
1345 }
1346 } else {
1347 let tools = self.list_tools().await;
1349 let response = ultrafast_mcp_core::types::tools::ListToolsResponse {
1350 tools,
1351 next_cursor: None,
1352 };
1353 match serde_json::to_value(response) {
1354 Ok(value) => JsonRpcResponse::success(value, request.id),
1355 Err(e) => JsonRpcResponse::error(
1356 JsonRpcError::new(-32603, format!("Serialization error: {e}")),
1357 request.id,
1358 ),
1359 }
1360 }
1361 }
1362 "tools/call" => {
1363 if !self.can_operate().await {
1364 return JsonRpcResponse::error(
1365 JsonRpcError::internal_error(Some("Server not ready".to_string())),
1366 request.id,
1367 );
1368 }
1369
1370 let params = match &request.params {
1371 Some(params) => params,
1372 None => {
1373 return JsonRpcResponse::error(
1374 JsonRpcError::new(
1375 -32602,
1376 "Tool call failed: Missing parameters".to_string(),
1377 ),
1378 request.id,
1379 );
1380 }
1381 };
1382
1383 let tool_name = params.get("name").and_then(|v| v.as_str());
1384 let arguments = params
1385 .get("arguments")
1386 .cloned()
1387 .unwrap_or(serde_json::json!({}));
1388
1389 if let Some(tool_name) = tool_name {
1390 if let Some(handler) = &self.tool_handler {
1391 let tool_call = ultrafast_mcp_core::types::tools::ToolCall {
1392 name: tool_name.to_string(),
1393 arguments: Some(arguments.clone()),
1394 };
1395 match handler.handle_tool_call(tool_call).await {
1397 Ok(result) => match serde_json::to_value(result) {
1398 Ok(value) => JsonRpcResponse::success(value, request.id),
1399 Err(e) => JsonRpcResponse::error(
1400 JsonRpcError::new(-32603, format!("Serialization error: {e}")),
1401 request.id,
1402 ),
1403 },
1404 Err(e) => {
1405 use ultrafast_mcp_core::error::{MCPError, ProtocolError};
1406 let (code, msg) = match &e {
1407 MCPError::Protocol(ProtocolError::InvalidParams(_))
1408 | MCPError::Protocol(ProtocolError::NotFound(_)) => {
1409 (-32602, format!("Tool call failed: {e}"))
1410 }
1411 _ => (-32603, format!("Tool call failed: {e}")),
1412 };
1413 JsonRpcResponse::error(JsonRpcError::new(code, msg), request.id)
1414 }
1415 }
1416 } else {
1417 if !self.has_tool(tool_name).await {
1419 return JsonRpcResponse::error(
1420 JsonRpcError::new(
1421 -32602,
1422 format!("Tool call failed: Tool not found: {tool_name}"),
1423 ),
1424 request.id,
1425 );
1426 }
1427 match self.execute_tool_call(tool_name, arguments).await {
1429 Ok(result) => match serde_json::to_value(result) {
1430 Ok(value) => JsonRpcResponse::success(value, request.id),
1431 Err(e) => JsonRpcResponse::error(
1432 JsonRpcError::new(-32603, format!("Serialization error: {e}")),
1433 request.id,
1434 ),
1435 },
1436 Err(e) => {
1437 use ultrafast_mcp_core::error::{MCPError, ProtocolError};
1438 let (code, msg) = match &e {
1439 MCPError::Protocol(ProtocolError::InvalidParams(_))
1440 | MCPError::Protocol(ProtocolError::NotFound(_)) => {
1441 (-32602, format!("Tool call failed: {e}"))
1442 }
1443 _ => (-32603, format!("Tool call failed: {e}")),
1444 };
1445 JsonRpcResponse::error(JsonRpcError::new(code, msg), request.id)
1446 }
1447 }
1448 }
1449 } else {
1450 JsonRpcResponse::error(
1451 JsonRpcError::new(
1452 -32602,
1453 "Tool call failed: Missing or invalid tool name".to_string(),
1454 ),
1455 request.id,
1456 )
1457 }
1458 }
1459
1460 "resources/list" => {
1462 if !self.can_operate().await {
1463 return JsonRpcResponse::error(
1464 JsonRpcError::new(-32000, "Server not ready".to_string()),
1465 request.id,
1466 );
1467 }
1468
1469 let list_request = self.deserialize_list_resources_request(request.params.clone());
1470
1471 if let Some(handler) = &self.resource_handler {
1472 match handler.list_resources(list_request).await {
1476 Ok(response) => match serde_json::to_value(response) {
1477 Ok(value) => JsonRpcResponse::success(value, request.id),
1478 Err(e) => JsonRpcResponse::error(
1479 JsonRpcError::new(-32603, format!("Serialization error: {e}")),
1480 request.id,
1481 ),
1482 },
1483 Err(e) => JsonRpcResponse::error(
1484 JsonRpcError::new(-32603, format!("Resources list failed: {e}")),
1485 request.id,
1486 ),
1487 }
1488 } else {
1489 JsonRpcResponse::error(
1490 JsonRpcError::new(-32601, "Resources not supported".to_string()),
1491 request.id,
1492 )
1493 }
1494 }
1495 "resources/read" => {
1496 if !self.can_operate().await {
1497 return JsonRpcResponse::error(
1498 JsonRpcError::new(-32000, "Server not ready".to_string()),
1499 request.id,
1500 );
1501 }
1502
1503 let read_request = self.deserialize_read_resource_request(request.params.clone());
1504
1505 if let Some(handler) = &self.resource_handler {
1506 if let Some(roots_handler) = &self.roots_handler {
1508 match roots_handler.list_roots().await {
1509 Ok(roots) => {
1510 if let Err(e) = handler
1511 .validate_resource_access(
1512 &read_request.uri,
1513 ultrafast_mcp_core::types::roots::RootOperation::Read,
1514 &roots,
1515 )
1516 .await
1517 {
1518 return JsonRpcResponse::error(
1519 JsonRpcError::new(
1520 -32603,
1521 format!("Root validation failed: {e}"),
1522 ),
1523 request.id,
1524 );
1525 }
1526 }
1527 Err(e) => {
1528 return JsonRpcResponse::error(
1529 JsonRpcError::new(-32603, format!("Failed to get roots: {e}")),
1530 request.id,
1531 );
1532 }
1533 }
1534 }
1535
1536 match handler.read_resource(read_request).await {
1537 Ok(response) => match serde_json::to_value(response) {
1538 Ok(value) => JsonRpcResponse::success(value, request.id),
1539 Err(e) => JsonRpcResponse::error(
1540 JsonRpcError::new(-32603, format!("Serialization error: {e}")),
1541 request.id,
1542 ),
1543 },
1544 Err(e) => JsonRpcResponse::error(
1545 JsonRpcError::new(-32603, format!("Resource read failed: {e}")),
1546 request.id,
1547 ),
1548 }
1549 } else {
1550 JsonRpcResponse::error(
1551 JsonRpcError::new(-32601, "Resources not supported".to_string()),
1552 request.id,
1553 )
1554 }
1555 }
1556 "resources/templates/list" => {
1557 if !self.can_operate().await {
1558 return JsonRpcResponse::error(
1559 JsonRpcError::new(-32000, "Server not ready".to_string()),
1560 request.id,
1561 );
1562 }
1563
1564 let list_request =
1565 self.deserialize_list_resource_templates_request(request.params.clone());
1566
1567 if let Some(handler) = &self.resource_handler {
1568 match handler.list_resource_templates(list_request).await {
1569 Ok(response) => JsonRpcResponse::success(
1570 serde_json::to_value(response).unwrap(),
1571 request.id,
1572 ),
1573 Err(e) => JsonRpcResponse::error(
1574 JsonRpcError::new(
1575 -32603,
1576 format!("Resource templates list failed: {e}"),
1577 ),
1578 request.id,
1579 ),
1580 }
1581 } else {
1582 JsonRpcResponse::error(
1583 JsonRpcError::new(-32601, "Resources not supported".to_string()),
1584 request.id,
1585 )
1586 }
1587 }
1588 "resources/subscribe" => {
1589 if !self.can_operate().await {
1590 return JsonRpcResponse::error(
1591 JsonRpcError::new(-32000, "Server not ready".to_string()),
1592 request.id,
1593 );
1594 }
1595
1596 let subscribe_request = self.deserialize_subscribe_request(request.params.clone());
1597
1598 if let Some(roots_handler) = &self.roots_handler {
1600 if let Some(resource_handler) = &self.resource_handler {
1601 match roots_handler.list_roots().await {
1602 Ok(roots) => {
1603 if let Err(e) = resource_handler
1604 .validate_resource_access(
1605 &subscribe_request.uri,
1606 ultrafast_mcp_core::types::roots::RootOperation::Read,
1607 &roots,
1608 )
1609 .await
1610 {
1611 return JsonRpcResponse::error(
1612 JsonRpcError::new(
1613 -32603,
1614 format!("Root validation failed: {e}"),
1615 ),
1616 request.id,
1617 );
1618 }
1619 }
1620 Err(e) => {
1621 return JsonRpcResponse::error(
1622 JsonRpcError::new(-32603, format!("Failed to get roots: {e}")),
1623 request.id,
1624 );
1625 }
1626 }
1627 }
1628 }
1629
1630 if let Some(handler) = &self.subscription_handler {
1631 match handler.subscribe(subscribe_request.uri.clone()).await {
1632 Ok(_) => {
1633 JsonRpcResponse::success(
1637 serde_json::to_value(SubscribeResponse::new()).unwrap(),
1638 request.id,
1639 )
1640 }
1641 Err(e) => JsonRpcResponse::error(
1642 JsonRpcError::new(-32603, format!("Resource subscribe failed: {e}")),
1643 request.id,
1644 ),
1645 }
1646 } else {
1647 JsonRpcResponse::error(
1648 JsonRpcError::new(
1649 -32601,
1650 "Resource subscriptions not supported".to_string(),
1651 ),
1652 request.id,
1653 )
1654 }
1655 }
1656 "resources/unsubscribe" => {
1657 if !self.can_operate().await {
1658 return JsonRpcResponse::error(
1659 JsonRpcError::new(-32000, "Server not ready".to_string()),
1660 request.id,
1661 );
1662 }
1663
1664 let unsubscribe_request =
1665 self.deserialize_unsubscribe_request(request.params.clone());
1666
1667 if let Some(handler) = &self.subscription_handler {
1668 match handler.unsubscribe(unsubscribe_request.uri).await {
1669 Ok(_) => JsonRpcResponse::success(serde_json::Value::Null, request.id),
1670 Err(e) => JsonRpcResponse::error(
1671 JsonRpcError::new(-32603, format!("Resource unsubscribe failed: {e}")),
1672 request.id,
1673 ),
1674 }
1675 } else {
1676 JsonRpcResponse::error(
1677 JsonRpcError::new(
1678 -32601,
1679 "Resource subscriptions not supported".to_string(),
1680 ),
1681 request.id,
1682 )
1683 }
1684 }
1685
1686 "prompts/list" => {
1688 if !self.can_operate().await {
1689 return JsonRpcResponse::error(
1690 JsonRpcError::new(-32000, "Server not ready".to_string()),
1691 request.id,
1692 );
1693 }
1694
1695 let list_request = self.deserialize_list_prompts_request(request.params.clone());
1696
1697 if let Some(handler) = &self.prompt_handler {
1698 match handler.list_prompts(list_request).await {
1699 Ok(response) => JsonRpcResponse::success(
1700 serde_json::to_value(response).unwrap(),
1701 request.id,
1702 ),
1703 Err(e) => JsonRpcResponse::error(
1704 JsonRpcError::new(-32603, format!("Prompts list failed: {e}")),
1705 request.id,
1706 ),
1707 }
1708 } else {
1709 JsonRpcResponse::error(
1710 JsonRpcError::new(-32601, "Prompts not supported".to_string()),
1711 request.id,
1712 )
1713 }
1714 }
1715 "prompts/get" => {
1716 if !self.can_operate().await {
1717 return JsonRpcResponse::error(
1718 JsonRpcError::new(-32000, "Server not ready".to_string()),
1719 request.id,
1720 );
1721 }
1722
1723 let get_request = self.deserialize_get_prompt_request(request.params.clone());
1724
1725 if let Some(handler) = &self.prompt_handler {
1726 match handler.get_prompt(get_request).await {
1727 Ok(response) => JsonRpcResponse::success(
1728 serde_json::to_value(response).unwrap(),
1729 request.id,
1730 ),
1731 Err(e) => JsonRpcResponse::error(
1732 JsonRpcError::new(-32603, format!("Prompt get failed: {e}")),
1733 request.id,
1734 ),
1735 }
1736 } else {
1737 JsonRpcResponse::error(
1738 JsonRpcError::new(-32601, "Prompts not supported".to_string()),
1739 request.id,
1740 )
1741 }
1742 }
1743
1744 "completion/complete" => {
1746 if !self.can_operate().await {
1747 return JsonRpcResponse::error(
1748 JsonRpcError::new(-32000, "Server not ready".to_string()),
1749 request.id,
1750 );
1751 }
1752
1753 let complete_request = self.deserialize_complete_request(request.params.clone());
1754
1755 if let Some(handler) = &self.completion_handler {
1756 match handler.complete(complete_request).await {
1757 Ok(response) => JsonRpcResponse::success(
1758 serde_json::to_value(response).unwrap(),
1759 request.id,
1760 ),
1761 Err(e) => JsonRpcResponse::error(
1762 JsonRpcError::new(-32603, format!("Completion failed: {e}")),
1763 request.id,
1764 ),
1765 }
1766 } else {
1767 JsonRpcResponse::error(
1768 JsonRpcError::new(-32601, "Completion not supported".to_string()),
1769 request.id,
1770 )
1771 }
1772 }
1773
1774 "sampling/createMessage" => {
1776 if !self.can_operate().await {
1777 return JsonRpcResponse::error(
1778 JsonRpcError::new(-32000, "Server not ready".to_string()),
1779 request.id,
1780 );
1781 }
1782
1783 let create_request =
1784 self.deserialize_create_message_request(request.params.clone());
1785
1786 if let Some(handler) = &self.sampling_handler {
1787 match handler.create_message(create_request).await {
1788 Ok(response) => JsonRpcResponse::success(
1789 serde_json::to_value(response).unwrap(),
1790 request.id,
1791 ),
1792 Err(e) => JsonRpcResponse::error(
1793 JsonRpcError::new(-32603, format!("Message creation failed: {e}")),
1794 request.id,
1795 ),
1796 }
1797 } else {
1798 JsonRpcResponse::error(
1799 JsonRpcError::new(-32601, "Sampling not supported".to_string()),
1800 request.id,
1801 )
1802 }
1803 }
1804
1805 "roots/list" => {
1807 if !self.can_operate().await {
1808 return JsonRpcResponse::error(
1809 JsonRpcError::new(-32000, "Server not ready".to_string()),
1810 request.id,
1811 );
1812 }
1813
1814 if let Some(handler) = &self.roots_handler {
1815 match handler.list_roots().await {
1816 Ok(response) => JsonRpcResponse::success(
1817 serde_json::to_value(response).unwrap(),
1818 request.id,
1819 ),
1820 Err(e) => JsonRpcResponse::error(
1821 JsonRpcError::new(-32603, format!("Roots list failed: {e}")),
1822 request.id,
1823 ),
1824 }
1825 } else {
1826 JsonRpcResponse::error(
1827 JsonRpcError::new(-32601, "Roots not supported".to_string()),
1828 request.id,
1829 )
1830 }
1831 }
1832
1833 "elicitation/create" => {
1835 if !self.can_operate().await {
1836 return JsonRpcResponse::error(
1837 JsonRpcError::new(-32000, "Server not ready".to_string()),
1838 request.id,
1839 );
1840 }
1841
1842 let elicitation_request =
1843 self.deserialize_elicitation_request(request.params.clone());
1844
1845 if let Some(handler) = &self.elicitation_handler {
1846 match handler.handle_elicitation(elicitation_request).await {
1847 Ok(response) => JsonRpcResponse::success(
1848 serde_json::to_value(response).unwrap(),
1849 request.id,
1850 ),
1851 Err(e) => JsonRpcResponse::error(
1852 JsonRpcError::new(-32603, format!("Elicitation failed: {e}")),
1853 request.id,
1854 ),
1855 }
1856 } else {
1857 JsonRpcResponse::error(
1858 JsonRpcError::new(-32601, "Elicitation not supported".to_string()),
1859 request.id,
1860 )
1861 }
1862 }
1863
1864 "elicitation/respond" => {
1865 if !self.can_operate().await {
1866 return JsonRpcResponse::error(
1867 JsonRpcError::new(-32000, "Server not ready".to_string()),
1868 request.id,
1869 );
1870 }
1871
1872 let elicitation_response = match serde_json::from_value::<
1873 ultrafast_mcp_core::types::elicitation::ElicitationResponse,
1874 >(
1875 request.params.unwrap_or_default()
1876 ) {
1877 Ok(response) => response,
1878 Err(e) => {
1879 return JsonRpcResponse::error(
1880 JsonRpcError::new(-32602, format!("Invalid elicitation response: {e}")),
1881 request.id,
1882 );
1883 }
1884 };
1885
1886 info!(
1888 "Received elicitation response: {:?}",
1889 elicitation_response.action
1890 );
1891
1892 JsonRpcResponse::success(serde_json::json!({}), request.id)
1895 }
1896
1897 "logging/setLevel" => {
1899 let params = match &request.params {
1900 Some(params) => params,
1901 None => {
1902 return JsonRpcResponse::error(
1903 JsonRpcError::new(-32602, "Missing parameters".to_string()),
1904 request.id,
1905 );
1906 }
1907 };
1908
1909 match serde_json::from_value::<LogLevelSetRequest>(params.clone()) {
1910 Ok(set_request) => match self.set_log_level(set_request.level).await {
1911 Ok(()) => {
1912 let response = LogLevelSetResponse::new();
1913 JsonRpcResponse::success(
1914 serde_json::to_value(response).unwrap(),
1915 request.id,
1916 )
1917 }
1918 Err(e) => JsonRpcResponse::error(
1919 JsonRpcError::new(-32603, format!("Failed to set log level: {e}")),
1920 request.id,
1921 ),
1922 },
1923 Err(e) => JsonRpcResponse::error(
1924 JsonRpcError::new(-32602, format!("Invalid log level set request: {e}")),
1925 request.id,
1926 ),
1927 }
1928 }
1929
1930 "ping" => {
1932 let ping_request = match serde_json::from_value::<
1933 ultrafast_mcp_core::types::notifications::PingRequest,
1934 >(request.params.unwrap_or_default())
1935 {
1936 Ok(req) => req,
1937 Err(_) => ultrafast_mcp_core::types::notifications::PingRequest { data: None },
1938 };
1939
1940 match self.ping_manager.handle_ping(ping_request).await {
1941 Ok(response) => JsonRpcResponse::success(
1942 serde_json::to_value(response).unwrap(),
1943 request.id,
1944 ),
1945 Err(e) => JsonRpcResponse::error(
1946 JsonRpcError::new(-32603, format!("Ping failed: {e}")),
1947 request.id,
1948 ),
1949 }
1950 }
1951
1952 "roots/set" => {
1954 let params = match &request.params {
1955 Some(params) => params,
1956 None => {
1957 return JsonRpcResponse::error(
1958 JsonRpcError::new(-32602, "Missing parameters".to_string()),
1959 request.id,
1960 );
1961 }
1962 };
1963
1964 match serde_json::from_value::<SetRootsRequest>(params.clone()) {
1965 Ok(set_request) => {
1966 let response = self
1967 .handle_set_roots(
1968 set_request.roots,
1969 &mut Box::new(
1970 create_transport(TransportConfig::Stdio).await.unwrap(),
1971 ),
1972 )
1973 .await;
1974 JsonRpcResponse::success(
1975 serde_json::to_value(response).unwrap(),
1976 request.id,
1977 )
1978 }
1979 Err(e) => JsonRpcResponse::error(
1980 JsonRpcError::new(-32602, format!("Invalid roots set request: {e}")),
1981 request.id,
1982 ),
1983 }
1984 }
1985
1986 _ => JsonRpcResponse::error(
1988 JsonRpcError::new(
1989 -32601,
1990 format!("Method not implemented: {}", request.method),
1991 ),
1992 request.id,
1993 ),
1994 }
1995 }
1996
1997 async fn handle_notification(&self, notification: JsonRpcRequest) -> MCPResult<()> {
1999 info!("Handling notification: {}", notification.method);
2000
2001 match notification.method.as_str() {
2002 "initialized" => {
2003 let notification = ultrafast_mcp_core::protocol::InitializedNotification {};
2004 self.handle_initialized(notification).await?;
2005 Ok(())
2006 }
2007 "notifications/cancelled" => {
2008 if let Some(params) = notification.params {
2010 let cancellation_notification: ultrafast_mcp_core::types::notifications::CancelledNotification =
2011 serde_json::from_value(params)?;
2012
2013 let _cancelled = self
2015 .cancellation_manager
2016 .handle_cancellation(cancellation_notification)
2017 .await?;
2018 info!("Cancellation notification processed");
2019 }
2020 Ok(())
2021 }
2022 _ => {
2023 warn!("Unknown notification method: {}", notification.method);
2024 Ok(())
2025 }
2026 }
2027 }
2028
2029 pub async fn notify_tools_changed(&self, transport: &mut Box<dyn Transport>) -> MCPResult<()> {
2033 let notification =
2034 ultrafast_mcp_core::types::notifications::ToolsListChangedNotification::new();
2035 self.send_notification(
2036 "notifications/tools/listChanged",
2037 Some(serde_json::to_value(notification)?),
2038 transport,
2039 )
2040 .await
2041 }
2042
2043 pub async fn notify_resources_changed(
2045 &self,
2046 transport: &mut Box<dyn Transport>,
2047 ) -> MCPResult<()> {
2048 let notification =
2049 ultrafast_mcp_core::types::notifications::ResourcesListChangedNotification::new();
2050 self.send_notification(
2051 "notifications/resources/listChanged",
2052 Some(serde_json::to_value(notification)?),
2053 transport,
2054 )
2055 .await
2056 }
2057
2058 pub async fn notify_prompts_changed(
2060 &self,
2061 transport: &mut Box<dyn Transport>,
2062 ) -> MCPResult<()> {
2063 let notification =
2064 ultrafast_mcp_core::types::notifications::PromptsListChangedNotification::new();
2065 self.send_notification(
2066 "notifications/prompts/listChanged",
2067 Some(serde_json::to_value(notification)?),
2068 transport,
2069 )
2070 .await
2071 }
2072
2073 pub async fn notify_resource_updated(
2075 &self,
2076 uri: String,
2077 transport: &mut Box<dyn Transport>,
2078 ) -> MCPResult<()> {
2079 let notification =
2080 ultrafast_mcp_core::types::resources::ResourceUpdatedNotification { uri };
2081 self.send_notification(
2082 "notifications/resources/updated",
2083 Some(serde_json::to_value(notification)?),
2084 transport,
2085 )
2086 .await
2087 }
2088
2089 pub async fn notify_progress(
2091 &self,
2092 progress_token: serde_json::Value,
2093 progress: f64,
2094 total: Option<f64>,
2095 message: Option<String>,
2096 transport: &mut Box<dyn Transport>,
2097 ) -> MCPResult<()> {
2098 let mut notification = ultrafast_mcp_core::types::notifications::ProgressNotification::new(
2099 progress_token,
2100 progress,
2101 );
2102 if let Some(total) = total {
2103 notification = notification.with_total(total);
2104 }
2105 if let Some(message) = message {
2106 notification = notification.with_message(message);
2107 }
2108 self.send_notification(
2109 "notifications/progress",
2110 Some(serde_json::to_value(notification)?),
2111 transport,
2112 )
2113 .await
2114 }
2115
2116 pub async fn notify_logging_message(
2118 &self,
2119 level: ultrafast_mcp_core::types::notifications::LogLevel,
2120 data: serde_json::Value,
2121 logger: Option<String>,
2122 transport: &mut Box<dyn Transport>,
2123 ) -> MCPResult<()> {
2124 let mut notification =
2125 ultrafast_mcp_core::types::notifications::LoggingMessageNotification::new(level, data);
2126 if let Some(logger) = logger {
2127 notification = notification.with_logger(logger);
2128 }
2129 self.send_notification(
2130 "notifications/logging/message",
2131 Some(serde_json::to_value(notification)?),
2132 transport,
2133 )
2134 .await
2135 }
2136
2137 pub async fn notify_cancelled(
2139 &self,
2140 request_id: serde_json::Value,
2141 reason: Option<String>,
2142 transport: &mut Box<dyn Transport>,
2143 ) -> MCPResult<()> {
2144 let mut notification =
2145 ultrafast_mcp_core::types::notifications::CancelledNotification::new(request_id);
2146 if let Some(reason) = reason {
2147 notification = notification.with_reason(reason);
2148 }
2149 self.send_notification(
2150 "notifications/cancelled",
2151 Some(serde_json::to_value(notification)?),
2152 transport,
2153 )
2154 .await
2155 }
2156
2157 pub async fn notify_roots_changed(&self, transport: &mut Box<dyn Transport>) -> MCPResult<()> {
2159 let notification =
2160 ultrafast_mcp_core::types::notifications::RootsListChangedNotification::new();
2161 self.send_notification(
2162 "notifications/roots/listChanged",
2163 Some(serde_json::to_value(notification)?),
2164 transport,
2165 )
2166 .await
2167 }
2168
2169 async fn send_notification(
2171 &self,
2172 method: &str,
2173 params: Option<serde_json::Value>,
2174 transport: &mut Box<dyn Transport>,
2175 ) -> MCPResult<()> {
2176 let notification = JsonRpcRequest {
2177 jsonrpc: Cow::Borrowed("2.0"),
2178 id: None, method: method.to_string(),
2180 params,
2181 meta: std::collections::HashMap::new(),
2182 };
2183
2184 transport
2185 .send_message(JsonRpcMessage::Request(notification))
2186 .await
2187 .map_err(|e| MCPError::internal_error(format!("Failed to send notification: {e}")))?;
2188
2189 info!("Sent notification: {}", method);
2190 Ok(())
2191 }
2192
2193 pub fn with_advanced_sampling_handler(
2195 mut self,
2196 handler: Arc<dyn AdvancedSamplingHandler>,
2197 ) -> Self {
2198 self.advanced_sampling_handler = Some(handler);
2199 self
2200 }
2201
2202 pub fn with_default_advanced_sampling(mut self) -> Self {
2204 let default_handler = Arc::new(DefaultAdvancedSamplingHandler::new(self.info.clone()));
2205 self.advanced_sampling_handler = Some(default_handler);
2206 self
2207 }
2208
2209 pub async fn handle_set_roots(
2211 &self,
2212 roots: Vec<ultrafast_mcp_core::types::roots::Root>,
2213 transport: &mut Box<dyn Transport>,
2214 ) -> SetRootsResponse {
2215 if let Some(handler) = &self.roots_handler {
2216 match handler.set_roots(roots.clone()).await {
2217 Ok(_) => {
2218 let notification = RootsListChangedNotification { roots };
2220 let params = serde_json::to_value(notification).ok();
2221 let _ = self
2222 .send_notification("roots/listChanged", params, transport)
2223 .await;
2224 SetRootsResponse {
2225 success: true,
2226 error: None,
2227 }
2228 }
2229 Err(e) => SetRootsResponse {
2230 success: false,
2231 error: Some(e.to_string()),
2232 },
2233 }
2234 } else {
2235 SetRootsResponse {
2236 success: false,
2237 error: Some("Roots handler not available".to_string()),
2238 }
2239 }
2240 }
2241}
2242
2243#[cfg(test)]
2244mod tests {
2245 use super::*;
2246 use serde_json::json;
2247 use ultrafast_mcp_core::types::{
2248 server::ServerInfo,
2249 tools::{Tool, ToolContent},
2250 };
2251
2252 struct MockToolHandler;
2254
2255 #[async_trait::async_trait]
2256 impl ToolHandler for MockToolHandler {
2257 async fn handle_tool_call(
2258 &self,
2259 call: ultrafast_mcp_core::types::tools::ToolCall,
2260 ) -> MCPResult<ultrafast_mcp_core::types::tools::ToolResult> {
2261 if call.name == "nonexistent_tool" {
2263 return Err(ultrafast_mcp_core::error::MCPError::not_found(
2264 "Tool not found".to_string(),
2265 ));
2266 }
2267 if let Some(args) = &call.arguments {
2268 if args.get("input").is_none() {
2269 return Err(ultrafast_mcp_core::error::MCPError::invalid_params(
2270 "Invalid parameters".to_string(),
2271 ));
2272 }
2273 } else {
2274 return Err(ultrafast_mcp_core::error::MCPError::invalid_params(
2275 "Missing arguments".to_string(),
2276 ));
2277 }
2278 Ok(ultrafast_mcp_core::types::tools::ToolResult {
2279 content: vec![ToolContent::text(format!("Mock result for {}", call.name))],
2280 is_error: None,
2281 })
2282 }
2283
2284 async fn list_tools(
2285 &self,
2286 _request: ultrafast_mcp_core::types::tools::ListToolsRequest,
2287 ) -> MCPResult<ultrafast_mcp_core::types::tools::ListToolsResponse> {
2288 Ok(ultrafast_mcp_core::types::tools::ListToolsResponse {
2290 tools: vec![],
2291 next_cursor: None,
2292 })
2293 }
2294 }
2295
2296 fn create_test_server() -> UltraFastServer {
2297 let info = ServerInfo {
2298 name: "test-server".to_string(),
2299 version: "1.0.0".to_string(),
2300 description: Some("Test server".to_string()),
2301 homepage: None,
2302 repository: None,
2303 authors: Some(vec!["test".to_string()]),
2304 license: Some("MIT".to_string()),
2305 };
2306 let capabilities = ServerCapabilities::default();
2307 UltraFastServer::new(info, capabilities).with_tool_handler(Arc::new(MockToolHandler))
2308 }
2309
2310 async fn create_initialized_test_server() -> UltraFastServer {
2311 let server = create_test_server();
2312
2313 let init_request = ultrafast_mcp_core::protocol::InitializeRequest {
2315 protocol_version: "2025-06-18".to_string(),
2316 capabilities: ultrafast_mcp_core::protocol::ClientCapabilities::default(),
2317 client_info: ultrafast_mcp_core::types::client::ClientInfo {
2318 name: "test-client".to_string(),
2319 version: "1.0.0".to_string(),
2320 description: Some("Test client".to_string()),
2321 homepage: None,
2322 repository: None,
2323 authors: Some(vec!["test".to_string()]),
2324 license: Some("MIT".to_string()),
2325 },
2326 };
2327
2328 let _response = server.handle_initialize(init_request).await;
2329
2330 let notification = ultrafast_mcp_core::protocol::InitializedNotification {};
2332 let _ = server.handle_initialized(notification).await;
2333
2334 server
2335 }
2336
2337 fn create_valid_tool(name: &str) -> Tool {
2338 Tool {
2339 name: name.to_string(),
2340 description: "A test tool".to_string(),
2341 input_schema: json!({
2342 "type": "object",
2343 "properties": {
2344 "input": {"type": "string"}
2345 },
2346 "required": ["input"]
2347 }),
2348 output_schema: Some(json!({
2349 "type": "object",
2350 "properties": {
2351 "output": {"type": "string"}
2352 }
2353 })),
2354 annotations: None,
2355 }
2356 }
2357
2358 #[tokio::test]
2359 async fn test_register_valid_tool() {
2360 let server = create_test_server();
2361 let tool = create_valid_tool("test_tool");
2362
2363 let result = server.register_tool(tool).await;
2364 assert!(result.is_ok());
2365
2366 assert!(server.has_tool("test_tool").await);
2367 assert_eq!(server.tool_count().await, 1);
2368 }
2369
2370 #[tokio::test]
2371 async fn test_register_duplicate_tool() {
2372 let server = create_test_server();
2373 let tool1 = create_valid_tool("test_tool");
2374 let tool2 = create_valid_tool("test_tool");
2375
2376 server.register_tool(tool1).await.unwrap();
2377 let result = server.register_tool(tool2).await;
2378
2379 assert!(matches!(
2380 result,
2381 Err(ToolRegistrationError::ToolAlreadyExists(_))
2382 ));
2383 assert_eq!(server.tool_count().await, 1);
2384 }
2385
2386 #[tokio::test]
2387 async fn test_register_reserved_name() {
2388 let server = create_test_server();
2389 let tool = create_valid_tool("initialize");
2390
2391 let result = server.register_tool(tool).await;
2392 assert!(matches!(
2393 result,
2394 Err(ToolRegistrationError::ReservedName(_))
2395 ));
2396 assert_eq!(server.tool_count().await, 0);
2397 }
2398
2399 #[tokio::test]
2400 async fn test_register_tool_without_description() {
2401 let server = create_test_server();
2402 let mut tool = create_valid_tool("test_tool");
2403 tool.description = "".to_string();
2404
2405 let result = server.register_tool(tool).await;
2406 assert!(matches!(
2407 result,
2408 Err(ToolRegistrationError::MissingDescription)
2409 ));
2410 }
2411
2412 #[tokio::test]
2413 async fn test_register_tool_with_invalid_input_schema() {
2414 let server = create_test_server();
2415 let mut tool = create_valid_tool("test_tool");
2416 tool.input_schema = json!("invalid schema");
2417
2418 let result = server.register_tool(tool).await;
2419 assert!(matches!(
2420 result,
2421 Err(ToolRegistrationError::InvalidSchema(_))
2422 ));
2423 }
2424
2425 #[tokio::test]
2426 async fn test_register_tool_without_output_schema() {
2427 let server = create_test_server();
2428 let mut tool = create_valid_tool("test_tool");
2429 tool.output_schema = None;
2430
2431 let result = server.register_tool(tool).await;
2432 assert!(matches!(
2433 result,
2434 Err(ToolRegistrationError::MissingOutputSchema)
2435 ));
2436 }
2437
2438 #[tokio::test]
2439 async fn test_register_tool_with_invalid_schema() {
2440 let server = create_test_server();
2441 let mut tool = create_valid_tool("test_tool");
2442 tool.input_schema = json!("invalid schema");
2443
2444 let result = server.register_tool(tool).await;
2445 assert!(matches!(
2446 result,
2447 Err(ToolRegistrationError::InvalidSchema(_))
2448 ));
2449 }
2450
2451 #[tokio::test]
2452 async fn test_unregister_tool() {
2453 let server = create_test_server();
2454 let tool = create_valid_tool("test_tool");
2455
2456 server.register_tool(tool).await.unwrap();
2457 assert!(server.has_tool("test_tool").await);
2458
2459 let result = server.unregister_tool("test_tool");
2460 assert!(result.await);
2461 assert!(!server.has_tool("test_tool").await);
2462 assert_eq!(server.tool_count().await, 0);
2463 }
2464
2465 #[tokio::test]
2466 async fn test_unregister_nonexistent_tool() {
2467 let server = create_test_server();
2468 let result = server.unregister_tool("nonexistent");
2469 assert!(!result.await);
2470 }
2471
2472 #[tokio::test]
2473 async fn test_register_multiple_tools() {
2474 let server = create_test_server();
2475 let tools = vec![
2476 create_valid_tool("tool1"),
2477 create_valid_tool("tool2"),
2478 create_valid_tool("tool3"),
2479 ];
2480
2481 let result = server.register_tools(tools).await;
2482 assert!(result.is_ok());
2483 assert_eq!(server.tool_count().await, 3);
2484 assert!(server.has_tool("tool1").await);
2485 assert!(server.has_tool("tool2").await);
2486 assert!(server.has_tool("tool3").await);
2487 }
2488
2489 #[tokio::test]
2490 async fn test_register_multiple_tools_with_duplicate() {
2491 let server = create_test_server();
2492 let tools = vec![
2493 create_valid_tool("tool1"),
2494 create_valid_tool("tool1"), create_valid_tool("tool2"),
2496 ];
2497
2498 let result = server.register_tools(tools).await;
2499 assert!(matches!(
2500 result,
2501 Err(ToolRegistrationError::ToolAlreadyExists(_))
2502 ));
2503 assert_eq!(server.tool_count().await, 1); }
2505
2506 #[tokio::test]
2507 async fn test_get_tool() {
2508 let server = create_test_server();
2509 let tool = create_valid_tool("test_tool");
2510
2511 server.register_tool(tool.clone()).await.unwrap();
2512
2513 let retrieved = server.get_tool("test_tool").await;
2514 assert!(retrieved.is_some());
2515 assert_eq!(retrieved.unwrap().name, tool.name);
2516 }
2517
2518 #[tokio::test]
2519 async fn test_get_nonexistent_tool() {
2520 let server = create_test_server();
2521 let retrieved = server.get_tool("nonexistent").await;
2522 assert!(retrieved.is_none());
2523 }
2524
2525 #[tokio::test]
2526 async fn test_list_tools() {
2527 let server = create_test_server();
2528 let tools = vec![create_valid_tool("tool1"), create_valid_tool("tool2")];
2529
2530 server.register_tools(tools).await.unwrap();
2531
2532 let listed = server.list_tools().await;
2533 assert_eq!(listed.len(), 2);
2534 assert!(listed.iter().any(|t| t.name == "tool1"));
2535 assert!(listed.iter().any(|t| t.name == "tool2"));
2536 }
2537
2538 #[tokio::test]
2539 async fn test_clear_tools() {
2540 let server = create_test_server();
2541 let tools = vec![create_valid_tool("tool1"), create_valid_tool("tool2")];
2542
2543 server.register_tools(tools).await.unwrap();
2544 assert_eq!(server.tool_count().await, 2);
2545
2546 server.clear_tools().await;
2547 assert_eq!(server.tool_count().await, 0);
2548 assert!(!server.has_tool("tool1").await);
2549 assert!(!server.has_tool("tool2").await);
2550 }
2551
2552 #[tokio::test]
2553 async fn test_validate_tool_call() {
2554 let server = create_test_server();
2555 let tool = create_valid_tool("test_tool");
2556 server.register_tool(tool).await.unwrap();
2557
2558 let valid_args = json!({"input": "test input"});
2559 let result = server.validate_tool_call("test_tool", &valid_args).await;
2560 assert!(result.is_ok());
2561 }
2562
2563 #[tokio::test]
2564 async fn test_validate_tool_call_invalid_args() {
2565 let server = create_test_server();
2566 let tool = create_valid_tool("test_tool");
2567 server.register_tool(tool).await.unwrap();
2568
2569 let invalid_args = json!({"wrong_field": "test input"});
2570 let result = server.validate_tool_call("test_tool", &invalid_args).await;
2571 assert!(result.is_err());
2572 }
2573
2574 #[tokio::test]
2575 async fn test_validate_nonexistent_tool_call() {
2576 let server = create_test_server();
2577 let args = json!({"input": "test input"});
2578 let result = server.validate_tool_call("nonexistent", &args).await;
2579 assert!(result.is_err());
2580 }
2581
2582 #[tokio::test]
2583 async fn test_execute_tool_call() {
2584 let server = create_test_server();
2585 let tool = create_valid_tool("test_tool");
2586 server.register_tool(tool).await.unwrap();
2587
2588 let args = json!({"input": "test input"});
2589 let result = server.execute_tool_call("test_tool", args).await;
2590 assert!(result.is_ok());
2591
2592 let tool_result = result.unwrap();
2593 assert_eq!(tool_result.content.len(), 1);
2594 assert!(!tool_result.is_error.unwrap_or(false));
2595 }
2596
2597 #[tokio::test]
2598 async fn test_execute_tool_call_without_handler() {
2599 let server = UltraFastServer::new(
2600 ServerInfo {
2601 name: "test-server".to_string(),
2602 version: "1.0.0".to_string(),
2603 description: Some("Test server".to_string()),
2604 homepage: None,
2605 repository: None,
2606 authors: Some(vec!["test".to_string()]),
2607 license: Some("MIT".to_string()),
2608 },
2609 ServerCapabilities::default(),
2610 );
2611 let tool = create_valid_tool("test_tool");
2612 server.register_tool(tool).await.unwrap();
2613
2614 let args = json!({"input": "test input"});
2615 let result = server.execute_tool_call("test_tool", args).await;
2616 assert!(result.is_err());
2617 }
2618
2619 #[tokio::test]
2620 async fn test_reserved_names() {
2621 let server = create_test_server();
2622 let reserved_names = [
2623 "initialize",
2624 "initialized",
2625 "shutdown",
2626 "exit",
2627 "ping",
2628 "tools/list",
2629 "tools/call",
2630 "resources/list",
2631 "resources/read",
2632 "resources/subscribe",
2633 "resources/unsubscribe",
2634 "prompts/list",
2635 "prompts/get",
2636 "sampling/create",
2637 "completion/complete",
2638 "roots/list",
2639 "elicitation/request",
2640 ];
2641
2642 for name in &reserved_names {
2643 let tool = create_valid_tool(name);
2644 let result = server.register_tool(tool).await;
2645 assert!(matches!(
2646 result,
2647 Err(ToolRegistrationError::ReservedName(_))
2648 ));
2649 }
2650 }
2651
2652 #[tokio::test]
2653 async fn test_tools_list_jsonrpc() {
2654 let server = create_initialized_test_server().await;
2655
2656 let tools = vec![create_valid_tool("tool1"), create_valid_tool("tool2")];
2658 server.register_tools(tools).await.unwrap();
2659
2660 let request = JsonRpcRequest {
2662 jsonrpc: Cow::Borrowed("2.0"),
2663 id: Some(ultrafast_mcp_core::protocol::jsonrpc::RequestId::string(
2664 "test-id",
2665 )),
2666 method: "tools/list".to_string(),
2667 params: None,
2668 meta: std::collections::HashMap::new(),
2669 };
2670
2671 let response = server.handle_request(request).await;
2672
2673 if let Some(result) = &response.result {
2675 assert_eq!(
2676 response.id,
2677 Some(ultrafast_mcp_core::protocol::jsonrpc::RequestId::string(
2678 "test-id"
2679 ))
2680 );
2681 let tools_array = result.get("tools").and_then(|t| t.as_array()).unwrap();
2682 assert_eq!(tools_array.len(), 2);
2683
2684 let tool_names: Vec<&str> = tools_array
2685 .iter()
2686 .filter_map(|t| t.get("name").and_then(|n| n.as_str()))
2687 .collect();
2688 assert!(tool_names.contains(&"tool1"));
2689 assert!(tool_names.contains(&"tool2"));
2690 } else {
2691 panic!("Expected success response");
2692 }
2693 }
2694
2695 #[tokio::test]
2696 async fn test_tools_call_jsonrpc_success() {
2697 let server = create_initialized_test_server().await;
2698
2699 let tool = create_valid_tool("test_tool");
2701 server.register_tool(tool).await.unwrap();
2702
2703 let request = JsonRpcRequest {
2705 jsonrpc: Cow::Borrowed("2.0"),
2706 id: Some(ultrafast_mcp_core::protocol::jsonrpc::RequestId::string(
2707 "test-id",
2708 )),
2709 method: "tools/call".to_string(),
2710 params: Some(json!({
2711 "name": "test_tool",
2712 "arguments": {
2713 "input": "test input"
2714 }
2715 })),
2716 meta: std::collections::HashMap::new(),
2717 };
2718
2719 let response = server.handle_request(request).await;
2720
2721 if let Some(result) = &response.result {
2723 assert_eq!(
2724 response.id,
2725 Some(ultrafast_mcp_core::protocol::jsonrpc::RequestId::string(
2726 "test-id"
2727 ))
2728 );
2729
2730 let content = result.get("content").and_then(|c| c.as_array()).unwrap();
2732 assert_eq!(content.len(), 1);
2733
2734 let text_content = content[0].get("text").and_then(|t| t.as_str()).unwrap();
2736 assert!(text_content.contains("Mock result for test_tool"));
2737 } else {
2738 panic!("Expected success response");
2739 }
2740 }
2741
2742 #[tokio::test]
2743 async fn test_tools_call_jsonrpc_missing_params() {
2744 let server = create_initialized_test_server().await;
2745
2746 let request = JsonRpcRequest {
2748 jsonrpc: Cow::Borrowed("2.0"),
2749 id: Some(ultrafast_mcp_core::protocol::jsonrpc::RequestId::string(
2750 "test-id",
2751 )),
2752 method: "tools/call".to_string(),
2753 params: None,
2754 meta: std::collections::HashMap::new(),
2755 };
2756
2757 let response = server.handle_request(request).await;
2758
2759 if let Some(error) = &response.error {
2761 assert_eq!(
2762 response.id,
2763 Some(ultrafast_mcp_core::protocol::jsonrpc::RequestId::string(
2764 "test-id"
2765 ))
2766 );
2767 assert_eq!(error.code, -32602); assert!(error.message.contains("Missing parameters"));
2769 } else {
2770 panic!("Expected error response");
2771 }
2772 }
2773
2774 #[tokio::test]
2775 async fn test_tools_call_jsonrpc_missing_name() {
2776 let server = create_initialized_test_server().await;
2777
2778 let request = JsonRpcRequest {
2780 jsonrpc: Cow::Borrowed("2.0"),
2781 id: Some(ultrafast_mcp_core::protocol::jsonrpc::RequestId::string(
2782 "test-id",
2783 )),
2784 method: "tools/call".to_string(),
2785 params: Some(json!({
2786 "arguments": {
2787 "input": "test input"
2788 }
2789 })),
2790 meta: std::collections::HashMap::new(),
2791 };
2792
2793 let response = server.handle_request(request).await;
2794
2795 if let Some(error) = &response.error {
2797 assert_eq!(
2798 response.id,
2799 Some(ultrafast_mcp_core::protocol::jsonrpc::RequestId::string(
2800 "test-id"
2801 ))
2802 );
2803 assert_eq!(error.code, -32602); assert!(error.message.contains("Missing or invalid tool name"));
2805 } else {
2806 panic!("Expected error response");
2807 }
2808 }
2809
2810 #[tokio::test]
2811 async fn test_tools_call_jsonrpc_nonexistent_tool() {
2812 let server = create_initialized_test_server().await;
2813
2814 let request = JsonRpcRequest {
2816 jsonrpc: Cow::Borrowed("2.0"),
2817 id: Some(ultrafast_mcp_core::protocol::jsonrpc::RequestId::string(
2818 "test-id",
2819 )),
2820 method: "tools/call".to_string(),
2821 params: Some(json!({
2822 "name": "nonexistent_tool",
2823 "arguments": {
2824 "input": "test input"
2825 }
2826 })),
2827 meta: std::collections::HashMap::new(),
2828 };
2829
2830 let response = server.handle_request(request).await;
2831
2832 if let Some(error) = &response.error {
2834 assert_eq!(
2835 response.id,
2836 Some(ultrafast_mcp_core::protocol::jsonrpc::RequestId::string(
2837 "test-id"
2838 ))
2839 );
2840 assert_eq!(error.code, -32602); assert!(error.message.contains("Tool call failed:"));
2842 assert!(error.message.contains("Tool not found"));
2843 } else {
2844 panic!("Expected error response");
2845 }
2846 }
2847
2848 #[tokio::test]
2849 async fn test_tools_call_jsonrpc_invalid_arguments() {
2850 let server = create_initialized_test_server().await;
2851
2852 let tool = create_valid_tool("test_tool");
2854 server.register_tool(tool).await.unwrap();
2855
2856 let request = JsonRpcRequest {
2858 jsonrpc: Cow::Borrowed("2.0"),
2859 id: Some(ultrafast_mcp_core::protocol::jsonrpc::RequestId::string(
2860 "test-id",
2861 )),
2862 method: "tools/call".to_string(),
2863 params: Some(json!({
2864 "name": "test_tool",
2865 "arguments": {
2866 "wrong_field": "test input"
2867 }
2868 })),
2869 meta: std::collections::HashMap::new(),
2870 };
2871
2872 let response = server.handle_request(request).await;
2873
2874 if let Some(error) = &response.error {
2876 assert_eq!(
2877 response.id,
2878 Some(ultrafast_mcp_core::protocol::jsonrpc::RequestId::string(
2879 "test-id"
2880 ))
2881 );
2882 assert_eq!(error.code, -32602); assert!(error.message.contains("Invalid parameters"));
2884 } else {
2885 panic!("Expected error response");
2886 }
2887 }
2888
2889 #[tokio::test]
2890 async fn test_tools_call_jsonrpc_empty_arguments() {
2891 let server = create_initialized_test_server().await;
2892
2893 let tool = create_valid_tool("test_tool");
2895 server.register_tool(tool).await.unwrap();
2896
2897 let request = JsonRpcRequest {
2899 jsonrpc: Cow::Borrowed("2.0"),
2900 id: Some(ultrafast_mcp_core::protocol::jsonrpc::RequestId::string(
2901 "test-id",
2902 )),
2903 method: "tools/call".to_string(),
2904 params: Some(json!({
2905 "name": "test_tool",
2906 "arguments": {}
2907 })),
2908 meta: std::collections::HashMap::new(),
2909 };
2910
2911 let response = server.handle_request(request).await;
2912
2913 if let Some(error) = &response.error {
2915 assert_eq!(
2916 response.id,
2917 Some(ultrafast_mcp_core::protocol::jsonrpc::RequestId::string(
2918 "test-id"
2919 ))
2920 );
2921 assert_eq!(error.code, -32602); assert!(error.message.contains("Invalid parameters"));
2924 } else {
2925 panic!("Expected error response");
2926 }
2927 }
2928
2929 #[tokio::test]
2930 async fn test_unknown_method() {
2931 let server = create_test_server();
2932
2933 let request = JsonRpcRequest {
2935 jsonrpc: Cow::Borrowed("2.0"),
2936 id: Some(ultrafast_mcp_core::protocol::jsonrpc::RequestId::string(
2937 "test-id",
2938 )),
2939 method: "unknown/method".to_string(),
2940 params: None,
2941 meta: std::collections::HashMap::new(),
2942 };
2943
2944 let response = server.handle_request(request).await;
2945
2946 if let Some(error) = &response.error {
2948 assert_eq!(
2949 response.id,
2950 Some(ultrafast_mcp_core::protocol::jsonrpc::RequestId::string(
2951 "test-id"
2952 ))
2953 );
2954 assert_eq!(error.code, -32601); assert!(error.message.contains("Method not implemented"));
2956 } else {
2957 panic!("Expected error response");
2958 }
2959 }
2960
2961 #[tokio::test]
2962 async fn test_tools_integration_workflow() {
2963 let server = create_initialized_test_server().await;
2964
2965 let tools = vec![
2967 create_valid_tool("calculator"),
2968 create_valid_tool("file_reader"),
2969 ];
2970 server.register_tools(tools).await.unwrap();
2971 assert_eq!(server.tool_count().await, 2);
2972
2973 let list_request = JsonRpcRequest {
2975 jsonrpc: Cow::Borrowed("2.0"),
2976 id: Some(ultrafast_mcp_core::protocol::jsonrpc::RequestId::string(
2977 "list-id",
2978 )),
2979 method: "tools/list".to_string(),
2980 params: None,
2981 meta: std::collections::HashMap::new(),
2982 };
2983
2984 let list_response = server.handle_request(list_request).await;
2985 if let Some(result) = &list_response.result {
2986 assert_eq!(
2987 list_response.id,
2988 Some(ultrafast_mcp_core::protocol::jsonrpc::RequestId::string(
2989 "list-id"
2990 ))
2991 );
2992 let tools_array = result.get("tools").and_then(|t| t.as_array()).unwrap();
2993 assert_eq!(tools_array.len(), 2);
2994 } else {
2995 panic!("Expected success response for tools/list");
2996 }
2997
2998 let call_request = JsonRpcRequest {
3000 jsonrpc: Cow::Borrowed("2.0"),
3001 id: Some(ultrafast_mcp_core::protocol::jsonrpc::RequestId::string(
3002 "call-id",
3003 )),
3004 method: "tools/call".to_string(),
3005 params: Some(json!({
3006 "name": "calculator",
3007 "arguments": {
3008 "input": "2 + 2"
3009 }
3010 })),
3011 meta: std::collections::HashMap::new(),
3012 };
3013
3014 let call_response = server.handle_request(call_request).await;
3015 if let Some(result) = &call_response.result {
3016 assert_eq!(
3017 call_response.id,
3018 Some(ultrafast_mcp_core::protocol::jsonrpc::RequestId::string(
3019 "call-id"
3020 ))
3021 );
3022 let content = result
3023 .get("content")
3024 .and_then(|c| c.as_array())
3025 .expect("Expected content array");
3026 assert_eq!(content.len(), 1);
3027 } else {
3028 panic!("Expected success response for tools/call");
3029 }
3030
3031 assert!(server.has_tool("calculator").await);
3033 assert!(server.has_tool("file_reader").await);
3034 }
3035}