modelcontextprotocol_server/
server.rs

1// mcp-server/src/server.rs
2use anyhow::{anyhow, Result};
3use serde_json::json;
4use std::collections::HashMap;
5use std::sync::atomic::{AtomicU8, Ordering};
6use std::sync::Arc;
7use tokio::sync::mpsc;
8use tracing::debug;
9
10use mcp_protocol::{
11    constants::{error_codes, methods, PROTOCOL_VERSION},
12    messages::{InitializeParams, InitializeResult, JsonRpcMessage, ServerCapabilities},
13    types::{
14        resource::{
15            Resource, ResourceContent, ResourceReadParams, ResourceSubscribeParams,
16            ResourcesListParams,
17        },
18        tool::{Tool, ToolCallParams, ToolCallResult},
19        ServerInfo, ServerState,
20    },
21    version::{is_supported_version, version_mismatch_error},
22};
23
24use crate::prompts::PromptManager;
25use crate::resources::ResourceManager;
26use crate::tools::ToolManager;
27use crate::transport::Transport;
28
29/// MCP server builder
30pub struct ServerBuilder {
31    name: String,
32    version: String,
33    transport: Option<Box<dyn Transport>>,
34    tool_manager: Option<Arc<ToolManager>>,
35    resource_manager: Option<Arc<ResourceManager>>,
36    prompt_manager: Option<Arc<PromptManager>>,
37}
38
39impl ServerBuilder {
40    /// Create a new server builder
41    pub fn new(name: &str, version: &str) -> Self {
42        debug!("Creating new server builder");
43        Self {
44            name: name.to_string(),
45            version: version.to_string(),
46            transport: None,
47            tool_manager: None,
48            resource_manager: None,
49            prompt_manager: None,
50        }
51    }
52
53    /// Set the transport to use
54    pub fn with_transport<T: Transport>(mut self, transport: T) -> Self {
55        self.transport = Some(Box::new(transport));
56        self
57    }
58
59    /// Set the tool manager
60    pub fn with_tool_manager(mut self, tool_manager: Arc<ToolManager>) -> Self {
61        self.tool_manager = Some(tool_manager);
62        self
63    }
64
65    /// Set the resource manager
66    pub fn with_resource_manager(mut self, resource_manager: Arc<ResourceManager>) -> Self {
67        self.resource_manager = Some(resource_manager);
68        self
69    }
70
71    /// Set the prompt manager
72    pub fn with_prompt_manager(mut self, prompt_manager: Arc<PromptManager>) -> Self {
73        self.prompt_manager = Some(prompt_manager);
74        self
75    }
76
77    /// Register a tool (creates a tool manager if not already set)
78    pub fn with_tool(
79        mut self,
80        name: &str,
81        description: Option<&str>,
82        input_schema: serde_json::Value,
83        handler: impl Fn(serde_json::Value) -> Result<ToolCallResult> + Send + Sync + 'static,
84    ) -> Self {
85        debug!("Registering tool: {}", name);
86        // Create tool manager if not already set
87        if self.tool_manager.is_none() {
88            self.tool_manager = Some(Arc::new(ToolManager::new()));
89        }
90
91        // Create tool
92        let tool = Tool {
93            name: name.to_string(),
94            description: description.map(|s| s.to_string()),
95            input_schema,
96            annotations: None,
97        };
98
99        // Register tool
100        let tool_manager = self.tool_manager.as_ref().unwrap();
101        tool_manager.register_tool(tool, handler);
102
103        self
104    }
105
106    /// Register a resource (creates a resource manager if not already set)
107    pub fn with_resource(
108        mut self,
109        uri: &str,
110        name: &str,
111        description: Option<&str>,
112        mime_type: Option<&str>,
113        size: Option<u64>,
114        content_provider: impl Fn() -> Result<Vec<ResourceContent>> + Send + Sync + 'static,
115    ) -> Self {
116        // Create resource manager if not already set
117        if self.resource_manager.is_none() {
118            self.resource_manager = Some(Arc::new(ResourceManager::new()));
119        }
120
121        // Create resource
122        let resource = Resource {
123            uri: uri.to_string(),
124            name: name.to_string(),
125            description: description.map(|s| s.to_string()),
126            mime_type: mime_type.map(|s| s.to_string()),
127            size,
128            annotations: None,
129        };
130
131        // Register resource
132        let resource_manager = self.resource_manager.as_ref().unwrap();
133        resource_manager.register_resource(resource, content_provider);
134
135        self
136    }
137
138    /// Register a resource template (creates a resource manager if not already set)
139    pub fn with_template(
140        mut self,
141        uri_template: &str,
142        name: &str,
143        description: Option<&str>,
144        mime_type: Option<&str>,
145        expander: impl Fn(String, HashMap<String, String>) -> Result<String> + Send + Sync + 'static,
146    ) -> Self {
147        // Create resource manager if not already set
148        if self.resource_manager.is_none() {
149            self.resource_manager = Some(Arc::new(ResourceManager::new()));
150        }
151
152        // Create template
153        let template = mcp_protocol::types::resource::ResourceTemplate {
154            uri_template: uri_template.to_string(),
155            name: name.to_string(),
156            description: description.map(|s| s.to_string()),
157            mime_type: mime_type.map(|s| s.to_string()),
158            annotations: None,
159        };
160
161        // Register template
162        let resource_manager = self.resource_manager.as_ref().unwrap();
163        resource_manager.register_template(template, expander);
164
165        self
166    }
167
168    /// Register a template parameter completion provider
169    pub fn with_template_completion(
170        mut self,
171        template_uri: &str,
172        provider: impl Fn(
173                String,
174                String,
175                Option<String>,
176            ) -> Result<Vec<mcp_protocol::types::completion::CompletionItem>>
177            + Send
178            + Sync
179            + 'static,
180    ) -> Self {
181        // Create resource manager if not already set
182        if self.resource_manager.is_none() {
183            self.resource_manager = Some(Arc::new(ResourceManager::new()));
184        }
185
186        // Register completion provider
187        let resource_manager = self.resource_manager.as_ref().unwrap();
188        resource_manager.register_completion_provider(template_uri, provider);
189
190        self
191    }
192
193    /// Register a prompt parameter completion provider
194    pub fn with_prompt_completion(
195        mut self,
196        prompt_name: &str,
197        param_name: &str,
198        provider: impl Fn(String, Option<String>) -> Result<Vec<String>> + Send + Sync + 'static,
199    ) -> Self {
200        // Create prompt manager if not already set
201        if self.prompt_manager.is_none() {
202            self.prompt_manager = Some(Arc::new(PromptManager::new()));
203        }
204
205        // Register completion provider
206        let prompt_manager = self.prompt_manager.as_ref().unwrap();
207        prompt_manager.register_completion_provider(prompt_name, param_name, provider);
208
209        self
210    }
211
212    /// Register a prompt (creates a prompt manager if not already set)
213    pub fn with_prompt(
214        mut self,
215        name: &str,
216        description: Option<&str>,
217        arguments: Option<Vec<mcp_protocol::types::prompt::PromptArgument>>,
218        handler: impl Fn(
219                Option<HashMap<String, String>>,
220            ) -> Result<Vec<mcp_protocol::types::prompt::PromptMessage>>
221            + Send
222            + Sync
223            + 'static,
224    ) -> Self {
225        // Create prompt manager if not already set
226        if self.prompt_manager.is_none() {
227            self.prompt_manager = Some(Arc::new(PromptManager::new()));
228        }
229
230        // Create prompt
231        let prompt = mcp_protocol::types::prompt::Prompt {
232            name: name.to_string(),
233            description: description.map(|s| s.to_string()),
234            arguments,
235            annotations: None,
236        };
237
238        // Register prompt
239        let prompt_manager = self.prompt_manager.as_ref().unwrap();
240        prompt_manager.register_prompt(prompt, handler);
241
242        self
243    }
244
245    /// Build the server
246    pub fn build(self) -> Result<Server> {
247        let transport = self
248            .transport
249            .ok_or_else(|| anyhow!("Transport is required"))?;
250
251        Ok(Server {
252            name: self.name,
253            version: self.version,
254            transport,
255            tool_manager: self
256                .tool_manager
257                .unwrap_or_else(|| Arc::new(ToolManager::new())),
258            resource_manager: self
259                .resource_manager
260                .unwrap_or_else(|| Arc::new(ResourceManager::new())),
261            prompt_manager: self
262                .prompt_manager
263                .unwrap_or_else(|| Arc::new(PromptManager::new())),
264            state: Arc::new(AtomicU8::new(ServerState::Created as u8)),
265        })
266    }
267}
268
269/// MCP server
270pub struct Server {
271    name: String,
272    version: String,
273    transport: Box<dyn Transport>,
274    tool_manager: Arc<ToolManager>,
275    resource_manager: Arc<ResourceManager>,
276    prompt_manager: Arc<PromptManager>,
277    state: Arc<AtomicU8>,
278}
279
280impl Server {
281    /// Get the tool capabilities
282    fn get_tool_capabilities(&self) -> HashMap<String, bool> {
283        let mut capabilities = HashMap::new();
284        capabilities.insert("listChanged".to_string(), true);
285        capabilities
286    }
287
288    /// Get the resource capabilities
289    fn get_resource_capabilities(&self) -> HashMap<String, bool> {
290        let mut capabilities = HashMap::new();
291        capabilities.insert("listChanged".to_string(), true);
292        capabilities.insert("subscribe".to_string(), true);
293        capabilities
294    }
295
296    /// Get the prompt capabilities
297    fn get_prompt_capabilities(&self) -> HashMap<String, bool> {
298        let mut capabilities = HashMap::new();
299        capabilities.insert("listChanged".to_string(), true);
300        capabilities
301    }
302
303    /// Get the server info
304    fn get_server_info(&self) -> ServerInfo {
305        ServerInfo {
306            name: self.name.clone(),
307            version: self.version.clone(),
308        }
309    }
310
311    /// Handle initialize request
312    async fn handle_initialize(&self, message: JsonRpcMessage) -> Result<()> {
313        match message {
314            JsonRpcMessage::Request { id, params, .. } => {
315                // Parse initialize parameters
316                let params: InitializeParams = match params {
317                    Some(params) => match serde_json::from_value(params) {
318                        Ok(params) => params,
319                        Err(err) => {
320                            // Send error response
321                            self.transport
322                                .send(JsonRpcMessage::error(
323                                    id,
324                                    error_codes::INVALID_PARAMS,
325                                    &format!("Invalid initialize parameters: {}", err),
326                                    None,
327                                ))
328                                .await?;
329                            return Ok(());
330                        }
331                    },
332                    None => {
333                        // Send error response
334                        self.transport
335                            .send(JsonRpcMessage::error(
336                                id,
337                                error_codes::INVALID_PARAMS,
338                                "Missing initialize parameters",
339                                None,
340                            ))
341                            .await?;
342                        return Ok(());
343                    }
344                };
345
346                // Validate protocol version
347                if !is_supported_version(&params.protocol_version) {
348                    // Send error response
349                    self.transport
350                        .send(JsonRpcMessage::error(
351                            id,
352                            error_codes::INVALID_PARAMS,
353                            "Unsupported protocol version",
354                            Some(json!(version_mismatch_error(&params.protocol_version))),
355                        ))
356                        .await?;
357                    return Ok(());
358                }
359
360                // Update server state
361                self.state
362                    .store(ServerState::Initializing as u8, Ordering::SeqCst);
363
364                // Get capabilities
365                let tools_capabilities = self.get_tool_capabilities();
366                let resources_capabilities = self.get_resource_capabilities();
367                let prompts_capabilities = self.get_prompt_capabilities();
368
369                // Create initialize result
370                let result = InitializeResult {
371                    protocol_version: PROTOCOL_VERSION.to_string(),
372                    capabilities: ServerCapabilities {
373                        tools: Some(tools_capabilities),
374                        resources: Some(resources_capabilities),
375                        prompts: Some(prompts_capabilities),
376                        ..Default::default()
377                    },
378                    server_info: self.get_server_info(),
379                    instructions: None,
380                };
381
382                // Send initialize response
383                self.transport
384                    .send(JsonRpcMessage::response(id, json!(result)))
385                    .await?;
386
387                Ok(())
388            }
389            _ => Err(anyhow!("Expected request message for initialize")),
390        }
391    }
392
393    /// Handle initialized notification
394    async fn handle_initialized(&self) -> Result<()> {
395        // Update server state
396        self.state.store(ServerState::Ready as u8, Ordering::SeqCst);
397
398        // No response needed for notifications
399        Ok(())
400    }
401
402    /// Handle tools/list request
403    async fn handle_tools_list(&self, message: JsonRpcMessage) -> Result<()> {
404        match message {
405            JsonRpcMessage::Request { id, .. } => {
406                // Check if server is ready
407                if self.state.load(Ordering::SeqCst) != ServerState::Ready as u8 {
408                    // Send error response
409                    self.transport
410                        .send(JsonRpcMessage::error(
411                            id,
412                            error_codes::SERVER_NOT_INITIALIZED,
413                            "Server not initialized",
414                            None,
415                        ))
416                        .await?;
417                    return Ok(());
418                }
419
420                // Get tools from manager
421                let tools = self.tool_manager.list_tools().await;
422
423                // Send response
424                self.transport
425                    .send(JsonRpcMessage::response(
426                        id,
427                        json!({
428                            "tools": tools,
429                            "nextCursor": ""
430                        }),
431                    ))
432                    .await?;
433
434                Ok(())
435            }
436            _ => Err(anyhow!("Expected request message for tools/list")),
437        }
438    }
439
440    /// Handle tools/call request
441    async fn handle_tools_call(&self, message: JsonRpcMessage) -> Result<()> {
442        match message {
443            JsonRpcMessage::Request { id, params, .. } => {
444                // Check if server is ready
445                if self.state.load(Ordering::SeqCst) != ServerState::Ready as u8 {
446                    // Send error response
447                    self.transport
448                        .send(JsonRpcMessage::error(
449                            id,
450                            error_codes::SERVER_NOT_INITIALIZED,
451                            "Server not initialized",
452                            None,
453                        ))
454                        .await?;
455                    return Ok(());
456                }
457
458                // Parse tool call parameters
459                let params: ToolCallParams = match params {
460                    Some(params) => match serde_json::from_value(params) {
461                        Ok(params) => params,
462                        Err(err) => {
463                            // Send error response
464                            self.transport
465                                .send(JsonRpcMessage::error(
466                                    id,
467                                    error_codes::INVALID_PARAMS,
468                                    &format!("Invalid tool call parameters: {}", err),
469                                    None,
470                                ))
471                                .await?;
472                            return Ok(());
473                        }
474                    },
475                    None => {
476                        // Send error response
477                        self.transport
478                            .send(JsonRpcMessage::error(
479                                id,
480                                error_codes::INVALID_PARAMS,
481                                "Missing tool call parameters",
482                                None,
483                            ))
484                            .await?;
485                        return Ok(());
486                    }
487                };
488
489                // Execute tool
490                match self
491                    .tool_manager
492                    .execute_tool(&params.name, params.arguments)
493                    .await
494                {
495                    Ok(result) => {
496                        // Send response
497                        self.transport
498                            .send(JsonRpcMessage::response(id, json!(result)))
499                            .await?;
500                    }
501                    Err(err) => {
502                        // Send error response
503                        self.transport
504                            .send(JsonRpcMessage::error(
505                                id,
506                                error_codes::INTERNAL_ERROR,
507                                &format!("Tool execution error: {}", err),
508                                None,
509                            ))
510                            .await?;
511                    }
512                }
513
514                Ok(())
515            }
516            _ => Err(anyhow!("Expected request message for tools/call")),
517        }
518    }
519
520    /// Handle resources/list request
521    async fn handle_resources_list(&self, message: JsonRpcMessage) -> Result<()> {
522        match message {
523            JsonRpcMessage::Request { id, params, .. } => {
524                // Check if server is ready
525                if self.state.load(Ordering::SeqCst) != ServerState::Ready as u8 {
526                    // Send error response
527                    self.transport
528                        .send(JsonRpcMessage::error(
529                            id,
530                            error_codes::SERVER_NOT_INITIALIZED,
531                            "Server not initialized",
532                            None,
533                        ))
534                        .await?;
535                    return Ok(());
536                }
537
538                // Parse parameters (optional)
539                let params: Option<ResourcesListParams> = match params {
540                    Some(params) => match serde_json::from_value(params) {
541                        Ok(params) => Some(params),
542                        Err(err) => {
543                            // Send error response
544                            self.transport
545                                .send(JsonRpcMessage::error(
546                                    id,
547                                    error_codes::INVALID_PARAMS,
548                                    &format!("Invalid resource list parameters: {}", err),
549                                    None,
550                                ))
551                                .await?;
552                            return Ok(());
553                        }
554                    },
555                    None => None,
556                };
557
558                // Get cursor from parameters
559                let cursor = params.and_then(|p| p.cursor);
560
561                // Get resources from manager with pagination
562                let (resources, next_cursor) = self.resource_manager.list_resources(cursor).await;
563
564                // Send response
565                self.transport
566                    .send(JsonRpcMessage::response(
567                        id,
568                        json!({
569                            "resources": resources,
570                            "nextCursor": next_cursor.unwrap_or_default()
571                        }),
572                    ))
573                    .await?;
574
575                Ok(())
576            }
577            _ => Err(anyhow!("Expected request message for resources/list")),
578        }
579    }
580
581    /// Handle resources/read request
582    async fn handle_resources_read(&self, message: JsonRpcMessage) -> Result<()> {
583        match message {
584            JsonRpcMessage::Request { id, params, .. } => {
585                // Check if server is ready
586                if self.state.load(Ordering::SeqCst) != ServerState::Ready as u8 {
587                    // Send error response
588                    self.transport
589                        .send(JsonRpcMessage::error(
590                            id,
591                            error_codes::SERVER_NOT_INITIALIZED,
592                            "Server not initialized",
593                            None,
594                        ))
595                        .await?;
596                    return Ok(());
597                }
598
599                // Parse parameters
600                let params: ResourceReadParams = match params {
601                    Some(params) => match serde_json::from_value(params) {
602                        Ok(params) => params,
603                        Err(err) => {
604                            // Send error response
605                            self.transport
606                                .send(JsonRpcMessage::error(
607                                    id,
608                                    error_codes::INVALID_PARAMS,
609                                    &format!("Invalid resource read parameters: {}", err),
610                                    None,
611                                ))
612                                .await?;
613                            return Ok(());
614                        }
615                    },
616                    None => {
617                        // Send error response
618                        self.transport
619                            .send(JsonRpcMessage::error(
620                                id,
621                                error_codes::INVALID_PARAMS,
622                                "Missing resource read parameters",
623                                None,
624                            ))
625                            .await?;
626                        return Ok(());
627                    }
628                };
629
630                // Read resource
631                match self
632                    .resource_manager
633                    .get_resource_content(&params.uri)
634                    .await
635                {
636                    Ok(contents) => {
637                        // Send response
638                        self.transport
639                            .send(JsonRpcMessage::response(
640                                id,
641                                json!({
642                                    "contents": contents
643                                }),
644                            ))
645                            .await?;
646                    }
647                    Err(err) => {
648                        // Send error response
649                        self.transport
650                            .send(JsonRpcMessage::error(
651                                id,
652                                error_codes::RESOURCE_NOT_FOUND,
653                                &format!("Resource not found: {}", err),
654                                Some(json!({
655                                    "uri": params.uri
656                                })),
657                            ))
658                            .await?;
659                    }
660                }
661
662                Ok(())
663            }
664            _ => Err(anyhow!("Expected request message for resources/read")),
665        }
666    }
667
668    /// Handle resources/subscribe request
669    async fn handle_resources_subscribe(&self, message: JsonRpcMessage) -> Result<()> {
670        match message {
671            JsonRpcMessage::Request { id, params, .. } => {
672                // Check if server is ready
673                if self.state.load(Ordering::SeqCst) != ServerState::Ready as u8 {
674                    // Send error response
675                    self.transport
676                        .send(JsonRpcMessage::error(
677                            id,
678                            error_codes::SERVER_NOT_INITIALIZED,
679                            "Server not initialized",
680                            None,
681                        ))
682                        .await?;
683                    return Ok(());
684                }
685
686                // Parse parameters
687                let params: ResourceSubscribeParams = match params {
688                    Some(params) => match serde_json::from_value(params) {
689                        Ok(params) => params,
690                        Err(err) => {
691                            // Send error response
692                            self.transport
693                                .send(JsonRpcMessage::error(
694                                    id,
695                                    error_codes::INVALID_PARAMS,
696                                    &format!("Invalid resource subscribe parameters: {}", err),
697                                    None,
698                                ))
699                                .await?;
700                            return Ok(());
701                        }
702                    },
703                    None => {
704                        // Send error response
705                        self.transport
706                            .send(JsonRpcMessage::error(
707                                id,
708                                error_codes::INVALID_PARAMS,
709                                "Missing resource subscribe parameters",
710                                None,
711                            ))
712                            .await?;
713                        return Ok(());
714                    }
715                };
716
717                // Subscribe to resource
718                let client_id = id.to_string(); // Use request ID as client ID for simplicity
719                match self
720                    .resource_manager
721                    .subscribe(&client_id, &params.uri)
722                    .await
723                {
724                    Ok(_) => {
725                        // Send success response
726                        self.transport
727                            .send(JsonRpcMessage::response(
728                                id,
729                                json!({
730                                    "success": true
731                                }),
732                            ))
733                            .await?;
734                    }
735                    Err(err) => {
736                        // Send error response
737                        self.transport
738                            .send(JsonRpcMessage::error(
739                                id,
740                                error_codes::RESOURCE_NOT_FOUND,
741                                &format!("Resource subscription error: {}", err),
742                                Some(json!({
743                                    "uri": params.uri
744                                })),
745                            ))
746                            .await?;
747                    }
748                }
749
750                Ok(())
751            }
752            _ => Err(anyhow!("Expected request message for resources/subscribe")),
753        }
754    }
755
756    /// Handle incoming messages
757    async fn handle_message(&self, message: JsonRpcMessage) -> Result<()> {
758        match &message.clone() {
759            JsonRpcMessage::Request { method, .. } => {
760                match method.as_str() {
761                    methods::INITIALIZE => self.handle_initialize(message).await?,
762                    methods::TOOLS_LIST => self.handle_tools_list(message).await?,
763                    methods::TOOLS_CALL => self.handle_tools_call(message).await?,
764                    methods::RESOURCES_LIST => self.handle_resources_list(message).await?,
765                    methods::RESOURCES_READ => self.handle_resources_read(message).await?,
766                    methods::RESOURCES_SUBSCRIBE => {
767                        self.handle_resources_subscribe(message).await?
768                    }
769                    methods::RESOURCES_UNSUBSCRIBE => {
770                        self.handle_resources_unsubscribe(message).await?
771                    }
772                    methods::RESOURCES_TEMPLATES_LIST => {
773                        self.handle_resources_templates_list(message).await?
774                    }
775                    methods::PROMPTS_LIST => self.handle_prompts_list(message).await?,
776                    methods::PROMPTS_GET => self.handle_prompts_get(message).await?,
777                    methods::COMPLETION_COMPLETE => {
778                        self.handle_completion_complete(message).await?
779                    }
780                    _ => {
781                        if let JsonRpcMessage::Request { id, .. } = message {
782                            // Method not found
783                            self.transport
784                                .send(JsonRpcMessage::error(
785                                    id,
786                                    error_codes::METHOD_NOT_FOUND,
787                                    &format!("Method not found: {}", method),
788                                    None,
789                                ))
790                                .await?;
791                        }
792                    }
793                }
794            }
795            JsonRpcMessage::Notification { method, .. } => match method.as_str() {
796                methods::INITIALIZED => self.handle_initialized().await?,
797                _ => {
798                    tracing::debug!("Unhandled notification: {}", method);
799                }
800            },
801            _ => {
802                // Not sure what to do with responses from the client
803                tracing::debug!("Unexpected message type from client");
804            }
805        }
806
807        Ok(())
808    }
809
810    /// Start the server and run until shutdown
811    pub async fn run(&self) -> Result<()> {
812        // Create message channel
813        let (tx, mut rx) = mpsc::channel::<JsonRpcMessage>(100);
814
815        // Start transport
816        self.transport.start(tx).await?;
817
818        // Set up resource update listener
819        let resource_update_rx = self.resource_manager.subscribe_to_updates();
820        let resource_transport = self.transport.box_clone();
821
822        // Spawn a task to handle resource updates
823        tokio::spawn(async move {
824            let mut update_rx = resource_update_rx;
825            while let Ok(uri) = update_rx.recv().await {
826                // Send notification
827                let _ = resource_transport
828                    .send(JsonRpcMessage::notification(
829                        methods::RESOURCES_UPDATED,
830                        Some(json!({ "uri": uri })),
831                    ))
832                    .await;
833            }
834        });
835
836        // Set up prompt update listener
837        let prompt_update_rx = self.prompt_manager.subscribe_to_updates();
838        let prompt_transport = self.transport.box_clone();
839
840        // Spawn a task to handle prompt updates
841        tokio::spawn(async move {
842            let mut update_rx = prompt_update_rx;
843            while let Ok(_) = update_rx.recv().await {
844                // Send notification
845                let _ = prompt_transport
846                    .send(JsonRpcMessage::notification(
847                        methods::PROMPTS_LIST_CHANGED,
848                        None,
849                    ))
850                    .await;
851            }
852        });
853
854        // Process messages
855        while let Some(message) = rx.recv().await {
856            if let Err(err) = self.handle_message(message).await {
857                tracing::error!("Error handling message: {}", err);
858            }
859        }
860
861        // Update state
862        self.state
863            .store(ServerState::ShuttingDown as u8, Ordering::SeqCst);
864
865        // Close transport
866        self.transport.close().await?;
867
868        Ok(())
869    }
870
871    /// Get a reference to the tool manager
872    pub fn tool_manager(&self) -> &Arc<ToolManager> {
873        &self.tool_manager
874    }
875
876    /// Get a reference to the resource manager
877    pub fn resource_manager(&self) -> &Arc<ResourceManager> {
878        &self.resource_manager
879    }
880
881    /// Get a reference to the prompt manager
882    pub fn prompt_manager(&self) -> &Arc<PromptManager> {
883        &self.prompt_manager
884    }
885
886    /// Get a reference to the transport
887    pub(crate) fn transport(&self) -> &Box<dyn Transport> {
888        &self.transport
889    }
890
891    /// Get the server state
892    pub(crate) fn state(&self) -> &Arc<AtomicU8> {
893        &self.state
894    }
895}