1use serde_json::Value;
8use std::collections::HashMap;
9use std::sync::Arc;
10use tokio::sync::{Mutex, RwLock};
11
12use crate::core::{
13 PromptInfo, ResourceInfo, ToolInfo,
14 error::{McpError, McpResult},
15 prompt::{Prompt, PromptHandler},
16 resource::{Resource, ResourceHandler},
17 tool::{Tool, ToolHandler},
18};
19use crate::protocol::{error_codes::*, messages::*, methods, types::*, validation::*};
20use crate::transport::traits::ServerTransport;
21
22#[derive(Debug, Clone)]
24pub struct ServerConfig {
25 pub max_concurrent_requests: usize,
27 pub request_timeout_ms: u64,
29 pub validate_requests: bool,
31 pub enable_logging: bool,
33}
34
35impl Default for ServerConfig {
36 fn default() -> Self {
37 Self {
38 max_concurrent_requests: 100,
39 request_timeout_ms: 30000,
40 validate_requests: true,
41 enable_logging: true,
42 }
43 }
44}
45
46pub struct McpServer {
48 info: ServerInfo,
50 capabilities: ServerCapabilities,
52 config: ServerConfig,
54 resources: Arc<RwLock<HashMap<String, Resource>>>,
56 tools: Arc<RwLock<HashMap<String, Tool>>>,
58 prompts: Arc<RwLock<HashMap<String, Prompt>>>,
60 transport: Arc<Mutex<Option<Box<dyn ServerTransport>>>>,
62 state: Arc<RwLock<ServerState>>,
64 #[allow(dead_code)]
66 request_counter: Arc<Mutex<u64>>,
67}
68
69#[derive(Debug, Clone, PartialEq)]
71pub enum ServerState {
72 Uninitialized,
74 Initializing,
76 Running,
78 Stopping,
80 Stopped,
82}
83
84impl McpServer {
85 pub fn new(name: String, version: String) -> Self {
87 Self {
88 info: ServerInfo::new(name, version),
89 capabilities: ServerCapabilities {
90 prompts: Some(PromptsCapability {
91 list_changed: Some(true),
92 }),
93 resources: Some(ResourcesCapability {
94 subscribe: Some(true),
95 list_changed: Some(true),
96 }),
97 tools: Some(ToolsCapability {
98 list_changed: Some(true),
99 }),
100 sampling: None,
101 logging: None,
102 experimental: None,
103 completions: None,
104 },
105 config: ServerConfig::default(),
106 resources: Arc::new(RwLock::new(HashMap::new())),
107 tools: Arc::new(RwLock::new(HashMap::new())),
108 prompts: Arc::new(RwLock::new(HashMap::new())),
109 transport: Arc::new(Mutex::new(None)),
110 state: Arc::new(RwLock::new(ServerState::Uninitialized)),
111 request_counter: Arc::new(Mutex::new(0)),
112 }
113 }
114
115 pub fn with_config(name: String, version: String, config: ServerConfig) -> Self {
117 let mut server = Self::new(name, version);
118 server.config = config;
119 server
120 }
121
122 pub fn set_capabilities(&mut self, capabilities: ServerCapabilities) {
124 self.capabilities = capabilities;
125 }
126
127 pub fn info(&self) -> &ServerInfo {
129 &self.info
130 }
131
132 pub fn name(&self) -> &str {
134 &self.info.name
135 }
136
137 pub fn version(&self) -> &str {
139 &self.info.version
140 }
141
142 pub fn capabilities(&self) -> &ServerCapabilities {
144 &self.capabilities
145 }
146
147 pub fn config(&self) -> &ServerConfig {
149 &self.config
150 }
151
152 pub async fn add_resource<H>(&self, name: String, uri: String, handler: H) -> McpResult<()>
158 where
159 H: ResourceHandler + 'static,
160 {
161 let resource_info = ResourceInfo {
162 uri: uri.clone(),
163 name: name.clone(),
164 description: None,
165 mime_type: None,
166 annotations: None,
167 size: None,
168 title: None,
169 meta: None,
170 };
171
172 validate_resource_info(&resource_info)?;
173
174 let resource = Resource::new(resource_info, handler);
175
176 {
177 let mut resources = self.resources.write().await;
178 resources.insert(uri, resource);
179 }
180
181 self.emit_resources_list_changed().await?;
183
184 Ok(())
185 }
186
187 pub async fn add_resource_detailed<H>(&self, info: ResourceInfo, handler: H) -> McpResult<()>
189 where
190 H: ResourceHandler + 'static,
191 {
192 validate_resource_info(&info)?;
193
194 let uri = info.uri.clone();
195 let resource = Resource::new(info, handler);
196
197 {
198 let mut resources = self.resources.write().await;
199 resources.insert(uri, resource);
200 }
201
202 self.emit_resources_list_changed().await?;
203
204 Ok(())
205 }
206
207 pub async fn remove_resource(&self, uri: &str) -> McpResult<bool> {
209 let removed = {
210 let mut resources = self.resources.write().await;
211 resources.remove(uri).is_some()
212 };
213
214 if removed {
215 self.emit_resources_list_changed().await?;
216 }
217
218 Ok(removed)
219 }
220
221 pub async fn list_resources(&self) -> McpResult<Vec<ResourceInfo>> {
223 let resources = self.resources.read().await;
224 Ok(resources.values().map(|r| r.info.clone()).collect())
225 }
226
227 pub async fn read_resource(&self, uri: &str) -> McpResult<Vec<ResourceContents>> {
229 let resources = self.resources.read().await;
230
231 match resources.get(uri) {
232 Some(resource) => {
233 let params = HashMap::new(); resource.handler.read(uri, ¶ms).await
235 }
236 None => Err(McpError::ResourceNotFound(uri.to_string())),
237 }
238 }
239
240 pub async fn add_tool<H>(
246 &self,
247 name: String,
248 description: Option<String>,
249 schema: Value,
250 handler: H,
251 ) -> McpResult<()>
252 where
253 H: ToolHandler + 'static,
254 {
255 let tool_schema = ToolInputSchema {
256 schema_type: "object".to_string(),
257 properties: schema
258 .get("properties")
259 .and_then(|p| p.as_object())
260 .map(|obj| obj.iter().map(|(k, v)| (k.clone(), v.clone())).collect()),
261 required: schema.get("required").and_then(|r| {
262 r.as_array().map(|arr| {
263 arr.iter()
264 .filter_map(|v| v.as_str().map(|s| s.to_string()))
265 .collect()
266 })
267 }),
268 additional_properties: schema
269 .as_object()
270 .unwrap_or(&serde_json::Map::new())
271 .iter()
272 .map(|(k, v)| (k.clone(), v.clone()))
273 .collect(),
274 };
275
276 let tool_info = ToolInfo {
277 name: name.clone(),
278 description,
279 input_schema: tool_schema,
280 annotations: None,
281 title: None,
282 meta: None,
283 };
284
285 validate_tool_info(&tool_info)?;
286
287 let tool = Tool::new(
288 name.clone(),
289 tool_info.description.clone(),
290 serde_json::to_value(&tool_info.input_schema)?,
291 handler,
292 );
293
294 {
295 let mut tools = self.tools.write().await;
296 tools.insert(name, tool);
297 }
298
299 self.emit_tools_list_changed().await?;
300
301 Ok(())
302 }
303
304 pub async fn add_tool_detailed<H>(&self, info: ToolInfo, handler: H) -> McpResult<()>
306 where
307 H: ToolHandler + 'static,
308 {
309 validate_tool_info(&info)?;
310
311 let name = info.name.clone();
312 let tool = Tool::new(
313 name.clone(),
314 info.description.clone(),
315 serde_json::to_value(&info.input_schema)?,
316 handler,
317 );
318
319 {
320 let mut tools = self.tools.write().await;
321 tools.insert(name, tool);
322 }
323
324 self.emit_tools_list_changed().await?;
325
326 Ok(())
327 }
328
329 pub async fn remove_tool(&self, name: &str) -> McpResult<bool> {
331 let removed = {
332 let mut tools = self.tools.write().await;
333 tools.remove(name).is_some()
334 };
335
336 if removed {
337 self.emit_tools_list_changed().await?;
338 }
339
340 Ok(removed)
341 }
342
343 pub async fn list_tools(&self) -> McpResult<Vec<ToolInfo>> {
345 let tools = self.tools.read().await;
346 Ok(tools.values().map(|t| t.info.clone()).collect())
347 }
348
349 pub async fn call_tool(
351 &self,
352 name: &str,
353 arguments: Option<HashMap<String, Value>>,
354 ) -> McpResult<ToolResult> {
355 let tools = self.tools.read().await;
356
357 match tools.get(name) {
358 Some(tool) => {
359 if !tool.enabled {
360 return Err(McpError::ToolNotFound(format!("Tool '{name}' is disabled")));
361 }
362
363 let args = arguments.unwrap_or_default();
364 tool.handler.call(args).await
365 }
366 None => Err(McpError::ToolNotFound(name.to_string())),
367 }
368 }
369
370 pub async fn add_prompt<H>(&self, info: PromptInfo, handler: H) -> McpResult<()>
376 where
377 H: PromptHandler + 'static,
378 {
379 validate_prompt_info(&info)?;
380
381 let name = info.name.clone();
382 let prompt = Prompt::new(info, handler);
383
384 {
385 let mut prompts = self.prompts.write().await;
386 prompts.insert(name, prompt);
387 }
388
389 self.emit_prompts_list_changed().await?;
390
391 Ok(())
392 }
393
394 pub async fn remove_prompt(&self, name: &str) -> McpResult<bool> {
396 let removed = {
397 let mut prompts = self.prompts.write().await;
398 prompts.remove(name).is_some()
399 };
400
401 if removed {
402 self.emit_prompts_list_changed().await?;
403 }
404
405 Ok(removed)
406 }
407
408 pub async fn list_prompts(&self) -> McpResult<Vec<PromptInfo>> {
410 let prompts = self.prompts.read().await;
411 Ok(prompts.values().map(|p| p.info.clone()).collect())
412 }
413
414 pub async fn get_prompt(
416 &self,
417 name: &str,
418 arguments: Option<HashMap<String, Value>>,
419 ) -> McpResult<PromptResult> {
420 let prompts = self.prompts.read().await;
421
422 match prompts.get(name) {
423 Some(prompt) => {
424 let args = arguments.unwrap_or_default();
425 prompt.handler.get(args).await
426 }
427 None => Err(McpError::PromptNotFound(name.to_string())),
428 }
429 }
430
431 pub async fn start<T>(&mut self, mut transport: T) -> McpResult<()>
437 where
438 T: ServerTransport + 'static,
439 {
440 let mut state = self.state.write().await;
441
442 match *state {
443 ServerState::Uninitialized => {
444 *state = ServerState::Initializing;
445 }
446 _ => return Err(McpError::Protocol("Server is already started".to_string())),
447 }
448
449 drop(state);
450
451 let resources = self.resources.clone();
453 let tools = self.tools.clone();
454 let prompts = self.prompts.clone();
455 let info = self.info.clone();
456 let capabilities = self.capabilities.clone();
457 let config = self.config.clone();
458
459 let request_handler: crate::transport::traits::ServerRequestHandler =
460 Arc::new(move |request| {
461 let resources = resources.clone();
462 let tools = tools.clone();
463 let prompts = prompts.clone();
464 let info = info.clone();
465 let capabilities = capabilities.clone();
466 let config = config.clone();
467
468 Box::pin(async move {
469 let temp_server = McpServer {
471 info,
472 capabilities,
473 config,
474 resources,
475 tools,
476 prompts,
477 transport: Arc::new(Mutex::new(None)),
478 state: Arc::new(RwLock::new(ServerState::Running)),
479 request_counter: Arc::new(Mutex::new(0)),
480 };
481 temp_server.handle_request(request).await
482 })
483 });
484
485 transport.set_request_handler(request_handler);
487
488 {
490 let mut transport_guard = self.transport.lock().await;
491 *transport_guard = Some(Box::new(transport));
492 }
493
494 {
496 let mut transport_guard = self.transport.lock().await;
497 if let Some(transport) = transport_guard.as_mut() {
498 transport.start().await?;
499 }
500 }
501
502 {
504 let mut state = self.state.write().await;
505 *state = ServerState::Running;
506 }
507
508 Ok(())
509 }
510
511 pub async fn stop(&self) -> McpResult<()> {
513 let mut state = self.state.write().await;
514
515 match *state {
516 ServerState::Running => {
517 *state = ServerState::Stopping;
518 }
519 ServerState::Stopped => return Ok(()),
520 _ => return Err(McpError::Protocol("Server is not running".to_string())),
521 }
522
523 drop(state);
524
525 {
527 let mut transport_guard = self.transport.lock().await;
528 if let Some(transport) = transport_guard.as_mut() {
529 transport.stop().await?;
530 }
531 }
532
533 {
535 let mut state = self.state.write().await;
536 *state = ServerState::Stopped;
537 }
538
539 Ok(())
540 }
541
542 pub async fn is_running(&self) -> bool {
544 let state = self.state.read().await;
545 matches!(*state, ServerState::Running)
546 }
547
548 pub async fn state(&self) -> ServerState {
550 let state = self.state.read().await;
551 state.clone()
552 }
553
554 pub async fn handle_request(&self, request: JsonRpcRequest) -> McpResult<JsonRpcResponse> {
560 if self.config.validate_requests {
562 validate_jsonrpc_request(&request)?;
563 validate_mcp_request(&request.method, request.params.as_ref())?;
564 }
565
566 let result = match request.method.as_str() {
568 methods::INITIALIZE => self.handle_initialize(request.params).await,
569 methods::PING => self.handle_ping().await,
570 methods::TOOLS_LIST => self.handle_tools_list(request.params).await,
571 methods::TOOLS_CALL => self.handle_tools_call(request.params).await,
572 methods::RESOURCES_LIST => self.handle_resources_list(request.params).await,
573 methods::RESOURCES_READ => self.handle_resources_read(request.params).await,
574 methods::RESOURCES_SUBSCRIBE => self.handle_resources_subscribe(request.params).await,
575 methods::RESOURCES_UNSUBSCRIBE => {
576 self.handle_resources_unsubscribe(request.params).await
577 }
578 methods::PROMPTS_LIST => self.handle_prompts_list(request.params).await,
579 methods::PROMPTS_GET => self.handle_prompts_get(request.params).await,
580 methods::LOGGING_SET_LEVEL => self.handle_logging_set_level(request.params).await,
581 _ => {
582 let method = &request.method;
583 Err(McpError::Protocol(format!("Unknown method: {method}")))
584 }
585 };
586
587 match result {
589 Ok(result_value) => Ok(JsonRpcResponse::success(request.id, result_value)?),
590 Err(error) => {
591 let (code, message) = match error {
592 McpError::ToolNotFound(_) => (TOOL_NOT_FOUND, error.to_string()),
593 McpError::ResourceNotFound(_) => (RESOURCE_NOT_FOUND, error.to_string()),
594 McpError::PromptNotFound(_) => (PROMPT_NOT_FOUND, error.to_string()),
595 McpError::Validation(_) => (INVALID_PARAMS, error.to_string()),
596 _ => (INTERNAL_ERROR, error.to_string()),
597 };
598 Ok(JsonRpcResponse::success(
601 request.id,
602 serde_json::json!({
603 "error": {
604 "code": code,
605 "message": message,
606 }
607 }),
608 )?)
609 }
610 }
611 }
612
613 async fn handle_initialize(&self, params: Option<Value>) -> McpResult<Value> {
618 let params: InitializeParams = match params {
619 Some(p) => serde_json::from_value(p)?,
620 None => {
621 return Err(McpError::Validation(
622 "Missing initialize parameters".to_string(),
623 ));
624 }
625 };
626
627 validate_initialize_params(¶ms)?;
628
629 let result = InitializeResult::new(
630 crate::protocol::LATEST_PROTOCOL_VERSION.to_string(),
631 self.capabilities.clone(),
632 self.info.clone(),
633 );
634
635 Ok(serde_json::to_value(result)?)
636 }
637
638 async fn handle_ping(&self) -> McpResult<Value> {
639 Ok(serde_json::to_value(PingResult { meta: None })?)
640 }
641
642 async fn handle_tools_list(&self, params: Option<Value>) -> McpResult<Value> {
643 let _params: ListToolsParams = match params {
644 Some(p) => serde_json::from_value(p)?,
645 None => ListToolsParams::default(),
646 };
647
648 let tools = self.list_tools().await?;
649 let result = ListToolsResult {
650 tools,
651 next_cursor: None, meta: None,
653 };
654
655 Ok(serde_json::to_value(result)?)
656 }
657
658 async fn handle_tools_call(&self, params: Option<Value>) -> McpResult<Value> {
659 let params: CallToolParams = match params {
660 Some(p) => serde_json::from_value(p)?,
661 None => {
662 return Err(McpError::Validation(
663 "Missing tool call parameters".to_string(),
664 ));
665 }
666 };
667
668 validate_call_tool_params(¶ms)?;
669
670 let result = self.call_tool(¶ms.name, params.arguments).await?;
671 Ok(serde_json::to_value(result)?)
672 }
673
674 async fn handle_resources_list(&self, params: Option<Value>) -> McpResult<Value> {
675 let _params: ListResourcesParams = match params {
676 Some(p) => serde_json::from_value(p)?,
677 None => ListResourcesParams::default(),
678 };
679
680 let resources = self.list_resources().await?;
681 let result = ListResourcesResult {
682 resources,
683 next_cursor: None, meta: None,
685 };
686
687 Ok(serde_json::to_value(result)?)
688 }
689
690 async fn handle_resources_read(&self, params: Option<Value>) -> McpResult<Value> {
691 let params: ReadResourceParams = match params {
692 Some(p) => serde_json::from_value(p)?,
693 None => {
694 return Err(McpError::Validation(
695 "Missing resource read parameters".to_string(),
696 ));
697 }
698 };
699
700 validate_read_resource_params(¶ms)?;
701
702 let contents = self.read_resource(¶ms.uri).await?;
703 let result = ReadResourceResult {
704 contents,
705 meta: None,
706 };
707
708 Ok(serde_json::to_value(result)?)
709 }
710
711 async fn handle_resources_subscribe(&self, params: Option<Value>) -> McpResult<Value> {
712 let params: SubscribeResourceParams = match params {
713 Some(p) => serde_json::from_value(p)?,
714 None => {
715 return Err(McpError::Validation(
716 "Missing resource subscribe parameters".to_string(),
717 ));
718 }
719 };
720
721 let _uri = params.uri;
723 let result = SubscribeResourceResult { meta: None };
724
725 Ok(serde_json::to_value(result)?)
726 }
727
728 async fn handle_resources_unsubscribe(&self, params: Option<Value>) -> McpResult<Value> {
729 let params: UnsubscribeResourceParams = match params {
730 Some(p) => serde_json::from_value(p)?,
731 None => {
732 return Err(McpError::Validation(
733 "Missing resource unsubscribe parameters".to_string(),
734 ));
735 }
736 };
737
738 let _uri = params.uri;
740 let result = UnsubscribeResourceResult { meta: None };
741
742 Ok(serde_json::to_value(result)?)
743 }
744
745 async fn handle_prompts_list(&self, params: Option<Value>) -> McpResult<Value> {
746 let _params: ListPromptsParams = match params {
747 Some(p) => serde_json::from_value(p)?,
748 None => ListPromptsParams::default(),
749 };
750
751 let prompts = self.list_prompts().await?;
752 let result = ListPromptsResult {
753 prompts,
754 next_cursor: None, meta: None,
756 };
757
758 Ok(serde_json::to_value(result)?)
759 }
760
761 async fn handle_prompts_get(&self, params: Option<Value>) -> McpResult<Value> {
762 let params: GetPromptParams = match params {
763 Some(p) => serde_json::from_value(p)?,
764 None => {
765 return Err(McpError::Validation(
766 "Missing prompt get parameters".to_string(),
767 ));
768 }
769 };
770
771 validate_get_prompt_params(¶ms)?;
772
773 let arguments = params.arguments.map(|args| {
774 args.into_iter()
775 .map(|(k, v)| (k, serde_json::Value::String(v)))
776 .collect()
777 });
778 let result = self.get_prompt(¶ms.name, arguments).await?;
779 Ok(serde_json::to_value(result)?)
780 }
781
782 async fn handle_logging_set_level(&self, params: Option<Value>) -> McpResult<Value> {
783 let _params: SetLoggingLevelParams = match params {
784 Some(p) => serde_json::from_value(p)?,
785 None => {
786 return Err(McpError::Validation(
787 "Missing logging level parameters".to_string(),
788 ));
789 }
790 };
791
792 let result = SetLoggingLevelResult { meta: None };
794 Ok(serde_json::to_value(result)?)
795 }
796
797 async fn emit_resources_list_changed(&self) -> McpResult<()> {
802 let notification = JsonRpcNotification::new(
803 methods::RESOURCES_LIST_CHANGED.to_string(),
804 Some(ResourceListChangedParams { meta: None }),
805 )?;
806
807 self.send_notification(notification).await
808 }
809
810 async fn emit_tools_list_changed(&self) -> McpResult<()> {
811 let notification = JsonRpcNotification::new(
812 methods::TOOLS_LIST_CHANGED.to_string(),
813 Some(ToolListChangedParams { meta: None }),
814 )?;
815
816 self.send_notification(notification).await
817 }
818
819 async fn emit_prompts_list_changed(&self) -> McpResult<()> {
820 let notification = JsonRpcNotification::new(
821 methods::PROMPTS_LIST_CHANGED.to_string(),
822 Some(PromptListChangedParams { meta: None }),
823 )?;
824
825 self.send_notification(notification).await
826 }
827
828 async fn send_notification(&self, notification: JsonRpcNotification) -> McpResult<()> {
830 let mut transport_guard = self.transport.lock().await;
831 if let Some(transport) = transport_guard.as_mut() {
832 transport.send_notification(notification).await?;
833 }
834 Ok(())
835 }
836
837 #[allow(dead_code)]
842 async fn next_request_id(&self) -> u64 {
843 let mut counter = self.request_counter.lock().await;
844 *counter += 1;
845 *counter
846 }
847}
848
849#[cfg(test)]
850mod tests {
851 use super::*;
852 use serde_json::json;
853
854 #[tokio::test]
855 async fn test_server_creation() {
856 let server = McpServer::new("test-server".to_string(), "1.0.0".to_string());
857 assert_eq!(server.info().name, "test-server");
858 assert_eq!(server.info().version, "1.0.0");
859 assert!(!server.is_running().await);
860 }
861
862 #[tokio::test]
863 async fn test_tool_management() {
864 let server = McpServer::new("test-server".to_string(), "1.0.0".to_string());
865
866 let schema = json!({
868 "type": "object",
869 "properties": {
870 "name": {"type": "string"}
871 }
872 });
873
874 struct TestToolHandler;
875
876 #[async_trait::async_trait]
877 impl ToolHandler for TestToolHandler {
878 async fn call(&self, _arguments: HashMap<String, Value>) -> McpResult<ToolResult> {
879 Ok(ToolResult {
880 content: vec![Content::text("Hello from tool")],
881 is_error: None,
882 structured_content: None,
883 meta: None,
884 })
885 }
886 }
887
888 server
889 .add_tool(
890 "test_tool".to_string(),
891 Some("A test tool".to_string()),
892 schema,
893 TestToolHandler,
894 )
895 .await
896 .unwrap();
897
898 let tools = server.list_tools().await.unwrap();
900 assert_eq!(tools.len(), 1);
901 assert_eq!(tools[0].name, "test_tool");
902
903 let result = server.call_tool("test_tool", None).await.unwrap();
905 assert_eq!(result.content.len(), 1);
906 }
907
908 #[tokio::test]
909 async fn test_initialize_request() {
910 let server = McpServer::new("test-server".to_string(), "1.0.0".to_string());
911
912 let init_params = InitializeParams::new(
913 crate::protocol::LATEST_PROTOCOL_VERSION.to_string(),
914 ClientCapabilities::default(),
915 ClientInfo {
916 name: "test-client".to_string(),
917 version: "1.0.0".to_string(),
918 title: Some("Test Client".to_string()),
919 },
920 );
921
922 let request =
923 JsonRpcRequest::new(json!(1), methods::INITIALIZE.to_string(), Some(init_params))
924 .unwrap();
925
926 let response = server.handle_request(request).await.unwrap();
927 assert!(response.result.is_some());
928 }
929}