1use anyhow::{anyhow, Result};
3use serde_json::json;
4use std::collections::HashMap;
5use std::sync::atomic::{AtomicU8, Ordering};
6use std::sync::Arc;
7use tokio::sync::mpsc;
8use tracing::debug;
9
10use mcp_protocol::{
11 constants::{error_codes, methods, PROTOCOL_VERSION},
12 messages::{InitializeParams, InitializeResult, JsonRpcMessage, ServerCapabilities},
13 types::{
14 resource::{
15 Resource, ResourceContent, ResourceReadParams, ResourceSubscribeParams,
16 ResourcesListParams,
17 },
18 tool::{Tool, ToolCallParams, ToolCallResult},
19 ServerInfo, ServerState,
20 },
21 version::{is_supported_version, version_mismatch_error},
22};
23
24use crate::prompts::PromptManager;
25use crate::resources::ResourceManager;
26use crate::tools::ToolManager;
27use crate::transport::Transport;
28
29pub struct ServerBuilder {
31 name: String,
32 version: String,
33 transport: Option<Box<dyn Transport>>,
34 tool_manager: Option<Arc<ToolManager>>,
35 resource_manager: Option<Arc<ResourceManager>>,
36 prompt_manager: Option<Arc<PromptManager>>,
37}
38
39impl ServerBuilder {
40 pub fn new(name: &str, version: &str) -> Self {
42 debug!("Creating new server builder");
43 Self {
44 name: name.to_string(),
45 version: version.to_string(),
46 transport: None,
47 tool_manager: None,
48 resource_manager: None,
49 prompt_manager: None,
50 }
51 }
52
53 pub fn with_transport<T: Transport>(mut self, transport: T) -> Self {
55 self.transport = Some(Box::new(transport));
56 self
57 }
58
59 pub fn with_tool_manager(mut self, tool_manager: Arc<ToolManager>) -> Self {
61 self.tool_manager = Some(tool_manager);
62 self
63 }
64
65 pub fn with_resource_manager(mut self, resource_manager: Arc<ResourceManager>) -> Self {
67 self.resource_manager = Some(resource_manager);
68 self
69 }
70
71 pub fn with_prompt_manager(mut self, prompt_manager: Arc<PromptManager>) -> Self {
73 self.prompt_manager = Some(prompt_manager);
74 self
75 }
76
77 pub fn with_tool(
79 mut self,
80 name: &str,
81 description: Option<&str>,
82 input_schema: serde_json::Value,
83 handler: impl Fn(serde_json::Value) -> Result<ToolCallResult> + Send + Sync + 'static,
84 ) -> Self {
85 debug!("Registering tool: {}", name);
86 if self.tool_manager.is_none() {
88 self.tool_manager = Some(Arc::new(ToolManager::new()));
89 }
90
91 let tool = Tool {
93 name: name.to_string(),
94 description: description.map(|s| s.to_string()),
95 input_schema,
96 annotations: None,
97 };
98
99 let tool_manager = self.tool_manager.as_ref().unwrap();
101 tool_manager.register_tool(tool, handler);
102
103 self
104 }
105
106 pub fn with_resource(
108 mut self,
109 uri: &str,
110 name: &str,
111 description: Option<&str>,
112 mime_type: Option<&str>,
113 size: Option<u64>,
114 content_provider: impl Fn() -> Result<Vec<ResourceContent>> + Send + Sync + 'static,
115 ) -> Self {
116 if self.resource_manager.is_none() {
118 self.resource_manager = Some(Arc::new(ResourceManager::new()));
119 }
120
121 let resource = Resource {
123 uri: uri.to_string(),
124 name: name.to_string(),
125 description: description.map(|s| s.to_string()),
126 mime_type: mime_type.map(|s| s.to_string()),
127 size,
128 annotations: None,
129 };
130
131 let resource_manager = self.resource_manager.as_ref().unwrap();
133 resource_manager.register_resource(resource, content_provider);
134
135 self
136 }
137
138 pub fn with_template(
140 mut self,
141 uri_template: &str,
142 name: &str,
143 description: Option<&str>,
144 mime_type: Option<&str>,
145 expander: impl Fn(String, HashMap<String, String>) -> Result<String> + Send + Sync + 'static,
146 ) -> Self {
147 if self.resource_manager.is_none() {
149 self.resource_manager = Some(Arc::new(ResourceManager::new()));
150 }
151
152 let template = mcp_protocol::types::resource::ResourceTemplate {
154 uri_template: uri_template.to_string(),
155 name: name.to_string(),
156 description: description.map(|s| s.to_string()),
157 mime_type: mime_type.map(|s| s.to_string()),
158 annotations: None,
159 };
160
161 let resource_manager = self.resource_manager.as_ref().unwrap();
163 resource_manager.register_template(template, expander);
164
165 self
166 }
167
168 pub fn with_template_completion(
170 mut self,
171 template_uri: &str,
172 provider: impl Fn(
173 String,
174 String,
175 Option<String>,
176 ) -> Result<Vec<mcp_protocol::types::completion::CompletionItem>>
177 + Send
178 + Sync
179 + 'static,
180 ) -> Self {
181 if self.resource_manager.is_none() {
183 self.resource_manager = Some(Arc::new(ResourceManager::new()));
184 }
185
186 let resource_manager = self.resource_manager.as_ref().unwrap();
188 resource_manager.register_completion_provider(template_uri, provider);
189
190 self
191 }
192
193 pub fn with_prompt_completion(
195 mut self,
196 prompt_name: &str,
197 param_name: &str,
198 provider: impl Fn(String, Option<String>) -> Result<Vec<String>> + Send + Sync + 'static,
199 ) -> Self {
200 if self.prompt_manager.is_none() {
202 self.prompt_manager = Some(Arc::new(PromptManager::new()));
203 }
204
205 let prompt_manager = self.prompt_manager.as_ref().unwrap();
207 prompt_manager.register_completion_provider(prompt_name, param_name, provider);
208
209 self
210 }
211
212 pub fn with_prompt(
214 mut self,
215 name: &str,
216 description: Option<&str>,
217 arguments: Option<Vec<mcp_protocol::types::prompt::PromptArgument>>,
218 handler: impl Fn(
219 Option<HashMap<String, String>>,
220 ) -> Result<Vec<mcp_protocol::types::prompt::PromptMessage>>
221 + Send
222 + Sync
223 + 'static,
224 ) -> Self {
225 if self.prompt_manager.is_none() {
227 self.prompt_manager = Some(Arc::new(PromptManager::new()));
228 }
229
230 let prompt = mcp_protocol::types::prompt::Prompt {
232 name: name.to_string(),
233 description: description.map(|s| s.to_string()),
234 arguments,
235 annotations: None,
236 };
237
238 let prompt_manager = self.prompt_manager.as_ref().unwrap();
240 prompt_manager.register_prompt(prompt, handler);
241
242 self
243 }
244
245 pub fn build(self) -> Result<Server> {
247 let transport = self
248 .transport
249 .ok_or_else(|| anyhow!("Transport is required"))?;
250
251 Ok(Server {
252 name: self.name,
253 version: self.version,
254 transport,
255 tool_manager: self
256 .tool_manager
257 .unwrap_or_else(|| Arc::new(ToolManager::new())),
258 resource_manager: self
259 .resource_manager
260 .unwrap_or_else(|| Arc::new(ResourceManager::new())),
261 prompt_manager: self
262 .prompt_manager
263 .unwrap_or_else(|| Arc::new(PromptManager::new())),
264 state: Arc::new(AtomicU8::new(ServerState::Created as u8)),
265 })
266 }
267}
268
269pub struct Server {
271 name: String,
272 version: String,
273 transport: Box<dyn Transport>,
274 tool_manager: Arc<ToolManager>,
275 resource_manager: Arc<ResourceManager>,
276 prompt_manager: Arc<PromptManager>,
277 state: Arc<AtomicU8>,
278}
279
280impl Server {
281 fn get_tool_capabilities(&self) -> HashMap<String, bool> {
283 let mut capabilities = HashMap::new();
284 capabilities.insert("listChanged".to_string(), true);
285 capabilities
286 }
287
288 fn get_resource_capabilities(&self) -> HashMap<String, bool> {
290 let mut capabilities = HashMap::new();
291 capabilities.insert("listChanged".to_string(), true);
292 capabilities.insert("subscribe".to_string(), true);
293 capabilities
294 }
295
296 fn get_prompt_capabilities(&self) -> HashMap<String, bool> {
298 let mut capabilities = HashMap::new();
299 capabilities.insert("listChanged".to_string(), true);
300 capabilities
301 }
302
303 fn get_server_info(&self) -> ServerInfo {
305 ServerInfo {
306 name: self.name.clone(),
307 version: self.version.clone(),
308 }
309 }
310
311 async fn handle_initialize(&self, message: JsonRpcMessage) -> Result<()> {
313 match message {
314 JsonRpcMessage::Request { id, params, .. } => {
315 let params: InitializeParams = match params {
317 Some(params) => match serde_json::from_value(params) {
318 Ok(params) => params,
319 Err(err) => {
320 self.transport
322 .send(JsonRpcMessage::error(
323 id,
324 error_codes::INVALID_PARAMS,
325 &format!("Invalid initialize parameters: {}", err),
326 None,
327 ))
328 .await?;
329 return Ok(());
330 }
331 },
332 None => {
333 self.transport
335 .send(JsonRpcMessage::error(
336 id,
337 error_codes::INVALID_PARAMS,
338 "Missing initialize parameters",
339 None,
340 ))
341 .await?;
342 return Ok(());
343 }
344 };
345
346 if !is_supported_version(¶ms.protocol_version) {
348 self.transport
350 .send(JsonRpcMessage::error(
351 id,
352 error_codes::INVALID_PARAMS,
353 "Unsupported protocol version",
354 Some(json!(version_mismatch_error(¶ms.protocol_version))),
355 ))
356 .await?;
357 return Ok(());
358 }
359
360 self.state
362 .store(ServerState::Initializing as u8, Ordering::SeqCst);
363
364 let tools_capabilities = self.get_tool_capabilities();
366 let resources_capabilities = self.get_resource_capabilities();
367 let prompts_capabilities = self.get_prompt_capabilities();
368
369 let result = InitializeResult {
371 protocol_version: PROTOCOL_VERSION.to_string(),
372 capabilities: ServerCapabilities {
373 tools: Some(tools_capabilities),
374 resources: Some(resources_capabilities),
375 prompts: Some(prompts_capabilities),
376 ..Default::default()
377 },
378 server_info: self.get_server_info(),
379 instructions: None,
380 };
381
382 self.transport
384 .send(JsonRpcMessage::response(id, json!(result)))
385 .await?;
386
387 Ok(())
388 }
389 _ => Err(anyhow!("Expected request message for initialize")),
390 }
391 }
392
393 async fn handle_initialized(&self) -> Result<()> {
395 self.state.store(ServerState::Ready as u8, Ordering::SeqCst);
397
398 Ok(())
400 }
401
402 async fn handle_tools_list(&self, message: JsonRpcMessage) -> Result<()> {
404 match message {
405 JsonRpcMessage::Request { id, .. } => {
406 if self.state.load(Ordering::SeqCst) != ServerState::Ready as u8 {
408 self.transport
410 .send(JsonRpcMessage::error(
411 id,
412 error_codes::SERVER_NOT_INITIALIZED,
413 "Server not initialized",
414 None,
415 ))
416 .await?;
417 return Ok(());
418 }
419
420 let tools = self.tool_manager.list_tools().await;
422
423 self.transport
425 .send(JsonRpcMessage::response(
426 id,
427 json!({
428 "tools": tools,
429 "nextCursor": ""
430 }),
431 ))
432 .await?;
433
434 Ok(())
435 }
436 _ => Err(anyhow!("Expected request message for tools/list")),
437 }
438 }
439
440 async fn handle_tools_call(&self, message: JsonRpcMessage) -> Result<()> {
442 match message {
443 JsonRpcMessage::Request { id, params, .. } => {
444 if self.state.load(Ordering::SeqCst) != ServerState::Ready as u8 {
446 self.transport
448 .send(JsonRpcMessage::error(
449 id,
450 error_codes::SERVER_NOT_INITIALIZED,
451 "Server not initialized",
452 None,
453 ))
454 .await?;
455 return Ok(());
456 }
457
458 let params: ToolCallParams = match params {
460 Some(params) => match serde_json::from_value(params) {
461 Ok(params) => params,
462 Err(err) => {
463 self.transport
465 .send(JsonRpcMessage::error(
466 id,
467 error_codes::INVALID_PARAMS,
468 &format!("Invalid tool call parameters: {}", err),
469 None,
470 ))
471 .await?;
472 return Ok(());
473 }
474 },
475 None => {
476 self.transport
478 .send(JsonRpcMessage::error(
479 id,
480 error_codes::INVALID_PARAMS,
481 "Missing tool call parameters",
482 None,
483 ))
484 .await?;
485 return Ok(());
486 }
487 };
488
489 let args = params.arguments.unwrap_or_else(|| json!({}));
490
491 match self.tool_manager.execute_tool(¶ms.name, args).await {
493 Ok(result) => {
494 self.transport
496 .send(JsonRpcMessage::response(id, json!(result)))
497 .await?;
498 }
499 Err(err) => {
500 self.transport
502 .send(JsonRpcMessage::error(
503 id,
504 error_codes::INTERNAL_ERROR,
505 &format!("Tool execution error: {}", err),
506 None,
507 ))
508 .await?;
509 }
510 }
511
512 Ok(())
513 }
514 _ => Err(anyhow!("Expected request message for tools/call")),
515 }
516 }
517
518 async fn handle_resources_list(&self, message: JsonRpcMessage) -> Result<()> {
520 match message {
521 JsonRpcMessage::Request { id, params, .. } => {
522 if self.state.load(Ordering::SeqCst) != ServerState::Ready as u8 {
524 self.transport
526 .send(JsonRpcMessage::error(
527 id,
528 error_codes::SERVER_NOT_INITIALIZED,
529 "Server not initialized",
530 None,
531 ))
532 .await?;
533 return Ok(());
534 }
535
536 let params: Option<ResourcesListParams> = match params {
538 Some(params) => match serde_json::from_value(params) {
539 Ok(params) => Some(params),
540 Err(err) => {
541 self.transport
543 .send(JsonRpcMessage::error(
544 id,
545 error_codes::INVALID_PARAMS,
546 &format!("Invalid resource list parameters: {}", err),
547 None,
548 ))
549 .await?;
550 return Ok(());
551 }
552 },
553 None => None,
554 };
555
556 let cursor = params.and_then(|p| p.cursor);
558
559 let (resources, next_cursor) = self.resource_manager.list_resources(cursor).await;
561
562 self.transport
564 .send(JsonRpcMessage::response(
565 id,
566 json!({
567 "resources": resources,
568 "nextCursor": next_cursor.unwrap_or_default()
569 }),
570 ))
571 .await?;
572
573 Ok(())
574 }
575 _ => Err(anyhow!("Expected request message for resources/list")),
576 }
577 }
578
579 async fn handle_resources_read(&self, message: JsonRpcMessage) -> Result<()> {
581 match message {
582 JsonRpcMessage::Request { id, params, .. } => {
583 if self.state.load(Ordering::SeqCst) != ServerState::Ready as u8 {
585 self.transport
587 .send(JsonRpcMessage::error(
588 id,
589 error_codes::SERVER_NOT_INITIALIZED,
590 "Server not initialized",
591 None,
592 ))
593 .await?;
594 return Ok(());
595 }
596
597 let params: ResourceReadParams = match params {
599 Some(params) => match serde_json::from_value(params) {
600 Ok(params) => params,
601 Err(err) => {
602 self.transport
604 .send(JsonRpcMessage::error(
605 id,
606 error_codes::INVALID_PARAMS,
607 &format!("Invalid resource read parameters: {}", err),
608 None,
609 ))
610 .await?;
611 return Ok(());
612 }
613 },
614 None => {
615 self.transport
617 .send(JsonRpcMessage::error(
618 id,
619 error_codes::INVALID_PARAMS,
620 "Missing resource read parameters",
621 None,
622 ))
623 .await?;
624 return Ok(());
625 }
626 };
627
628 match self
630 .resource_manager
631 .get_resource_content(¶ms.uri)
632 .await
633 {
634 Ok(contents) => {
635 self.transport
637 .send(JsonRpcMessage::response(
638 id,
639 json!({
640 "contents": contents
641 }),
642 ))
643 .await?;
644 }
645 Err(err) => {
646 self.transport
648 .send(JsonRpcMessage::error(
649 id,
650 error_codes::RESOURCE_NOT_FOUND,
651 &format!("Resource not found: {}", err),
652 Some(json!({
653 "uri": params.uri
654 })),
655 ))
656 .await?;
657 }
658 }
659
660 Ok(())
661 }
662 _ => Err(anyhow!("Expected request message for resources/read")),
663 }
664 }
665
666 async fn handle_resources_subscribe(&self, message: JsonRpcMessage) -> Result<()> {
668 match message {
669 JsonRpcMessage::Request { id, params, .. } => {
670 if self.state.load(Ordering::SeqCst) != ServerState::Ready as u8 {
672 self.transport
674 .send(JsonRpcMessage::error(
675 id,
676 error_codes::SERVER_NOT_INITIALIZED,
677 "Server not initialized",
678 None,
679 ))
680 .await?;
681 return Ok(());
682 }
683
684 let params: ResourceSubscribeParams = match params {
686 Some(params) => match serde_json::from_value(params) {
687 Ok(params) => params,
688 Err(err) => {
689 self.transport
691 .send(JsonRpcMessage::error(
692 id,
693 error_codes::INVALID_PARAMS,
694 &format!("Invalid resource subscribe parameters: {}", err),
695 None,
696 ))
697 .await?;
698 return Ok(());
699 }
700 },
701 None => {
702 self.transport
704 .send(JsonRpcMessage::error(
705 id,
706 error_codes::INVALID_PARAMS,
707 "Missing resource subscribe parameters",
708 None,
709 ))
710 .await?;
711 return Ok(());
712 }
713 };
714
715 let client_id = id.to_string(); match self
718 .resource_manager
719 .subscribe(&client_id, ¶ms.uri)
720 .await
721 {
722 Ok(_) => {
723 self.transport
725 .send(JsonRpcMessage::response(
726 id,
727 json!({
728 "success": true
729 }),
730 ))
731 .await?;
732 }
733 Err(err) => {
734 self.transport
736 .send(JsonRpcMessage::error(
737 id,
738 error_codes::RESOURCE_NOT_FOUND,
739 &format!("Resource subscription error: {}", err),
740 Some(json!({
741 "uri": params.uri
742 })),
743 ))
744 .await?;
745 }
746 }
747
748 Ok(())
749 }
750 _ => Err(anyhow!("Expected request message for resources/subscribe")),
751 }
752 }
753
754 async fn handle_message(&self, message: JsonRpcMessage) -> Result<()> {
756 match &message.clone() {
757 JsonRpcMessage::Request { method, .. } => {
758 match method.as_str() {
759 methods::INITIALIZE => self.handle_initialize(message).await?,
760 methods::TOOLS_LIST => self.handle_tools_list(message).await?,
761 methods::TOOLS_CALL => self.handle_tools_call(message).await?,
762 methods::RESOURCES_LIST => self.handle_resources_list(message).await?,
763 methods::RESOURCES_READ => self.handle_resources_read(message).await?,
764 methods::RESOURCES_SUBSCRIBE => {
765 self.handle_resources_subscribe(message).await?
766 }
767 methods::RESOURCES_UNSUBSCRIBE => {
768 self.handle_resources_unsubscribe(message).await?
769 }
770 methods::RESOURCES_TEMPLATES_LIST => {
771 self.handle_resources_templates_list(message).await?
772 }
773 methods::PROMPTS_LIST => self.handle_prompts_list(message).await?,
774 methods::PROMPTS_GET => self.handle_prompts_get(message).await?,
775 methods::COMPLETION_COMPLETE => {
776 self.handle_completion_complete(message).await?
777 }
778 _ => {
779 if let JsonRpcMessage::Request { id, .. } = message {
780 self.transport
782 .send(JsonRpcMessage::error(
783 id,
784 error_codes::METHOD_NOT_FOUND,
785 &format!("Method not found: {}", method),
786 None,
787 ))
788 .await?;
789 }
790 }
791 }
792 }
793 JsonRpcMessage::Notification { method, .. } => match method.as_str() {
794 methods::INITIALIZED => self.handle_initialized().await?,
795 _ => {
796 tracing::debug!("Unhandled notification: {}", method);
797 }
798 },
799 _ => {
800 tracing::debug!("Unexpected message type from client");
802 }
803 }
804
805 Ok(())
806 }
807
808 pub async fn run(&self) -> Result<()> {
810 let (tx, mut rx) = mpsc::channel::<JsonRpcMessage>(100);
812
813 self.transport.start(tx).await?;
815
816 let resource_update_rx = self.resource_manager.subscribe_to_updates();
818 let resource_transport = self.transport.box_clone();
819
820 tokio::spawn(async move {
822 let mut update_rx = resource_update_rx;
823 while let Ok(uri) = update_rx.recv().await {
824 let _ = resource_transport
826 .send(JsonRpcMessage::notification(
827 methods::RESOURCES_UPDATED,
828 Some(json!({ "uri": uri })),
829 ))
830 .await;
831 }
832 });
833
834 let prompt_update_rx = self.prompt_manager.subscribe_to_updates();
836 let prompt_transport = self.transport.box_clone();
837
838 tokio::spawn(async move {
840 let mut update_rx = prompt_update_rx;
841 while let Ok(_) = update_rx.recv().await {
842 let _ = prompt_transport
844 .send(JsonRpcMessage::notification(
845 methods::PROMPTS_LIST_CHANGED,
846 None,
847 ))
848 .await;
849 }
850 });
851
852 while let Some(message) = rx.recv().await {
854 if let Err(err) = self.handle_message(message).await {
855 tracing::error!("Error handling message: {}", err);
856 }
857 }
858
859 self.state
861 .store(ServerState::ShuttingDown as u8, Ordering::SeqCst);
862
863 self.transport.close().await?;
865
866 Ok(())
867 }
868
869 pub fn tool_manager(&self) -> &Arc<ToolManager> {
871 &self.tool_manager
872 }
873
874 pub fn resource_manager(&self) -> &Arc<ResourceManager> {
876 &self.resource_manager
877 }
878
879 pub fn prompt_manager(&self) -> &Arc<PromptManager> {
881 &self.prompt_manager
882 }
883
884 pub(crate) fn transport(&self) -> &Box<dyn Transport> {
886 &self.transport
887 }
888
889 pub(crate) fn state(&self) -> &Arc<AtomicU8> {
891 &self.state
892 }
893}