modelcontextprotocol_server/
server.rs

1// mcp-server/src/server.rs
2use anyhow::{anyhow, Result};
3use serde_json::json;
4use std::collections::HashMap;
5use std::sync::atomic::{AtomicU8, Ordering};
6use std::sync::Arc;
7use tokio::sync::mpsc;
8use tracing::debug;
9
10use mcp_protocol::{
11    constants::{error_codes, methods, PROTOCOL_VERSION},
12    messages::{InitializeParams, InitializeResult, JsonRpcMessage, ServerCapabilities},
13    types::{
14        resource::{
15            Resource, ResourceContent, ResourceReadParams, ResourceSubscribeParams,
16            ResourcesListParams,
17        },
18        tool::{Tool, ToolCallParams, ToolCallResult},
19        ServerInfo, ServerState,
20    },
21    version::{is_supported_version, version_mismatch_error},
22};
23
24use crate::prompts::PromptManager;
25use crate::resources::ResourceManager;
26use crate::tools::ToolManager;
27use crate::transport::Transport;
28
29/// MCP server builder
30pub struct ServerBuilder {
31    name: String,
32    version: String,
33    transport: Option<Box<dyn Transport>>,
34    tool_manager: Option<Arc<ToolManager>>,
35    resource_manager: Option<Arc<ResourceManager>>,
36    prompt_manager: Option<Arc<PromptManager>>,
37}
38
39impl ServerBuilder {
40    /// Create a new server builder
41    pub fn new(name: &str, version: &str) -> Self {
42        debug!("Creating new server builder");
43        Self {
44            name: name.to_string(),
45            version: version.to_string(),
46            transport: None,
47            tool_manager: None,
48            resource_manager: None,
49            prompt_manager: None,
50        }
51    }
52
53    /// Set the transport to use
54    pub fn with_transport<T: Transport>(mut self, transport: T) -> Self {
55        self.transport = Some(Box::new(transport));
56        self
57    }
58
59    /// Set the tool manager
60    pub fn with_tool_manager(mut self, tool_manager: Arc<ToolManager>) -> Self {
61        self.tool_manager = Some(tool_manager);
62        self
63    }
64
65    /// Set the resource manager
66    pub fn with_resource_manager(mut self, resource_manager: Arc<ResourceManager>) -> Self {
67        self.resource_manager = Some(resource_manager);
68        self
69    }
70
71    /// Set the prompt manager
72    pub fn with_prompt_manager(mut self, prompt_manager: Arc<PromptManager>) -> Self {
73        self.prompt_manager = Some(prompt_manager);
74        self
75    }
76
77    /// Register a tool (creates a tool manager if not already set)
78    pub fn with_tool(
79        mut self,
80        name: &str,
81        description: Option<&str>,
82        input_schema: serde_json::Value,
83        handler: impl Fn(serde_json::Value) -> Result<ToolCallResult> + Send + Sync + 'static,
84    ) -> Self {
85        debug!("Registering tool: {}", name);
86        // Create tool manager if not already set
87        if self.tool_manager.is_none() {
88            self.tool_manager = Some(Arc::new(ToolManager::new()));
89        }
90
91        // Create tool
92        let tool = Tool {
93            name: name.to_string(),
94            description: description.map(|s| s.to_string()),
95            input_schema,
96            annotations: None,
97        };
98
99        // Register tool
100        let tool_manager = self.tool_manager.as_ref().unwrap();
101        tool_manager.register_tool(tool, handler);
102
103        self
104    }
105
106    /// Register a resource (creates a resource manager if not already set)
107    pub fn with_resource(
108        mut self,
109        uri: &str,
110        name: &str,
111        description: Option<&str>,
112        mime_type: Option<&str>,
113        size: Option<u64>,
114        content_provider: impl Fn() -> Result<Vec<ResourceContent>> + Send + Sync + 'static,
115    ) -> Self {
116        // Create resource manager if not already set
117        if self.resource_manager.is_none() {
118            self.resource_manager = Some(Arc::new(ResourceManager::new()));
119        }
120
121        // Create resource
122        let resource = Resource {
123            uri: uri.to_string(),
124            name: name.to_string(),
125            description: description.map(|s| s.to_string()),
126            mime_type: mime_type.map(|s| s.to_string()),
127            size,
128            annotations: None,
129        };
130
131        // Register resource
132        let resource_manager = self.resource_manager.as_ref().unwrap();
133        resource_manager.register_resource(resource, content_provider);
134
135        self
136    }
137
138    /// Register a resource template (creates a resource manager if not already set)
139    pub fn with_template(
140        mut self,
141        uri_template: &str,
142        name: &str,
143        description: Option<&str>,
144        mime_type: Option<&str>,
145        expander: impl Fn(String, HashMap<String, String>) -> Result<String> + Send + Sync + 'static,
146    ) -> Self {
147        // Create resource manager if not already set
148        if self.resource_manager.is_none() {
149            self.resource_manager = Some(Arc::new(ResourceManager::new()));
150        }
151
152        // Create template
153        let template = mcp_protocol::types::resource::ResourceTemplate {
154            uri_template: uri_template.to_string(),
155            name: name.to_string(),
156            description: description.map(|s| s.to_string()),
157            mime_type: mime_type.map(|s| s.to_string()),
158            annotations: None,
159        };
160
161        // Register template
162        let resource_manager = self.resource_manager.as_ref().unwrap();
163        resource_manager.register_template(template, expander);
164
165        self
166    }
167
168    /// Register a template parameter completion provider
169    pub fn with_template_completion(
170        mut self,
171        template_uri: &str,
172        provider: impl Fn(
173                String,
174                String,
175                Option<String>,
176            ) -> Result<Vec<mcp_protocol::types::completion::CompletionItem>>
177            + Send
178            + Sync
179            + 'static,
180    ) -> Self {
181        // Create resource manager if not already set
182        if self.resource_manager.is_none() {
183            self.resource_manager = Some(Arc::new(ResourceManager::new()));
184        }
185
186        // Register completion provider
187        let resource_manager = self.resource_manager.as_ref().unwrap();
188        resource_manager.register_completion_provider(template_uri, provider);
189
190        self
191    }
192
193    /// Register a prompt parameter completion provider
194    pub fn with_prompt_completion(
195        mut self,
196        prompt_name: &str,
197        param_name: &str,
198        provider: impl Fn(String, Option<String>) -> Result<Vec<String>> + Send + Sync + 'static,
199    ) -> Self {
200        // Create prompt manager if not already set
201        if self.prompt_manager.is_none() {
202            self.prompt_manager = Some(Arc::new(PromptManager::new()));
203        }
204
205        // Register completion provider
206        let prompt_manager = self.prompt_manager.as_ref().unwrap();
207        prompt_manager.register_completion_provider(prompt_name, param_name, provider);
208
209        self
210    }
211
212    /// Register a prompt (creates a prompt manager if not already set)
213    pub fn with_prompt(
214        mut self,
215        name: &str,
216        description: Option<&str>,
217        arguments: Option<Vec<mcp_protocol::types::prompt::PromptArgument>>,
218        handler: impl Fn(
219                Option<HashMap<String, String>>,
220            ) -> Result<Vec<mcp_protocol::types::prompt::PromptMessage>>
221            + Send
222            + Sync
223            + 'static,
224    ) -> Self {
225        // Create prompt manager if not already set
226        if self.prompt_manager.is_none() {
227            self.prompt_manager = Some(Arc::new(PromptManager::new()));
228        }
229
230        // Create prompt
231        let prompt = mcp_protocol::types::prompt::Prompt {
232            name: name.to_string(),
233            description: description.map(|s| s.to_string()),
234            arguments,
235            annotations: None,
236        };
237
238        // Register prompt
239        let prompt_manager = self.prompt_manager.as_ref().unwrap();
240        prompt_manager.register_prompt(prompt, handler);
241
242        self
243    }
244
245    /// Build the server
246    pub fn build(self) -> Result<Server> {
247        let transport = self
248            .transport
249            .ok_or_else(|| anyhow!("Transport is required"))?;
250
251        Ok(Server {
252            name: self.name,
253            version: self.version,
254            transport,
255            tool_manager: self
256                .tool_manager
257                .unwrap_or_else(|| Arc::new(ToolManager::new())),
258            resource_manager: self
259                .resource_manager
260                .unwrap_or_else(|| Arc::new(ResourceManager::new())),
261            prompt_manager: self
262                .prompt_manager
263                .unwrap_or_else(|| Arc::new(PromptManager::new())),
264            state: Arc::new(AtomicU8::new(ServerState::Created as u8)),
265        })
266    }
267}
268
269/// MCP server
270pub struct Server {
271    name: String,
272    version: String,
273    transport: Box<dyn Transport>,
274    tool_manager: Arc<ToolManager>,
275    resource_manager: Arc<ResourceManager>,
276    prompt_manager: Arc<PromptManager>,
277    state: Arc<AtomicU8>,
278}
279
280impl Server {
281    /// Get the tool capabilities
282    fn get_tool_capabilities(&self) -> HashMap<String, bool> {
283        let mut capabilities = HashMap::new();
284        capabilities.insert("listChanged".to_string(), true);
285        capabilities
286    }
287
288    /// Get the resource capabilities
289    fn get_resource_capabilities(&self) -> HashMap<String, bool> {
290        let mut capabilities = HashMap::new();
291        capabilities.insert("listChanged".to_string(), true);
292        capabilities.insert("subscribe".to_string(), true);
293        capabilities
294    }
295
296    /// Get the prompt capabilities
297    fn get_prompt_capabilities(&self) -> HashMap<String, bool> {
298        let mut capabilities = HashMap::new();
299        capabilities.insert("listChanged".to_string(), true);
300        capabilities
301    }
302
303    /// Get the server info
304    fn get_server_info(&self) -> ServerInfo {
305        ServerInfo {
306            name: self.name.clone(),
307            version: self.version.clone(),
308        }
309    }
310
311    /// Handle initialize request
312    async fn handle_initialize(&self, message: JsonRpcMessage) -> Result<()> {
313        match message {
314            JsonRpcMessage::Request { id, params, .. } => {
315                // Parse initialize parameters
316                let params: InitializeParams = match params {
317                    Some(params) => match serde_json::from_value(params) {
318                        Ok(params) => params,
319                        Err(err) => {
320                            // Send error response
321                            self.transport
322                                .send(JsonRpcMessage::error(
323                                    id,
324                                    error_codes::INVALID_PARAMS,
325                                    &format!("Invalid initialize parameters: {}", err),
326                                    None,
327                                ))
328                                .await?;
329                            return Ok(());
330                        }
331                    },
332                    None => {
333                        // Send error response
334                        self.transport
335                            .send(JsonRpcMessage::error(
336                                id,
337                                error_codes::INVALID_PARAMS,
338                                "Missing initialize parameters",
339                                None,
340                            ))
341                            .await?;
342                        return Ok(());
343                    }
344                };
345
346                // Validate protocol version
347                if !is_supported_version(&params.protocol_version) {
348                    // Send error response
349                    self.transport
350                        .send(JsonRpcMessage::error(
351                            id,
352                            error_codes::INVALID_PARAMS,
353                            "Unsupported protocol version",
354                            Some(json!(version_mismatch_error(&params.protocol_version))),
355                        ))
356                        .await?;
357                    return Ok(());
358                }
359
360                // Update server state
361                self.state
362                    .store(ServerState::Initializing as u8, Ordering::SeqCst);
363
364                // Get capabilities
365                let tools_capabilities = self.get_tool_capabilities();
366                let resources_capabilities = self.get_resource_capabilities();
367                let prompts_capabilities = self.get_prompt_capabilities();
368
369                // Create initialize result
370                let result = InitializeResult {
371                    protocol_version: PROTOCOL_VERSION.to_string(),
372                    capabilities: ServerCapabilities {
373                        tools: Some(tools_capabilities),
374                        resources: Some(resources_capabilities),
375                        prompts: Some(prompts_capabilities),
376                        ..Default::default()
377                    },
378                    server_info: self.get_server_info(),
379                    instructions: None,
380                };
381
382                // Send initialize response
383                self.transport
384                    .send(JsonRpcMessage::response(id, json!(result)))
385                    .await?;
386
387                Ok(())
388            }
389            _ => Err(anyhow!("Expected request message for initialize")),
390        }
391    }
392
393    /// Handle initialized notification
394    async fn handle_initialized(&self) -> Result<()> {
395        // Update server state
396        self.state.store(ServerState::Ready as u8, Ordering::SeqCst);
397
398        // No response needed for notifications
399        Ok(())
400    }
401
402    /// Handle tools/list request
403    async fn handle_tools_list(&self, message: JsonRpcMessage) -> Result<()> {
404        match message {
405            JsonRpcMessage::Request { id, .. } => {
406                // Check if server is ready
407                if self.state.load(Ordering::SeqCst) != ServerState::Ready as u8 {
408                    // Send error response
409                    self.transport
410                        .send(JsonRpcMessage::error(
411                            id,
412                            error_codes::SERVER_NOT_INITIALIZED,
413                            "Server not initialized",
414                            None,
415                        ))
416                        .await?;
417                    return Ok(());
418                }
419
420                // Get tools from manager
421                let tools = self.tool_manager.list_tools().await;
422
423                // Send response
424                self.transport
425                    .send(JsonRpcMessage::response(
426                        id,
427                        json!({
428                            "tools": tools,
429                            "nextCursor": ""
430                        }),
431                    ))
432                    .await?;
433
434                Ok(())
435            }
436            _ => Err(anyhow!("Expected request message for tools/list")),
437        }
438    }
439
440    /// Handle tools/call request
441    async fn handle_tools_call(&self, message: JsonRpcMessage) -> Result<()> {
442        match message {
443            JsonRpcMessage::Request { id, params, .. } => {
444                // Check if server is ready
445                if self.state.load(Ordering::SeqCst) != ServerState::Ready as u8 {
446                    // Send error response
447                    self.transport
448                        .send(JsonRpcMessage::error(
449                            id,
450                            error_codes::SERVER_NOT_INITIALIZED,
451                            "Server not initialized",
452                            None,
453                        ))
454                        .await?;
455                    return Ok(());
456                }
457
458                // Parse tool call parameters
459                let params: ToolCallParams = match params {
460                    Some(params) => match serde_json::from_value(params) {
461                        Ok(params) => params,
462                        Err(err) => {
463                            // Send error response
464                            self.transport
465                                .send(JsonRpcMessage::error(
466                                    id,
467                                    error_codes::INVALID_PARAMS,
468                                    &format!("Invalid tool call parameters: {}", err),
469                                    None,
470                                ))
471                                .await?;
472                            return Ok(());
473                        }
474                    },
475                    None => {
476                        // Send error response
477                        self.transport
478                            .send(JsonRpcMessage::error(
479                                id,
480                                error_codes::INVALID_PARAMS,
481                                "Missing tool call parameters",
482                                None,
483                            ))
484                            .await?;
485                        return Ok(());
486                    }
487                };
488
489                let args = params.arguments.unwrap_or_else(|| json!({}));
490
491                // Execute tool
492                match self.tool_manager.execute_tool(&params.name, args).await {
493                    Ok(result) => {
494                        // Send response
495                        self.transport
496                            .send(JsonRpcMessage::response(id, json!(result)))
497                            .await?;
498                    }
499                    Err(err) => {
500                        // Send error response
501                        self.transport
502                            .send(JsonRpcMessage::error(
503                                id,
504                                error_codes::INTERNAL_ERROR,
505                                &format!("Tool execution error: {}", err),
506                                None,
507                            ))
508                            .await?;
509                    }
510                }
511
512                Ok(())
513            }
514            _ => Err(anyhow!("Expected request message for tools/call")),
515        }
516    }
517
518    /// Handle resources/list request
519    async fn handle_resources_list(&self, message: JsonRpcMessage) -> Result<()> {
520        match message {
521            JsonRpcMessage::Request { id, params, .. } => {
522                // Check if server is ready
523                if self.state.load(Ordering::SeqCst) != ServerState::Ready as u8 {
524                    // Send error response
525                    self.transport
526                        .send(JsonRpcMessage::error(
527                            id,
528                            error_codes::SERVER_NOT_INITIALIZED,
529                            "Server not initialized",
530                            None,
531                        ))
532                        .await?;
533                    return Ok(());
534                }
535
536                // Parse parameters (optional)
537                let params: Option<ResourcesListParams> = match params {
538                    Some(params) => match serde_json::from_value(params) {
539                        Ok(params) => Some(params),
540                        Err(err) => {
541                            // Send error response
542                            self.transport
543                                .send(JsonRpcMessage::error(
544                                    id,
545                                    error_codes::INVALID_PARAMS,
546                                    &format!("Invalid resource list parameters: {}", err),
547                                    None,
548                                ))
549                                .await?;
550                            return Ok(());
551                        }
552                    },
553                    None => None,
554                };
555
556                // Get cursor from parameters
557                let cursor = params.and_then(|p| p.cursor);
558
559                // Get resources from manager with pagination
560                let (resources, next_cursor) = self.resource_manager.list_resources(cursor).await;
561
562                // Send response
563                self.transport
564                    .send(JsonRpcMessage::response(
565                        id,
566                        json!({
567                            "resources": resources,
568                            "nextCursor": next_cursor.unwrap_or_default()
569                        }),
570                    ))
571                    .await?;
572
573                Ok(())
574            }
575            _ => Err(anyhow!("Expected request message for resources/list")),
576        }
577    }
578
579    /// Handle resources/read request
580    async fn handle_resources_read(&self, message: JsonRpcMessage) -> Result<()> {
581        match message {
582            JsonRpcMessage::Request { id, params, .. } => {
583                // Check if server is ready
584                if self.state.load(Ordering::SeqCst) != ServerState::Ready as u8 {
585                    // Send error response
586                    self.transport
587                        .send(JsonRpcMessage::error(
588                            id,
589                            error_codes::SERVER_NOT_INITIALIZED,
590                            "Server not initialized",
591                            None,
592                        ))
593                        .await?;
594                    return Ok(());
595                }
596
597                // Parse parameters
598                let params: ResourceReadParams = match params {
599                    Some(params) => match serde_json::from_value(params) {
600                        Ok(params) => params,
601                        Err(err) => {
602                            // Send error response
603                            self.transport
604                                .send(JsonRpcMessage::error(
605                                    id,
606                                    error_codes::INVALID_PARAMS,
607                                    &format!("Invalid resource read parameters: {}", err),
608                                    None,
609                                ))
610                                .await?;
611                            return Ok(());
612                        }
613                    },
614                    None => {
615                        // Send error response
616                        self.transport
617                            .send(JsonRpcMessage::error(
618                                id,
619                                error_codes::INVALID_PARAMS,
620                                "Missing resource read parameters",
621                                None,
622                            ))
623                            .await?;
624                        return Ok(());
625                    }
626                };
627
628                // Read resource
629                match self
630                    .resource_manager
631                    .get_resource_content(&params.uri)
632                    .await
633                {
634                    Ok(contents) => {
635                        // Send response
636                        self.transport
637                            .send(JsonRpcMessage::response(
638                                id,
639                                json!({
640                                    "contents": contents
641                                }),
642                            ))
643                            .await?;
644                    }
645                    Err(err) => {
646                        // Send error response
647                        self.transport
648                            .send(JsonRpcMessage::error(
649                                id,
650                                error_codes::RESOURCE_NOT_FOUND,
651                                &format!("Resource not found: {}", err),
652                                Some(json!({
653                                    "uri": params.uri
654                                })),
655                            ))
656                            .await?;
657                    }
658                }
659
660                Ok(())
661            }
662            _ => Err(anyhow!("Expected request message for resources/read")),
663        }
664    }
665
666    /// Handle resources/subscribe request
667    async fn handle_resources_subscribe(&self, message: JsonRpcMessage) -> Result<()> {
668        match message {
669            JsonRpcMessage::Request { id, params, .. } => {
670                // Check if server is ready
671                if self.state.load(Ordering::SeqCst) != ServerState::Ready as u8 {
672                    // Send error response
673                    self.transport
674                        .send(JsonRpcMessage::error(
675                            id,
676                            error_codes::SERVER_NOT_INITIALIZED,
677                            "Server not initialized",
678                            None,
679                        ))
680                        .await?;
681                    return Ok(());
682                }
683
684                // Parse parameters
685                let params: ResourceSubscribeParams = match params {
686                    Some(params) => match serde_json::from_value(params) {
687                        Ok(params) => params,
688                        Err(err) => {
689                            // Send error response
690                            self.transport
691                                .send(JsonRpcMessage::error(
692                                    id,
693                                    error_codes::INVALID_PARAMS,
694                                    &format!("Invalid resource subscribe parameters: {}", err),
695                                    None,
696                                ))
697                                .await?;
698                            return Ok(());
699                        }
700                    },
701                    None => {
702                        // Send error response
703                        self.transport
704                            .send(JsonRpcMessage::error(
705                                id,
706                                error_codes::INVALID_PARAMS,
707                                "Missing resource subscribe parameters",
708                                None,
709                            ))
710                            .await?;
711                        return Ok(());
712                    }
713                };
714
715                // Subscribe to resource
716                let client_id = id.to_string(); // Use request ID as client ID for simplicity
717                match self
718                    .resource_manager
719                    .subscribe(&client_id, &params.uri)
720                    .await
721                {
722                    Ok(_) => {
723                        // Send success response
724                        self.transport
725                            .send(JsonRpcMessage::response(
726                                id,
727                                json!({
728                                    "success": true
729                                }),
730                            ))
731                            .await?;
732                    }
733                    Err(err) => {
734                        // Send error response
735                        self.transport
736                            .send(JsonRpcMessage::error(
737                                id,
738                                error_codes::RESOURCE_NOT_FOUND,
739                                &format!("Resource subscription error: {}", err),
740                                Some(json!({
741                                    "uri": params.uri
742                                })),
743                            ))
744                            .await?;
745                    }
746                }
747
748                Ok(())
749            }
750            _ => Err(anyhow!("Expected request message for resources/subscribe")),
751        }
752    }
753
754    /// Handle incoming messages
755    async fn handle_message(&self, message: JsonRpcMessage) -> Result<()> {
756        match &message.clone() {
757            JsonRpcMessage::Request { method, .. } => {
758                match method.as_str() {
759                    methods::INITIALIZE => self.handle_initialize(message).await?,
760                    methods::TOOLS_LIST => self.handle_tools_list(message).await?,
761                    methods::TOOLS_CALL => self.handle_tools_call(message).await?,
762                    methods::RESOURCES_LIST => self.handle_resources_list(message).await?,
763                    methods::RESOURCES_READ => self.handle_resources_read(message).await?,
764                    methods::RESOURCES_SUBSCRIBE => {
765                        self.handle_resources_subscribe(message).await?
766                    }
767                    methods::RESOURCES_UNSUBSCRIBE => {
768                        self.handle_resources_unsubscribe(message).await?
769                    }
770                    methods::RESOURCES_TEMPLATES_LIST => {
771                        self.handle_resources_templates_list(message).await?
772                    }
773                    methods::PROMPTS_LIST => self.handle_prompts_list(message).await?,
774                    methods::PROMPTS_GET => self.handle_prompts_get(message).await?,
775                    methods::COMPLETION_COMPLETE => {
776                        self.handle_completion_complete(message).await?
777                    }
778                    _ => {
779                        if let JsonRpcMessage::Request { id, .. } = message {
780                            // Method not found
781                            self.transport
782                                .send(JsonRpcMessage::error(
783                                    id,
784                                    error_codes::METHOD_NOT_FOUND,
785                                    &format!("Method not found: {}", method),
786                                    None,
787                                ))
788                                .await?;
789                        }
790                    }
791                }
792            }
793            JsonRpcMessage::Notification { method, .. } => match method.as_str() {
794                methods::INITIALIZED => self.handle_initialized().await?,
795                _ => {
796                    tracing::debug!("Unhandled notification: {}", method);
797                }
798            },
799            _ => {
800                // Not sure what to do with responses from the client
801                tracing::debug!("Unexpected message type from client");
802            }
803        }
804
805        Ok(())
806    }
807
808    /// Start the server and run until shutdown
809    pub async fn run(&self) -> Result<()> {
810        // Create message channel
811        let (tx, mut rx) = mpsc::channel::<JsonRpcMessage>(100);
812
813        // Start transport
814        self.transport.start(tx).await?;
815
816        // Set up resource update listener
817        let resource_update_rx = self.resource_manager.subscribe_to_updates();
818        let resource_transport = self.transport.box_clone();
819
820        // Spawn a task to handle resource updates
821        tokio::spawn(async move {
822            let mut update_rx = resource_update_rx;
823            while let Ok(uri) = update_rx.recv().await {
824                // Send notification
825                let _ = resource_transport
826                    .send(JsonRpcMessage::notification(
827                        methods::RESOURCES_UPDATED,
828                        Some(json!({ "uri": uri })),
829                    ))
830                    .await;
831            }
832        });
833
834        // Set up prompt update listener
835        let prompt_update_rx = self.prompt_manager.subscribe_to_updates();
836        let prompt_transport = self.transport.box_clone();
837
838        // Spawn a task to handle prompt updates
839        tokio::spawn(async move {
840            let mut update_rx = prompt_update_rx;
841            while let Ok(_) = update_rx.recv().await {
842                // Send notification
843                let _ = prompt_transport
844                    .send(JsonRpcMessage::notification(
845                        methods::PROMPTS_LIST_CHANGED,
846                        None,
847                    ))
848                    .await;
849            }
850        });
851
852        // Process messages
853        while let Some(message) = rx.recv().await {
854            if let Err(err) = self.handle_message(message).await {
855                tracing::error!("Error handling message: {}", err);
856            }
857        }
858
859        // Update state
860        self.state
861            .store(ServerState::ShuttingDown as u8, Ordering::SeqCst);
862
863        // Close transport
864        self.transport.close().await?;
865
866        Ok(())
867    }
868
869    /// Get a reference to the tool manager
870    pub fn tool_manager(&self) -> &Arc<ToolManager> {
871        &self.tool_manager
872    }
873
874    /// Get a reference to the resource manager
875    pub fn resource_manager(&self) -> &Arc<ResourceManager> {
876        &self.resource_manager
877    }
878
879    /// Get a reference to the prompt manager
880    pub fn prompt_manager(&self) -> &Arc<PromptManager> {
881        &self.prompt_manager
882    }
883
884    /// Get a reference to the transport
885    pub(crate) fn transport(&self) -> &Box<dyn Transport> {
886        &self.transport
887    }
888
889    /// Get the server state
890    pub(crate) fn state(&self) -> &Arc<AtomicU8> {
891        &self.state
892    }
893}