1use anyhow::{anyhow, Result};
3use serde_json::json;
4use std::collections::HashMap;
5use std::sync::atomic::{AtomicU8, Ordering};
6use std::sync::Arc;
7use tokio::sync::mpsc;
8use tracing::debug;
9
10use mcp_protocol::{
11 constants::{error_codes, methods, PROTOCOL_VERSION},
12 messages::{InitializeParams, InitializeResult, JsonRpcMessage, ServerCapabilities},
13 types::{
14 resource::{
15 Resource, ResourceContent, ResourceReadParams, ResourceSubscribeParams,
16 ResourcesListParams,
17 },
18 tool::{Tool, ToolCallParams, ToolCallResult},
19 ServerInfo, ServerState,
20 },
21 version::{is_supported_version, version_mismatch_error},
22};
23
24use crate::prompts::PromptManager;
25use crate::resources::ResourceManager;
26use crate::tools::ToolManager;
27use crate::transport::Transport;
28
29pub struct ServerBuilder {
31 name: String,
32 version: String,
33 transport: Option<Box<dyn Transport>>,
34 tool_manager: Option<Arc<ToolManager>>,
35 resource_manager: Option<Arc<ResourceManager>>,
36 prompt_manager: Option<Arc<PromptManager>>,
37}
38
39impl ServerBuilder {
40 pub fn new(name: &str, version: &str) -> Self {
42 debug!("Creating new server builder");
43 Self {
44 name: name.to_string(),
45 version: version.to_string(),
46 transport: None,
47 tool_manager: None,
48 resource_manager: None,
49 prompt_manager: None,
50 }
51 }
52
53 pub fn with_transport<T: Transport>(mut self, transport: T) -> Self {
55 self.transport = Some(Box::new(transport));
56 self
57 }
58
59 pub fn with_tool_manager(mut self, tool_manager: Arc<ToolManager>) -> Self {
61 self.tool_manager = Some(tool_manager);
62 self
63 }
64
65 pub fn with_resource_manager(mut self, resource_manager: Arc<ResourceManager>) -> Self {
67 self.resource_manager = Some(resource_manager);
68 self
69 }
70
71 pub fn with_prompt_manager(mut self, prompt_manager: Arc<PromptManager>) -> Self {
73 self.prompt_manager = Some(prompt_manager);
74 self
75 }
76
77 pub fn with_tool(
79 mut self,
80 name: &str,
81 description: Option<&str>,
82 input_schema: serde_json::Value,
83 handler: impl Fn(serde_json::Value) -> Result<ToolCallResult> + Send + Sync + 'static,
84 ) -> Self {
85 debug!("Registering tool: {}", name);
86 if self.tool_manager.is_none() {
88 self.tool_manager = Some(Arc::new(ToolManager::new()));
89 }
90
91 let tool = Tool {
93 name: name.to_string(),
94 description: description.map(|s| s.to_string()),
95 input_schema,
96 annotations: None,
97 };
98
99 let tool_manager = self.tool_manager.as_ref().unwrap();
101 tool_manager.register_tool(tool, handler);
102
103 self
104 }
105
106 pub fn with_resource(
108 mut self,
109 uri: &str,
110 name: &str,
111 description: Option<&str>,
112 mime_type: Option<&str>,
113 size: Option<u64>,
114 content_provider: impl Fn() -> Result<Vec<ResourceContent>> + Send + Sync + 'static,
115 ) -> Self {
116 if self.resource_manager.is_none() {
118 self.resource_manager = Some(Arc::new(ResourceManager::new()));
119 }
120
121 let resource = Resource {
123 uri: uri.to_string(),
124 name: name.to_string(),
125 description: description.map(|s| s.to_string()),
126 mime_type: mime_type.map(|s| s.to_string()),
127 size,
128 annotations: None,
129 };
130
131 let resource_manager = self.resource_manager.as_ref().unwrap();
133 resource_manager.register_resource(resource, content_provider);
134
135 self
136 }
137
138 pub fn with_template(
140 mut self,
141 uri_template: &str,
142 name: &str,
143 description: Option<&str>,
144 mime_type: Option<&str>,
145 expander: impl Fn(String, HashMap<String, String>) -> Result<String> + Send + Sync + 'static,
146 ) -> Self {
147 if self.resource_manager.is_none() {
149 self.resource_manager = Some(Arc::new(ResourceManager::new()));
150 }
151
152 let template = mcp_protocol::types::resource::ResourceTemplate {
154 uri_template: uri_template.to_string(),
155 name: name.to_string(),
156 description: description.map(|s| s.to_string()),
157 mime_type: mime_type.map(|s| s.to_string()),
158 annotations: None,
159 };
160
161 let resource_manager = self.resource_manager.as_ref().unwrap();
163 resource_manager.register_template(template, expander);
164
165 self
166 }
167
168 pub fn with_template_completion(
170 mut self,
171 template_uri: &str,
172 provider: impl Fn(
173 String,
174 String,
175 Option<String>,
176 ) -> Result<Vec<mcp_protocol::types::completion::CompletionItem>>
177 + Send
178 + Sync
179 + 'static,
180 ) -> Self {
181 if self.resource_manager.is_none() {
183 self.resource_manager = Some(Arc::new(ResourceManager::new()));
184 }
185
186 let resource_manager = self.resource_manager.as_ref().unwrap();
188 resource_manager.register_completion_provider(template_uri, provider);
189
190 self
191 }
192
193 pub fn with_prompt_completion(
195 mut self,
196 prompt_name: &str,
197 param_name: &str,
198 provider: impl Fn(String, Option<String>) -> Result<Vec<String>> + Send + Sync + 'static,
199 ) -> Self {
200 if self.prompt_manager.is_none() {
202 self.prompt_manager = Some(Arc::new(PromptManager::new()));
203 }
204
205 let prompt_manager = self.prompt_manager.as_ref().unwrap();
207 prompt_manager.register_completion_provider(prompt_name, param_name, provider);
208
209 self
210 }
211
212 pub fn with_prompt(
214 mut self,
215 name: &str,
216 description: Option<&str>,
217 arguments: Option<Vec<mcp_protocol::types::prompt::PromptArgument>>,
218 handler: impl Fn(
219 Option<HashMap<String, String>>,
220 ) -> Result<Vec<mcp_protocol::types::prompt::PromptMessage>>
221 + Send
222 + Sync
223 + 'static,
224 ) -> Self {
225 if self.prompt_manager.is_none() {
227 self.prompt_manager = Some(Arc::new(PromptManager::new()));
228 }
229
230 let prompt = mcp_protocol::types::prompt::Prompt {
232 name: name.to_string(),
233 description: description.map(|s| s.to_string()),
234 arguments,
235 annotations: None,
236 };
237
238 let prompt_manager = self.prompt_manager.as_ref().unwrap();
240 prompt_manager.register_prompt(prompt, handler);
241
242 self
243 }
244
245 pub fn build(self) -> Result<Server> {
247 let transport = self
248 .transport
249 .ok_or_else(|| anyhow!("Transport is required"))?;
250
251 Ok(Server {
252 name: self.name,
253 version: self.version,
254 transport,
255 tool_manager: self
256 .tool_manager
257 .unwrap_or_else(|| Arc::new(ToolManager::new())),
258 resource_manager: self
259 .resource_manager
260 .unwrap_or_else(|| Arc::new(ResourceManager::new())),
261 prompt_manager: self
262 .prompt_manager
263 .unwrap_or_else(|| Arc::new(PromptManager::new())),
264 state: Arc::new(AtomicU8::new(ServerState::Created as u8)),
265 })
266 }
267}
268
269pub struct Server {
271 name: String,
272 version: String,
273 transport: Box<dyn Transport>,
274 tool_manager: Arc<ToolManager>,
275 resource_manager: Arc<ResourceManager>,
276 prompt_manager: Arc<PromptManager>,
277 state: Arc<AtomicU8>,
278}
279
280impl Server {
281 fn get_tool_capabilities(&self) -> HashMap<String, bool> {
283 let mut capabilities = HashMap::new();
284 capabilities.insert("listChanged".to_string(), true);
285 capabilities
286 }
287
288 fn get_resource_capabilities(&self) -> HashMap<String, bool> {
290 let mut capabilities = HashMap::new();
291 capabilities.insert("listChanged".to_string(), true);
292 capabilities.insert("subscribe".to_string(), true);
293 capabilities
294 }
295
296 fn get_prompt_capabilities(&self) -> HashMap<String, bool> {
298 let mut capabilities = HashMap::new();
299 capabilities.insert("listChanged".to_string(), true);
300 capabilities
301 }
302
303 fn get_server_info(&self) -> ServerInfo {
305 ServerInfo {
306 name: self.name.clone(),
307 version: self.version.clone(),
308 }
309 }
310
311 async fn handle_initialize(&self, message: JsonRpcMessage) -> Result<()> {
313 match message {
314 JsonRpcMessage::Request { id, params, .. } => {
315 let params: InitializeParams = match params {
317 Some(params) => match serde_json::from_value(params) {
318 Ok(params) => params,
319 Err(err) => {
320 self.transport
322 .send(JsonRpcMessage::error(
323 id,
324 error_codes::INVALID_PARAMS,
325 &format!("Invalid initialize parameters: {}", err),
326 None,
327 ))
328 .await?;
329 return Ok(());
330 }
331 },
332 None => {
333 self.transport
335 .send(JsonRpcMessage::error(
336 id,
337 error_codes::INVALID_PARAMS,
338 "Missing initialize parameters",
339 None,
340 ))
341 .await?;
342 return Ok(());
343 }
344 };
345
346 if !is_supported_version(¶ms.protocol_version) {
348 self.transport
350 .send(JsonRpcMessage::error(
351 id,
352 error_codes::INVALID_PARAMS,
353 "Unsupported protocol version",
354 Some(json!(version_mismatch_error(¶ms.protocol_version))),
355 ))
356 .await?;
357 return Ok(());
358 }
359
360 self.state
362 .store(ServerState::Initializing as u8, Ordering::SeqCst);
363
364 let tools_capabilities = self.get_tool_capabilities();
366 let resources_capabilities = self.get_resource_capabilities();
367 let prompts_capabilities = self.get_prompt_capabilities();
368
369 let result = InitializeResult {
371 protocol_version: PROTOCOL_VERSION.to_string(),
372 capabilities: ServerCapabilities {
373 tools: Some(tools_capabilities),
374 resources: Some(resources_capabilities),
375 prompts: Some(prompts_capabilities),
376 ..Default::default()
377 },
378 server_info: self.get_server_info(),
379 instructions: None,
380 };
381
382 self.transport
384 .send(JsonRpcMessage::response(id, json!(result)))
385 .await?;
386
387 Ok(())
388 }
389 _ => Err(anyhow!("Expected request message for initialize")),
390 }
391 }
392
393 async fn handle_initialized(&self) -> Result<()> {
395 self.state.store(ServerState::Ready as u8, Ordering::SeqCst);
397
398 Ok(())
400 }
401
402 async fn handle_tools_list(&self, message: JsonRpcMessage) -> Result<()> {
404 match message {
405 JsonRpcMessage::Request { id, .. } => {
406 if self.state.load(Ordering::SeqCst) != ServerState::Ready as u8 {
408 self.transport
410 .send(JsonRpcMessage::error(
411 id,
412 error_codes::SERVER_NOT_INITIALIZED,
413 "Server not initialized",
414 None,
415 ))
416 .await?;
417 return Ok(());
418 }
419
420 let tools = self.tool_manager.list_tools().await;
422
423 self.transport
425 .send(JsonRpcMessage::response(
426 id,
427 json!({
428 "tools": tools,
429 "nextCursor": ""
430 }),
431 ))
432 .await?;
433
434 Ok(())
435 }
436 _ => Err(anyhow!("Expected request message for tools/list")),
437 }
438 }
439
440 async fn handle_tools_call(&self, message: JsonRpcMessage) -> Result<()> {
442 match message {
443 JsonRpcMessage::Request { id, params, .. } => {
444 if self.state.load(Ordering::SeqCst) != ServerState::Ready as u8 {
446 self.transport
448 .send(JsonRpcMessage::error(
449 id,
450 error_codes::SERVER_NOT_INITIALIZED,
451 "Server not initialized",
452 None,
453 ))
454 .await?;
455 return Ok(());
456 }
457
458 let params: ToolCallParams = match params {
460 Some(params) => match serde_json::from_value(params) {
461 Ok(params) => params,
462 Err(err) => {
463 self.transport
465 .send(JsonRpcMessage::error(
466 id,
467 error_codes::INVALID_PARAMS,
468 &format!("Invalid tool call parameters: {}", err),
469 None,
470 ))
471 .await?;
472 return Ok(());
473 }
474 },
475 None => {
476 self.transport
478 .send(JsonRpcMessage::error(
479 id,
480 error_codes::INVALID_PARAMS,
481 "Missing tool call parameters",
482 None,
483 ))
484 .await?;
485 return Ok(());
486 }
487 };
488
489 match self
491 .tool_manager
492 .execute_tool(¶ms.name, params.arguments)
493 .await
494 {
495 Ok(result) => {
496 self.transport
498 .send(JsonRpcMessage::response(id, json!(result)))
499 .await?;
500 }
501 Err(err) => {
502 self.transport
504 .send(JsonRpcMessage::error(
505 id,
506 error_codes::INTERNAL_ERROR,
507 &format!("Tool execution error: {}", err),
508 None,
509 ))
510 .await?;
511 }
512 }
513
514 Ok(())
515 }
516 _ => Err(anyhow!("Expected request message for tools/call")),
517 }
518 }
519
520 async fn handle_resources_list(&self, message: JsonRpcMessage) -> Result<()> {
522 match message {
523 JsonRpcMessage::Request { id, params, .. } => {
524 if self.state.load(Ordering::SeqCst) != ServerState::Ready as u8 {
526 self.transport
528 .send(JsonRpcMessage::error(
529 id,
530 error_codes::SERVER_NOT_INITIALIZED,
531 "Server not initialized",
532 None,
533 ))
534 .await?;
535 return Ok(());
536 }
537
538 let params: Option<ResourcesListParams> = match params {
540 Some(params) => match serde_json::from_value(params) {
541 Ok(params) => Some(params),
542 Err(err) => {
543 self.transport
545 .send(JsonRpcMessage::error(
546 id,
547 error_codes::INVALID_PARAMS,
548 &format!("Invalid resource list parameters: {}", err),
549 None,
550 ))
551 .await?;
552 return Ok(());
553 }
554 },
555 None => None,
556 };
557
558 let cursor = params.and_then(|p| p.cursor);
560
561 let (resources, next_cursor) = self.resource_manager.list_resources(cursor).await;
563
564 self.transport
566 .send(JsonRpcMessage::response(
567 id,
568 json!({
569 "resources": resources,
570 "nextCursor": next_cursor.unwrap_or_default()
571 }),
572 ))
573 .await?;
574
575 Ok(())
576 }
577 _ => Err(anyhow!("Expected request message for resources/list")),
578 }
579 }
580
581 async fn handle_resources_read(&self, message: JsonRpcMessage) -> Result<()> {
583 match message {
584 JsonRpcMessage::Request { id, params, .. } => {
585 if self.state.load(Ordering::SeqCst) != ServerState::Ready as u8 {
587 self.transport
589 .send(JsonRpcMessage::error(
590 id,
591 error_codes::SERVER_NOT_INITIALIZED,
592 "Server not initialized",
593 None,
594 ))
595 .await?;
596 return Ok(());
597 }
598
599 let params: ResourceReadParams = match params {
601 Some(params) => match serde_json::from_value(params) {
602 Ok(params) => params,
603 Err(err) => {
604 self.transport
606 .send(JsonRpcMessage::error(
607 id,
608 error_codes::INVALID_PARAMS,
609 &format!("Invalid resource read parameters: {}", err),
610 None,
611 ))
612 .await?;
613 return Ok(());
614 }
615 },
616 None => {
617 self.transport
619 .send(JsonRpcMessage::error(
620 id,
621 error_codes::INVALID_PARAMS,
622 "Missing resource read parameters",
623 None,
624 ))
625 .await?;
626 return Ok(());
627 }
628 };
629
630 match self
632 .resource_manager
633 .get_resource_content(¶ms.uri)
634 .await
635 {
636 Ok(contents) => {
637 self.transport
639 .send(JsonRpcMessage::response(
640 id,
641 json!({
642 "contents": contents
643 }),
644 ))
645 .await?;
646 }
647 Err(err) => {
648 self.transport
650 .send(JsonRpcMessage::error(
651 id,
652 error_codes::RESOURCE_NOT_FOUND,
653 &format!("Resource not found: {}", err),
654 Some(json!({
655 "uri": params.uri
656 })),
657 ))
658 .await?;
659 }
660 }
661
662 Ok(())
663 }
664 _ => Err(anyhow!("Expected request message for resources/read")),
665 }
666 }
667
668 async fn handle_resources_subscribe(&self, message: JsonRpcMessage) -> Result<()> {
670 match message {
671 JsonRpcMessage::Request { id, params, .. } => {
672 if self.state.load(Ordering::SeqCst) != ServerState::Ready as u8 {
674 self.transport
676 .send(JsonRpcMessage::error(
677 id,
678 error_codes::SERVER_NOT_INITIALIZED,
679 "Server not initialized",
680 None,
681 ))
682 .await?;
683 return Ok(());
684 }
685
686 let params: ResourceSubscribeParams = match params {
688 Some(params) => match serde_json::from_value(params) {
689 Ok(params) => params,
690 Err(err) => {
691 self.transport
693 .send(JsonRpcMessage::error(
694 id,
695 error_codes::INVALID_PARAMS,
696 &format!("Invalid resource subscribe parameters: {}", err),
697 None,
698 ))
699 .await?;
700 return Ok(());
701 }
702 },
703 None => {
704 self.transport
706 .send(JsonRpcMessage::error(
707 id,
708 error_codes::INVALID_PARAMS,
709 "Missing resource subscribe parameters",
710 None,
711 ))
712 .await?;
713 return Ok(());
714 }
715 };
716
717 let client_id = id.to_string(); match self
720 .resource_manager
721 .subscribe(&client_id, ¶ms.uri)
722 .await
723 {
724 Ok(_) => {
725 self.transport
727 .send(JsonRpcMessage::response(
728 id,
729 json!({
730 "success": true
731 }),
732 ))
733 .await?;
734 }
735 Err(err) => {
736 self.transport
738 .send(JsonRpcMessage::error(
739 id,
740 error_codes::RESOURCE_NOT_FOUND,
741 &format!("Resource subscription error: {}", err),
742 Some(json!({
743 "uri": params.uri
744 })),
745 ))
746 .await?;
747 }
748 }
749
750 Ok(())
751 }
752 _ => Err(anyhow!("Expected request message for resources/subscribe")),
753 }
754 }
755
756 async fn handle_message(&self, message: JsonRpcMessage) -> Result<()> {
758 match &message.clone() {
759 JsonRpcMessage::Request { method, .. } => {
760 match method.as_str() {
761 methods::INITIALIZE => self.handle_initialize(message).await?,
762 methods::TOOLS_LIST => self.handle_tools_list(message).await?,
763 methods::TOOLS_CALL => self.handle_tools_call(message).await?,
764 methods::RESOURCES_LIST => self.handle_resources_list(message).await?,
765 methods::RESOURCES_READ => self.handle_resources_read(message).await?,
766 methods::RESOURCES_SUBSCRIBE => {
767 self.handle_resources_subscribe(message).await?
768 }
769 methods::RESOURCES_UNSUBSCRIBE => {
770 self.handle_resources_unsubscribe(message).await?
771 }
772 methods::RESOURCES_TEMPLATES_LIST => {
773 self.handle_resources_templates_list(message).await?
774 }
775 methods::PROMPTS_LIST => self.handle_prompts_list(message).await?,
776 methods::PROMPTS_GET => self.handle_prompts_get(message).await?,
777 methods::COMPLETION_COMPLETE => {
778 self.handle_completion_complete(message).await?
779 }
780 _ => {
781 if let JsonRpcMessage::Request { id, .. } = message {
782 self.transport
784 .send(JsonRpcMessage::error(
785 id,
786 error_codes::METHOD_NOT_FOUND,
787 &format!("Method not found: {}", method),
788 None,
789 ))
790 .await?;
791 }
792 }
793 }
794 }
795 JsonRpcMessage::Notification { method, .. } => match method.as_str() {
796 methods::INITIALIZED => self.handle_initialized().await?,
797 _ => {
798 tracing::debug!("Unhandled notification: {}", method);
799 }
800 },
801 _ => {
802 tracing::debug!("Unexpected message type from client");
804 }
805 }
806
807 Ok(())
808 }
809
810 pub async fn run(&self) -> Result<()> {
812 let (tx, mut rx) = mpsc::channel::<JsonRpcMessage>(100);
814
815 self.transport.start(tx).await?;
817
818 let resource_update_rx = self.resource_manager.subscribe_to_updates();
820 let resource_transport = self.transport.box_clone();
821
822 tokio::spawn(async move {
824 let mut update_rx = resource_update_rx;
825 while let Ok(uri) = update_rx.recv().await {
826 let _ = resource_transport
828 .send(JsonRpcMessage::notification(
829 methods::RESOURCES_UPDATED,
830 Some(json!({ "uri": uri })),
831 ))
832 .await;
833 }
834 });
835
836 let prompt_update_rx = self.prompt_manager.subscribe_to_updates();
838 let prompt_transport = self.transport.box_clone();
839
840 tokio::spawn(async move {
842 let mut update_rx = prompt_update_rx;
843 while let Ok(_) = update_rx.recv().await {
844 let _ = prompt_transport
846 .send(JsonRpcMessage::notification(
847 methods::PROMPTS_LIST_CHANGED,
848 None,
849 ))
850 .await;
851 }
852 });
853
854 while let Some(message) = rx.recv().await {
856 if let Err(err) = self.handle_message(message).await {
857 tracing::error!("Error handling message: {}", err);
858 }
859 }
860
861 self.state
863 .store(ServerState::ShuttingDown as u8, Ordering::SeqCst);
864
865 self.transport.close().await?;
867
868 Ok(())
869 }
870
871 pub fn tool_manager(&self) -> &Arc<ToolManager> {
873 &self.tool_manager
874 }
875
876 pub fn resource_manager(&self) -> &Arc<ResourceManager> {
878 &self.resource_manager
879 }
880
881 pub fn prompt_manager(&self) -> &Arc<PromptManager> {
883 &self.prompt_manager
884 }
885
886 pub(crate) fn transport(&self) -> &Box<dyn Transport> {
888 &self.transport
889 }
890
891 pub(crate) fn state(&self) -> &Arc<AtomicU8> {
893 &self.state
894 }
895}